Wall following demo
This commit is contained in:
parent
4f7f3542bf
commit
e06501811b
5
.deploy/ch-10-3-follow-wall.sh
Executable file
5
.deploy/ch-10-3-follow-wall.sh
Executable file
@ -0,0 +1,5 @@
|
||||
.deploy/send-it.sh \
|
||||
ch-10/3-wall-follow/wall_follow.py \
|
||||
ch-10/3-wall-follow/robot.py \
|
||||
ch-10/3-wall-follow/pio_encoder.py \
|
||||
ch-10/3-wall-follow/pid.py
|
||||
@ -1,45 +1,54 @@
|
||||
import robot
|
||||
import time
|
||||
import robot
|
||||
import pid
|
||||
|
||||
robot.left_distance.distance_mode = 1
|
||||
class FollowObject:
|
||||
def __init__(self):
|
||||
self.max_speed = 0.9
|
||||
self.follow_pid = pid.PID(0.1, 0.1, 0.015, 15)
|
||||
self.last_time = time.monotonic_ns()
|
||||
self.left_dist = 0
|
||||
self.pid_output = 0
|
||||
|
||||
max_speed = 0.9
|
||||
set_point = 15
|
||||
def setup_robot(self):
|
||||
robot.left_distance.distance_mode = 1
|
||||
|
||||
robot.left_distance.start_ranging()
|
||||
|
||||
follow_pid = pid.PID(0.1, 0, 0)
|
||||
last_time = time.monotonic()
|
||||
print("Starting")
|
||||
try:
|
||||
while True:
|
||||
def movement_update(self):
|
||||
# do we have data
|
||||
if robot.left_distance.data_ready:
|
||||
left_dist = robot.left_distance.distance
|
||||
|
||||
# get error value
|
||||
error_value = left_dist - set_point
|
||||
self.left_dist = robot.left_distance.distance
|
||||
|
||||
# calculate time delta
|
||||
new_time = time.monotonic()
|
||||
time_delta = new_time - last_time
|
||||
last_time = new_time
|
||||
new_time = time.monotonic_ns()
|
||||
time_delta = new_time - self.last_time
|
||||
self.last_time = new_time
|
||||
|
||||
# get speeds from pid
|
||||
speed = min(max_speed, follow_pid.update(error_value, time_delta))
|
||||
speed = max(-max_speed, speed)
|
||||
self.pid_output = self.follow_pid.update(self.left_dist, time_delta)
|
||||
speed = min(self.max_speed, self.pid_output)
|
||||
speed = max(-self.max_speed, speed)
|
||||
|
||||
# make movements
|
||||
print(f"Dist: {left_dist}, Err: {error_value}, Speed: {speed}")
|
||||
robot.set_left(speed)
|
||||
robot.set_right(speed)
|
||||
|
||||
# reset and loop
|
||||
robot.left_distance.clear_interrupt()
|
||||
time.sleep(0.1)
|
||||
|
||||
finally:
|
||||
robot.stop()
|
||||
robot.left_distance.clear_interrupt()
|
||||
robot.left_distance.stop_ranging()
|
||||
def main_loop(self):
|
||||
robot.left_distance.start_ranging()
|
||||
self.last_time = time.monotonic()
|
||||
while True:
|
||||
self.movement_update()
|
||||
|
||||
def start(self):
|
||||
print("Starting")
|
||||
try:
|
||||
self.setup_robot()
|
||||
self.main_loop()
|
||||
finally:
|
||||
robot.stop()
|
||||
robot.left_distance.clear_interrupt()
|
||||
robot.left_distance.stop_ranging()
|
||||
|
||||
FollowObject().start()
|
||||
|
||||
@ -1,17 +1,21 @@
|
||||
class PID:
|
||||
def __init__(self, proportional_k, integral_k, differential_k) -> None:
|
||||
def __init__(self, proportional_k, integral_k, differential_k, set_point):
|
||||
self.proportional_k = proportional_k
|
||||
self.integral_k = integral_k
|
||||
self.differential_k = differential_k
|
||||
self.set_point = set_point
|
||||
|
||||
self.integral = 0
|
||||
self.error_sum = 0
|
||||
self.last_value = 0
|
||||
|
||||
def update(self, error_value, time_delta):
|
||||
def update(self, measurement, time_delta):
|
||||
error_value = measurement - self.set_point
|
||||
proportional = error_value * self.proportional_k
|
||||
|
||||
self.integral += error_value * time_delta
|
||||
integral = self.integral * self.integral_k
|
||||
# calculate integral
|
||||
self.error_sum += error_value * time_delta
|
||||
integral = self.error_sum * self.integral_k
|
||||
self.last_value = error_value
|
||||
|
||||
differentiated_error = (error_value - self.last_value) / time_delta
|
||||
differential = differentiated_error * self.differential_k
|
||||
|
||||
@ -4,15 +4,15 @@ import json
|
||||
from adafruit_esp32spi import adafruit_esp32spi_wsgiserver
|
||||
from adafruit_wsgi.wsgi_app import WSGIApp
|
||||
|
||||
import pid
|
||||
import robot
|
||||
import robot_wifi
|
||||
import pid
|
||||
|
||||
|
||||
class FollowApp:
|
||||
def __init__(self) -> None:
|
||||
class FollowObject:
|
||||
def __init__(self):
|
||||
self.max_speed = 0.9
|
||||
self.follow_pid = pid.PID(0.1, 0.1, 0.015)
|
||||
self.follow_pid = pid.PID(0.1, 0.1, 0.015, 15)
|
||||
self.wifi = None
|
||||
self.server = None
|
||||
|
||||
@ -38,10 +38,9 @@ class FollowApp:
|
||||
print(f"IP Address is {ip_int}")
|
||||
|
||||
def index(self, request):
|
||||
print("Handling request")
|
||||
return 200, [('Content-Type', 'application/json')], [json.dumps(
|
||||
{
|
||||
"last_value": self.pid.last_value,
|
||||
"last_value": self.follow_pid.last_value,
|
||||
"pid_output": self.pid_output,
|
||||
"time": self.last_time
|
||||
}
|
||||
@ -59,12 +58,9 @@ class FollowApp:
|
||||
|
||||
# get speeds from pid
|
||||
self.pid_output = self.follow_pid.update(self.left_dist, time_delta)
|
||||
speed = min(self.max_speed, self.pid_output)
|
||||
speed = max(-self.max_speed, speed)
|
||||
speed = self.pid_output * self.max_speed
|
||||
|
||||
# make movements
|
||||
if abs(speed) < 0.3:
|
||||
speed = 0
|
||||
robot.set_left(speed)
|
||||
robot.set_right(speed)
|
||||
|
||||
@ -98,4 +94,4 @@ class FollowApp:
|
||||
robot.left_distance.clear_interrupt()
|
||||
robot.left_distance.stop_ranging()
|
||||
|
||||
FollowApp().start()
|
||||
FollowObject().start()
|
||||
|
||||
@ -17,7 +17,11 @@ class SensorStream:
|
||||
self.times = []
|
||||
|
||||
def sensor_stream(self, frame):
|
||||
response = requests.get(url, timeout=1)
|
||||
try:
|
||||
response = requests.get(url, timeout=1)
|
||||
except (requests.exceptions.ConnectTimeout, requests.exceptions.ReadTimeout, requests.exceptions.ConnectionError):
|
||||
print("Waiting...")
|
||||
return
|
||||
print(f"Content: {response.content}")
|
||||
print(f"status: {response.status_code}")
|
||||
|
||||
|
||||
@ -1,3 +1,6 @@
|
||||
from black import err
|
||||
|
||||
|
||||
class PID:
|
||||
def __init__(self, proportional_k, integral_k, differential_k, set_point):
|
||||
self.proportional_k = proportional_k
|
||||
@ -10,6 +13,7 @@ class PID:
|
||||
self.min_output = -1
|
||||
self.max_output = 1
|
||||
|
||||
self.dead_zone = 0.3
|
||||
|
||||
def update(self, measurement, time_delta):
|
||||
error_value = measurement - self.set_point
|
||||
@ -25,10 +29,14 @@ class PID:
|
||||
|
||||
differentiated_error = (error_value - self.last_value) / time_delta
|
||||
differential = differentiated_error * self.differential_k
|
||||
self.last_value = error_value
|
||||
|
||||
output = proportional + integral + differential
|
||||
# clamp output
|
||||
output = min(self.max_output, output)
|
||||
output = max(self.min_output, output)
|
||||
if abs(output) < self.dead_zone:
|
||||
output = 0
|
||||
else:
|
||||
output = min(self.max_output, output)
|
||||
output = max(self.min_output, output)
|
||||
|
||||
return output
|
||||
|
||||
@ -1,6 +1,4 @@
|
||||
""" Turn JSON data stream into graphs"""
|
||||
from itertools import count
|
||||
|
||||
import requests
|
||||
|
||||
import matplotlib.pyplot as plt
|
||||
@ -11,39 +9,40 @@ url = 'http://192.168.1.128'
|
||||
|
||||
class SensorStream:
|
||||
def __init__(self) -> None:
|
||||
self.index = count()
|
||||
self.reset()
|
||||
|
||||
def reset(self):
|
||||
self.error_values = []
|
||||
self.sensor_values = []
|
||||
self.pid_outputs = []
|
||||
self.x_values = []
|
||||
self.times = []
|
||||
|
||||
self.set_point = requests.get(f"{url}/set_point").json()
|
||||
def sensor_stream(self, frame):
|
||||
response = requests.get(url, timeout=1)
|
||||
print(f"Content: {response.content}")
|
||||
print(f"status: {response.status_code}")
|
||||
|
||||
def sensor_stream(self):
|
||||
item = requests.get(url).json()
|
||||
print(f"Received: {item.decode('utf-8')}")
|
||||
self.x_vals.append(next(self.index))
|
||||
self.error_values.append(item['error_value'])
|
||||
self.sensor_values.append(item['sensor_value'])
|
||||
item = response.json()
|
||||
|
||||
print(f"Received: {item}")
|
||||
if self.times and item['time'] < self.times[-1]:
|
||||
self.reset()
|
||||
self.times.append(item['time'])
|
||||
self.error_values.append(item['last_value'])
|
||||
self.pid_outputs.append(item['pid_output'])
|
||||
|
||||
if len(self.x_vals) > 100:
|
||||
self.x_vals = self.x_vals[-100:]
|
||||
|
||||
if len(self.times) > 100:
|
||||
self.times = self.times[-100:]
|
||||
self.error_values = self.error_values[-100:]
|
||||
self.sensor_values = self.sensor_values[-100:]
|
||||
self.pid_outputs = self.pid_outputs[-100:]
|
||||
|
||||
plt.cla() # clear axes.
|
||||
# plot the items
|
||||
plt.plot(self.x_vals, self.error_values, label="error")
|
||||
plt.plot(self.x_vals, self.sensor_values, label="sensor")
|
||||
plt.plot(self.x_vals, self.pid_outputs, label="pid")
|
||||
plt.plot(self.times, self.error_values, label="error")
|
||||
plt.plot(self.times, self.pid_outputs, label="pid")
|
||||
|
||||
plt.legend(loc='upper right')
|
||||
|
||||
def start(self):
|
||||
plt.style.use('fivethirtyeight')
|
||||
# Create the animation. GFC - get current figure. random_stream - callback func.
|
||||
self.ani = FuncAnimation(plt.gcf(), self.sensor_stream, interval=200)
|
||||
plt.tight_layout()
|
||||
|
||||
@ -1,19 +1,39 @@
|
||||
class PID:
|
||||
def __init__(self, proportional_k, integral_k, differential_k) -> None:
|
||||
def __init__(self, proportional_k, integral_k, differential_k, set_point):
|
||||
self.proportional_k = proportional_k
|
||||
self.integral_k = integral_k
|
||||
self.differential_k = differential_k
|
||||
self.set_point = set_point
|
||||
|
||||
self.integral = 0
|
||||
self.error_sum = 0
|
||||
self.last_value = 0
|
||||
self.min_output = -1
|
||||
self.max_output = 1
|
||||
|
||||
def update(self, error_value, time_delta):
|
||||
self.dead_zone = 0.3
|
||||
|
||||
def update(self, measurement, time_delta):
|
||||
error_value = measurement - self.set_point
|
||||
proportional = error_value * self.proportional_k
|
||||
|
||||
self.integral += error_value * time_delta
|
||||
integral = self.integral * self.integral_k
|
||||
# calculate integral
|
||||
self.error_sum += error_value * time_delta
|
||||
# clamp it
|
||||
self.error_sum = min(self.max_output, self.error_sum)
|
||||
self.error_sum = max(self.min_output, self.error_sum)
|
||||
|
||||
integral = self.error_sum * self.integral_k
|
||||
|
||||
differentiated_error = (error_value - self.last_value) / time_delta
|
||||
differential = differentiated_error * self.differential_k
|
||||
self.last_value = error_value
|
||||
|
||||
return proportional + integral + differential
|
||||
output = proportional + integral + differential
|
||||
# clamp output
|
||||
if abs(output) < self.dead_zone:
|
||||
output = 0
|
||||
else:
|
||||
output = min(self.max_output, output)
|
||||
output = max(self.min_output, output)
|
||||
|
||||
return output
|
||||
|
||||
@ -10,28 +10,24 @@ import robot
|
||||
import robot_wifi
|
||||
|
||||
|
||||
app = WSGIApp()
|
||||
|
||||
class FollowWallApp:
|
||||
def __init__(self) -> None:
|
||||
self. self.speed = 0.6
|
||||
= 0.6
|
||||
self.speed = 0.6
|
||||
self.max_deflection = 0.4
|
||||
self.follow_pid = pid.PID(0.1, 0.5, 0, 15)
|
||||
self.follow_pid.dead_zone = 0.6
|
||||
|
||||
self.set_point = 15
|
||||
self.follow_pid = pid.PID(0.1, 0, 0)
|
||||
self.wifi = None
|
||||
self.server = None
|
||||
|
||||
self.last_time = 0
|
||||
self.last_time = time.monotonic_ns()
|
||||
self.left_dist = 0
|
||||
self.error_value = 0
|
||||
self.pid_output = 0
|
||||
|
||||
def setup_robot():
|
||||
def setup_robot(self):
|
||||
robot.left_distance.distance_mode = 1
|
||||
|
||||
def setup_wifi(self):
|
||||
def setup_wifi(self, app):
|
||||
print("Setting up wifi.")
|
||||
self.wifi, esp = robot_wifi.connect_to_wifi()
|
||||
self.server = adafruit_esp32spi_wsgiserver.WSGIServer(
|
||||
@ -45,43 +41,32 @@ class FollowWallApp:
|
||||
ip_int = ".".join(str(int(n)) for n in esp.ip_address)
|
||||
print(f"IP Address is {ip_int}")
|
||||
|
||||
@app.route("/")
|
||||
def index(self, request):
|
||||
return 200, [('Content-Type', 'application/json')], [json.dumps(
|
||||
{
|
||||
"error_value": self.error_value,
|
||||
"left_dist": self.left_dist,
|
||||
"last_value": self.follow_pid.last_value,
|
||||
"pid_output": self.pid_output,
|
||||
"last_time": self.last_time
|
||||
"time": self.last_time
|
||||
}
|
||||
)]
|
||||
|
||||
@app.route("/set_point")
|
||||
def set_point(self, request):
|
||||
return 200, [('Content-Type', 'application/json')], [json.dumps(self.set_point)]
|
||||
|
||||
def movement_update(self):
|
||||
# do we have data
|
||||
if robot.left_distance.data_ready:
|
||||
self.left_dist = robot.left_distance.distance
|
||||
|
||||
# get error value
|
||||
self.error_value = self.left_dist - self.set_point
|
||||
|
||||
# calculate time delta
|
||||
new_time = time.monotonic()
|
||||
new_time = time.monotonic_ns()
|
||||
time_delta = new_time - self.last_time
|
||||
self.last_time = new_time
|
||||
|
||||
# get turn from pid
|
||||
self.pid_output = self.follow_pid.update(self.error_value, time_delta)
|
||||
deflection = min(self.max_deflection, self.pid_output)
|
||||
deflection = max(-self.max_deflection, deflection)
|
||||
self.pid_output = self.follow_pid.update(self.left_dist, time_delta)
|
||||
deflection = self.pid_output * self.max_deflection
|
||||
|
||||
# make movements
|
||||
print(f"Dist: {self.left_dist}, Err: {self.error_value}, Deflection: {deflection}")
|
||||
robot.set_left(self.speed + deflection)
|
||||
robot.set_right(self.speed - deflection)
|
||||
robot.set_left(self.speed - deflection)
|
||||
robot.set_right(self.speed + deflection)
|
||||
|
||||
# reset and loop
|
||||
robot.left_distance.clear_interrupt()
|
||||
@ -93,7 +78,6 @@ class FollowWallApp:
|
||||
try:
|
||||
self.movement_update()
|
||||
self.server.update_poll()
|
||||
time.sleep(0.1)
|
||||
except RuntimeError as e:
|
||||
print(f"Server poll error: {type(e)}, {e}")
|
||||
robot.stop()
|
||||
@ -102,10 +86,12 @@ class FollowWallApp:
|
||||
print("Reset complete.")
|
||||
|
||||
def start(self):
|
||||
app = WSGIApp()
|
||||
app.route("/")(self.index)
|
||||
print("Starting")
|
||||
try:
|
||||
self.setup_robot()
|
||||
self.setup_wifi()
|
||||
self.setup_wifi(app)
|
||||
self.main_loop()
|
||||
finally:
|
||||
robot.stop()
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user