Current ch-11 state
This commit is contained in:
parent
b0db885480
commit
d7ba881234
1
.gitignore
vendored
1
.gitignore
vendored
@ -5,3 +5,4 @@
|
||||
*.FCStd1
|
||||
secrets.py
|
||||
libs
|
||||
.idea/
|
||||
|
||||
@ -34,6 +34,7 @@ async def command_handler():
|
||||
elif command == "O":
|
||||
robot.stop()
|
||||
elif command.startswith("O"):
|
||||
await asyncio.sleep(5)
|
||||
robot.set_left(Settings.speed)
|
||||
robot.set_right(Settings.speed)
|
||||
asyncio.create_task(stop_motors_after(float(command[1:])))
|
||||
|
||||
100
ch-11/1.5-speed-control/code.py
Normal file
100
ch-11/1.5-speed-control/code.py
Normal file
@ -0,0 +1,100 @@
|
||||
import asyncio
|
||||
import board
|
||||
import busio
|
||||
import robot
|
||||
import time
|
||||
|
||||
import pid_controller
|
||||
uart = busio.UART(board.GP12, board.GP13, baudrate=9600)
|
||||
|
||||
class Settings:
|
||||
# work in ticks only - 1 rpm
|
||||
speed = 0.5
|
||||
time_interval = 0.2
|
||||
motors_enabled = False
|
||||
dead_zone = 0.2
|
||||
|
||||
class SpeedController:
|
||||
def __init__(self, encoder, motor_fn) -> None:
|
||||
self.encoder = encoder
|
||||
self.motor_fn = motor_fn
|
||||
#P1.4,I2.8,D1
|
||||
#P0, I2.5, D1.1
|
||||
self.pid = pid_controller.PIDController(0.1, 1.5, 0)
|
||||
self.reset()
|
||||
|
||||
def reset(self):
|
||||
self.last_ticks = self.encoder.read()
|
||||
self.error = 0
|
||||
self.control_signal = 0
|
||||
self.pid.reset()
|
||||
|
||||
def update(self, dt):
|
||||
current_ticks = self.encoder.read()
|
||||
actual_speed_in_rpm = (current_ticks - self.last_ticks) / (dt * robot.ticks_per_revolution)
|
||||
self.last_ticks = current_ticks
|
||||
# calculate the error
|
||||
self.error = (Settings.speed * Settings.motors_enabled) - actual_speed_in_rpm
|
||||
# calculate the control signal
|
||||
self.control_signal = self.pid.calculate(self.error, dt)
|
||||
self.motor_fn(self.control_signal)
|
||||
|
||||
|
||||
left = SpeedController(robot.left_encoder, robot.set_left)
|
||||
|
||||
async def speed_controller_loop():
|
||||
last_time = time.monotonic()
|
||||
while True:
|
||||
await asyncio.sleep(Settings.time_interval)
|
||||
current_time = time.monotonic()
|
||||
dt = current_time - last_time
|
||||
left.update(dt)
|
||||
last_time = current_time
|
||||
print(f"{dt:.2f}")
|
||||
uart.write(f"{Settings.speed * Settings.motors_enabled}, {left.error:.2f},{left.control_signal:.2f}\n".encode())
|
||||
|
||||
|
||||
async def stop_motors_after(seconds):
|
||||
await asyncio.sleep(seconds)
|
||||
Settings.motors_enabled = False
|
||||
# robot.stop()
|
||||
|
||||
async def command_handler():
|
||||
while True:
|
||||
if uart.in_waiting:
|
||||
command = uart.readline().decode().strip()
|
||||
# PID settings
|
||||
if command.startswith("P"):
|
||||
left.pid.kp = float(command[1:])
|
||||
elif command.startswith("I"):
|
||||
left.pid.ki = float(command[1:])
|
||||
left.pid.reset()
|
||||
elif command.startswith("D"):
|
||||
left.pid.kd = float(command[1:])
|
||||
# Speed settings
|
||||
elif command.startswith("M"):
|
||||
Settings.speed = float(command[1:])
|
||||
elif command.startswith("T"):
|
||||
Settings.time_interval = float(command[1:])
|
||||
# Start/stop commands
|
||||
elif command == "O":
|
||||
Settings.motors_enabled = False
|
||||
elif command.startswith("O"):
|
||||
asyncio.create_task(stop_motors_after(float(command[1:])))
|
||||
Settings.motors_enabled = True
|
||||
left.reset()
|
||||
# Print settings
|
||||
elif command.startswith("?"):
|
||||
uart.write(f"M{Settings.speed:.1f}\n".encode())
|
||||
uart.write(f"T{Settings.time_interval:.1f}\n".encode())
|
||||
uart.write(f"P{left.pid.kp:.2f}:I{left.pid.ki:.2f}:D{left.pid.kd:.2f}\n".encode())
|
||||
await asyncio.sleep(3)
|
||||
await asyncio.sleep(0)
|
||||
|
||||
try:
|
||||
asyncio.create_task(speed_controller_loop())
|
||||
asyncio.create_task(stop_motors_after(10))
|
||||
Settings.motors_enabled = True
|
||||
asyncio.run(command_handler())
|
||||
finally:
|
||||
robot.stop()
|
||||
27
ch-11/1.5-speed-control/pid_controller.py
Normal file
27
ch-11/1.5-speed-control/pid_controller.py
Normal file
@ -0,0 +1,27 @@
|
||||
class PIDController:
|
||||
def __init__(self, kp, ki, kd, d_filter_gain=0.1, imax=None, imin=None):
|
||||
self.kp = kp
|
||||
self.ki = ki
|
||||
self.kd = kd
|
||||
self.d_filter_gain = d_filter_gain
|
||||
self.imax = imax
|
||||
self.imin = imin
|
||||
self.reset()
|
||||
|
||||
def reset(self):
|
||||
self.integral = 0
|
||||
self.error_prev = 0
|
||||
self.derivative = 0
|
||||
|
||||
def calculate(self, error, dt):
|
||||
self.integral += error * dt
|
||||
if self.imax is not None and self.integral > self.imax:
|
||||
self.integral = self.imax
|
||||
if self.imin is not None and self.integral < self.imin:
|
||||
self.integral = self.imin
|
||||
# Add a low pass filter to the difference
|
||||
difference = (error - self.error_prev) * self.d_filter_gain
|
||||
self.error_prev += difference
|
||||
self.derivative = difference / dt
|
||||
|
||||
return self.kp * error + self.ki * self.integral + self.kd * self.derivative
|
||||
84
ch-11/1.5-speed-control/pio_encoder.py
Normal file
84
ch-11/1.5-speed-control/pio_encoder.py
Normal file
@ -0,0 +1,84 @@
|
||||
import rp2pio
|
||||
import adafruit_pioasm
|
||||
import array
|
||||
import asyncio
|
||||
|
||||
|
||||
program = """
|
||||
; use the osr for count
|
||||
; input pins c1 c2
|
||||
|
||||
set y, 0 ; clear y
|
||||
mov osr, y ; and clear osr
|
||||
read:
|
||||
; x will be the old value
|
||||
; y the new values
|
||||
mov x, y ; store old Y in x
|
||||
in null, 32 ; Clear ISR - using y
|
||||
in pins, 2 ; read two pins into y
|
||||
mov y, isr
|
||||
jmp x!=y, different ; Jump if its different
|
||||
jmp read ; otherwise loop back to read
|
||||
|
||||
different:
|
||||
; x has old value, y has new.
|
||||
; extract the upper bit of X.
|
||||
in x, 31 ; get bit 31 - old p1 (remember which direction it came in)
|
||||
in null, 31 ; keep only 1 bit
|
||||
mov x, isr ; put this back in x
|
||||
jmp !x, c1_old_zero
|
||||
|
||||
c1_old_not_zero:
|
||||
jmp pin, count_up
|
||||
jmp count_down
|
||||
|
||||
c1_old_zero:
|
||||
jmp pin, count_down
|
||||
; fall through
|
||||
count_up:
|
||||
; for a clockwise move - we'll add 1 by inverting
|
||||
mov x, ~ osr ; store inverted OSR on x
|
||||
jmp x--, fake ; use jump to take off 1
|
||||
fake:
|
||||
mov x, ~ x ; invert back
|
||||
jmp send
|
||||
count_down:
|
||||
; for a clockwise move, just take one off
|
||||
mov x, osr ; store osr in x
|
||||
jmp x--, send ; dec and send
|
||||
send:
|
||||
; send x.
|
||||
mov isr, x ; send it
|
||||
push noblock ; put ISR into input FIFO
|
||||
mov osr, x ; put X back in OSR
|
||||
jmp read ; loop back
|
||||
"""
|
||||
|
||||
assembled = adafruit_pioasm.assemble(program)
|
||||
|
||||
|
||||
class QuadratureEncoder:
|
||||
def __init__(self, first_pin, second_pin, reversed=False):
|
||||
"""Encoder with 2 pins. Must use sequential pins on the board"""
|
||||
self.sm = rp2pio.StateMachine(
|
||||
assembled,
|
||||
frequency=0,
|
||||
first_in_pin=first_pin,
|
||||
jmp_pin=second_pin,
|
||||
in_pin_count=2,
|
||||
)
|
||||
self.reversed = reversed
|
||||
self._buffer = array.array("i", [0])
|
||||
asyncio.create_task(self.poll_loop())
|
||||
|
||||
async def poll_loop(self):
|
||||
while True:
|
||||
await asyncio.sleep(0)
|
||||
while self.sm.in_waiting:
|
||||
self.sm.readinto(self._buffer)
|
||||
|
||||
def read(self):
|
||||
if self.reversed:
|
||||
return -self._buffer[0]
|
||||
else:
|
||||
return self._buffer[0]
|
||||
68
ch-11/1.5-speed-control/robot.py
Executable file
68
ch-11/1.5-speed-control/robot.py
Executable file
@ -0,0 +1,68 @@
|
||||
import board
|
||||
import pwmio
|
||||
import pio_encoder
|
||||
import busio
|
||||
import adafruit_vl53l1x
|
||||
import math
|
||||
|
||||
wheel_diameter_mm = 70
|
||||
wheel_circumference_mm = math.pi * wheel_diameter_mm
|
||||
gear_ratio = 298
|
||||
encoder_poles = 28
|
||||
ticks_per_revolution = encoder_poles * gear_ratio
|
||||
ticks_to_mm_const = wheel_circumference_mm / ticks_per_revolution
|
||||
|
||||
def ticks_to_mm(ticks):
|
||||
return ticks_to_mm_const * ticks
|
||||
|
||||
motor_A2 = pwmio.PWMOut(board.GP17, frequency=100)
|
||||
motor_A1 = pwmio.PWMOut(board.GP16, frequency=100)
|
||||
motor_B2 = pwmio.PWMOut(board.GP18, frequency=100)
|
||||
motor_B1 = pwmio.PWMOut(board.GP19, frequency=100)
|
||||
|
||||
right_motor = motor_A1, motor_A2
|
||||
left_motor = motor_B1, motor_B2
|
||||
motor_dead_zone = 0.2
|
||||
|
||||
right_encoder = pio_encoder.QuadratureEncoder(board.GP20, board.GP21)
|
||||
left_encoder = pio_encoder.QuadratureEncoder(board.GP26, board.GP27, reversed=True)
|
||||
|
||||
i2c0 = busio.I2C(sda=board.GP0, scl=board.GP1)
|
||||
i2c1 = busio.I2C(sda=board.GP2, scl=board.GP3)
|
||||
|
||||
left_distance = adafruit_vl53l1x.VL53L1X(i2c0)
|
||||
right_distance = adafruit_vl53l1x.VL53L1X(i2c1)
|
||||
|
||||
|
||||
def stop():
|
||||
motor_A1.duty_cycle = 0
|
||||
motor_A2.duty_cycle = 0
|
||||
motor_B1.duty_cycle = 0
|
||||
motor_B2.duty_cycle = 0
|
||||
|
||||
|
||||
def set_speed(motor, speed):
|
||||
# stop completely if in the dead zone
|
||||
if abs(speed) < motor_dead_zone:
|
||||
motor[0].duty_cycle = 0
|
||||
motor[1].duty_cycle = 0
|
||||
return
|
||||
# Swap motor pins if we reverse the speed
|
||||
if speed < 0:
|
||||
direction = motor[1], motor[0]
|
||||
speed = -speed
|
||||
else:
|
||||
direction = motor
|
||||
speed = min(speed, 1) # limit to 1.0
|
||||
max_speed = 2 ** 16 - 1
|
||||
|
||||
direction[0].duty_cycle = int(max_speed * speed)
|
||||
direction[1].duty_cycle = 0
|
||||
|
||||
|
||||
def set_left(speed):
|
||||
set_speed(left_motor, speed)
|
||||
|
||||
|
||||
def set_right(speed):
|
||||
set_speed(right_motor, speed)
|
||||
103
ch-11/2-speed-control-acceleration/code.py
Normal file
103
ch-11/2-speed-control-acceleration/code.py
Normal file
@ -0,0 +1,103 @@
|
||||
import asyncio
|
||||
import time
|
||||
import robot
|
||||
import pid_controller
|
||||
|
||||
class Settings:
|
||||
speed = 0.17
|
||||
distance = 1
|
||||
time_interval = 0.2
|
||||
motors_enabled = False
|
||||
|
||||
|
||||
class SpeedController:
|
||||
def __init__(self, encoder, motor_fn):
|
||||
self.encoder = encoder
|
||||
self.motor_fn = motor_fn
|
||||
self.pid = pid_controller.PIDController(3, 0, 1)
|
||||
self.reset()
|
||||
|
||||
def reset(self):
|
||||
self.last_ticks = self.encoder.read()
|
||||
self.error = 0
|
||||
self.speed = 0
|
||||
self.actual_speed = 0
|
||||
self.pid.reset()
|
||||
|
||||
def update(self, dt):
|
||||
current_ticks = self.encoder.read()
|
||||
speed_in_ticks = (current_ticks - self.last_ticks) / dt
|
||||
self.last_ticks = current_ticks
|
||||
self.actual_speed = robot.ticks_to_mm(speed_in_ticks) / 1000
|
||||
# calculate the error
|
||||
self.error = (Settings.speed * Settings.motors_enabled) - self.actual_speed
|
||||
# calculate the control signal
|
||||
control_signal = self.pid.calculate(self.error, dt)
|
||||
self.speed += control_signal
|
||||
self.motor_fn(self.speed)
|
||||
|
||||
|
||||
left = SpeedController(robot.left_encoder, robot.set_left)
|
||||
right = SpeedController(robot.right_encoder, robot.set_right)
|
||||
|
||||
|
||||
async def motor_speed_loop():
|
||||
last_time = time.monotonic()
|
||||
while True:
|
||||
await asyncio.sleep(Settings.time_interval)
|
||||
current_time = time.monotonic()
|
||||
dt = current_time - last_time
|
||||
last_time = current_time
|
||||
left.update(dt)
|
||||
right.update(dt)
|
||||
robot.uart.write(f"0, {left.actual_speed:.2f},{Settings.speed:.2f}\n".encode())
|
||||
|
||||
|
||||
async def stop_motors_after(seconds):
|
||||
await asyncio.sleep(seconds)
|
||||
Settings.motors_enabled = False
|
||||
|
||||
|
||||
async def command_handler():
|
||||
while True:
|
||||
if robot.uart.in_waiting:
|
||||
command = robot.uart.readline().decode().strip()
|
||||
# PID settings
|
||||
if command.startswith("P"):
|
||||
left.pid.kp = float(command[1:])
|
||||
right.pid.kp = float(command[1:])
|
||||
elif command.startswith("I"):
|
||||
left.pid.ki = float(command[1:])
|
||||
left.pid.reset()
|
||||
right.pid.ki = float(command[1:])
|
||||
right.pid.reset()
|
||||
elif command.startswith("D"):
|
||||
left.pid.kd = float(command[1:])
|
||||
right.pid.kd = float(command[1:])
|
||||
elif command.startswith("T"):
|
||||
Settings.time_interval = float(command[1:])
|
||||
# Speed settings
|
||||
elif command.startswith("M"):
|
||||
Settings.speed = float(command[1:])
|
||||
# Start/stop commands
|
||||
elif command == "O":
|
||||
Settings.motors_enabled = False
|
||||
elif command.startswith("O"):
|
||||
await asyncio.sleep(5)
|
||||
asyncio.create_task(stop_motors_after(float(command[1:])))
|
||||
Settings.motors_enabled = True
|
||||
left.reset()
|
||||
right.reset()
|
||||
# Print settings
|
||||
elif command.startswith("?"):
|
||||
robot.uart.write(f"M{Settings.speed:.1f}\n".encode())
|
||||
robot.uart.write(f"P{left.pid.kp:.2f}:I{left.pid.ki:.2f}:D{left.pid.kd:.2f}\n".encode())
|
||||
robot.uart.write(f"T{Settings.time_interval:.1f}\n".encode())
|
||||
await asyncio.sleep(3)
|
||||
await asyncio.sleep(0)
|
||||
|
||||
try:
|
||||
asyncio.create_task(motor_speed_loop())
|
||||
asyncio.run(command_handler())
|
||||
finally:
|
||||
robot.stop()
|
||||
27
ch-11/2-speed-control-acceleration/pid_controller.py
Normal file
27
ch-11/2-speed-control-acceleration/pid_controller.py
Normal file
@ -0,0 +1,27 @@
|
||||
class PIDController:
|
||||
def __init__(self, kp, ki, kd, d_filter_gain=0.1, imax=None, imin=None):
|
||||
self.kp = kp
|
||||
self.ki = ki
|
||||
self.kd = kd
|
||||
self.d_filter_gain = d_filter_gain
|
||||
self.imax = imax
|
||||
self.imin = imin
|
||||
self.reset()
|
||||
|
||||
def reset(self):
|
||||
self.integral = 0
|
||||
self.error_prev = 0
|
||||
self.derivative = 0
|
||||
|
||||
def calculate(self, error, dt):
|
||||
self.integral += error * dt
|
||||
if self.imax is not None and self.integral > self.imax:
|
||||
self.integral = self.imax
|
||||
if self.imin is not None and self.integral < self.imin:
|
||||
self.integral = self.imin
|
||||
# Add a low pass filter to the difference
|
||||
difference = (error - self.error_prev) * self.d_filter_gain
|
||||
self.error_prev += difference
|
||||
self.derivative = difference / dt
|
||||
|
||||
return self.kp * error + self.ki * self.integral + self.kd * self.derivative
|
||||
84
ch-11/2-speed-control-acceleration/pio_encoder.py
Normal file
84
ch-11/2-speed-control-acceleration/pio_encoder.py
Normal file
@ -0,0 +1,84 @@
|
||||
import rp2pio
|
||||
import adafruit_pioasm
|
||||
import array
|
||||
import asyncio
|
||||
|
||||
|
||||
program = """
|
||||
; use the osr for count
|
||||
; input pins c1 c2
|
||||
|
||||
set y, 0 ; clear y
|
||||
mov osr, y ; and clear osr
|
||||
read:
|
||||
; x will be the old value
|
||||
; y the new values
|
||||
mov x, y ; store old Y in x
|
||||
in null, 32 ; Clear ISR - using y
|
||||
in pins, 2 ; read two pins into y
|
||||
mov y, isr
|
||||
jmp x!=y, different ; Jump if its different
|
||||
jmp read ; otherwise loop back to read
|
||||
|
||||
different:
|
||||
; x has old value, y has new.
|
||||
; extract the upper bit of X.
|
||||
in x, 31 ; get bit 31 - old p1 (remember which direction it came in)
|
||||
in null, 31 ; keep only 1 bit
|
||||
mov x, isr ; put this back in x
|
||||
jmp !x, c1_old_zero
|
||||
|
||||
c1_old_not_zero:
|
||||
jmp pin, count_up
|
||||
jmp count_down
|
||||
|
||||
c1_old_zero:
|
||||
jmp pin, count_down
|
||||
; fall through
|
||||
count_up:
|
||||
; for a clockwise move - we'll add 1 by inverting
|
||||
mov x, ~ osr ; store inverted OSR on x
|
||||
jmp x--, fake ; use jump to take off 1
|
||||
fake:
|
||||
mov x, ~ x ; invert back
|
||||
jmp send
|
||||
count_down:
|
||||
; for a clockwise move, just take one off
|
||||
mov x, osr ; store osr in x
|
||||
jmp x--, send ; dec and send
|
||||
send:
|
||||
; send x.
|
||||
mov isr, x ; send it
|
||||
push noblock ; put ISR into input FIFO
|
||||
mov osr, x ; put X back in OSR
|
||||
jmp read ; loop back
|
||||
"""
|
||||
|
||||
assembled = adafruit_pioasm.assemble(program)
|
||||
|
||||
|
||||
class QuadratureEncoder:
|
||||
def __init__(self, first_pin, second_pin, reversed=False):
|
||||
"""Encoder with 2 pins. Must use sequential pins on the board"""
|
||||
self.sm = rp2pio.StateMachine(
|
||||
assembled,
|
||||
frequency=0,
|
||||
first_in_pin=first_pin,
|
||||
jmp_pin=second_pin,
|
||||
in_pin_count=2,
|
||||
)
|
||||
self.reversed = reversed
|
||||
self._buffer = array.array("i", [0])
|
||||
asyncio.create_task(self.poll_loop())
|
||||
|
||||
async def poll_loop(self):
|
||||
while True:
|
||||
await asyncio.sleep(0)
|
||||
while self.sm.in_waiting:
|
||||
self.sm.readinto(self._buffer)
|
||||
|
||||
def read(self):
|
||||
if self.reversed:
|
||||
return -self._buffer[0]
|
||||
else:
|
||||
return self._buffer[0]
|
||||
65
ch-11/2-speed-control-acceleration/robot.py
Executable file
65
ch-11/2-speed-control-acceleration/robot.py
Executable file
@ -0,0 +1,65 @@
|
||||
import board
|
||||
import pwmio
|
||||
import pio_encoder
|
||||
import busio
|
||||
import adafruit_vl53l1x
|
||||
import math
|
||||
import busio
|
||||
|
||||
uart = busio.UART(board.GP12, board.GP13, baudrate=9600)
|
||||
|
||||
wheel_diameter_mm = 70
|
||||
wheel_circumference_mm = math.pi * wheel_diameter_mm
|
||||
gear_ratio = 298
|
||||
encoder_poles = 28
|
||||
ticks_per_revolution = encoder_poles * gear_ratio
|
||||
ticks_to_mm_const = wheel_circumference_mm / ticks_per_revolution
|
||||
|
||||
def ticks_to_mm(ticks):
|
||||
return ticks_to_mm_const * ticks
|
||||
|
||||
motor_A2 = pwmio.PWMOut(board.GP17, frequency=100)
|
||||
motor_A1 = pwmio.PWMOut(board.GP16, frequency=100)
|
||||
motor_B2 = pwmio.PWMOut(board.GP18, frequency=100)
|
||||
motor_B1 = pwmio.PWMOut(board.GP19, frequency=100)
|
||||
|
||||
right_motor = motor_A1, motor_A2
|
||||
left_motor = motor_B1, motor_B2
|
||||
|
||||
right_encoder = pio_encoder.QuadratureEncoder(board.GP20, board.GP21)
|
||||
left_encoder = pio_encoder.QuadratureEncoder(board.GP26, board.GP27, reversed=True)
|
||||
|
||||
i2c0 = busio.I2C(sda=board.GP0, scl=board.GP1)
|
||||
i2c1 = busio.I2C(sda=board.GP2, scl=board.GP3)
|
||||
|
||||
left_distance = adafruit_vl53l1x.VL53L1X(i2c0)
|
||||
right_distance = adafruit_vl53l1x.VL53L1X(i2c1)
|
||||
|
||||
|
||||
def stop():
|
||||
motor_A1.duty_cycle = 0
|
||||
motor_A2.duty_cycle = 0
|
||||
motor_B1.duty_cycle = 0
|
||||
motor_B2.duty_cycle = 0
|
||||
|
||||
|
||||
def set_speed(motor, speed):
|
||||
# Swap motor pins if we reverse the speed
|
||||
if speed < 0:
|
||||
direction = motor[1], motor[0]
|
||||
speed = -speed
|
||||
else:
|
||||
direction = motor
|
||||
speed = min(speed, 1) # limit to 1.0
|
||||
max_speed = 2 ** 16 - 1
|
||||
|
||||
direction[0].duty_cycle = int(max_speed * speed)
|
||||
direction[1].duty_cycle = 0
|
||||
|
||||
|
||||
def set_left(speed):
|
||||
set_speed(left_motor, speed)
|
||||
|
||||
|
||||
def set_right(speed):
|
||||
set_speed(right_motor, speed)
|
||||
102
ch-11/3-known-distance-acceleration/code.py
Normal file
102
ch-11/3-known-distance-acceleration/code.py
Normal file
@ -0,0 +1,102 @@
|
||||
import asyncio
|
||||
import time
|
||||
import robot
|
||||
import pid_controller
|
||||
|
||||
class Settings:
|
||||
speed = 0.17
|
||||
time_interval = 0.2
|
||||
motors_enabled = False
|
||||
|
||||
|
||||
class SpeedController:
|
||||
def __init__(self, encoder, motor_fn):
|
||||
self.encoder = encoder
|
||||
self.motor_fn = motor_fn
|
||||
self.pid = pid_controller.PIDController(3, 0, 1)
|
||||
self.reset()
|
||||
|
||||
def reset(self):
|
||||
self.last_ticks = self.encoder.read()
|
||||
self.error = 0
|
||||
self.speed = 0
|
||||
self.actual_speed = 0
|
||||
self.pid.reset()
|
||||
|
||||
def update(self, dt):
|
||||
current_ticks = self.encoder.read()
|
||||
speed_in_ticks = (current_ticks - self.last_ticks) / dt
|
||||
self.last_ticks = current_ticks
|
||||
self.actual_speed = robot.ticks_to_mm(speed_in_ticks) / 1000
|
||||
# calculate the error
|
||||
self.error = (Settings.speed * Settings.motors_enabled) - self.actual_speed
|
||||
# calculate the control signal
|
||||
control_signal = self.pid.calculate(self.error, dt)
|
||||
self.speed += control_signal
|
||||
self.motor_fn(self.speed)
|
||||
|
||||
|
||||
left = SpeedController(robot.left_encoder, robot.set_left)
|
||||
right = SpeedController(robot.right_encoder, robot.set_right)
|
||||
|
||||
|
||||
async def motor_speed_loop():
|
||||
last_time = time.monotonic()
|
||||
while True:
|
||||
await asyncio.sleep(Settings.time_interval)
|
||||
current_time = time.monotonic()
|
||||
dt = current_time - last_time
|
||||
last_time = current_time
|
||||
left.update(dt)
|
||||
right.update(dt)
|
||||
robot.uart.write(f"0, {left.actual_speed:.2f},{Settings.speed:.2f}\n".encode())
|
||||
|
||||
|
||||
async def stop_motors_after(seconds):
|
||||
await asyncio.sleep(seconds)
|
||||
Settings.motors_enabled = False
|
||||
|
||||
|
||||
async def command_handler():
|
||||
while True:
|
||||
if robot.uart.in_waiting:
|
||||
command = robot.uart.readline().decode().strip()
|
||||
# PID settings
|
||||
if command.startswith("P"):
|
||||
left.pid.kp = float(command[1:])
|
||||
right.pid.kp = float(command[1:])
|
||||
elif command.startswith("I"):
|
||||
left.pid.ki = float(command[1:])
|
||||
left.pid.reset()
|
||||
right.pid.ki = float(command[1:])
|
||||
right.pid.reset()
|
||||
elif command.startswith("D"):
|
||||
left.pid.kd = float(command[1:])
|
||||
right.pid.kd = float(command[1:])
|
||||
elif command.startswith("T"):
|
||||
Settings.time_interval = float(command[1:])
|
||||
# Speed settings
|
||||
elif command.startswith("M"):
|
||||
Settings.speed = float(command[1:])
|
||||
# Start/stop commands
|
||||
elif command == "O":
|
||||
Settings.motors_enabled = False
|
||||
elif command.startswith("O"):
|
||||
await asyncio.sleep(5)
|
||||
asyncio.create_task(stop_motors_after(float(command[1:])))
|
||||
Settings.motors_enabled = True
|
||||
left.reset()
|
||||
right.reset()
|
||||
# Print settings
|
||||
elif command.startswith("?"):
|
||||
robot.uart.write(f"M{Settings.speed:.1f}\n".encode())
|
||||
robot.uart.write(f"P{left.pid.kp:.2f}:I{left.pid.ki:.2f}:D{left.pid.kd:.2f}\n".encode())
|
||||
robot.uart.write(f"T{Settings.time_interval:.1f}\n".encode())
|
||||
await asyncio.sleep(3)
|
||||
await asyncio.sleep(0)
|
||||
|
||||
try:
|
||||
asyncio.create_task(motor_speed_loop())
|
||||
asyncio.run(command_handler())
|
||||
finally:
|
||||
robot.stop()
|
||||
27
ch-11/3-known-distance-acceleration/pid_controller.py
Normal file
27
ch-11/3-known-distance-acceleration/pid_controller.py
Normal file
@ -0,0 +1,27 @@
|
||||
class PIDController:
|
||||
def __init__(self, kp, ki, kd, d_filter_gain=0.1, imax=None, imin=None):
|
||||
self.kp = kp
|
||||
self.ki = ki
|
||||
self.kd = kd
|
||||
self.d_filter_gain = d_filter_gain
|
||||
self.imax = imax
|
||||
self.imin = imin
|
||||
self.reset()
|
||||
|
||||
def reset(self):
|
||||
self.integral = 0
|
||||
self.error_prev = 0
|
||||
self.derivative = 0
|
||||
|
||||
def calculate(self, error, dt):
|
||||
self.integral += error * dt
|
||||
if self.imax is not None and self.integral > self.imax:
|
||||
self.integral = self.imax
|
||||
if self.imin is not None and self.integral < self.imin:
|
||||
self.integral = self.imin
|
||||
# Add a low pass filter to the difference
|
||||
difference = (error - self.error_prev) * self.d_filter_gain
|
||||
self.error_prev += difference
|
||||
self.derivative = difference / dt
|
||||
|
||||
return self.kp * error + self.ki * self.integral + self.kd * self.derivative
|
||||
84
ch-11/3-known-distance-acceleration/pio_encoder.py
Normal file
84
ch-11/3-known-distance-acceleration/pio_encoder.py
Normal file
@ -0,0 +1,84 @@
|
||||
import rp2pio
|
||||
import adafruit_pioasm
|
||||
import array
|
||||
import asyncio
|
||||
|
||||
|
||||
program = """
|
||||
; use the osr for count
|
||||
; input pins c1 c2
|
||||
|
||||
set y, 0 ; clear y
|
||||
mov osr, y ; and clear osr
|
||||
read:
|
||||
; x will be the old value
|
||||
; y the new values
|
||||
mov x, y ; store old Y in x
|
||||
in null, 32 ; Clear ISR - using y
|
||||
in pins, 2 ; read two pins into y
|
||||
mov y, isr
|
||||
jmp x!=y, different ; Jump if its different
|
||||
jmp read ; otherwise loop back to read
|
||||
|
||||
different:
|
||||
; x has old value, y has new.
|
||||
; extract the upper bit of X.
|
||||
in x, 31 ; get bit 31 - old p1 (remember which direction it came in)
|
||||
in null, 31 ; keep only 1 bit
|
||||
mov x, isr ; put this back in x
|
||||
jmp !x, c1_old_zero
|
||||
|
||||
c1_old_not_zero:
|
||||
jmp pin, count_up
|
||||
jmp count_down
|
||||
|
||||
c1_old_zero:
|
||||
jmp pin, count_down
|
||||
; fall through
|
||||
count_up:
|
||||
; for a clockwise move - we'll add 1 by inverting
|
||||
mov x, ~ osr ; store inverted OSR on x
|
||||
jmp x--, fake ; use jump to take off 1
|
||||
fake:
|
||||
mov x, ~ x ; invert back
|
||||
jmp send
|
||||
count_down:
|
||||
; for a clockwise move, just take one off
|
||||
mov x, osr ; store osr in x
|
||||
jmp x--, send ; dec and send
|
||||
send:
|
||||
; send x.
|
||||
mov isr, x ; send it
|
||||
push noblock ; put ISR into input FIFO
|
||||
mov osr, x ; put X back in OSR
|
||||
jmp read ; loop back
|
||||
"""
|
||||
|
||||
assembled = adafruit_pioasm.assemble(program)
|
||||
|
||||
|
||||
class QuadratureEncoder:
|
||||
def __init__(self, first_pin, second_pin, reversed=False):
|
||||
"""Encoder with 2 pins. Must use sequential pins on the board"""
|
||||
self.sm = rp2pio.StateMachine(
|
||||
assembled,
|
||||
frequency=0,
|
||||
first_in_pin=first_pin,
|
||||
jmp_pin=second_pin,
|
||||
in_pin_count=2,
|
||||
)
|
||||
self.reversed = reversed
|
||||
self._buffer = array.array("i", [0])
|
||||
asyncio.create_task(self.poll_loop())
|
||||
|
||||
async def poll_loop(self):
|
||||
while True:
|
||||
await asyncio.sleep(0)
|
||||
while self.sm.in_waiting:
|
||||
self.sm.readinto(self._buffer)
|
||||
|
||||
def read(self):
|
||||
if self.reversed:
|
||||
return -self._buffer[0]
|
||||
else:
|
||||
return self._buffer[0]
|
||||
65
ch-11/3-known-distance-acceleration/robot.py
Executable file
65
ch-11/3-known-distance-acceleration/robot.py
Executable file
@ -0,0 +1,65 @@
|
||||
import board
|
||||
import pwmio
|
||||
import pio_encoder
|
||||
import busio
|
||||
import adafruit_vl53l1x
|
||||
import math
|
||||
import busio
|
||||
|
||||
uart = busio.UART(board.GP12, board.GP13, baudrate=9600)
|
||||
|
||||
wheel_diameter_mm = 70
|
||||
wheel_circumference_mm = math.pi * wheel_diameter_mm
|
||||
gear_ratio = 298
|
||||
encoder_poles = 28
|
||||
ticks_per_revolution = encoder_poles * gear_ratio
|
||||
ticks_to_mm_const = wheel_circumference_mm / ticks_per_revolution
|
||||
|
||||
def ticks_to_mm(ticks):
|
||||
return ticks_to_mm_const * ticks
|
||||
|
||||
motor_A2 = pwmio.PWMOut(board.GP17, frequency=100)
|
||||
motor_A1 = pwmio.PWMOut(board.GP16, frequency=100)
|
||||
motor_B2 = pwmio.PWMOut(board.GP18, frequency=100)
|
||||
motor_B1 = pwmio.PWMOut(board.GP19, frequency=100)
|
||||
|
||||
right_motor = motor_A1, motor_A2
|
||||
left_motor = motor_B1, motor_B2
|
||||
|
||||
right_encoder = pio_encoder.QuadratureEncoder(board.GP20, board.GP21)
|
||||
left_encoder = pio_encoder.QuadratureEncoder(board.GP26, board.GP27, reversed=True)
|
||||
|
||||
i2c0 = busio.I2C(sda=board.GP0, scl=board.GP1)
|
||||
i2c1 = busio.I2C(sda=board.GP2, scl=board.GP3)
|
||||
|
||||
left_distance = adafruit_vl53l1x.VL53L1X(i2c0)
|
||||
right_distance = adafruit_vl53l1x.VL53L1X(i2c1)
|
||||
|
||||
|
||||
def stop():
|
||||
motor_A1.duty_cycle = 0
|
||||
motor_A2.duty_cycle = 0
|
||||
motor_B1.duty_cycle = 0
|
||||
motor_B2.duty_cycle = 0
|
||||
|
||||
|
||||
def set_speed(motor, speed):
|
||||
# Swap motor pins if we reverse the speed
|
||||
if speed < 0:
|
||||
direction = motor[1], motor[0]
|
||||
speed = -speed
|
||||
else:
|
||||
direction = motor
|
||||
speed = min(speed, 1) # limit to 1.0
|
||||
max_speed = 2 ** 16 - 1
|
||||
|
||||
direction[0].duty_cycle = int(max_speed * speed)
|
||||
direction[1].duty_cycle = 0
|
||||
|
||||
|
||||
def set_left(speed):
|
||||
set_speed(left_motor, speed)
|
||||
|
||||
|
||||
def set_right(speed):
|
||||
set_speed(right_motor, speed)
|
||||
8
ch-11/_test_encoder_output/lpf.py
Normal file
8
ch-11/_test_encoder_output/lpf.py
Normal file
@ -0,0 +1,8 @@
|
||||
class LPF:
|
||||
"""Low pass filter."""
|
||||
def __init__(self, alpha):
|
||||
self.alpha = alpha
|
||||
self.last = 0
|
||||
def update(self, value):
|
||||
self.last = self.alpha * value + (1 - self.alpha) * self.last
|
||||
return self.last
|
||||
@ -1,59 +0,0 @@
|
||||
""" Turn JSON data stream into graphs"""
|
||||
import requests
|
||||
import json
|
||||
|
||||
import matplotlib.pyplot as plt
|
||||
from matplotlib.animation import FuncAnimation
|
||||
|
||||
url = "http://192.168.1.128"
|
||||
|
||||
|
||||
class AnimatedGraph:
|
||||
def __init__(self):
|
||||
self.fields = {}
|
||||
self.samples = 100
|
||||
self.reset()
|
||||
print("getting source to iterate over")
|
||||
self.source = self._graph_source()
|
||||
print("init completed")
|
||||
|
||||
def reset(self):
|
||||
for field in self.fields:
|
||||
self.fields[field] = []
|
||||
|
||||
def _graph_source(self):
|
||||
while True:
|
||||
try:
|
||||
with requests.get(url, timeout=1, stream=True) as response:
|
||||
print(f"status: {response.status_code}")
|
||||
yield from response.iter_lines()
|
||||
except requests.exceptions.RequestException:
|
||||
pass
|
||||
|
||||
def make_frame(self, frame):
|
||||
print("loading next item")
|
||||
item = json.loads(next(self.source))
|
||||
print("item loaded")
|
||||
if 'time' in self.fields and item["time"] < self.fields['time'][-1]:
|
||||
self.reset()
|
||||
for field in item:
|
||||
if field not in self.fields:
|
||||
self.fields[field] = []
|
||||
self.fields[field].append(item[field])
|
||||
|
||||
if len(self.fields['time'] ) > self.samples:
|
||||
for field in self.fields:
|
||||
self.fields[field] = self.fields[field][-self.samples:]
|
||||
|
||||
plt.cla() # clear axes.
|
||||
# plot the items
|
||||
for field in self.fields:
|
||||
if field != "time":
|
||||
plt.plot("time", field, data=self.fields)
|
||||
|
||||
plt.legend(loc="upper right")
|
||||
|
||||
# Create the animation. gcf - get current figure. random_stream - callback func.
|
||||
animation = FuncAnimation(plt.gcf(), AnimatedGraph().make_frame, interval=200)
|
||||
plt.tight_layout()
|
||||
plt.show()
|
||||
@ -1,2 +0,0 @@
|
||||
matplotlib
|
||||
requests
|
||||
Loading…
x
Reference in New Issue
Block a user