diff --git a/ch-13/5-eliminating-poses-numpy-way/robot/code.py b/ch-13/5-eliminating-poses-numpy-way/robot/code.py deleted file mode 100644 index 0a26152..0000000 --- a/ch-13/5-eliminating-poses-numpy-way/robot/code.py +++ /dev/null @@ -1,204 +0,0 @@ -import asyncio -import json -from guassian import get_gaussian_sample -from ulab import numpy as np - -import arena -import robot - - -class Simulation: - def __init__(self): - self.population_size = 50 - - # poses can be a list of x[pop size], y[pop size], heading[pop size] - # while less "pythonic" it is more "numpyish" - self.poses = np.empty((3, 0), dtype=np.float) - self.regenerate_poses() - - def regenerate_poses(self): - # determine the number - number_to_make = self.population_size - len(self.poses[0]) - new_poses = np.empty((3, number_to_make), dtype = np.float) - # determine the mean and std for new poses - if len(self.poses[0]) > 0: - mean = np.mean(self.poses, 0) - std = np.std(self.poses, 0) - else: - mean = (arena.width/2, arena.height/2, 180) - std = (arena.width/4, arena.height/4, 180) - # generate new poses - print(f"Generating {number_to_make} new poses.") - for n in range(number_to_make): - new_poses[0, n] = get_gaussian_sample(mean[0], std[0]) - new_poses[1, n] = get_gaussian_sample(mean[1], std[1]) - new_poses[2, n] = get_gaussian_sample(mean[2], std[2]) - # set poses to concatenation of new poses and remaining poses - self.poses = np.concatenate((self.poses, new_poses), axis=1) - -# hmm - cannot expand, or resize np arrays. -# maybe we use normal py arrays apart from the mean/std bit? -# a real numpy whizz may have a better way. - - async def move_robot(self): - starting_heading = robot.imu.euler[0] - encoder_left = robot.left_encoder.read() - encoder_right = robot.right_encoder.read() - robot.set_left(1) - robot.set_right(0.5) - await asyncio.sleep(0.1) - # record sensor changes - left_movement = robot.left_encoder.read() - encoder_left - right_movement = robot.right_encoder.read() - encoder_right - speed_in_mm = robot.ticks_to_m * ((left_movement + right_movement) / 2) * 1000 - new_heading = robot.imu.euler[0] - heading_change = starting_heading - new_heading - - # move poses - radians = np.radians(self.poses[2]) - self.poses[0] += speed_in_mm * np.cos(radians) - self.poses[1] += speed_in_mm * np.sin(radians) - self.poses[2] += np.full(self.poses[2].shape, heading_change) - self.poses[2] = np.vectorize(lambda n: n % 360)(self.poses[2]) - -# ``` -# >>> first = np.array([[1, 2, 3, 4], [5,6,7,8]]) -# >>> second = np.array([3, 6, 9, 12]) -# >>> first + second -# array([[ 4, 8, 12, 16], -# [ 8, 12, 16, 20]]) -# >>> second = np.array([[3, 6, 9, 12], [1,1,1,1]]) -# >>> first + second -# array([[ 4, 8, 12, 16], -# [ 6, 7, 8, 9]]) -# >>> test=np.arange(10) -# >>> test -# array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9], dtype=int16) -# >>> mask = np.empty(len(test), dtype=np.bool) -# >>> mask -# array([False, False, False, False, False, False, False, False, False], dtype=bool) -# >>> mask[3] = [True] -# >>> mask[5] = [True] -# >>> mask[6] = [True] -# >>> test[mask] -# array([4, 6, 7], dtype=int16) -# ``` - def eliminate_poses(self, left_distance, right_distance): - poses_to_keep = np.empty(len(self.poses[0]), dtype=np.bool) - # first eliminate those outside the arena - for position, pose in enumerate(self.poses.transpose()): - # first those outside the arena - poses_to_keep[position] = [arena.point_is_inside_arena(pose[0], pose[1])] - # apply the keep as a mask to the poses using ulab. - self.poses = self.poses.transpose()[poses_to_keep].transpose() - - # Then deal with sensors - # todo: would reorganising these pose arrays let us better use ulab tools? - distance_sensors = np.array((4, len(self.poses[0])), np.float) - # sensors - they are facing forward, either side of the robot. Project them out to the sides - distance_sensors[0] = self.poses[0] + robot.distance_sensor_from_middle * np.cos(np.radians(self.poses[2] + 90)) - distance_sensors[1] = self.poses[1] + robot.distance_sensor_from_middle * np.sin(np.radians(self.poses[2] + 90)) - # sensor right - distance_sensors[2] = self.poses[0] + robot.distance_sensor_from_middle * np.cos(np.radians(self.poses[2] - 90)) - distance_sensors[3] = self.poses[1] + robot.distance_sensor_from_middle * np.sin(np.radians(self.poses[2] - 90)) - # now project these sensors forward based on their distance read - distance_sensors[0] += np.cos(np.radians(self.poses[2])) * left_distance - distance_sensors[1] += np.sin(np.radians(self.poses[2])) * left_distance - distance_sensors[2] += np.cos(np.radians(self.poses[2])) * right_distance - distance_sensors[3] += np.sin(np.radians(self.poses[2])) * right_distance - # now eliminate those sensors that are too far outside the boundary - # extension (extra layer) - those too far inside the boundary - # extension (if needed) - make the boundary fuzzier - 20cm error? - - for position, distance_sensor in enumerate(distance_sensors.transpose()): - poses_to_keep[position] = [arena.point_is_inside_arena(distance_sensor[0], distance_sensor[1]) and arena.point_is_inside_arena(distance_sensor[2], distance_sensor[3])] - # apply the keep as a mask to the poses using ulab. - self.poses = self.poses.T[poses_to_keep].transpose() - -# steps: -# regenerate poses -# move robot -# eliminate poses -# send poses - - async def run(self): - try: - robot.left_distance.start_ranging() - robot.right_distance.start_ranging() - for _ in range(15): - self.regenerate_poses() - await self.move_robot() - if robot.left_distance.data_ready and robot.right_distance.data_ready: - left_distance = robot.left_distance.distance * 10 # convert to mm - right_distance = robot.right_distance.distance * 10 - self.eliminate_poses(left_distance, right_distance) - robot.left_distance.clear_interrupt() - robot.right_distance.clear_interrupt() - finally: - robot.stop() - -def send_json(data): - robot.uart.write((json.dumps(data)+"\n").encode()) - -def read_command(): - data = robot.uart.readline() - try: - decoded = data.decode() - except UnicodeError: - print("UnicodeError decoding :") - print(data) - return None - try: - request = json.loads(decoded) - except ValueError: - print("ValueError reading json from:") - print(decoded) - return None - return request - -async def updater(simulation): - print("starting updater") - while True: - sys_status, gyro, accel, mag = robot.imu.calibration_status - if sys_status != 3: - send_json({"imu_calibration": { - "gyro": gyro, - "accel": accel, - "mag": mag, - "sys": sys_status, - }}) - send_json({ - "poses": simulation.poses.transpose().tolist(), - }) - await asyncio.sleep(0.5) - -async def command_handler(simulation): - update_task = asyncio.create_task(updater(simulation)) - print("Starting handler") - simulation_task = None - while True: - if robot.uart.in_waiting: - print("Receiving data...") - request = read_command() - if not request: - print("no request") - continue - # {"command": "arena"} - if request["command"] == "arena": - send_json({ - "arena": arena.boundary_lines, - "target_zone": arena.target_zone, - }) - elif request["command"] == "start": - print("Starting simulation") - if simulation_task is None or simulation_task.done(): - simulation_task = asyncio.create_task(simulation.run()) - elif request["command"] == "stop": - robot.stop() - if simulation_task and not simulation_task.done(): - simulation_task.cancel() - simulation_task = None - await asyncio.sleep(0.1) - -simulation= Simulation() -asyncio.run(command_handler(simulation)) diff --git a/ch-13/5-eliminating-poses-numpy-way/computer/display_from_robot.py b/ch-13/7-regenerating-full-numpy/computer/display_from_robot.py similarity index 100% rename from ch-13/5-eliminating-poses-numpy-way/computer/display_from_robot.py rename to ch-13/7-regenerating-full-numpy/computer/display_from_robot.py diff --git a/ch-13/5-eliminating-poses-numpy-way/computer/find_devices.py b/ch-13/7-regenerating-full-numpy/computer/find_devices.py similarity index 100% rename from ch-13/5-eliminating-poses-numpy-way/computer/find_devices.py rename to ch-13/7-regenerating-full-numpy/computer/find_devices.py diff --git a/ch-13/5-eliminating-poses-numpy-way/computer/poetry.lock b/ch-13/7-regenerating-full-numpy/computer/poetry.lock similarity index 100% rename from ch-13/5-eliminating-poses-numpy-way/computer/poetry.lock rename to ch-13/7-regenerating-full-numpy/computer/poetry.lock diff --git a/ch-13/5-eliminating-poses-numpy-way/computer/pyproject.toml b/ch-13/7-regenerating-full-numpy/computer/pyproject.toml similarity index 100% rename from ch-13/5-eliminating-poses-numpy-way/computer/pyproject.toml rename to ch-13/7-regenerating-full-numpy/computer/pyproject.toml diff --git a/ch-13/5-eliminating-poses-numpy-way/computer/requirements.txt b/ch-13/7-regenerating-full-numpy/computer/requirements.txt similarity index 100% rename from ch-13/5-eliminating-poses-numpy-way/computer/requirements.txt rename to ch-13/7-regenerating-full-numpy/computer/requirements.txt diff --git a/ch-13/5-eliminating-poses-numpy-way/computer/robot_ble_connection.py b/ch-13/7-regenerating-full-numpy/computer/robot_ble_connection.py similarity index 100% rename from ch-13/5-eliminating-poses-numpy-way/computer/robot_ble_connection.py rename to ch-13/7-regenerating-full-numpy/computer/robot_ble_connection.py diff --git a/ch-13/5-eliminating-poses-numpy-way/robot/arena.py b/ch-13/7-regenerating-full-numpy/robot/arena.py similarity index 100% rename from ch-13/5-eliminating-poses-numpy-way/robot/arena.py rename to ch-13/7-regenerating-full-numpy/robot/arena.py diff --git a/ch-13/7-regenerating-full-numpy/robot/code.py b/ch-13/7-regenerating-full-numpy/robot/code.py new file mode 100644 index 0000000..53b6065 --- /dev/null +++ b/ch-13/7-regenerating-full-numpy/robot/code.py @@ -0,0 +1,261 @@ +import asyncio +import json +from guassian import get_gaussian_sample +from ulab import numpy as np + +import arena +import robot + + +class Simulation: + def __init__(self): + self.population_size = 50 + self.left_distance = 100 + self.right_distance = 100 + + # poses can be a list of x[pop size], y[pop size], heading[pop size] + # while less "pythonic" it is more "numpyish" + self.poses = np.empty((3, 0), dtype=np.float) + self.regenerate_poses() + + def regenerate_poses(self): + # determine the number + number_to_make = self.population_size - len(self.poses[0]) + new_poses = np.empty((3, number_to_make), dtype=np.float) + # determine the mean and std for new poses + if len(self.poses[0]) > 0: + mean = np.mean(self.poses, 0) + std = np.std(self.poses, 0) + else: + mean = np.array((arena.width / 2, arena.height / 2, 180)) + std = np.array((arena.width / 4, arena.height / 4, 180)) + print("mean.shape :", mean.shape) + print("std.shape :", std.shape) + + # generate new poses + print(f"Generating {number_to_make} new poses.") + for n in range(number_to_make): + new_poses[0, n] = get_gaussian_sample(mean[0], std[0]) + new_poses[1, n] = get_gaussian_sample(mean[1], std[1]) + new_poses[2, n] = get_gaussian_sample(mean[2], std[2]) + # set poses to concatenation of new poses and remaining poses + self.poses = np.concatenate((self.poses, new_poses), axis=1) + + # hmm - cannot expand, or resize np arrays. + # maybe we use normal py arrays apart from the mean/std bit? + # a real numpy whizz may have a better way. + + async def move_robot(self): + starting_heading = robot.imu.euler[0] + encoder_left = robot.left_encoder.read() + encoder_right = robot.right_encoder.read() + robot.set_left(1) + robot.set_right(0.5) + await asyncio.sleep(0.1) + # record sensor changes + left_movement = robot.left_encoder.read() - encoder_left + right_movement = robot.right_encoder.read() - encoder_right + speed_in_mm = robot.ticks_to_m * ((left_movement + right_movement) / 2) * 1000 + new_heading = robot.imu.euler[0] + heading_change = starting_heading - new_heading + + # move poses + radians = np.radians(self.poses[2]) + self.poses[0] += speed_in_mm * np.cos(radians) + self.poses[1] += speed_in_mm * np.sin(radians) + self.poses[2] += np.full(self.poses[2].shape, heading_change) + self.poses[2] = np.vectorize(lambda n: n % 360)(self.poses[2]) + + # ``` + # >>> first = np.array([[1, 2, 3, 4], [5,6,7,8]]) + # >>> second = np.array([3, 6, 9, 12]) + # >>> first + second + # array([[ 4, 8, 12, 16], + # [ 8, 12, 16, 20]]) + # >>> second = np.array([[3, 6, 9, 12], [1,1,1,1]]) + # >>> first + second + # array([[ 4, 8, 12, 16], + # [ 6, 7, 8, 9]]) + # >>> test=np.arange(10) + # >>> test + # array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9], dtype=int16) + # >>> mask = np.empty(len(test), dtype=np.bool) + # >>> mask + # array([False, False, False, False, False, False, False, False, False], dtype=bool) + # >>> mask[3] = [True] + # >>> mask[5] = [True] + # >>> mask[6] = [True] + # >>> test[mask] + # array([4, 6, 7], dtype=int16) + # ``` + + def eliminate_poses(self, left_distance, right_distance): + poses_to_keep = np.empty(len(self.poses[0]), dtype=np.bool) + # first eliminate those outside the arena + for position, pose in enumerate(self.poses.transpose()): + # first those outside the arena + poses_to_keep[position] = [arena.point_is_inside_arena(pose[0], pose[1])] + # apply the keep as a mask to the poses using ulab. + self.poses = np.array( + [ + self.poses[0][poses_to_keep], + self.poses[1][poses_to_keep], + self.poses[2][poses_to_keep], + ] + ) + + # Then deal with sensors + # todo: would reorganising these pose arrays let us better use ulab tools? + distance_sensors = np.empty((4, len(self.poses[0])), dtype=np.float) + # sensors - they are facing forward, either side of the robot. Project them out to the sides + distance_sensors[0] = self.poses[ + 0 + ] + robot.distance_sensor_from_middle * np.cos(np.radians(self.poses[2] + 90)) + distance_sensors[1] = self.poses[ + 1 + ] + robot.distance_sensor_from_middle * np.sin(np.radians(self.poses[2] + 90)) + # sensor right + distance_sensors[2] = self.poses[ + 0 + ] + robot.distance_sensor_from_middle * np.cos(np.radians(self.poses[2] - 90)) + distance_sensors[3] = self.poses[ + 1 + ] + robot.distance_sensor_from_middle * np.sin(np.radians(self.poses[2] - 90)) + # now project these sensors forward based on their distance read + distance_sensors[0] += np.cos(np.radians(self.poses[2])) * left_distance + distance_sensors[1] += np.sin(np.radians(self.poses[2])) * left_distance + distance_sensors[2] += np.cos(np.radians(self.poses[2])) * right_distance + distance_sensors[3] += np.sin(np.radians(self.poses[2])) * right_distance + # now eliminate those sensors that are too far outside the boundary + # extension (extra layer) - those too far inside the boundary + # extension (if needed) - make the boundary fuzzier - 20cm error? + # poses to keep must be redefined - it's now shorter + poses_to_keep = np.empty(len(self.poses[0]), dtype=np.bool) + for position, distance_sensor in enumerate(distance_sensors.transpose()): + poses_to_keep[position] = [ + arena.point_is_inside_arena(distance_sensor[0], distance_sensor[1]) + and arena.point_is_inside_arena(distance_sensor[2], distance_sensor[3]) + ] + # apply the keep as a mask to the poses using ulab. + self.poses = np.array( + [ + self.poses[0][poses_to_keep], + self.poses[1][poses_to_keep], + self.poses[2][poses_to_keep], + ] + ) + print(self.poses.shape) + + # Helpful note - when debugging this numpy code, use prints with the serial console + # printing the object shape is often a very helpful way to debug what is going on. + # Eg: + # print("poses_to_keep.shape:", poses_to_keep.shape) + # print("self.poses[0].shape:", self.poses[0].shape) + + # steps: + # regenerate poses + # move robot + # eliminate poses + # send poses + async def distance_sensor_updater(self): + robot.left_distance.start_ranging() + robot.right_distance.start_ranging() + while True: + if robot.left_distance.data_ready: + self.left_distance = robot.left_distance.distance * 10 # convert to mm + robot.left_distance.clear_interrupt() + if robot.right_distance.data_ready: + self.right_distance = robot.right_distance.distance * 10 + robot.right_distance.clear_interrupt() + await asyncio.sleep_ms(10) + + async def run(self): + asyncio.create_task(self.distance_sensor_updater()) + try: + for _ in range(15): + self.regenerate_poses() + await self.move_robot() + self.eliminate_poses(self.left_distance, self.right_distance) + finally: + robot.stop() + + +def send_json(data): + robot.uart.write((json.dumps(data) + "\n").encode()) + + +def read_command(): + data = robot.uart.readline() + try: + decoded = data.decode() + except UnicodeError: + print("UnicodeError decoding :") + print(data) + return None + try: + request = json.loads(decoded) + except ValueError: + print("ValueError reading json from:") + print(decoded) + return None + return request + + +async def updater(simulation): + print("starting updater") + while True: + sys_status, gyro, accel, mag = robot.imu.calibration_status + if sys_status != 3: + send_json( + { + "imu_calibration": { + "gyro": gyro, + "accel": accel, + "mag": mag, + "sys": sys_status, + } + } + ) + send_json( + { + "poses": simulation.poses.transpose().tolist(), + } + ) + await asyncio.sleep(0.5) + + +async def command_handler(simulation): + update_task = asyncio.create_task(updater(simulation)) + print("Starting handler") + simulation_task = None + # This line - helpful to debug - rapid iteration on connected robot. + simulation_task = asyncio.create_task(simulation.run()) + while True: + if robot.uart.in_waiting: + print("Receiving data...") + request = read_command() + if not request: + print("no request") + continue + # {"command": "arena"} + if request["command"] == "arena": + send_json( + { + "arena": arena.boundary_lines, + "target_zone": arena.target_zone, + } + ) + elif request["command"] == "start": + print("Starting simulation") + if simulation_task is None or simulation_task.done(): + simulation_task = asyncio.create_task(simulation.run()) + elif request["command"] == "stop": + robot.stop() + if simulation_task and not simulation_task.done(): + simulation_task.cancel() + simulation_task = None + await asyncio.sleep(0.1) + + +simulation = Simulation() +asyncio.run(command_handler(simulation)) diff --git a/ch-13/5-eliminating-poses-numpy-way/robot/guassian.py b/ch-13/7-regenerating-full-numpy/robot/guassian.py similarity index 100% rename from ch-13/5-eliminating-poses-numpy-way/robot/guassian.py rename to ch-13/7-regenerating-full-numpy/robot/guassian.py diff --git a/ch-13/5-eliminating-poses-numpy-way/robot/pid_controller.py b/ch-13/7-regenerating-full-numpy/robot/pid_controller.py similarity index 100% rename from ch-13/5-eliminating-poses-numpy-way/robot/pid_controller.py rename to ch-13/7-regenerating-full-numpy/robot/pid_controller.py diff --git a/ch-13/5-eliminating-poses-numpy-way/robot/pio_encoder.py b/ch-13/7-regenerating-full-numpy/robot/pio_encoder.py similarity index 100% rename from ch-13/5-eliminating-poses-numpy-way/robot/pio_encoder.py rename to ch-13/7-regenerating-full-numpy/robot/pio_encoder.py diff --git a/ch-13/5-eliminating-poses-numpy-way/robot/robot.py b/ch-13/7-regenerating-full-numpy/robot/robot.py similarity index 100% rename from ch-13/5-eliminating-poses-numpy-way/robot/robot.py rename to ch-13/7-regenerating-full-numpy/robot/robot.py