Chapter 11 and 13 - wifi code (pre bluetooth)

This commit is contained in:
Danny Staple 2022-08-01 09:53:21 +01:00
parent 7eff69d680
commit 6b4660eb23
17 changed files with 623 additions and 56 deletions

View File

@ -0,0 +1,73 @@
import time
import json
from adafruit_esp32spi import adafruit_esp32spi_wsgiserver
from adafruit_wsgi.wsgi_app import WSGIApp
import robot
import robot_wifi
class SpeedCountApp:
def __init__(self):
self.intended_speed = 0.9
self.last_time = time.monotonic()
self.wifi = None
self.server = None
def setup_wifi(self, app):
print("Setting up wifi.")
self.wifi, esp = robot_wifi.connect_to_wifi()
self.server = adafruit_esp32spi_wsgiserver.WSGIServer(80, application=app)
adafruit_esp32spi_wsgiserver.set_interface(esp)
print("Starting server")
self.server.start()
ip_int = ".".join(str(int(n)) for n in esp.ip_address)
print(f"IP Address is {ip_int}")
def index(self, request):
new_time = time.monotonic()
time_delta = new_time - self.last_time
self.last_time = new_time
left_speed = robot.left_encoder.get_speed(time_delta)
right_speed = robot.right_encoder.get_speed(time_delta)
return (
200,
[("Content-Type", "application/json")],
[
json.dumps(
{
"left_speed": left_speed,
"right_speed": right_speed,
"time": self.last_time,
}
)
],
)
def main_loop(self):
robot.set_left(self.intended_speed)
robot.set_right(self.intended_speed)
while True:
try:
self.server.update_poll()
except RuntimeError as e:
print(f"Server poll error: {type(e)}, {e}")
print(f"Resetting ESP...")
self.wifi.reset()
print("Reset complete.")
def start(self):
app = WSGIApp()
app.route("/")(self.index)
print("Starting")
try:
self.setup_wifi(app)
self.main_loop()
finally:
robot.stop()
SpeedCountApp().start()

View File

@ -0,0 +1,84 @@
import rp2pio
import adafruit_pioasm
import array
program = """
; use the osr for count
; input pins c1 c2
set y, 0 ; clear y
mov osr, y ; and clear osr
read:
; x will be the old value
; y the new values
mov x, y ; store old Y in x
in null, 32 ; Clear ISR - using y
in pins, 2 ; read two pins into y
mov y, isr
jmp x!=y, different ; Jump if its different
jmp read ; otherwise loop back to read
different:
; x has old value, y has new.
; extract the upper bit of X.
in x, 31 ; get bit 31 - old p1 (remember which direction it came in)
in null, 31 ; keep only 1 bit
mov x, isr ; put this back in x
jmp !x, c1_old_zero
c1_old_not_zero:
jmp pin, count_up
jmp count_down
c1_old_zero:
jmp pin, count_down
; fall through
count_up:
; for a clockwise move - we'll add 1 by inverting
mov x, ~ osr ; store inverted OSR on x
jmp x--, fake ; use jump to take off 1
fake:
mov x, ~ x ; invert back
jmp send
count_down:
; for a clockwise move, just take one off
mov x, osr ; store osr in x
jmp x--, send ; dec and send
send:
; send x.
mov isr, x ; send it
push noblock ; put ISR into input FIFO
mov osr, x ; put X back in OSR
jmp read ; loop back
"""
assembled = adafruit_pioasm.assemble(program)
class QuadratureEncoder:
def __init__(self, first_pin, second_pin, reversed=False):
"""Encoder with 2 pins. Must use sequential pins on the board"""
self.sm = rp2pio.StateMachine(
assembled,
frequency=0,
first_in_pin=first_pin,
jmp_pin=second_pin,
in_pin_count=2,
)
self.reversed = reversed
self._buffer = array.array("i", [0])
self.previous_reading = 0
def read(self):
while self.sm.in_waiting:
self.sm.readinto(self._buffer)
if self.reversed:
return -self._buffer[0]
else:
return self._buffer[0]
def get_speed(self, delta_time):
new_read = self.read()
distance = new_read - self.previous_reading
self.previous_reading = new_read
return distance / delta_time

52
ch-11/1-encoder-speed/robot.py Executable file
View File

@ -0,0 +1,52 @@
import board
import pwmio
import pio_encoder
import busio
import adafruit_vl53l1x
motor_A1 = pwmio.PWMOut(board.GP17)
motor_A2 = pwmio.PWMOut(board.GP16)
motor_B1 = pwmio.PWMOut(board.GP18)
motor_B2 = pwmio.PWMOut(board.GP19)
right_motor = motor_A1, motor_A2
left_motor = motor_B1, motor_B2
right_encoder = pio_encoder.QuadratureEncoder(board.GP20, board.GP21, reversed=True)
left_encoder = pio_encoder.QuadratureEncoder(board.GP26, board.GP27)
i2c0 = busio.I2C(sda=board.GP0, scl=board.GP1)
i2c1 = busio.I2C(sda=board.GP2, scl=board.GP3)
left_distance = adafruit_vl53l1x.VL53L1X(i2c0)
right_distance = adafruit_vl53l1x.VL53L1X(i2c1)
def stop():
motor_A1.duty_cycle = 0
motor_A2.duty_cycle = 0
motor_B1.duty_cycle = 0
motor_B2.duty_cycle = 0
def set_speed(motor, speed):
# Swap motor pins if we reverse the speed
if speed < 0:
direction = motor[1], motor[0]
speed = -speed
else:
direction = motor
speed = min(speed, 1) # limit to 1.0
max_speed = 2 ** 16 - 1
direction[0].duty_cycle = int(max_speed * speed)
direction[1].duty_cycle = 0
def set_left(speed):
set_speed(left_motor, speed)
def set_right(speed):
set_speed(right_motor, speed)

View File

@ -0,0 +1,25 @@
import board
import busio
from digitalio import DigitalInOut
from adafruit_esp32spi import adafruit_esp32spi
from adafruit_esp32spi import adafruit_esp32spi_wifimanager
try:
from secrets import secrets
except ImportError:
print("WiFi secrets are kept in secrets.py, please add them there!")
raise
def connect_to_wifi():
esp32_cs = DigitalInOut(board.GP10)
esp32_ready = DigitalInOut(board.GP9)
esp32_reset = DigitalInOut(board.GP8)
spi = busio.SPI(board.GP14, MOSI=board.GP11, MISO=board.GP12)
esp = adafruit_esp32spi.ESP_SPIcontrol(spi, esp32_cs, esp32_ready, esp32_reset)
esp.reset()
wifi = adafruit_esp32spi_wifimanager.ESPSPI_WiFiManager(esp, secrets)
wifi.connect()
return wifi, esp

View File

@ -0,0 +1,39 @@
class PID:
def __init__(self, proportional_k, integral_k, differential_k, set_point):
self.proportional_k = proportional_k
self.integral_k = integral_k
self.differential_k = differential_k
self.set_point = set_point
self.error_sum = 0
self.last_value = 0
self.min_output = -1
self.max_output = 1
self.dead_zone = 0.3
def update(self, measurement, time_delta):
error_value = measurement - self.set_point
proportional = error_value * self.proportional_k
# calculate integral
self.error_sum += error_value * time_delta
# clamp it
self.error_sum = min(self.max_output, self.error_sum)
self.error_sum = max(self.min_output, self.error_sum)
integral = self.error_sum * self.integral_k
differentiated_error = (error_value - self.last_value) / time_delta
differential = differentiated_error * self.differential_k
self.last_value = error_value
output = proportional + integral + differential
# clamp output
if abs(output) < self.dead_zone:
output = 0
else:
output = min(self.max_output, output)
output = max(self.min_output, output)
return output

View File

@ -0,0 +1,84 @@
import rp2pio
import adafruit_pioasm
import array
program = """
; use the osr for count
; input pins c1 c2
set y, 0 ; clear y
mov osr, y ; and clear osr
read:
; x will be the old value
; y the new values
mov x, y ; store old Y in x
in null, 32 ; Clear ISR - using y
in pins, 2 ; read two pins into y
mov y, isr
jmp x!=y, different ; Jump if its different
jmp read ; otherwise loop back to read
different:
; x has old value, y has new.
; extract the upper bit of X.
in x, 31 ; get bit 31 - old p1 (remember which direction it came in)
in null, 31 ; keep only 1 bit
mov x, isr ; put this back in x
jmp !x, c1_old_zero
c1_old_not_zero:
jmp pin, count_up
jmp count_down
c1_old_zero:
jmp pin, count_down
; fall through
count_up:
; for a clockwise move - we'll add 1 by inverting
mov x, ~ osr ; store inverted OSR on x
jmp x--, fake ; use jump to take off 1
fake:
mov x, ~ x ; invert back
jmp send
count_down:
; for a clockwise move, just take one off
mov x, osr ; store osr in x
jmp x--, send ; dec and send
send:
; send x.
mov isr, x ; send it
push noblock ; put ISR into input FIFO
mov osr, x ; put X back in OSR
jmp read ; loop back
"""
assembled = adafruit_pioasm.assemble(program)
class QuadratureEncoder:
def __init__(self, first_pin, second_pin, reversed=False):
"""Encoder with 2 pins. Must use sequential pins on the board"""
self.sm = rp2pio.StateMachine(
assembled,
frequency=0,
first_in_pin=first_pin,
jmp_pin=second_pin,
in_pin_count=2,
)
self.reversed = reversed
self._buffer = array.array("i", [0])
self.previous_reading = 0
def read(self):
while self.sm.in_waiting:
self.sm.readinto(self._buffer)
if self.reversed:
return -self._buffer[0]
else:
return self._buffer[0]
def get_speed(self, delta_time):
new_read = self.read()
distance = new_read - self.previous_reading
self.previous_reading = new_read
return distance / delta_time

View File

@ -0,0 +1,52 @@
import board
import pwmio
import pio_encoder
import busio
import adafruit_vl53l1x
motor_A1 = pwmio.PWMOut(board.GP17)
motor_A2 = pwmio.PWMOut(board.GP16)
motor_B1 = pwmio.PWMOut(board.GP18)
motor_B2 = pwmio.PWMOut(board.GP19)
right_motor = motor_A1, motor_A2
left_motor = motor_B1, motor_B2
right_encoder = pio_encoder.QuadratureEncoder(board.GP20, board.GP21, reversed=True)
left_encoder = pio_encoder.QuadratureEncoder(board.GP26, board.GP27)
i2c0 = busio.I2C(sda=board.GP0, scl=board.GP1)
i2c1 = busio.I2C(sda=board.GP2, scl=board.GP3)
left_distance = adafruit_vl53l1x.VL53L1X(i2c0)
right_distance = adafruit_vl53l1x.VL53L1X(i2c1)
def stop():
motor_A1.duty_cycle = 0
motor_A2.duty_cycle = 0
motor_B1.duty_cycle = 0
motor_B2.duty_cycle = 0
def set_speed(motor, speed):
# Swap motor pins if we reverse the speed
if speed < 0:
direction = motor[1], motor[0]
speed = -speed
else:
direction = motor
speed = min(speed, 1) # limit to 1.0
max_speed = 2 ** 16 - 1
direction[0].duty_cycle = int(max_speed * speed)
direction[1].duty_cycle = 0
def set_left(speed):
set_speed(left_motor, speed)
def set_right(speed):
set_speed(right_motor, speed)

View File

@ -0,0 +1,25 @@
import board
import busio
from digitalio import DigitalInOut
from adafruit_esp32spi import adafruit_esp32spi
from adafruit_esp32spi import adafruit_esp32spi_wifimanager
try:
from secrets import secrets
except ImportError:
print("WiFi secrets are kept in secrets.py, please add them there!")
raise
def connect_to_wifi():
esp32_cs = DigitalInOut(board.GP10)
esp32_ready = DigitalInOut(board.GP9)
esp32_reset = DigitalInOut(board.GP8)
spi = busio.SPI(board.GP14, MOSI=board.GP11, MISO=board.GP12)
esp = adafruit_esp32spi.ESP_SPIcontrol(spi, esp32_cs, esp32_ready, esp32_reset)
esp.reset()
wifi = adafruit_esp32spi_wifimanager.ESPSPI_WiFiManager(esp, secrets)
wifi.connect()
return wifi, esp

View File

@ -0,0 +1,109 @@
import time
import json
from adafruit_esp32spi import adafruit_esp32spi_wsgiserver
from adafruit_wsgi.wsgi_app import WSGIApp
import robot
import robot_wifi
import pid
class SpeedControlApp:
def __init__(self):
self.wifi = None
self.server = None
self.intended_speed = 0.9
self.last_time = time.monotonic()
self.speed_to_encoder_factor = 1/5100
self.left_speed_pid = pid.PID(0, -0.7, 0, self.intended_speed)
self.right_speed_pid = pid.PID(0, -0.7, 0, self.intended_speed)
self.left_pid_output = 0
self.right_pid_output = 0
self.left_speed = 0
self.right_speed = 0
def setup_wifi(self, app):
print("Setting up wifi.")
self.wifi, esp = robot_wifi.connect_to_wifi()
self.server = adafruit_esp32spi_wsgiserver.WSGIServer(80, application=app)
adafruit_esp32spi_wsgiserver.set_interface(esp)
print("Starting server")
self.server.start()
ip_int = ".".join(str(int(n)) for n in esp.ip_address)
print(f"IP Address is {ip_int}")
def update(self):
new_time = time.monotonic()
time_delta = new_time - self.last_time
self.last_time = new_time
self.left_speed = robot.left_encoder.get_speed(time_delta) * self.speed_to_encoder_factor
self.right_speed = robot.right_encoder.get_speed(time_delta) * self.speed_to_encoder_factor
self.left_pid_output = self.left_speed_pid.update(self.left_speed, time_delta)
self.right_pid_output = self.right_speed_pid.update(self.right_speed, time_delta)
# print({
# "left_speed": self.left_speed,
# "left_pid": self.left_pid_output,
# "right_speed": self.right_speed,
# "right_pid": self.right_pid_output,
# "time": self.last_time,
# "error_sum": self.left_speed_pid.error_sum
# })
# robot.set_left(self.left_pid_output)
# robot.set_right(self.right_pid_output)
def movement_generator(self):
while True:
self.update()
data = json.dumps(
{
"left_speed": self.left_speed,
"left_pid": self.left_pid_output,
"right_speed": self.right_speed,
"right_pid": self.right_pid_output,
"time": self.last_time,
}
) + "/n"
print(data)
yield data
def index(self, request):
return (
200,
[("Content-Type", "application/json")],
self.movement_generator(),
)
def main_loop(self):
while True:
try:
self.update()
# time.sleep(0.1)
self.server.update_poll()
except RuntimeError as e:
print(f"Server poll error: {type(e)}, {e}")
print(f"Resetting ESP...")
self.wifi.reset()
print("Reset complete.")
def start(self):
app = WSGIApp()
app.route("/")(self.index)
print("Starting")
try:
self.setup_wifi(app)
self.main_loop()
finally:
robot.stop()
SpeedControlApp().start()

View File

@ -9,22 +9,28 @@ import robot_wifi
import pid
class FollowObject:
class AllSensorsApp:
def __init__(self):
self.max_speed = 0.7
self.follow_pid = pid.PID(0.1, 0.1, 0.015, 15)
self.wifi = None
self.server = None
self.last_time = time.monotonic()
self.intended_speed = 0.9
self.speed_to_encoder_factor = 1/5000
self.left_speed_pid = pid.PID(0, -0.7, 0, self.intended_speed)
self.right_speed_pid = pid.PID(0, -0.7, 0, self.intended_speed)
self.start_time = time.monotonic()
self.last_time = self.start_time
self.left_dist = 0
self.right_dist = 0
self.pid_output = 0
self.left_pid_output = 0
self.right_pid_output = 0
self.left_speed = 0
self.right_speed = 0
def setup_robot(self):
robot.left_distance.distance_mode = 1
# def setup_robot(self):
# robot.left_distance.distance_mode = 1
# robot.right_distance.distance_mode = 1
def setup_wifi(self, app):
print("Setting up wifi.")
@ -46,14 +52,14 @@ class FollowObject:
[
json.dumps(
{
"last_value": self.follow_pid.last_value,
"pid_output": self.pid_output,
"left_speed_pid": self.left_pid_output,
"right_speed_pid": self.right_pid_output,
"imu_z": imu_data[2],
"left_speed": self.left_speed,
"right_speed": self.right_speed,
"left_distance": self.left_dist,
"right_distance": self.right_dist,
"time": self.last_time,
"time": self.last_time - self.start_time,
}
)
],
@ -65,33 +71,32 @@ class FollowObject:
return 200, [("Content-Type", "text/html")], [fd.read()]
def movement_update(self):
# do we have data
if robot.left_distance.data_ready:
self.left_dist = robot.left_distance.distance
self.right_dist = robot.right_distance.distance
# calculate time delta
new_time = time.monotonic()
time_delta = new_time - self.last_time
self.last_time = new_time
# calculate time delta
new_time = time.monotonic()
time_delta = new_time - self.last_time
self.last_time = new_time
self.left_speed = robot.left_encoder.get_speed(time_delta) * self.speed_to_encoder_factor
self.right_speed = robot.right_encoder.get_speed(time_delta) * self.speed_to_encoder_factor
# get speeds from pid
self.pid_output = self.follow_pid.update(self.left_dist, time_delta)
speed = self.pid_output * self.max_speed
self.left_pid_output = self.left_speed_pid.update(self.left_speed, time_delta)
self.right_pid_output = self.right_speed_pid.update(self.right_speed, time_delta)
# robot.set_left(self.left_pid_output)
# robot.set_right(self.right_pid_output)
self.left_speed = robot.left_encoder.get_speed(time_delta)
self.right_speed = robot.right_encoder.get_speed(time_delta)
# # do we have data
# if robot.left_distance.data_ready:
# self.left_dist = robot.left_distance.distance
# self.right_dist = robot.right_distance.distance
# make movements
robot.set_left(speed)
robot.set_right(speed)
# reset and loop
robot.left_distance.clear_interrupt()
robot.right_distance.clear_interrupt()
# # reset and loop
# robot.left_distance.clear_interrupt()
# robot.right_distance.clear_interrupt()
def main_loop(self):
robot.left_distance.start_ranging()
# robot.left_distance.start_ranging()
# robot.right_distance.start_ranging()
while True:
try:
self.movement_update()
@ -109,13 +114,15 @@ class FollowObject:
app.route("/data")(self.data)
print("Starting")
try:
self.setup_robot()
# self.setup_robot()
self.setup_wifi(app)
self.main_loop()
finally:
robot.stop()
robot.left_distance.clear_interrupt()
robot.left_distance.stop_ranging()
# robot.left_distance.clear_interrupt()
# robot.left_distance.stop_ranging()
# robot.right_distance.clear_interrupt()
# robot.right_distance.stop_ranging()
FollowObject().start()
AllSensorsApp().start()

View File

@ -42,6 +42,8 @@
marks: [
Plot.line(current_dataset, {x: "time", y: "left_speed", stroke: "red"}),
Plot.line(current_dataset, {x: "time", y: "right_speed", stroke: "blue"}),
Plot.line(current_dataset, {x: "time", y: "left_speed_pid", stroke: "orange"}),
Plot.line(current_dataset, {x: "time", y: "right_speed_pid", stroke: "green"}),
]
});
var graphNode = document.getElementById("speed");

View File

@ -18,10 +18,10 @@ right_encoder = pio_encoder.QuadratureEncoder(board.GP20, board.GP21, reversed=T
left_encoder = pio_encoder.QuadratureEncoder(board.GP26, board.GP27)
i2c0 = busio.I2C(sda=board.GP0, scl=board.GP1)
i2c1 = busio.I2C(sda=board.GP2, scl=board.GP3)
# i2c1 = busio.I2C(sda=board.GP2, scl=board.GP3)
right_distance = adafruit_vl53l1x.VL53L1X(i2c0)
left_distance = adafruit_vl53l1x.VL53L1X(i2c1)
# right_distance = adafruit_vl53l1x.VL53L1X(i2c0)
# left_distance = adafruit_vl53l1x.VL53L1X(i2c1)
imu = adafruit_bno055.BNO055_I2C(i2c0)

View File

@ -16,10 +16,16 @@ def connect_to_wifi():
esp32_ready = DigitalInOut(board.GP9)
esp32_reset = DigitalInOut(board.GP8)
status_led = DigitalInOut(board.LED)
status_led.switch_to_output()
spi = busio.SPI(board.GP14, MOSI=board.GP11, MISO=board.GP12)
esp = adafruit_esp32spi.ESP_SPIcontrol(spi, esp32_cs, esp32_ready, esp32_reset)
esp.reset()
wifi = adafruit_esp32spi_wifimanager.ESPSPI_WiFiManager(esp, secrets)
wifi.connect()
status_led.value = 1
return wifi, esp

View File

@ -39,5 +39,4 @@
console.log(error.name === 'AbortError');
}
}
// }
</script>

View File

@ -19,21 +19,20 @@ class RandomWalkSensors:
self.last_time = self.start_time
# self.app = None
def setup_wifi(self, app):
# self.app = app
print("Setting up wifi.")
self.wifi, esp = robot_wifi.connect_to_wifi()
def setup_server(self, esp, app):
self.server = adafruit_esp32spi_wsgiserver.WSGIServer(80, application=app)
adafruit_esp32spi_wsgiserver.set_interface(esp)
print("Starting server")
self.server.start()
ip_int = ".".join(str(int(n)) for n in esp.ip_address)
print(f"IP Address is {ip_int}")
def reconnect(self):
print(f"{time.monotonic()} Resetting esp... ")
self.wifi.reset()
print(f"{time.monotonic()} Reconnecting wifi... ")
self.wifi.connect()
print(f"{time.monotonic()} starting server...")
self.server.start()
print(f"{time.monotonic()} started server...")
def data(self, request):
@ -74,17 +73,21 @@ class RandomWalkSensors:
self.server.update_poll()
except RuntimeError as e:
traceback.print_exception(BaseException, e, e.__traceback__)
self.wifi.reset()
self.reconnect()
print("Reset complete.")
if not "Failed to send" in str(e):
self.reconnect()
def start(self):
app = WSGIApp()
app.route("/")(self.index)
app.route("/data")(self.data)
print("Starting")
self.setup_wifi(app)
self.main_loop()
self.wifi, esp, spi = robot_wifi.connect_to_wifi()
try:
self.setup_server(esp, app)
self.main_loop()
finally:
spi.unlock()
spi.deinit()
RandomWalkSensors().start()

View File

@ -18,14 +18,21 @@ def connect_to_wifi():
status_led = DigitalInOut(board.LED)
status_led.switch_to_output()
print("Setting up wifi.")
spi = busio.SPI(board.GP14, MOSI=board.GP11, MISO=board.GP12)
esp = adafruit_esp32spi.ESP_SPIcontrol(spi, esp32_cs, esp32_ready, esp32_reset)
esp.reset()
print("SPI Configure")
esp = adafruit_esp32spi.ESP_SPIcontrol(spi, esp32_cs, esp32_ready, esp32_reset,
baudrate=300*1000
)
esp.ready_timeout = 1
print("Firmware vers.", esp.firmware_version)
wifi = adafruit_esp32spi_wifimanager.ESPSPI_WiFiManager(esp, secrets)
wifi.connect()
ip_int = ".".join(str(int(n)) for n in esp.ip_address)
print(f"IP Address is {ip_int}")
status_led.value = 1
return wifi, esp
return wifi, esp, spi

BIN
ch-13/arena.FCStd Normal file

Binary file not shown.