diff --git a/ch-13/full-version/computer/display_from_robot.py b/ch-13/full-version/computer/display_from_robot.py index 4533a8f..0e5cd37 100644 --- a/ch-13/full-version/computer/display_from_robot.py +++ b/ch-13/full-version/computer/display_from_robot.py @@ -15,6 +15,7 @@ class RobotDisplay: self.display_closed = False self.fig, self.ax = plt.subplots() self.poses = np.zeros([200, 3], dtype=np.int16) + self.motion = np.zeros([200, 3], dtype=float) def handle_close(self, _): self.display_closed = True @@ -39,6 +40,9 @@ class RobotDisplay: print("Incoming poses shape", incoming_poses.shape) print("Existing poses shape", self.poses.shape) self.poses[message["offset"]: message["offset"] + incoming_poses.shape[0]] = incoming_poses + if "motion" in message: + np.roll(self.motion, 1, axis=0) + self.motion[0] = [message["motion"]["rot1"], message["motion"]["trans"], message["motion"]["rot2"]] def draw(self): self.ax.clear() diff --git a/ch-13/full-version/computer/display_with_motion_graph.py b/ch-13/full-version/computer/display_with_motion_graph.py new file mode 100644 index 0000000..1c5464c --- /dev/null +++ b/ch-13/full-version/computer/display_with_motion_graph.py @@ -0,0 +1,90 @@ +import asyncio +import json +import numpy as np +import matplotlib.pyplot as plt +from matplotlib.widgets import Button + +from robot_ble_connection import BleConnection + + +class RobotDisplay: + def __init__(self): + self.ble_connection = BleConnection(self.handle_data) + self.line = "" + self.arena = {} + self.display_closed = False + self.fig, (self.ax1, self.ax2) = plt.subplots(nrows=2) + self.poses = np.zeros([200, 3], dtype=np.int16) + self.motion = np.zeros([200, 3], dtype=float) + + def handle_close(self, _): + self.display_closed = True + + def handle_data(self, data): + self.line += data.decode("utf-8") + # print(f"Received data: {data.decode('utf-8')}") + # print(f"Line is now: {self.line}") + while "\n" in self.line: + line, self.line = self.line.split("\n", 1) + print(f"Received line: {line}") + try: + message = json.loads(line) + except ValueError: + print("Error parsing JSON") + return + if "arena" in message: + self.arena = message + if "poses" in message: + print(message) + incoming_poses = np.array(message["poses"], dtype=np.int16) + self.poses[message["offset"]: message["offset"] + incoming_poses.shape[0]] = incoming_poses + if "motion" in message: + self.motion = np.roll(self.motion, 1, axis=0) + self.motion[0] = [message["motion"]["rot1"], message["motion"]["trans"], message["motion"]["rot2"]] + print(self.motion[1]) + + def draw(self): + self.ax1.clear() + if self.arena: + for line in self.arena["arena"]: + self.ax1.plot( + [line[0][0], line[1][0]], [line[0][1], line[1][1]], color="black" + ) + self.ax1.scatter(self.poses[:,0], self.poses[:,1], color="blue") + self.ax2.clear() + self.ax2.plot(np.arange(200), self.motion) + + + async def send_command(self, command): + #+ "\n" - why does adding this (which sounds right) cause the ble stack (on the robot or computer? ) not to work any more? + request = (json.dumps({"command": command}) ).encode() + print(f"Sending request: {request}") + await self.ble_connection.send_uart_data(request) + + def start(self, _): + self.button_task = asyncio.create_task(self.send_command("start")) + + # def stop(self, _): + # self.button_task = asyncio.create_task(self.send_command("stop")) + + async def main(self): + plt.ion() + await self.ble_connection.connect() + try: + await self.send_command("arena") + self.fig.canvas.mpl_connect("close_event", self.handle_close) + start_button = Button(plt.axes([0.7, 0.05, 0.1, 0.075]), "Start") + start_button.on_clicked(self.start) + # stop_button = Button(plt.axes([0.81, 0.05, 0.1, 0.075]), "Stop") + # stop_button.on_clicked(self.stop) + while not self.display_closed: + self.draw() + plt.draw() + plt.pause(0.05) + await asyncio.sleep(0.01) + finally: + await self.ble_connection.close() + + +robot_display = RobotDisplay() +asyncio.run(robot_display.main()) diff --git a/ch-13/full-version/robot/code.py b/ch-13/full-version/robot/code.py index 3aa6558..cc4ae85 100644 --- a/ch-13/full-version/robot/code.py +++ b/ch-13/full-version/robot/code.py @@ -5,28 +5,35 @@ from ulab import numpy as np import arena import robot -class VaryingWallAvoid: +class CollisionAvoid: def __init__(self): self.speed = 0.6 - - def speed_from_distance(self, distance): - limited_error = min(distance, 300) * self.speed - motor_speed = limited_error / 300 - if motor_speed < 0.2: - motor_speed = -0.3 - return motor_speed + self.left_distance = 300 + self.right_distance = 300 def update(self, left_distance, right_distance): - left = self.speed_from_distance(left_distance) - right = self.speed_from_distance(right_distance) - robot.set_left(left) - robot.set_right(right) + self.left_distance = left_distance + self.right_distance = right_distance + + async def run(self): + while True: + robot.set_right(self.speed) + while self.left_distance < 300 or self.right_distance < 300: + robot.set_left(-0.6) + await asyncio.sleep(0.3) + robot.set_left(self.speed) + await asyncio.sleep(0) + triangular_proportion = np.sqrt(6) / 2 def get_triangular_sample(mean, standard_deviation): base = triangular_proportion * (random.uniform(-standard_deviation, standard_deviation) + random.uniform(-standard_deviation, standard_deviation)) return mean + base + +def send_json(data): + robot.uart.write((json.dumps(data) + "\n").encode()) + class Simulation: def __init__(self): self.population_size = 100 @@ -45,7 +52,7 @@ class Simulation: int(random.uniform(0, 360))) for _ in range(self.population_size)], dtype=np.int16, ) - self.collision_avoider = VaryingWallAvoid() + self.collision_avoider = CollisionAvoid() async def apply_sensor_model(self): # Based on vl53l1x sensor readings, create weight for each pose. @@ -84,6 +91,7 @@ class Simulation: # remove any that are outside the arena if not arena.point_is_inside_arena(self.poses[index,0], self.poses[index,1]): weights[index] = 0 + print("pose outside arena") continue # difference between this distance and the distance sensed is the error # weight is the inverse of the error @@ -140,26 +148,34 @@ class Simulation: async def motion_model(self): """move forward, apply the motion model""" - starting_heading = robot.imu.euler[0] - encoder_left = robot.left_encoder.read() - encoder_right = robot.right_encoder.read() + new_heading = robot.imu.euler[0] + new_encoder_left = robot.left_encoder.read() + new_encoder_right = robot.right_encoder.read() - await asyncio.sleep(0.01) + await asyncio.sleep(0) rot1, trans, rot2 = self.convert_odometry_to_motion( - robot.left_encoder.read() - encoder_left, - robot.right_encoder.read() - encoder_right) - + new_encoder_left - self.last_encoder_left, + new_encoder_right - self.last_encoder_right) + send_json({"motion": { + "rot1": rot1, + "trans": trans, + "rot2": rot2 + }}) + self.last_encoder_left = new_encoder_left + self.last_encoder_right = new_encoder_right try: new_heading = robot.imu.euler[0] except OSError: new_heading = None if new_heading: - heading_change = starting_heading - new_heading + heading_change = self.last_heading - new_heading + self.last_heading = new_heading # blend with the encoder heading changes rot1 = rot1 * self.encoder_mix + heading_change * self.imu_mix rot2 = rot2 * self.encoder_mix + heading_change * self.imu_mix else: print("Failed to get heading") + await asyncio.sleep(0) rot1_model = np.array([get_triangular_sample(rot1, self.rotation_standard_dev) for _ in range(self.poses.shape[0])]) trans_model = np.array([get_triangular_sample(trans, self.speed_standard_dev) for _ in range(self.poses.shape[0])]) rot2_model = np.array([get_triangular_sample(rot2, self.rotation_standard_dev) for _ in range(self.poses.shape[0])]) @@ -190,18 +206,20 @@ class Simulation: async def run(self): asyncio.create_task(self.distance_sensor_updater()) + asyncio.create_task(self.collision_avoider.run()) + self.last_heading = robot.imu.euler[0] + self.last_encoder_left = robot.left_encoder.read() + self.last_encoder_right = robot.right_encoder.read() + try: while True: weights = await self.apply_sensor_model() - self.resample(weights) + self.poses = self.resample(weights, self.population_size) await self.motion_model() finally: robot.stop() -def send_json(data): - robot.uart.write((json.dumps(data) + "\n").encode()) - def read_command(): data = robot.uart.readline()