Resampling and angle calculation fixes.

This commit is contained in:
Danny Staple 2022-12-18 15:06:45 +00:00
parent d896e2a80a
commit 66738da6d9
2 changed files with 40 additions and 42 deletions

View File

@ -13,8 +13,8 @@ class RobotDisplay:
self.line = ""
self.arena = {}
self.display_closed = False
self.fig, (self.ax1, self.ax2) = plt.subplots(nrows=2)
self.poses = np.zeros([200, 3], dtype=np.int16)
self.fig, self.ax = plt.subplots()
self.poses = np.zeros([200, 2], dtype=np.int16)
self.motion = np.zeros([200, 3], dtype=float)
def handle_close(self, _):
@ -35,24 +35,17 @@ class RobotDisplay:
if "arena" in message:
self.arena = message
if "poses" in message:
print(message)
incoming_poses = np.array(message["poses"], dtype=np.int16)
self.poses[message["offset"]: message["offset"] + incoming_poses.shape[0]] = incoming_poses
if "motion" in message:
self.motion = np.roll(self.motion, 1, axis=0)
self.motion[0] = [message["motion"]["rot1"], message["motion"]["trans"], message["motion"]["rot2"]]
print(self.motion[1])
self.poses = incoming_poses
def draw(self):
self.ax1.clear()
self.ax.clear()
if self.arena:
for line in self.arena["arena"]:
self.ax1.plot(
self.ax.plot(
[line[0][0], line[1][0]], [line[0][1], line[1][1]], color="black"
)
self.ax1.scatter(self.poses[:,0], self.poses[:,1], color="blue")
self.ax2.clear()
self.ax2.plot(np.arange(200), self.motion)
self.ax.scatter(self.poses[:,0], self.poses[:,1], color="blue")
async def send_command(self, command):

View File

@ -31,12 +31,28 @@ def get_triangular_sample(mean, standard_deviation):
return mean + base
def convert_to_standard_position(true_bearing):
standard_position = 90 - true_bearing
if standard_position > 180:
standard_position -= 360
elif standard_position < -180:
standard_position += 360
return standard_position
def send_json(data):
robot.uart.write((json.dumps(data) + "\n").encode())
def send_poses(samples):
send_json({
"poses": samples[:,:2].tolist(),
})
class Simulation:
def __init__(self):
self.population_size = 100
self.population_size = 300
self.left_distance = 100
self.right_distance = 100
self.imu_mix = 0.3 * 0.5
@ -86,39 +102,36 @@ class Simulation:
await asyncio.sleep(0)
# weighted poses a numpy array of weights for each pose
weights = np.empty(self.poses.shape[0], dtype=np.float)
weights = np.zeros(self.poses.shape[0], dtype=np.float)
for index in range(self.poses.shape[0]):
# remove any that are outside the arena
if not arena.point_is_inside_arena(self.poses[index,0], self.poses[index,1]):
weights[index] = 0
print("pose outside arena")
continue
# difference between this distance and the distance sensed is the error
# weight is the inverse of the error
weights[index] = arena.get_distance_grid_at_point(distance_sensor_left[index,0], distance_sensor_left[index,1])
weights[index] += arena.get_distance_grid_at_point(distance_sensor_left[index,0], distance_sensor_left[index,1])
weights[index] += arena.get_distance_grid_at_point(distance_sensor_right[index,0], distance_sensor_right[index,1])
await asyncio.sleep(0)
#normalise the weights
weights = weights / np.sum(weights)
return weights
def resample(self, weights):
# based on the weights, resample the poses
# weights is a numpy array of weights
# resample is a numpy array of indices into the poses array
def resample(self, weights, sample_count):
"""Return sample_count number of samples from the
poses, based on the weights array.
Uses low variance resampling"""
samples = []
# use low variance resampling
start = random.uniform(0, 1 / self.population_size)
start = random.uniform(0, 1 / sample_count)
cumulative_weights = weights[0]
source_index = 0
for current_index in range(self.population_size):
sample_index = start + current_index / self.population_size
while sample_index > cumulative_weights:
for current_index in range(sample_count):
weight_index = start + current_index / sample_count
while weight_index > cumulative_weights:
source_index += 1
cumulative_weights += weights[source_index]
samples.append(source_index)
# set poses to the resampled poses
self.poses = np.array([self.poses[n] for n in samples])
return np.array([self.poses[n] for n in samples])
def convert_odometry_to_motion(self, left_encoder_delta, right_encoder_delta):
# convert odometry to motion
@ -139,6 +152,7 @@ class Simulation:
# calculate the ICC
radius = (robot.wheelbase_mm / 2) * (left_mm + right_mm) / (right_mm - left_mm)
## arc length/radius = angle
theta = (right_mm - left_mm) / robot.wheelbase_mm
# For a small enough motion, assume that the chord length = arc length
arc_length = theta * radius
@ -156,11 +170,6 @@ class Simulation:
rot1, trans, rot2 = self.convert_odometry_to_motion(
new_encoder_left - self.last_encoder_left,
new_encoder_right - self.last_encoder_right)
send_json({"motion": {
"rot1": rot1,
"trans": trans,
"rot2": rot2
}})
self.last_encoder_left = new_encoder_left
self.last_encoder_right = new_encoder_right
try:
@ -170,6 +179,8 @@ class Simulation:
if new_heading:
heading_change = self.last_heading - new_heading
self.last_heading = new_heading
# convert heading from true bearing to standard position
heading_change = convert_to_standard_position(heading_change)
# blend with the encoder heading changes
rot1 = rot1 * self.encoder_mix + heading_change * self.imu_mix
rot2 = rot2 * self.encoder_mix + heading_change * self.imu_mix
@ -181,8 +192,8 @@ class Simulation:
rot2_model = np.array([get_triangular_sample(rot2, self.rotation_standard_dev) for _ in range(self.poses.shape[0])])
self.poses[:,2] += rot1_model
rot1_radians = np.radians(self.poses[:,2])
self.poses[:,0] += trans_model * np.sin(rot1_radians)
self.poses[:,1] += trans_model * np.cos(rot1_radians)
self.poses[:,0] += trans_model * np.cos(rot1_radians)
self.poses[:,1] += trans_model * np.sin(rot1_radians)
self.poses[:,2] += rot2_model
self.poses[:,2] = np.vectorize(lambda n: float(n % 360))(self.poses[:,2])
self.poses = np.array(self.poses, dtype=np.int16)
@ -214,13 +225,13 @@ class Simulation:
try:
while True:
weights = await self.apply_sensor_model()
send_poses(self.resample(weights, 20))
self.poses = self.resample(weights, self.population_size)
await self.motion_model()
finally:
robot.stop()
def read_command():
data = robot.uart.readline()
try:
@ -253,12 +264,6 @@ async def updater(simulation):
}
}
)
for n in range(0, simulation.poses.shape[0], 10):
send_json({
"poses": simulation.poses[n:n+10].tolist(),
"offset": n,
})
await asyncio.sleep(0.01)
await asyncio.sleep(0.5)