Chapter 11 code changes - async and bluetooth

This commit is contained in:
Danny Staple 2022-08-09 21:49:35 +01:00
parent 6b4660eb23
commit ad3f30b559
10 changed files with 270 additions and 109 deletions

View File

@ -1,73 +0,0 @@
import time
import json
from adafruit_esp32spi import adafruit_esp32spi_wsgiserver
from adafruit_wsgi.wsgi_app import WSGIApp
import robot
import robot_wifi
class SpeedCountApp:
def __init__(self):
self.intended_speed = 0.9
self.last_time = time.monotonic()
self.wifi = None
self.server = None
def setup_wifi(self, app):
print("Setting up wifi.")
self.wifi, esp = robot_wifi.connect_to_wifi()
self.server = adafruit_esp32spi_wsgiserver.WSGIServer(80, application=app)
adafruit_esp32spi_wsgiserver.set_interface(esp)
print("Starting server")
self.server.start()
ip_int = ".".join(str(int(n)) for n in esp.ip_address)
print(f"IP Address is {ip_int}")
def index(self, request):
new_time = time.monotonic()
time_delta = new_time - self.last_time
self.last_time = new_time
left_speed = robot.left_encoder.get_speed(time_delta)
right_speed = robot.right_encoder.get_speed(time_delta)
return (
200,
[("Content-Type", "application/json")],
[
json.dumps(
{
"left_speed": left_speed,
"right_speed": right_speed,
"time": self.last_time,
}
)
],
)
def main_loop(self):
robot.set_left(self.intended_speed)
robot.set_right(self.intended_speed)
while True:
try:
self.server.update_poll()
except RuntimeError as e:
print(f"Server poll error: {type(e)}, {e}")
print(f"Resetting ESP...")
self.wifi.reset()
print("Reset complete.")
def start(self):
app = WSGIApp()
app.route("/")(self.index)
print("Starting")
try:
self.setup_wifi(app)
self.main_loop()
finally:
robot.stop()
SpeedCountApp().start()

View File

@ -1,25 +0,0 @@
import board
import busio
from digitalio import DigitalInOut
from adafruit_esp32spi import adafruit_esp32spi
from adafruit_esp32spi import adafruit_esp32spi_wifimanager
try:
from secrets import secrets
except ImportError:
print("WiFi secrets are kept in secrets.py, please add them there!")
raise
def connect_to_wifi():
esp32_cs = DigitalInOut(board.GP10)
esp32_ready = DigitalInOut(board.GP9)
esp32_reset = DigitalInOut(board.GP8)
spi = busio.SPI(board.GP14, MOSI=board.GP11, MISO=board.GP12)
esp = adafruit_esp32spi.ESP_SPIcontrol(spi, esp32_cs, esp32_ready, esp32_reset)
esp.reset()
wifi = adafruit_esp32spi_wifimanager.ESPSPI_WiFiManager(esp, secrets)
wifi.connect()
return wifi, esp

View File

@ -0,0 +1,47 @@
import asyncio
import board
import busio
import robot
uart = busio.UART(board.GP12, board.GP13, baudrate=9600)
class Settings:
speed = 0.7
time_interval = 0.2
async def motor_speed_loop():
left_last, right_last = robot.left_encoder.read(), robot.right_encoder.read()
while True:
await asyncio.sleep(Settings.time_interval)
left_new, right_new = robot.left_encoder.read(), robot.right_encoder.read()
left_speed = robot.ticks_to_mm(left_new - left_last) / Settings.time_interval
left_last = left_new
right_speed = robot.ticks_to_mm(right_new - right_last) / Settings.time_interval
right_last = right_new
uart.write(f"{left_speed:.3f},{right_speed:.3f},0\n".encode())
async def stop_motors_after(seconds):
await asyncio.sleep(seconds)
robot.stop()
async def command_handler():
while True:
if uart.in_waiting:
command = uart.readline().decode().strip()
if command.startswith("M"):
speed = float(command[1:])
elif command.startswith("T"):
Settings.time_interval = float(command[1:])
elif command == "O":
robot.stop()
elif command.startswith("O"):
robot.set_left(Settings.speed)
robot.set_right(Settings.speed)
asyncio.create_task(stop_motors_after(float(command[1:])))
elif command.startswith("?"):
uart.write(f"M{Settings.speed:.1f}\n".encode())
uart.write(f"T{Settings.time_interval:.1f}\n".encode())
await asyncio.sleep(3)
await asyncio.sleep(0)
asyncio.create_task(motor_speed_loop())
asyncio.run(command_handler())

View File

@ -0,0 +1,20 @@
class PIDController:
def __init__(self, kp, ki, kd, d_filter_gain=0.1):
self.kp = kp
self.ki = ki
self.kd = kd
self.d_filter_gain = d_filter_gain
self.integral = 0
self.error_prev = 0
self.derivative = 0
def calculate(self, error, dt):
self.integral += error * dt
# Add a low pass filter to the difference
difference = (error - self.error_prev) * self.d_filter_gain
self.error_prev += difference
self.derivative = difference / dt
return self.kp * error + self.ki * self.integral + self.kd * self.derivative

View File

@ -67,7 +67,6 @@ class QuadratureEncoder:
)
self.reversed = reversed
self._buffer = array.array("i", [0])
self.previous_reading = 0
def read(self):
while self.sm.in_waiting:
@ -76,9 +75,3 @@ class QuadratureEncoder:
return -self._buffer[0]
else:
return self._buffer[0]
def get_speed(self, delta_time):
new_read = self.read()
distance = new_read - self.previous_reading
self.previous_reading = new_read
return distance / delta_time

View File

@ -3,12 +3,20 @@ import pwmio
import pio_encoder
import busio
import adafruit_vl53l1x
import math
wheel_diameter_mm = 70
wheel_circumference_mm = math.pi * wheel_diameter_mm
ticks_per_revolution = 2800
ticks_to_mm_const = wheel_circumference_mm / ticks_per_revolution
motor_A1 = pwmio.PWMOut(board.GP17)
motor_A2 = pwmio.PWMOut(board.GP16)
motor_B1 = pwmio.PWMOut(board.GP18)
motor_B2 = pwmio.PWMOut(board.GP19)
def ticks_to_mm(ticks):
return ticks_to_mm_const * ticks
motor_A1 = pwmio.PWMOut(board.GP17, frequency=100)
motor_A2 = pwmio.PWMOut(board.GP16, frequency=100)
motor_B1 = pwmio.PWMOut(board.GP18, frequency=100)
motor_B2 = pwmio.PWMOut(board.GP19, frequency=100)
right_motor = motor_A1, motor_A2
left_motor = motor_B1, motor_B2

View File

@ -0,0 +1,34 @@
import time
import board
import busio
import robot
uart = busio.UART(board.GP12, board.GP13, baudrate=9600)
speed = 0.7
stop_time = 0
while True:
current_time = time.monotonic()
if current_time > stop_time:
robot.set_left(0)
robot.set_right(0)
left_value = robot.left_encoder.read()
right_value = robot.right_encoder.read()
uart.write(f"{left_value:.3f},{right_value:.3f}\n".encode() )
time.sleep(0.02)
if uart.in_waiting:
command = uart.readline().decode().strip()
if command.startswith("M"):
speed = float(command[1:])
elif command == "O":
stop_time = 0
robot.set_left(0)
robot.set_right(0)
elif command.startswith("O"):
stop_time = float(command[1:]) + current_time
robot.set_left(speed)
robot.set_right(speed)
elif command.startswith("?"):
uart.write(f"M{speed:.1f}\n".encode())
time.sleep(3)

View File

@ -0,0 +1,20 @@
class PIDController:
def __init__(self, kp, ki, kd, d_filter_gain=0.1):
self.kp = kp
self.ki = ki
self.kd = kd
self.d_filter_gain = d_filter_gain
self.integral = 0
self.error_prev = 0
self.derivative = 0
def calculate(self, error, dt):
self.integral += error * dt
# Add a low pass filter to the difference
difference = (error - self.error_prev) * self.d_filter_gain
self.error_prev += difference
self.derivative = difference / dt
return self.kp * error + self.ki * self.integral + self.kd * self.derivative

View File

@ -0,0 +1,77 @@
import rp2pio
import adafruit_pioasm
import array
program = """
; use the osr for count
; input pins c1 c2
set y, 0 ; clear y
mov osr, y ; and clear osr
read:
; x will be the old value
; y the new values
mov x, y ; store old Y in x
in null, 32 ; Clear ISR - using y
in pins, 2 ; read two pins into y
mov y, isr
jmp x!=y, different ; Jump if its different
jmp read ; otherwise loop back to read
different:
; x has old value, y has new.
; extract the upper bit of X.
in x, 31 ; get bit 31 - old p1 (remember which direction it came in)
in null, 31 ; keep only 1 bit
mov x, isr ; put this back in x
jmp !x, c1_old_zero
c1_old_not_zero:
jmp pin, count_up
jmp count_down
c1_old_zero:
jmp pin, count_down
; fall through
count_up:
; for a clockwise move - we'll add 1 by inverting
mov x, ~ osr ; store inverted OSR on x
jmp x--, fake ; use jump to take off 1
fake:
mov x, ~ x ; invert back
jmp send
count_down:
; for a clockwise move, just take one off
mov x, osr ; store osr in x
jmp x--, send ; dec and send
send:
; send x.
mov isr, x ; send it
push noblock ; put ISR into input FIFO
mov osr, x ; put X back in OSR
jmp read ; loop back
"""
assembled = adafruit_pioasm.assemble(program)
class QuadratureEncoder:
def __init__(self, first_pin, second_pin, reversed=False):
"""Encoder with 2 pins. Must use sequential pins on the board"""
self.sm = rp2pio.StateMachine(
assembled,
frequency=0,
first_in_pin=first_pin,
jmp_pin=second_pin,
in_pin_count=2,
)
self.reversed = reversed
self._buffer = array.array("i", [0])
def read(self):
while self.sm.in_waiting:
self.sm.readinto(self._buffer)
if self.reversed:
return -self._buffer[0]
else:
return self._buffer[0]

View File

@ -0,0 +1,60 @@
import board
import pwmio
import pio_encoder
import busio
import adafruit_vl53l1x
import math
wheel_diameter_mm = 70
wheel_circumference_mm = math.pi * wheel_diameter_mm
ticks_per_revolution = 2800
ticks_to_mm_const = wheel_circumference_mm / ticks_per_revolution
def ticks_to_mm(ticks):
return ticks_to_mm_const * ticks
motor_A1 = pwmio.PWMOut(board.GP17, frequency=100)
motor_A2 = pwmio.PWMOut(board.GP16, frequency=100)
motor_B1 = pwmio.PWMOut(board.GP18, frequency=100)
motor_B2 = pwmio.PWMOut(board.GP19, frequency=100)
right_motor = motor_A1, motor_A2
left_motor = motor_B1, motor_B2
right_encoder = pio_encoder.QuadratureEncoder(board.GP20, board.GP21, reversed=True)
left_encoder = pio_encoder.QuadratureEncoder(board.GP26, board.GP27)
i2c0 = busio.I2C(sda=board.GP0, scl=board.GP1)
i2c1 = busio.I2C(sda=board.GP2, scl=board.GP3)
left_distance = adafruit_vl53l1x.VL53L1X(i2c0)
right_distance = adafruit_vl53l1x.VL53L1X(i2c1)
def stop():
motor_A1.duty_cycle = 0
motor_A2.duty_cycle = 0
motor_B1.duty_cycle = 0
motor_B2.duty_cycle = 0
def set_speed(motor, speed):
# Swap motor pins if we reverse the speed
if speed < 0:
direction = motor[1], motor[0]
speed = -speed
else:
direction = motor
speed = min(speed, 1) # limit to 1.0
max_speed = 2 ** 16 - 1
direction[0].duty_cycle = int(max_speed * speed)
direction[1].duty_cycle = 0
def set_left(speed):
set_speed(left_motor, speed)
def set_right(speed):
set_speed(right_motor, speed)