Add amended version of 4.3 with performance reporting and some fixes.

This commit is contained in:
Danny Staple 2023-01-22 22:56:12 +00:00
parent 172daae295
commit dd3fd206dc
12 changed files with 3716 additions and 0 deletions

File diff suppressed because one or more lines are too long

View File

@ -0,0 +1,74 @@
import asyncio
import json
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.widgets import Button
from robot_ble_connection import BleConnection
class RobotDisplay:
def __init__(self):
self.ble_connection = BleConnection(self.handle_data)
self.buffer = ""
self.arena = {}
self.closed = False
self.fig, self.axes = plt.subplots()
self.poses = None
def handle_close(self, _):
self.closed = True
def handle_data(self, data):
self.buffer += data.decode()
# print(f"Received raw data: {data}")
while "\n" in self.buffer:
line, self.buffer = self.buffer.split("\n", 1)
print(f"Received data: {line}")
try:
message = json.loads(line)
except ValueError:
print("Error parsing JSON")
return
if "arena" in message:
self.arena = message
if "poses" in message:
self.poses = np.array(message["poses"], dtype=np.int16)
def draw(self):
self.axes.clear()
if self.arena:
for line in self.arena["arena"]:
self.axes.plot(
[line[0][0], line[1][0]], [line[0][1], line[1][1]], color="black"
)
if self.poses is not None:
self.axes.scatter(self.poses[:,0], self.poses[:,1], color="blue")
async def send_command(self, command):
request = (json.dumps({"command": command}) ).encode()
print(f"Sending request: {request}")
await self.ble_connection.send_uart_data(request)
def start(self, _):
self.button_task = asyncio.create_task(self.send_command("start"))
async def main(self):
plt.ion()
await self.ble_connection.connect()
try:
await self.send_command("arena")
self.fig.canvas.mpl_connect("close_event", self.handle_close)
start_button = Button(plt.axes([0.7, 0.05, 0.1, 0.075]), "Start")
start_button.on_clicked(self.start)
while not self.closed:
self.draw()
plt.draw()
plt.pause(0.05)
await asyncio.sleep(0.01)
finally:
await self.ble_connection.close()
robot_display = RobotDisplay()
asyncio.run(robot_display.main())

2572
ch-13/4.3-monte-carlo_perf/poetry.lock generated Normal file

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,22 @@
[tool.poetry]
name = "modelling-space"
version = "0.1.0"
description = ""
authors = ["Danny Staple <danny@orionrobots.co.uk>"]
readme = "README.md"
packages = [{include = "modelling_space"}]
[tool.poetry.dependencies]
python = "^3.9"
matplotlib = "3.6.1"
numpy = "1.23.4"
bleak = "0.19.0"
jupyter = "^1.0.0"
[tool.poetry.group.dev.dependencies]
black = "^22.10.0"
[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"

View File

@ -0,0 +1,3 @@
matplotlib==3.6.1
numpy==1.23.4
bleak==0.19.0

View File

@ -0,0 +1,97 @@
"""Represent the lines of the arena"""
try:
from ulab import numpy as np
except ImportError:
import numpy as np
width = 1500
height = 1500
cutout_width = 500
cutout_height = 500
boundary_lines = [
[(0,0), (0, height)],
[(0, height), (width, height)],
[(width, height), (width, cutout_height)],
[(width, cutout_height), (width - cutout_width, cutout_height)],
[(width - cutout_width, cutout_height), (width - cutout_width, 0)],
[(width - cutout_width, 0), (0, 0)],
]
def contains(x, y):
"""Return True if the point is inside the arena.
if the point is inside the rectangle, but not inside the cutout, it's inside the arena.
"""
# is it inside the rectangle?
if x < 0 or x > width \
or y < 0 or y > height:
return False
# is it inside the cutout?
if x > (width - cutout_width) and y < cutout_height:
return False
return True
grid_cell_size = 50
overscan = 10 # 10 each way
def get_distance_to_segment(x, y, segment):
"""Return the distance from the point to the segment.
Segment -> ((x1, y1), (x2, y2))
All segments are horizontal or vertical.
"""
x1, y1 = segment[0]
x2, y2 = segment[1]
# if the segment is horizontal, the point will be closest to the y value of the segment
if y1 == y2 and x >= min(x1, x2) and x <= max(x1, x2):
return abs(y - y1)
# if the segment is vertical, the point will be closest to the x value of the segment
if x1 == x2 and y >= min(y1, y2) and y <= max(y1, y2):
return abs(x - x1)
# the point will be closest to one of the end points
return np.sqrt(
min(
(x - x1) ** 2 + (y - y1) ** 2,
(x - x2) ** 2 + (y - y2) ** 2
)
)
def get_distance_likelihood(x, y):
"""Return the distance from the point to the nearest segment as a decay function."""
min_distance = None
for segment in boundary_lines:
distance = get_distance_to_segment(x, y, segment)
if min_distance is None or distance < min_distance:
min_distance = distance
return 1.0 / (1 + min_distance/250) ** 2
# beam endpoint model
def make_distance_grid():
"""Take the boundary lines. With and overscan of 10 cells, and grid cell size of 5cm (50mm),
make a grid of the distance to the nearest boundary line.
"""
grid = np.zeros((
width // grid_cell_size + 2 * overscan,
height // grid_cell_size + 2 * overscan
), dtype=np.float)
for x in range(grid.shape[0]):
column_x = x * grid_cell_size - (overscan * grid_cell_size)
for y in range(grid.shape[1]):
row_y = y * grid_cell_size - (overscan * grid_cell_size)
grid[x, y] = get_distance_likelihood(
column_x, row_y
)
return grid
distance_grid = make_distance_grid()
def get_distance_grid_at_point(x, y):
"""Return the distance grid value at the given point."""
grid_x = int(x // grid_cell_size + overscan)
grid_y = int(y // grid_cell_size + overscan)
if grid_x < 0 or grid_x >= distance_grid.shape[0] or grid_y < 0 or grid_y >= distance_grid.shape[1]:
return 0
return distance_grid[grid_x, grid_y]

View File

@ -0,0 +1,290 @@
import asyncio
import json
import random
from ulab import numpy as np
import arena
import robot
import traceback
from performance_counter import PerformanceCounter
class DistanceSensorTracker:
def __init__(self):
robot.left_distance.distance_mode = 2
robot.right_distance.distance_mode = 2
self.left = 300
self.right = 300
async def main(self):
robot.left_distance.start_ranging()
robot.right_distance.start_ranging()
while True:
if robot.left_distance.data_ready and robot.left_distance.distance:
self.left = robot.left_distance.distance * 10 # convert to mm
robot.left_distance.clear_interrupt()
if robot.right_distance.data_ready and robot.right_distance.distance:
self.right = robot.right_distance.distance * 10
robot.right_distance.clear_interrupt()
await asyncio.sleep(0.01)
class CollisionAvoid:
def __init__(self, distance_sensors):
self.speed = 0.6
self.distance_sensors = distance_sensors
async def main(self):
try:
while True:
robot.set_right(self.speed)
while self.distance_sensors.left < 300 or \
self.distance_sensors.right < 300:
robot.set_left(-self.speed)
await asyncio.sleep(0.3)
robot.set_left(self.speed)
await asyncio.sleep(0)
except Exception as e:
send_json({"exception": traceback.format_exception(e)})
raise
def get_random_sample(mean, scale):
return mean + (random.uniform(-scale, scale) + random.uniform(-scale, scale)) / 2
def send_json(data):
robot.uart.write((json.dumps(data) + "\n").encode())
def read_json():
try:
data = robot.uart.readline()
decoded = data.decode()
return json.loads(decoded)
except (UnicodeError, ValueError):
print("Invalid data")
return None
def send_poses(samples):
print(f"sending {len(samples)} poses")
send_json({
"poses": np.array(samples[:,:2], dtype=np.int16).tolist(),
})
class Simulation:
def __init__(self):
self.population_size = 200
self.poses = np.array(
[(
int(random.uniform(0, arena.width)),
int(random.uniform(0, arena.height)),
int(random.uniform(0, 360))) for _ in range(self.population_size)],
dtype=np.float,
)
self.distance_sensors = DistanceSensorTracker()
self.collision_avoider = CollisionAvoid(self.distance_sensors)
self.last_encoder_left = robot.left_encoder.read()
self.last_encoder_right = robot.right_encoder.read()
self.alpha_rot = 0.05
self.alpha_rot_trans = 0.01
self.alpha_trans = 0.05
self.alpha_trans_rot = 0.01
# profiling
self.pc_resample = PerformanceCounter()
self.pc_odometry = PerformanceCounter()
self.pc_move_poses = PerformanceCounter() # superset of odometry time
self.pc_randomise_motion = PerformanceCounter()
self.pc_motion_model = PerformanceCounter()
self.pc_observe_distance_sensors = PerformanceCounter()
self.pc_observation_model = PerformanceCounter()
def resample(self, weights, sample_count):
"""Return sample_count number of samples from the
poses, based on the weights array.
Uses low variance resampling"""
self.pc_resample.start()
samples = np.zeros((sample_count, 3))
interval = 1 / sample_count
shift = random.uniform(0, interval)
cumulative_weights = weights[0]
source_index = 0
for current_index in range(sample_count):
weight_index = shift + current_index * interval
while weight_index >= cumulative_weights:
source_index += 1
source_index = min(len(weights), source_index)
cumulative_weights += weights[source_index]
samples[current_index] = self.poses[source_index]
self.pc_resample.stop()
return samples
def convert_odometry_to_motion(self, left_encoder_delta, right_encoder_delta):
"""
left_encoder is the change in the left encoder
right_encoder is the change in the right encoder
returns rot1, trans, rot2
rot1 is the rotation of the robot in degrees before the translation
trans is the distance the robot has moved in mm
rot2 is the rotation of the robot in degrees
"""
self.pc_odometry.start()
left_mm = left_encoder_delta * robot.ticks_to_mm
right_mm = right_encoder_delta * robot.ticks_to_mm
if left_mm == right_mm:
return 0, left_mm, 0
# calculate the radius of the arc
radius = (robot.wheelbase_mm / 2) * (left_mm + right_mm) / (right_mm - left_mm)
## angle = difference in steps / wheelbase
d_theta = (right_mm - left_mm) / robot.wheelbase_mm
# For a small enough motion, assume that the chord length = arc length
arc_length = d_theta * radius
rot1 = np.degrees(d_theta/2)
rot2 = rot1
self.pc_odometry.stop()
return rot1, arc_length, rot2
def move_poses(self, rot1, trans, rot2):
self.pc_move_poses.start()
self.poses[:,2] += rot1
rot1_radians = np.radians(self.poses[:,2])
self.poses[:,0] += trans * np.cos(rot1_radians)
self.poses[:,1] += trans * np.sin(rot1_radians)
self.poses[:,2] += rot2
self.poses[:,2] = np.array([float(theta % 360) for theta in self.poses[:,2]])
self.pc_move_poses.stop()
def randomise_motion(self, rot1, trans, rot2):
self.pc_randomise_motion.start()
rot1_scale = self.alpha_rot * abs(rot1) + self.alpha_rot_trans * abs(trans)
trans_scale = self.alpha_trans * abs(trans) + self.alpha_trans_rot * (abs(rot1) + abs(rot2))
rot2_scale = self.alpha_rot * abs(rot2) + self.alpha_rot_trans * abs(trans)
rot1_model = np.array([get_random_sample(rot1, rot1_scale) for _ in range(self.poses.shape[0])])
trans_model = np.array([get_random_sample(trans, trans_scale) for _ in range(self.poses.shape[0])])
rot2_model = np.array([get_random_sample(rot2, rot2_scale) for _ in range(self.poses.shape[0])])
self.pc_randomise_motion.stop()
return rot1_model, trans_model, rot2_model
def motion_model(self):
"""Apply the motion model"""
self.pc_motion_model.start()
new_encoder_left = robot.left_encoder.read()
new_encoder_right = robot.right_encoder.read()
rot1, trans, rot2 = self.convert_odometry_to_motion(
new_encoder_left - self.last_encoder_left,
new_encoder_right - self.last_encoder_right)
self.last_encoder_left = new_encoder_left
self.last_encoder_right = new_encoder_right
rot1_model, trans_model, rot2_model = self.randomise_motion(rot1, trans, rot2)
self.move_poses(rot1_model, trans_model, rot2_model)
#
# print(
# json.dumps(
# [self.poses.tolist(), rot1, trans, rot2]
# )
# )
self.pc_motion_model.stop()
def observe_distance_sensors(self, weights):
self.pc_observe_distance_sensors.start()
# Sensor triangle left
opposite = self.distance_sensors.left + robot.dist_forward_mm
adjacent = robot.dist_side_mm
left_angle = np.atan(opposite / adjacent)
left_hypotenuse = np.sqrt(opposite**2 + adjacent**2)
# Sensor triangle right
opposite = self.distance_sensors.right + robot.dist_forward_mm
adjacent = robot.dist_side_mm
right_angle = np.atan(opposite / adjacent)
right_hypotenuse = np.sqrt(opposite**2 + adjacent**2)
# modify the current weights based on the distance sensors
left_sensor = np.zeros((self.poses.shape[0], 2), dtype=np.float)
right_sensor = np.zeros((self.poses.shape[0], 2), dtype=np.float)
# left sensor
poses_left_angle = np.radians(self.poses[:, 2]) + left_angle
left_sensor[:, 0] = self.poses[:, 0] + np.cos(poses_left_angle) * left_hypotenuse
left_sensor[:, 1] = self.poses[:, 1] + np.sin(poses_left_angle) * left_hypotenuse
# right sensor
poses_right_angle = np.radians(self.poses[:, 2]) - right_angle
right_sensor[:, 0] = self.poses[:, 0] + np.cos(poses_right_angle) * right_hypotenuse
right_sensor[:, 1] = self.poses[:, 1] + np.sin(poses_right_angle) * right_hypotenuse
# Look up the distance in the arena
for index in range(self.poses.shape[0]):
sensor_weight = arena.get_distance_grid_at_point(left_sensor[index,0], left_sensor[index,1])
sensor_weight += arena.get_distance_grid_at_point(right_sensor[index,0], right_sensor[index,1])
weights[index] *= sensor_weight
self.pc_observe_distance_sensors.stop()
return weights
def observation_model(self):
self.pc_observation_model.start()
weights = np.ones(self.poses.shape[0], dtype=np.float)
for index, pose in enumerate(self.poses):
if not arena.contains(pose[:1], pose[:2]):
weights[index] = 0.01
weights = self.observe_distance_sensors(weights)
weights = weights / np.sum(weights)
self.pc_observation_model.stop()
return weights
def print_pc_lines(self):
if self.pc_odometry.count % 10 != 0:
return
print("Resample: ", self.pc_resample.performance_line())
print("Odometry: ", self.pc_odometry.performance_line())
print("Move Poses: ", self.pc_move_poses.performance_line())
print("Randomise Motion: ", self.pc_randomise_motion.performance_line())
print("Motion Model: ", self.pc_motion_model.performance_line())
print("Observe Distance Sensors: ", self.pc_observe_distance_sensors.performance_line())
print("Observation Model: ", self.pc_observation_model.performance_line())
async def main(self):
asyncio.create_task(self.distance_sensors.main())
collision_avoider = asyncio.create_task(self.collision_avoider.main())
try:
while True:
weights = self.observation_model()
print("Weights: ", weights)
send_poses(self.resample(weights, 20))
self.poses = self.resample(weights, self.population_size)
await asyncio.sleep(0.05)
self.motion_model()
self.print_pc_lines()
except Exception as e:
send_json({"exception": traceback.format_exception(e)})
raise
finally:
collision_avoider.cancel()
robot.stop()
async def command_handler(simulation):
print("Starting handler")
simulation_task = None
while True:
if robot.uart.in_waiting:
request = read_json()
if not request:
continue
print("Received: ", request)
if request["command"] == "arena":
send_json({
"arena": arena.boundary_lines,
})
elif request["command"] == "start":
if not simulation_task:
simulation_task = asyncio.create_task(simulation.main())
await asyncio.sleep(0.1)
simulation = Simulation()
asyncio.run(command_handler(simulation))

View File

@ -0,0 +1,85 @@
import time
class PerformanceCounter:
def __init__(self):
self.count = 0
self.total_time = 0
self.start_time = 0
def start(self):
self.start_time = time.monotonic()
def stop(self):
end = time.monotonic()
self.total_time += (end - self.start_time)
self.count += 1
def per_call(self):
return self.total_time / self.count
def total_call_time(self):
return self.total_time
def call_count(self):
return self.count
def performance_line(self):
return f"Calls: {self.count}, Total time: {self.total_time} s, Per call: {self.per_call()}"
# class PerformanceCounterDecorator:
# def __init__(self, fn):
# self.count = 0
# self.total_time = 0
# self.fn = fn
# def __call__(self, *args, **kwargs):
# start = time.monotonic()
# try:
# print(f"Calling method with {args}, {kwargs}")
# return self.fn(*args, **kwargs)
# finally:
# end = time.monotonic()
# self.total_time += (end - start)
# self.count += 1
# def per_call(self):
# return self.total_time / self.count
# def total_call_time(self):
# return self.total_time
# def call_count(self):
# return self.count
# def performance_line(self):
# return f"Calls: {self.count}, Total time: {self.total_time} s, Per call: {self.per_call()}"
# @PerformanceCounter
# def slow_printer(stuff):
# print(f"Hello, {stuff}")
# time.sleep(0.05)
# return "hello"
# class SomeClass:
# def __init__(self, stuff):
# self.stuff = stuff
# @PerformanceCounter
# def test(self, more_stuff):
# print(self.stuff, more_stuff)
# return 5
# result = slow_printer("first_thing")
# print(f"result is {result}")
# # for n in range(1000):
# # slow_printer("first_thing")
# # slow_printer("second_thing")
# print(slow_printer.performance_line())
# some_instance = SomeClass("foo")
# result = some_instance.test("bar")
# print(result)
# print(SomeClass.test.performance_line())

View File

@ -0,0 +1,27 @@
class PIDController:
def __init__(self, kp, ki, kd, d_filter_gain=0.1, imax=None, imin=None):
self.kp = kp
self.ki = ki
self.kd = kd
self.d_filter_gain = d_filter_gain
self.imax = imax
self.imin = imin
self.reset()
def reset(self):
self.integral = 0
self.error_prev = 0
self.derivative = 0
def calculate(self, error, dt):
self.integral += error * dt
if self.imax is not None and self.integral > self.imax:
self.integral = self.imax
if self.imin is not None and self.integral < self.imin:
self.integral = self.imin
# Add a low pass filter to the difference
difference = (error - self.error_prev) * self.d_filter_gain
self.error_prev += difference
self.derivative = difference / dt
return self.kp * error + self.ki * self.integral + self.kd * self.derivative

View File

@ -0,0 +1,84 @@
import rp2pio
import adafruit_pioasm
import array
import asyncio
program = """
; use the osr for count
; input pins c1 c2
set y, 0 ; clear y
mov osr, y ; and clear osr
read:
; x will be the old value
; y the new values
mov x, y ; store old Y in x
in null, 32 ; Clear ISR - using y
in pins, 2 ; read two pins into y
mov y, isr
jmp x!=y, different ; Jump if its different
jmp read ; otherwise loop back to read
different:
; x has old value, y has new.
; extract the upper bit of X.
in x, 31 ; get bit 31 - old p1 (remember which direction it came in)
in null, 31 ; keep only 1 bit
mov x, isr ; put this back in x
jmp !x, c1_old_zero
c1_old_not_zero:
jmp pin, count_up
jmp count_down
c1_old_zero:
jmp pin, count_down
; fall through
count_up:
; for a clockwise move - we'll add 1 by inverting
mov x, ~ osr ; store inverted OSR on x
jmp x--, fake ; use jump to take off 1
fake:
mov x, ~ x ; invert back
jmp send
count_down:
; for a clockwise move, just take one off
mov x, osr ; store osr in x
jmp x--, send ; dec and send
send:
; send x.
mov isr, x ; send it
push noblock ; put ISR into input FIFO
mov osr, x ; put X back in OSR
jmp read ; loop back
"""
assembled = adafruit_pioasm.assemble(program)
class QuadratureEncoder:
def __init__(self, first_pin, second_pin, reversed=False):
"""Encoder with 2 pins. Must use sequential pins on the board"""
self.sm = rp2pio.StateMachine(
assembled,
frequency=0,
first_in_pin=first_pin,
jmp_pin=second_pin,
in_pin_count=2,
)
self.reversed = reversed
self._buffer = array.array("i", [0])
asyncio.create_task(self.poll_loop())
async def poll_loop(self):
while True:
await asyncio.sleep(0)
while self.sm.in_waiting:
self.sm.readinto(self._buffer)
def read(self):
if self.reversed:
return -self._buffer[0]
else:
return self._buffer[0]

View File

@ -0,0 +1,80 @@
import board
import pwmio
import pio_encoder
import busio
import adafruit_vl53l1x
import math
import adafruit_bno055
uart = busio.UART(board.GP12, board.GP13, baudrate=9600, timeout=0.1)
wheel_diameter_mm = 69.5
wheel_circumference_mm = math.pi * wheel_diameter_mm
gear_ratio = 298
encoder_poles = 28
ticks_per_revolution = encoder_poles * gear_ratio
ticks_to_mm = wheel_circumference_mm / ticks_per_revolution
ticks_to_m = ticks_to_mm / 1000
m_to_ticks = 1 / ticks_to_m
wheelbase_mm = 170
dist_side_mm = 37 # approx mm
dist_forward_mm = 66 # approx mm
motor_A2 = pwmio.PWMOut(board.GP17, frequency=100)
motor_A1 = pwmio.PWMOut(board.GP16, frequency=100)
motor_B2 = pwmio.PWMOut(board.GP18, frequency=100)
motor_B1 = pwmio.PWMOut(board.GP19, frequency=100)
right_motor = motor_A1, motor_A2
left_motor = motor_B1, motor_B2
right_encoder = pio_encoder.QuadratureEncoder(board.GP20, board.GP21)
left_encoder = pio_encoder.QuadratureEncoder(board.GP26, board.GP27, reversed=True)
i2c0 = busio.I2C(sda=board.GP0, scl=board.GP1)
i2c1 = busio.I2C(sda=board.GP2, scl=board.GP3)
left_distance = adafruit_vl53l1x.VL53L1X(i2c0)
right_distance = adafruit_vl53l1x.VL53L1X(i2c1)
imu = adafruit_bno055.BNO055_I2C(i2c0)
def stop():
motor_A1.duty_cycle = 0
motor_A2.duty_cycle = 0
motor_B1.duty_cycle = 0
motor_B2.duty_cycle = 0
def set_speed(motor, speed):
# Swap motor pins if we reverse the speed
if abs(speed) < 0.1:
motor[0].duty_cycle = 0
motor[1].duty_cycle = 1
return
if speed < 0:
direction = motor[1], motor[0]
speed = -speed
else:
direction = motor
speed = min(speed, 1) # limit to 1.0
max_speed = 2 ** 16 - 1
direction[0].duty_cycle = int(max_speed * speed)
direction[1].duty_cycle = 0
def set_left(speed):
set_speed(left_motor, speed)
def set_right(speed):
set_speed(right_motor, speed)
def send_line(message):
uart.write(f"{message}\n".encode())
def check_imu_status():
sys_status, gyro, accel, mag = imu.calibration_status
send_line(f"Sys: {sys_status}, Gyro: {gyro}, Accel: {accel}, Mag: {mag}")
return sys_status == 3

View File

@ -0,0 +1,38 @@
import asyncio
import bleak
class BleConnection:
# See https://learn.adafruit.com/introducing-adafruit-ble-bluetooth-low-energy-friend/uart-service
ble_uuid = "6E400001-B5A3-F393-E0A9-E50E24DCCA9E"
rx_gatt = "6E400003-B5A3-F393-E0A9-E50E24DCCA9E"
tx_gatt = "6E400002-B5A3-F393-E0A9-E50E24DCCA9E"
ble_name = "Adafruit Bluefruit LE"
def __init__(self, receive_handler):
self.ble_client = None
self.receive_handler = receive_handler
def _uart_handler(self, _, data: bytes):
self.receive_handler(data)
async def connect(self):
print("Scanning for devices...")
devices = await bleak.BleakScanner.discover(service_uuids=[self.ble_uuid])
print(f"Found {len(devices)} devices")
print([device.name for device in devices])
ble_device_info = [device for device in devices if device.name==self.ble_name][0]
print(f"Connecting to {ble_device_info.name}...")
self.ble_client = bleak.BleakClient(ble_device_info.address)
await self.ble_client.connect()
print("Connected to {}".format(ble_device_info.name))
self.notify_task = asyncio.create_task(
self.ble_client.start_notify(self.rx_gatt, self._uart_handler)
)
async def close(self):
await self.ble_client.disconnect()
async def send_uart_data(self, data):
await self.ble_client.write_gatt_char(self.tx_gatt, data)