Displaying a pose written up.
This commit is contained in:
parent
752c298f8d
commit
c14c83e286
@ -9,20 +9,19 @@ from robot_ble_connection import BleConnection
|
||||
class RobotDisplay:
|
||||
def __init__(self):
|
||||
self.ble_connection = BleConnection(self.handle_data)
|
||||
self.line = ""
|
||||
self.buffer = ""
|
||||
self.arena = {}
|
||||
self.display_closed = False
|
||||
self.closed = False
|
||||
self.fig, self.ax = plt.subplots()
|
||||
self.pose_coords = []
|
||||
self.pose_uv = []
|
||||
self.poses = None
|
||||
|
||||
def handle_close(self, _):
|
||||
self.display_closed = True
|
||||
self.closed = True
|
||||
|
||||
def handle_data(self, data):
|
||||
self.line += data.decode("utf-8")
|
||||
while "\n" in self.line:
|
||||
line, self.line = self.line.split("\n", 1)
|
||||
self.buffer += data.decode("utf-8")
|
||||
while "\n" in self.buffer:
|
||||
line, self.buffer = self.buffer.split("\n", 1)
|
||||
print(f"Received data: {line}")
|
||||
try:
|
||||
message = json.loads(line)
|
||||
@ -32,12 +31,7 @@ class RobotDisplay:
|
||||
if "arena" in message:
|
||||
self.arena = message
|
||||
if "poses" in message:
|
||||
# the robot poses are an array of [x, y, theta] arrays.
|
||||
# matplotlib quiver plots wants an [x] ,[y] and [angle] arrays
|
||||
poses = np.array(message["poses"]).T
|
||||
self.pose_coords = poses[:2]
|
||||
angle_rads = np.deg2rad(poses[2])
|
||||
self.pose_uv = np.array([np.cos(angle_rads), np.sin(angle_rads)])
|
||||
self.poses = np.array(message["poses"], dtype=np.int16)
|
||||
|
||||
def draw(self):
|
||||
self.ax.clear()
|
||||
@ -46,12 +40,8 @@ class RobotDisplay:
|
||||
self.ax.plot(
|
||||
[line[0][0], line[1][0]], [line[0][1], line[1][1]], color="black"
|
||||
)
|
||||
for line in self.arena["target_zone"]:
|
||||
self.ax.plot(
|
||||
[line[0][0], line[1][0]], [line[0][1], line[1][1]], color="red"
|
||||
)
|
||||
if len(self.pose_coords) > 0:
|
||||
self.ax.quiver(self.pose_coords[0], self.pose_coords[1], self.pose_uv[0], self.pose_uv[1], color="blue")
|
||||
if self.poses is not None:
|
||||
self.ax.scatter(self.poses[:,0], self.poses[:,1], color="blue")
|
||||
|
||||
async def main(self):
|
||||
plt.ion()
|
||||
@ -62,7 +52,7 @@ class RobotDisplay:
|
||||
await self.ble_connection.send_uart_data(request)
|
||||
self.fig.canvas.mpl_connect("close_event", self.handle_close)
|
||||
|
||||
while not self.display_closed:
|
||||
while not self.closed:
|
||||
self.draw()
|
||||
plt.draw()
|
||||
plt.pause(0.05)
|
||||
@ -1,5 +1,8 @@
|
||||
"""Represent the lines and target zone of the arena"""
|
||||
|
||||
"""Represent the lines of the arena"""
|
||||
try:
|
||||
from ulab import numpy as np
|
||||
except ImportError:
|
||||
import numpy as np
|
||||
|
||||
boundary_lines = [
|
||||
[(0,0), (0, 1500)],
|
||||
@ -10,13 +13,65 @@ boundary_lines = [
|
||||
[(1000, 0), (0, 0)],
|
||||
]
|
||||
|
||||
|
||||
target_zone = [
|
||||
[(1100, 900), (1100, 1100)],
|
||||
[(1100, 1100), (1250, 1100)],
|
||||
[(1250, 1100), (1250, 900)],
|
||||
[(1250, 900), (1100, 900)],
|
||||
]
|
||||
|
||||
width = 1500
|
||||
height = 1500
|
||||
|
||||
|
||||
grid_cell_size = 50
|
||||
overscan = 10 # 10 each way
|
||||
|
||||
|
||||
def get_distance_to_segment(x, y, segment):
|
||||
"""Return the distance from the point to the segment.
|
||||
Segment -> ((x1, y1), (x2, y2))
|
||||
All segments are horizontal or vertical.
|
||||
"""
|
||||
x1, y1 = segment[0]
|
||||
x2, y2 = segment[1]
|
||||
# if the segment is horizontal, the point will be closest to the y value of the segment
|
||||
if y1 == y2 and x >= min(x1, x2) and x <= max(x1, x2):
|
||||
return abs(y - y1)
|
||||
# if the segment is vertical, the point will be closest to the x value of the segment
|
||||
if x1 == x2 and y >= min(y1, y2) and y <= max(y1, y2):
|
||||
return abs(x - x1)
|
||||
# the point will be closest to one of the end points
|
||||
return np.sqrt(
|
||||
min(
|
||||
(x - x1) ** 2 + (y - y1) ** 2,
|
||||
(x - x2) ** 2 + (y - y2) ** 2
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
def get_distance_likelihood(x, y):
|
||||
"""Return the distance from the point to the nearest segment as a decay function."""
|
||||
min_distance = None
|
||||
for segment in boundary_lines:
|
||||
distance = get_distance_to_segment(x, y, segment)
|
||||
if min_distance is None or distance < min_distance:
|
||||
min_distance = distance
|
||||
return 1.0 / (1 + min_distance/250) ** 2
|
||||
|
||||
|
||||
grid_cell_size = 50
|
||||
overscan = 10 # 10 each way
|
||||
|
||||
# beam endpoint model
|
||||
def make_distance_grid():
|
||||
"""Take the boundary lines. With and overscan of 10 cells, and grid cell size of 5cm (50mm),
|
||||
make a grid of the distance to the nearest boundary line.
|
||||
"""
|
||||
grid = np.zeros((
|
||||
width // grid_cell_size + 2 * overscan,
|
||||
height // grid_cell_size + 2 * overscan
|
||||
), dtype=np.float)
|
||||
for x in range(grid.shape[0]):
|
||||
column_x = x * grid_cell_size - (overscan * grid_cell_size)
|
||||
for y in range(grid.shape[1]):
|
||||
row_y = y * grid_cell_size - (overscan * grid_cell_size)
|
||||
grid[x, y] = get_distance_likelihood(
|
||||
column_x, row_y
|
||||
)
|
||||
return grid
|
||||
|
||||
distance_grid = make_distance_grid()
|
||||
|
||||
@ -6,37 +6,51 @@ from ulab import numpy as np
|
||||
import arena
|
||||
import robot
|
||||
|
||||
class Simulation:
|
||||
def __init__(self):
|
||||
population_size = 10
|
||||
self.poses = np.empty((population_size, 3), dtype=np.float)
|
||||
for n in range(population_size):
|
||||
self.poses[n] = random.uniform(0, arena.width), random.uniform(0, arena.height), random.uniform(0, 360)
|
||||
|
||||
def send_json(data):
|
||||
robot.uart.write((json.dumps(data)+"\n").encode())
|
||||
robot.uart.write((json.dumps(data) + "\n").encode())
|
||||
|
||||
def read_json():
|
||||
try:
|
||||
data = robot.uart.readline()
|
||||
decoded = data.decode()
|
||||
return json.loads(decoded)
|
||||
except (UnicodeError, ValueError):
|
||||
print("Invalid data")
|
||||
return None
|
||||
|
||||
|
||||
class Simulation:
|
||||
def __init__(self):
|
||||
self.population_size = 20
|
||||
self.poses = np.array(
|
||||
[(
|
||||
int(random.uniform(0, arena.width)),
|
||||
int(random.uniform(0, arena.height)),
|
||||
int(random.uniform(0, 360))) for _ in range(self.population_size)],
|
||||
dtype=np.int16,
|
||||
)
|
||||
|
||||
def send_poses(samples):
|
||||
send_json({
|
||||
"poses": samples[:,:2].tolist(),
|
||||
})
|
||||
|
||||
async def command_handler(simulation):
|
||||
while True:
|
||||
if robot.uart.in_waiting:
|
||||
print("Receiving data...")
|
||||
try:
|
||||
data = robot.uart.readline().decode()
|
||||
request = json.loads(data)
|
||||
except (UnicodeError, ValueError):
|
||||
print("Invalid data")
|
||||
continue
|
||||
# {"command": "arena"}
|
||||
if request["command"] == "arena":
|
||||
send_json({
|
||||
"arena": arena.boundary_lines,
|
||||
"target_zone": arena.target_zone,
|
||||
})
|
||||
else:
|
||||
send_json({
|
||||
"poses": simulation.poses.tolist(),
|
||||
})
|
||||
await asyncio.sleep(0.1)
|
||||
print("Starting handler")
|
||||
while True:
|
||||
if robot.uart.in_waiting:
|
||||
request = read_json()
|
||||
if not request:
|
||||
continue
|
||||
print("Received: ", request)
|
||||
if request["command"] == "arena":
|
||||
send_json({
|
||||
"arena": arena.boundary_lines,
|
||||
})
|
||||
send_poses(simulation.poses)
|
||||
|
||||
simulation= Simulation()
|
||||
await asyncio.sleep(0.1)
|
||||
|
||||
|
||||
simulation = Simulation()
|
||||
asyncio.run(command_handler(simulation))
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user