From 66738da6d9433be998c21379595174ffa1884947 Mon Sep 17 00:00:00 2001 From: Danny Staple Date: Sun, 18 Dec 2022 15:06:45 +0000 Subject: [PATCH] Resampling and angle calculation fixes. --- .../computer/display_with_motion_graph.py | 19 ++---- ch-13/full-version/robot/code.py | 63 ++++++++++--------- 2 files changed, 40 insertions(+), 42 deletions(-) diff --git a/ch-13/full-version/computer/display_with_motion_graph.py b/ch-13/full-version/computer/display_with_motion_graph.py index 1c5464c..6486518 100644 --- a/ch-13/full-version/computer/display_with_motion_graph.py +++ b/ch-13/full-version/computer/display_with_motion_graph.py @@ -13,8 +13,8 @@ class RobotDisplay: self.line = "" self.arena = {} self.display_closed = False - self.fig, (self.ax1, self.ax2) = plt.subplots(nrows=2) - self.poses = np.zeros([200, 3], dtype=np.int16) + self.fig, self.ax = plt.subplots() + self.poses = np.zeros([200, 2], dtype=np.int16) self.motion = np.zeros([200, 3], dtype=float) def handle_close(self, _): @@ -35,24 +35,17 @@ class RobotDisplay: if "arena" in message: self.arena = message if "poses" in message: - print(message) incoming_poses = np.array(message["poses"], dtype=np.int16) - self.poses[message["offset"]: message["offset"] + incoming_poses.shape[0]] = incoming_poses - if "motion" in message: - self.motion = np.roll(self.motion, 1, axis=0) - self.motion[0] = [message["motion"]["rot1"], message["motion"]["trans"], message["motion"]["rot2"]] - print(self.motion[1]) + self.poses = incoming_poses def draw(self): - self.ax1.clear() + self.ax.clear() if self.arena: for line in self.arena["arena"]: - self.ax1.plot( + self.ax.plot( [line[0][0], line[1][0]], [line[0][1], line[1][1]], color="black" ) - self.ax1.scatter(self.poses[:,0], self.poses[:,1], color="blue") - self.ax2.clear() - self.ax2.plot(np.arange(200), self.motion) + self.ax.scatter(self.poses[:,0], self.poses[:,1], color="blue") async def send_command(self, command): diff --git a/ch-13/full-version/robot/code.py b/ch-13/full-version/robot/code.py index cc4ae85..0caabdd 100644 --- a/ch-13/full-version/robot/code.py +++ b/ch-13/full-version/robot/code.py @@ -31,12 +31,28 @@ def get_triangular_sample(mean, standard_deviation): return mean + base +def convert_to_standard_position(true_bearing): + standard_position = 90 - true_bearing + if standard_position > 180: + standard_position -= 360 + elif standard_position < -180: + standard_position += 360 + return standard_position + + def send_json(data): robot.uart.write((json.dumps(data) + "\n").encode()) + +def send_poses(samples): + send_json({ + "poses": samples[:,:2].tolist(), + }) + + class Simulation: def __init__(self): - self.population_size = 100 + self.population_size = 300 self.left_distance = 100 self.right_distance = 100 self.imu_mix = 0.3 * 0.5 @@ -86,39 +102,36 @@ class Simulation: await asyncio.sleep(0) # weighted poses a numpy array of weights for each pose - weights = np.empty(self.poses.shape[0], dtype=np.float) + weights = np.zeros(self.poses.shape[0], dtype=np.float) for index in range(self.poses.shape[0]): # remove any that are outside the arena if not arena.point_is_inside_arena(self.poses[index,0], self.poses[index,1]): weights[index] = 0 - print("pose outside arena") continue # difference between this distance and the distance sensed is the error # weight is the inverse of the error weights[index] = arena.get_distance_grid_at_point(distance_sensor_left[index,0], distance_sensor_left[index,1]) - weights[index] += arena.get_distance_grid_at_point(distance_sensor_left[index,0], distance_sensor_left[index,1]) + weights[index] += arena.get_distance_grid_at_point(distance_sensor_right[index,0], distance_sensor_right[index,1]) await asyncio.sleep(0) #normalise the weights weights = weights / np.sum(weights) return weights - def resample(self, weights): - # based on the weights, resample the poses - # weights is a numpy array of weights - # resample is a numpy array of indices into the poses array + def resample(self, weights, sample_count): + """Return sample_count number of samples from the + poses, based on the weights array. + Uses low variance resampling""" samples = [] - # use low variance resampling - start = random.uniform(0, 1 / self.population_size) + start = random.uniform(0, 1 / sample_count) cumulative_weights = weights[0] source_index = 0 - for current_index in range(self.population_size): - sample_index = start + current_index / self.population_size - while sample_index > cumulative_weights: + for current_index in range(sample_count): + weight_index = start + current_index / sample_count + while weight_index > cumulative_weights: source_index += 1 cumulative_weights += weights[source_index] samples.append(source_index) - # set poses to the resampled poses - self.poses = np.array([self.poses[n] for n in samples]) + return np.array([self.poses[n] for n in samples]) def convert_odometry_to_motion(self, left_encoder_delta, right_encoder_delta): # convert odometry to motion @@ -139,6 +152,7 @@ class Simulation: # calculate the ICC radius = (robot.wheelbase_mm / 2) * (left_mm + right_mm) / (right_mm - left_mm) + ## arc length/radius = angle theta = (right_mm - left_mm) / robot.wheelbase_mm # For a small enough motion, assume that the chord length = arc length arc_length = theta * radius @@ -156,11 +170,6 @@ class Simulation: rot1, trans, rot2 = self.convert_odometry_to_motion( new_encoder_left - self.last_encoder_left, new_encoder_right - self.last_encoder_right) - send_json({"motion": { - "rot1": rot1, - "trans": trans, - "rot2": rot2 - }}) self.last_encoder_left = new_encoder_left self.last_encoder_right = new_encoder_right try: @@ -170,6 +179,8 @@ class Simulation: if new_heading: heading_change = self.last_heading - new_heading self.last_heading = new_heading + # convert heading from true bearing to standard position + heading_change = convert_to_standard_position(heading_change) # blend with the encoder heading changes rot1 = rot1 * self.encoder_mix + heading_change * self.imu_mix rot2 = rot2 * self.encoder_mix + heading_change * self.imu_mix @@ -181,8 +192,8 @@ class Simulation: rot2_model = np.array([get_triangular_sample(rot2, self.rotation_standard_dev) for _ in range(self.poses.shape[0])]) self.poses[:,2] += rot1_model rot1_radians = np.radians(self.poses[:,2]) - self.poses[:,0] += trans_model * np.sin(rot1_radians) - self.poses[:,1] += trans_model * np.cos(rot1_radians) + self.poses[:,0] += trans_model * np.cos(rot1_radians) + self.poses[:,1] += trans_model * np.sin(rot1_radians) self.poses[:,2] += rot2_model self.poses[:,2] = np.vectorize(lambda n: float(n % 360))(self.poses[:,2]) self.poses = np.array(self.poses, dtype=np.int16) @@ -214,13 +225,13 @@ class Simulation: try: while True: weights = await self.apply_sensor_model() + send_poses(self.resample(weights, 20)) self.poses = self.resample(weights, self.population_size) await self.motion_model() finally: robot.stop() - def read_command(): data = robot.uart.readline() try: @@ -253,12 +264,6 @@ async def updater(simulation): } } ) - for n in range(0, simulation.poses.shape[0], 10): - send_json({ - "poses": simulation.poses[n:n+10].tolist(), - "offset": n, - }) - await asyncio.sleep(0.01) await asyncio.sleep(0.5)