Updated speed control

This commit is contained in:
Danny Staple 2022-08-22 23:39:25 +01:00
parent c986d25bf5
commit 1b71aecddc
7 changed files with 160 additions and 188 deletions

View File

@ -0,0 +1,102 @@
import asyncio
import board
import busio
import robot
import time
import pid_controller
uart = busio.UART(board.GP12, board.GP13, baudrate=9600)
class Settings:
speed = 0.15
time_interval = 0.2
motors_enabled = False
class SpeedController:
def __init__(self, encoder, motor_fn) -> None:
self.encoder = encoder
self.motor_fn = motor_fn
self.pid = pid_controller.PIDController(1, 6, 0)
self.reset()
def reset(self):
self.last_ticks = self.encoder.read()
self.error = 0
self.control_signal = 0
self.pid.reset()
def update(self, dt):
current_ticks = self.encoder.read()
speed_in_ticks = (current_ticks - self.last_ticks) / dt
self.last_ticks = current_ticks
speed_in_m_per_s = robot.ticks_to_mm(speed_in_ticks) / 1000
# calculate the error
self.error = (Settings.speed * Settings.motors_enabled) - speed_in_m_per_s
# calculate the control signal
self.control_signal = self.pid.calculate(self.error, dt)
self.motor_fn(self.control_signal)
left = SpeedController(robot.left_encoder, robot.set_left)
right = SpeedController(robot.right_encoder, robot.set_right)
async def motor_speed_loop():
last_time = time.monotonic()
while True:
await asyncio.sleep(Settings.time_interval)
current_time = time.monotonic()
dt = current_time - last_time
left.update(dt)
right.update(dt)
last_time = current_time
uart.write(f"0, {left.error:.2f},{right.error:.2f}\n".encode())
async def stop_motors_after(seconds):
await asyncio.sleep(seconds)
Settings.motors_enabled = False
# robot.stop()
async def command_handler():
while True:
if uart.in_waiting:
command = uart.readline().decode().strip()
# PID settings
if command.startswith("P"):
left.pid.kp = float(command[1:])
right.pid.kp = float(command[1:])
elif command.startswith("I"):
left.pid.ki = float(command[1:])
left.pid.reset()
right.pid.ki = float(command[1:])
right.pid.reset()
elif command.startswith("D"):
left.pid.kd = float(command[1:])
right.pid.kd = float(command[1:])
# Speed settings
elif command.startswith("M"):
Settings.speed = float(command[1:])
elif command.startswith("T"):
Settings.time_interval = float(command[1:])
# Start/stop commands
elif command == "O":
Settings.motors_enabled = False
elif command.startswith("O"):
asyncio.create_task(stop_motors_after(float(command[1:])))
Settings.motors_enabled = True
left.reset()
right.reset()
# Print settings
elif command.startswith("?"):
uart.write(f"M{Settings.speed:.1f}\n".encode())
uart.write(f"T{Settings.time_interval:.1f}\n".encode())
uart.write(f"P{left.pid.kp:.2f}:I{left.pid.ki:.2f}:D{left.pid.kd:.2f}\n".encode())
await asyncio.sleep(3)
await asyncio.sleep(0)
try:
asyncio.create_task(motor_speed_loop())
asyncio.run(command_handler())
finally:
robot.stop()

View File

@ -0,0 +1,27 @@
class PIDController:
def __init__(self, kp, ki, kd, d_filter_gain=0.1, imax=None, imin=None):
self.kp = kp
self.ki = ki
self.kd = kd
self.d_filter_gain = d_filter_gain
self.imax = imax
self.imin = imin
self.reset()
def reset(self):
self.integral = 0
self.error_prev = 0
self.derivative = 0
def calculate(self, error, dt):
self.integral += error * dt
if self.imax is not None and self.integral > self.imax:
self.integral = self.imax
if self.imin is not None and self.integral < self.imin:
self.integral = self.imin
# Add a low pass filter to the difference
difference = (error - self.error_prev) * self.d_filter_gain
self.error_prev += difference
self.derivative = difference / dt
return self.kp * error + self.ki * self.integral + self.kd * self.derivative

View File

@ -1,6 +1,8 @@
import rp2pio
import adafruit_pioasm
import array
import asyncio
program = """
; use the osr for count
@ -67,18 +69,16 @@ class QuadratureEncoder:
)
self.reversed = reversed
self._buffer = array.array("i", [0])
self.previous_reading = 0
asyncio.create_task(self.poll_loop())
async def poll_loop(self):
while True:
await asyncio.sleep(0)
while self.sm.in_waiting:
self.sm.readinto(self._buffer)
def read(self):
while self.sm.in_waiting:
self.sm.readinto(self._buffer)
if self.reversed:
return -self._buffer[0]
else:
return self._buffer[0]
def get_speed(self, delta_time):
new_read = self.read()
distance = new_read - self.previous_reading
self.previous_reading = new_read
return distance / delta_time

View File

@ -3,18 +3,29 @@ import pwmio
import pio_encoder
import busio
import adafruit_vl53l1x
import math
wheel_diameter_mm = 70
wheel_circumference_mm = math.pi * wheel_diameter_mm
gear_ratio = 298
encoder_poles = 28
ticks_per_revolution = encoder_poles * gear_ratio
ticks_to_mm_const = wheel_circumference_mm / ticks_per_revolution
motor_A1 = pwmio.PWMOut(board.GP17)
motor_A2 = pwmio.PWMOut(board.GP16)
motor_B1 = pwmio.PWMOut(board.GP18)
motor_B2 = pwmio.PWMOut(board.GP19)
def ticks_to_mm(ticks):
return ticks_to_mm_const * ticks
motor_A2 = pwmio.PWMOut(board.GP17, frequency=100)
motor_A1 = pwmio.PWMOut(board.GP16, frequency=100)
motor_B2 = pwmio.PWMOut(board.GP18, frequency=100)
motor_B1 = pwmio.PWMOut(board.GP19, frequency=100)
right_motor = motor_A1, motor_A2
left_motor = motor_B1, motor_B2
motor_dead_zone = 0.2
right_encoder = pio_encoder.QuadratureEncoder(board.GP20, board.GP21, reversed=True)
left_encoder = pio_encoder.QuadratureEncoder(board.GP26, board.GP27)
right_encoder = pio_encoder.QuadratureEncoder(board.GP20, board.GP21)
left_encoder = pio_encoder.QuadratureEncoder(board.GP26, board.GP27, reversed=True)
i2c0 = busio.I2C(sda=board.GP0, scl=board.GP1)
i2c1 = busio.I2C(sda=board.GP2, scl=board.GP3)
@ -31,6 +42,11 @@ def stop():
def set_speed(motor, speed):
# stop completely if in the dead zone
if abs(speed) < motor_dead_zone:
motor[0].duty_cycle = 0
motor[1].duty_cycle = 0
return
# Swap motor pins if we reverse the speed
if speed < 0:
direction = motor[1], motor[0]

View File

@ -1,39 +0,0 @@
class PID:
def __init__(self, proportional_k, integral_k, differential_k, set_point):
self.proportional_k = proportional_k
self.integral_k = integral_k
self.differential_k = differential_k
self.set_point = set_point
self.error_sum = 0
self.last_value = 0
self.min_output = -1
self.max_output = 1
self.dead_zone = 0.3
def update(self, measurement, time_delta):
error_value = measurement - self.set_point
proportional = error_value * self.proportional_k
# calculate integral
self.error_sum += error_value * time_delta
# clamp it
self.error_sum = min(self.max_output, self.error_sum)
self.error_sum = max(self.min_output, self.error_sum)
integral = self.error_sum * self.integral_k
differentiated_error = (error_value - self.last_value) / time_delta
differential = differentiated_error * self.differential_k
self.last_value = error_value
output = proportional + integral + differential
# clamp output
if abs(output) < self.dead_zone:
output = 0
else:
output = min(self.max_output, output)
output = max(self.min_output, output)
return output

View File

@ -1,25 +0,0 @@
import board
import busio
from digitalio import DigitalInOut
from adafruit_esp32spi import adafruit_esp32spi
from adafruit_esp32spi import adafruit_esp32spi_wifimanager
try:
from secrets import secrets
except ImportError:
print("WiFi secrets are kept in secrets.py, please add them there!")
raise
def connect_to_wifi():
esp32_cs = DigitalInOut(board.GP10)
esp32_ready = DigitalInOut(board.GP9)
esp32_reset = DigitalInOut(board.GP8)
spi = busio.SPI(board.GP14, MOSI=board.GP11, MISO=board.GP12)
esp = adafruit_esp32spi.ESP_SPIcontrol(spi, esp32_cs, esp32_ready, esp32_reset)
esp.reset()
wifi = adafruit_esp32spi_wifimanager.ESPSPI_WiFiManager(esp, secrets)
wifi.connect()
return wifi, esp

View File

@ -1,109 +0,0 @@
import time
import json
from adafruit_esp32spi import adafruit_esp32spi_wsgiserver
from adafruit_wsgi.wsgi_app import WSGIApp
import robot
import robot_wifi
import pid
class SpeedControlApp:
def __init__(self):
self.wifi = None
self.server = None
self.intended_speed = 0.9
self.last_time = time.monotonic()
self.speed_to_encoder_factor = 1/5100
self.left_speed_pid = pid.PID(0, -0.7, 0, self.intended_speed)
self.right_speed_pid = pid.PID(0, -0.7, 0, self.intended_speed)
self.left_pid_output = 0
self.right_pid_output = 0
self.left_speed = 0
self.right_speed = 0
def setup_wifi(self, app):
print("Setting up wifi.")
self.wifi, esp = robot_wifi.connect_to_wifi()
self.server = adafruit_esp32spi_wsgiserver.WSGIServer(80, application=app)
adafruit_esp32spi_wsgiserver.set_interface(esp)
print("Starting server")
self.server.start()
ip_int = ".".join(str(int(n)) for n in esp.ip_address)
print(f"IP Address is {ip_int}")
def update(self):
new_time = time.monotonic()
time_delta = new_time - self.last_time
self.last_time = new_time
self.left_speed = robot.left_encoder.get_speed(time_delta) * self.speed_to_encoder_factor
self.right_speed = robot.right_encoder.get_speed(time_delta) * self.speed_to_encoder_factor
self.left_pid_output = self.left_speed_pid.update(self.left_speed, time_delta)
self.right_pid_output = self.right_speed_pid.update(self.right_speed, time_delta)
# print({
# "left_speed": self.left_speed,
# "left_pid": self.left_pid_output,
# "right_speed": self.right_speed,
# "right_pid": self.right_pid_output,
# "time": self.last_time,
# "error_sum": self.left_speed_pid.error_sum
# })
# robot.set_left(self.left_pid_output)
# robot.set_right(self.right_pid_output)
def movement_generator(self):
while True:
self.update()
data = json.dumps(
{
"left_speed": self.left_speed,
"left_pid": self.left_pid_output,
"right_speed": self.right_speed,
"right_pid": self.right_pid_output,
"time": self.last_time,
}
) + "/n"
print(data)
yield data
def index(self, request):
return (
200,
[("Content-Type", "application/json")],
self.movement_generator(),
)
def main_loop(self):
while True:
try:
self.update()
# time.sleep(0.1)
self.server.update_poll()
except RuntimeError as e:
print(f"Server poll error: {type(e)}, {e}")
print(f"Resetting ESP...")
self.wifi.reset()
print("Reset complete.")
def start(self):
app = WSGIApp()
app.route("/")(self.index)
print("Starting")
try:
self.setup_wifi(app)
self.main_loop()
finally:
robot.stop()
SpeedControlApp().start()