diff --git a/.gitignore b/.gitignore index 54fa14b..fbc48d8 100644 --- a/.gitignore +++ b/.gitignore @@ -5,3 +5,4 @@ *.FCStd1 secrets.py libs +.idea/ diff --git a/ch-11/1-measuring_wheel_speeds/code.py b/ch-11/1-measuring_wheel_speeds/code.py index ae922cd..cc314e3 100644 --- a/ch-11/1-measuring_wheel_speeds/code.py +++ b/ch-11/1-measuring_wheel_speeds/code.py @@ -34,6 +34,7 @@ async def command_handler(): elif command == "O": robot.stop() elif command.startswith("O"): + await asyncio.sleep(5) robot.set_left(Settings.speed) robot.set_right(Settings.speed) asyncio.create_task(stop_motors_after(float(command[1:]))) diff --git a/ch-11/1.5-speed-control/code.py b/ch-11/1.5-speed-control/code.py new file mode 100644 index 0000000..2efb7e5 --- /dev/null +++ b/ch-11/1.5-speed-control/code.py @@ -0,0 +1,100 @@ +import asyncio +import board +import busio +import robot +import time + +import pid_controller +uart = busio.UART(board.GP12, board.GP13, baudrate=9600) + +class Settings: + # work in ticks only - 1 rpm + speed = 0.5 + time_interval = 0.2 + motors_enabled = False + dead_zone = 0.2 + +class SpeedController: + def __init__(self, encoder, motor_fn) -> None: + self.encoder = encoder + self.motor_fn = motor_fn + #P1.4,I2.8,D1 + #P0, I2.5, D1.1 + self.pid = pid_controller.PIDController(0.1, 1.5, 0) + self.reset() + + def reset(self): + self.last_ticks = self.encoder.read() + self.error = 0 + self.control_signal = 0 + self.pid.reset() + + def update(self, dt): + current_ticks = self.encoder.read() + actual_speed_in_rpm = (current_ticks - self.last_ticks) / (dt * robot.ticks_per_revolution) + self.last_ticks = current_ticks + # calculate the error + self.error = (Settings.speed * Settings.motors_enabled) - actual_speed_in_rpm + # calculate the control signal + self.control_signal = self.pid.calculate(self.error, dt) + self.motor_fn(self.control_signal) + + +left = SpeedController(robot.left_encoder, robot.set_left) + +async def speed_controller_loop(): + last_time = time.monotonic() + while True: + await asyncio.sleep(Settings.time_interval) + current_time = time.monotonic() + dt = current_time - last_time + left.update(dt) + last_time = current_time + print(f"{dt:.2f}") + uart.write(f"{Settings.speed * Settings.motors_enabled}, {left.error:.2f},{left.control_signal:.2f}\n".encode()) + + +async def stop_motors_after(seconds): + await asyncio.sleep(seconds) + Settings.motors_enabled = False + # robot.stop() + +async def command_handler(): + while True: + if uart.in_waiting: + command = uart.readline().decode().strip() + # PID settings + if command.startswith("P"): + left.pid.kp = float(command[1:]) + elif command.startswith("I"): + left.pid.ki = float(command[1:]) + left.pid.reset() + elif command.startswith("D"): + left.pid.kd = float(command[1:]) + # Speed settings + elif command.startswith("M"): + Settings.speed = float(command[1:]) + elif command.startswith("T"): + Settings.time_interval = float(command[1:]) + # Start/stop commands + elif command == "O": + Settings.motors_enabled = False + elif command.startswith("O"): + asyncio.create_task(stop_motors_after(float(command[1:]))) + Settings.motors_enabled = True + left.reset() + # Print settings + elif command.startswith("?"): + uart.write(f"M{Settings.speed:.1f}\n".encode()) + uart.write(f"T{Settings.time_interval:.1f}\n".encode()) + uart.write(f"P{left.pid.kp:.2f}:I{left.pid.ki:.2f}:D{left.pid.kd:.2f}\n".encode()) + await asyncio.sleep(3) + await asyncio.sleep(0) + +try: + asyncio.create_task(speed_controller_loop()) + asyncio.create_task(stop_motors_after(10)) + Settings.motors_enabled = True + asyncio.run(command_handler()) +finally: + robot.stop() diff --git a/ch-11/1.5-speed-control/pid_controller.py b/ch-11/1.5-speed-control/pid_controller.py new file mode 100644 index 0000000..a09358f --- /dev/null +++ b/ch-11/1.5-speed-control/pid_controller.py @@ -0,0 +1,27 @@ +class PIDController: + def __init__(self, kp, ki, kd, d_filter_gain=0.1, imax=None, imin=None): + self.kp = kp + self.ki = ki + self.kd = kd + self.d_filter_gain = d_filter_gain + self.imax = imax + self.imin = imin + self.reset() + + def reset(self): + self.integral = 0 + self.error_prev = 0 + self.derivative = 0 + + def calculate(self, error, dt): + self.integral += error * dt + if self.imax is not None and self.integral > self.imax: + self.integral = self.imax + if self.imin is not None and self.integral < self.imin: + self.integral = self.imin + # Add a low pass filter to the difference + difference = (error - self.error_prev) * self.d_filter_gain + self.error_prev += difference + self.derivative = difference / dt + + return self.kp * error + self.ki * self.integral + self.kd * self.derivative diff --git a/ch-11/1.5-speed-control/pio_encoder.py b/ch-11/1.5-speed-control/pio_encoder.py new file mode 100644 index 0000000..b831bf4 --- /dev/null +++ b/ch-11/1.5-speed-control/pio_encoder.py @@ -0,0 +1,84 @@ +import rp2pio +import adafruit_pioasm +import array +import asyncio + + +program = """ +; use the osr for count +; input pins c1 c2 + + set y, 0 ; clear y + mov osr, y ; and clear osr +read: + ; x will be the old value + ; y the new values + mov x, y ; store old Y in x + in null, 32 ; Clear ISR - using y + in pins, 2 ; read two pins into y + mov y, isr + jmp x!=y, different ; Jump if its different + jmp read ; otherwise loop back to read + +different: + ; x has old value, y has new. + ; extract the upper bit of X. + in x, 31 ; get bit 31 - old p1 (remember which direction it came in) + in null, 31 ; keep only 1 bit + mov x, isr ; put this back in x + jmp !x, c1_old_zero + +c1_old_not_zero: + jmp pin, count_up + jmp count_down + +c1_old_zero: + jmp pin, count_down + ; fall through +count_up: + ; for a clockwise move - we'll add 1 by inverting + mov x, ~ osr ; store inverted OSR on x + jmp x--, fake ; use jump to take off 1 +fake: + mov x, ~ x ; invert back + jmp send +count_down: + ; for a clockwise move, just take one off + mov x, osr ; store osr in x + jmp x--, send ; dec and send +send: + ; send x. + mov isr, x ; send it + push noblock ; put ISR into input FIFO + mov osr, x ; put X back in OSR + jmp read ; loop back +""" + +assembled = adafruit_pioasm.assemble(program) + + +class QuadratureEncoder: + def __init__(self, first_pin, second_pin, reversed=False): + """Encoder with 2 pins. Must use sequential pins on the board""" + self.sm = rp2pio.StateMachine( + assembled, + frequency=0, + first_in_pin=first_pin, + jmp_pin=second_pin, + in_pin_count=2, + ) + self.reversed = reversed + self._buffer = array.array("i", [0]) + asyncio.create_task(self.poll_loop()) + + async def poll_loop(self): + while True: + await asyncio.sleep(0) + while self.sm.in_waiting: + self.sm.readinto(self._buffer) + + def read(self): + if self.reversed: + return -self._buffer[0] + else: + return self._buffer[0] diff --git a/ch-11/1.5-speed-control/robot.py b/ch-11/1.5-speed-control/robot.py new file mode 100755 index 0000000..871a180 --- /dev/null +++ b/ch-11/1.5-speed-control/robot.py @@ -0,0 +1,68 @@ +import board +import pwmio +import pio_encoder +import busio +import adafruit_vl53l1x +import math + +wheel_diameter_mm = 70 +wheel_circumference_mm = math.pi * wheel_diameter_mm +gear_ratio = 298 +encoder_poles = 28 +ticks_per_revolution = encoder_poles * gear_ratio +ticks_to_mm_const = wheel_circumference_mm / ticks_per_revolution + +def ticks_to_mm(ticks): + return ticks_to_mm_const * ticks + +motor_A2 = pwmio.PWMOut(board.GP17, frequency=100) +motor_A1 = pwmio.PWMOut(board.GP16, frequency=100) +motor_B2 = pwmio.PWMOut(board.GP18, frequency=100) +motor_B1 = pwmio.PWMOut(board.GP19, frequency=100) + +right_motor = motor_A1, motor_A2 +left_motor = motor_B1, motor_B2 +motor_dead_zone = 0.2 + +right_encoder = pio_encoder.QuadratureEncoder(board.GP20, board.GP21) +left_encoder = pio_encoder.QuadratureEncoder(board.GP26, board.GP27, reversed=True) + +i2c0 = busio.I2C(sda=board.GP0, scl=board.GP1) +i2c1 = busio.I2C(sda=board.GP2, scl=board.GP3) + +left_distance = adafruit_vl53l1x.VL53L1X(i2c0) +right_distance = adafruit_vl53l1x.VL53L1X(i2c1) + + +def stop(): + motor_A1.duty_cycle = 0 + motor_A2.duty_cycle = 0 + motor_B1.duty_cycle = 0 + motor_B2.duty_cycle = 0 + + +def set_speed(motor, speed): + # stop completely if in the dead zone + if abs(speed) < motor_dead_zone: + motor[0].duty_cycle = 0 + motor[1].duty_cycle = 0 + return + # Swap motor pins if we reverse the speed + if speed < 0: + direction = motor[1], motor[0] + speed = -speed + else: + direction = motor + speed = min(speed, 1) # limit to 1.0 + max_speed = 2 ** 16 - 1 + + direction[0].duty_cycle = int(max_speed * speed) + direction[1].duty_cycle = 0 + + +def set_left(speed): + set_speed(left_motor, speed) + + +def set_right(speed): + set_speed(right_motor, speed) diff --git a/ch-11/2-speed-control-acceleration/code.py b/ch-11/2-speed-control-acceleration/code.py new file mode 100644 index 0000000..f9b4852 --- /dev/null +++ b/ch-11/2-speed-control-acceleration/code.py @@ -0,0 +1,103 @@ +import asyncio +import time +import robot +import pid_controller + +class Settings: + speed = 0.17 + distance = 1 + time_interval = 0.2 + motors_enabled = False + + +class SpeedController: + def __init__(self, encoder, motor_fn): + self.encoder = encoder + self.motor_fn = motor_fn + self.pid = pid_controller.PIDController(3, 0, 1) + self.reset() + + def reset(self): + self.last_ticks = self.encoder.read() + self.error = 0 + self.speed = 0 + self.actual_speed = 0 + self.pid.reset() + + def update(self, dt): + current_ticks = self.encoder.read() + speed_in_ticks = (current_ticks - self.last_ticks) / dt + self.last_ticks = current_ticks + self.actual_speed = robot.ticks_to_mm(speed_in_ticks) / 1000 + # calculate the error + self.error = (Settings.speed * Settings.motors_enabled) - self.actual_speed + # calculate the control signal + control_signal = self.pid.calculate(self.error, dt) + self.speed += control_signal + self.motor_fn(self.speed) + + +left = SpeedController(robot.left_encoder, robot.set_left) +right = SpeedController(robot.right_encoder, robot.set_right) + + +async def motor_speed_loop(): + last_time = time.monotonic() + while True: + await asyncio.sleep(Settings.time_interval) + current_time = time.monotonic() + dt = current_time - last_time + last_time = current_time + left.update(dt) + right.update(dt) + robot.uart.write(f"0, {left.actual_speed:.2f},{Settings.speed:.2f}\n".encode()) + + +async def stop_motors_after(seconds): + await asyncio.sleep(seconds) + Settings.motors_enabled = False + + +async def command_handler(): + while True: + if robot.uart.in_waiting: + command = robot.uart.readline().decode().strip() + # PID settings + if command.startswith("P"): + left.pid.kp = float(command[1:]) + right.pid.kp = float(command[1:]) + elif command.startswith("I"): + left.pid.ki = float(command[1:]) + left.pid.reset() + right.pid.ki = float(command[1:]) + right.pid.reset() + elif command.startswith("D"): + left.pid.kd = float(command[1:]) + right.pid.kd = float(command[1:]) + elif command.startswith("T"): + Settings.time_interval = float(command[1:]) + # Speed settings + elif command.startswith("M"): + Settings.speed = float(command[1:]) + # Start/stop commands + elif command == "O": + Settings.motors_enabled = False + elif command.startswith("O"): + await asyncio.sleep(5) + asyncio.create_task(stop_motors_after(float(command[1:]))) + Settings.motors_enabled = True + left.reset() + right.reset() + # Print settings + elif command.startswith("?"): + robot.uart.write(f"M{Settings.speed:.1f}\n".encode()) + robot.uart.write(f"P{left.pid.kp:.2f}:I{left.pid.ki:.2f}:D{left.pid.kd:.2f}\n".encode()) + robot.uart.write(f"T{Settings.time_interval:.1f}\n".encode()) + await asyncio.sleep(3) + await asyncio.sleep(0) + +try: + asyncio.create_task(motor_speed_loop()) + asyncio.run(command_handler()) +finally: + robot.stop() diff --git a/ch-11/2-speed-control-acceleration/pid_controller.py b/ch-11/2-speed-control-acceleration/pid_controller.py new file mode 100644 index 0000000..a09358f --- /dev/null +++ b/ch-11/2-speed-control-acceleration/pid_controller.py @@ -0,0 +1,27 @@ +class PIDController: + def __init__(self, kp, ki, kd, d_filter_gain=0.1, imax=None, imin=None): + self.kp = kp + self.ki = ki + self.kd = kd + self.d_filter_gain = d_filter_gain + self.imax = imax + self.imin = imin + self.reset() + + def reset(self): + self.integral = 0 + self.error_prev = 0 + self.derivative = 0 + + def calculate(self, error, dt): + self.integral += error * dt + if self.imax is not None and self.integral > self.imax: + self.integral = self.imax + if self.imin is not None and self.integral < self.imin: + self.integral = self.imin + # Add a low pass filter to the difference + difference = (error - self.error_prev) * self.d_filter_gain + self.error_prev += difference + self.derivative = difference / dt + + return self.kp * error + self.ki * self.integral + self.kd * self.derivative diff --git a/ch-11/2-speed-control-acceleration/pio_encoder.py b/ch-11/2-speed-control-acceleration/pio_encoder.py new file mode 100644 index 0000000..b831bf4 --- /dev/null +++ b/ch-11/2-speed-control-acceleration/pio_encoder.py @@ -0,0 +1,84 @@ +import rp2pio +import adafruit_pioasm +import array +import asyncio + + +program = """ +; use the osr for count +; input pins c1 c2 + + set y, 0 ; clear y + mov osr, y ; and clear osr +read: + ; x will be the old value + ; y the new values + mov x, y ; store old Y in x + in null, 32 ; Clear ISR - using y + in pins, 2 ; read two pins into y + mov y, isr + jmp x!=y, different ; Jump if its different + jmp read ; otherwise loop back to read + +different: + ; x has old value, y has new. + ; extract the upper bit of X. + in x, 31 ; get bit 31 - old p1 (remember which direction it came in) + in null, 31 ; keep only 1 bit + mov x, isr ; put this back in x + jmp !x, c1_old_zero + +c1_old_not_zero: + jmp pin, count_up + jmp count_down + +c1_old_zero: + jmp pin, count_down + ; fall through +count_up: + ; for a clockwise move - we'll add 1 by inverting + mov x, ~ osr ; store inverted OSR on x + jmp x--, fake ; use jump to take off 1 +fake: + mov x, ~ x ; invert back + jmp send +count_down: + ; for a clockwise move, just take one off + mov x, osr ; store osr in x + jmp x--, send ; dec and send +send: + ; send x. + mov isr, x ; send it + push noblock ; put ISR into input FIFO + mov osr, x ; put X back in OSR + jmp read ; loop back +""" + +assembled = adafruit_pioasm.assemble(program) + + +class QuadratureEncoder: + def __init__(self, first_pin, second_pin, reversed=False): + """Encoder with 2 pins. Must use sequential pins on the board""" + self.sm = rp2pio.StateMachine( + assembled, + frequency=0, + first_in_pin=first_pin, + jmp_pin=second_pin, + in_pin_count=2, + ) + self.reversed = reversed + self._buffer = array.array("i", [0]) + asyncio.create_task(self.poll_loop()) + + async def poll_loop(self): + while True: + await asyncio.sleep(0) + while self.sm.in_waiting: + self.sm.readinto(self._buffer) + + def read(self): + if self.reversed: + return -self._buffer[0] + else: + return self._buffer[0] diff --git a/ch-11/2-speed-control-acceleration/robot.py b/ch-11/2-speed-control-acceleration/robot.py new file mode 100755 index 0000000..bc83e0b --- /dev/null +++ b/ch-11/2-speed-control-acceleration/robot.py @@ -0,0 +1,65 @@ +import board +import pwmio +import pio_encoder +import busio +import adafruit_vl53l1x +import math +import busio + +uart = busio.UART(board.GP12, board.GP13, baudrate=9600) + +wheel_diameter_mm = 70 +wheel_circumference_mm = math.pi * wheel_diameter_mm +gear_ratio = 298 +encoder_poles = 28 +ticks_per_revolution = encoder_poles * gear_ratio +ticks_to_mm_const = wheel_circumference_mm / ticks_per_revolution + +def ticks_to_mm(ticks): + return ticks_to_mm_const * ticks + +motor_A2 = pwmio.PWMOut(board.GP17, frequency=100) +motor_A1 = pwmio.PWMOut(board.GP16, frequency=100) +motor_B2 = pwmio.PWMOut(board.GP18, frequency=100) +motor_B1 = pwmio.PWMOut(board.GP19, frequency=100) + +right_motor = motor_A1, motor_A2 +left_motor = motor_B1, motor_B2 + +right_encoder = pio_encoder.QuadratureEncoder(board.GP20, board.GP21) +left_encoder = pio_encoder.QuadratureEncoder(board.GP26, board.GP27, reversed=True) + +i2c0 = busio.I2C(sda=board.GP0, scl=board.GP1) +i2c1 = busio.I2C(sda=board.GP2, scl=board.GP3) + +left_distance = adafruit_vl53l1x.VL53L1X(i2c0) +right_distance = adafruit_vl53l1x.VL53L1X(i2c1) + + +def stop(): + motor_A1.duty_cycle = 0 + motor_A2.duty_cycle = 0 + motor_B1.duty_cycle = 0 + motor_B2.duty_cycle = 0 + + +def set_speed(motor, speed): + # Swap motor pins if we reverse the speed + if speed < 0: + direction = motor[1], motor[0] + speed = -speed + else: + direction = motor + speed = min(speed, 1) # limit to 1.0 + max_speed = 2 ** 16 - 1 + + direction[0].duty_cycle = int(max_speed * speed) + direction[1].duty_cycle = 0 + + +def set_left(speed): + set_speed(left_motor, speed) + + +def set_right(speed): + set_speed(right_motor, speed) diff --git a/ch-11/3-known-distance-acceleration/code.py b/ch-11/3-known-distance-acceleration/code.py new file mode 100644 index 0000000..311a67b --- /dev/null +++ b/ch-11/3-known-distance-acceleration/code.py @@ -0,0 +1,102 @@ +import asyncio +import time +import robot +import pid_controller + +class Settings: + speed = 0.17 + time_interval = 0.2 + motors_enabled = False + + +class SpeedController: + def __init__(self, encoder, motor_fn): + self.encoder = encoder + self.motor_fn = motor_fn + self.pid = pid_controller.PIDController(3, 0, 1) + self.reset() + + def reset(self): + self.last_ticks = self.encoder.read() + self.error = 0 + self.speed = 0 + self.actual_speed = 0 + self.pid.reset() + + def update(self, dt): + current_ticks = self.encoder.read() + speed_in_ticks = (current_ticks - self.last_ticks) / dt + self.last_ticks = current_ticks + self.actual_speed = robot.ticks_to_mm(speed_in_ticks) / 1000 + # calculate the error + self.error = (Settings.speed * Settings.motors_enabled) - self.actual_speed + # calculate the control signal + control_signal = self.pid.calculate(self.error, dt) + self.speed += control_signal + self.motor_fn(self.speed) + + +left = SpeedController(robot.left_encoder, robot.set_left) +right = SpeedController(robot.right_encoder, robot.set_right) + + +async def motor_speed_loop(): + last_time = time.monotonic() + while True: + await asyncio.sleep(Settings.time_interval) + current_time = time.monotonic() + dt = current_time - last_time + last_time = current_time + left.update(dt) + right.update(dt) + robot.uart.write(f"0, {left.actual_speed:.2f},{Settings.speed:.2f}\n".encode()) + + +async def stop_motors_after(seconds): + await asyncio.sleep(seconds) + Settings.motors_enabled = False + + +async def command_handler(): + while True: + if robot.uart.in_waiting: + command = robot.uart.readline().decode().strip() + # PID settings + if command.startswith("P"): + left.pid.kp = float(command[1:]) + right.pid.kp = float(command[1:]) + elif command.startswith("I"): + left.pid.ki = float(command[1:]) + left.pid.reset() + right.pid.ki = float(command[1:]) + right.pid.reset() + elif command.startswith("D"): + left.pid.kd = float(command[1:]) + right.pid.kd = float(command[1:]) + elif command.startswith("T"): + Settings.time_interval = float(command[1:]) + # Speed settings + elif command.startswith("M"): + Settings.speed = float(command[1:]) + # Start/stop commands + elif command == "O": + Settings.motors_enabled = False + elif command.startswith("O"): + await asyncio.sleep(5) + asyncio.create_task(stop_motors_after(float(command[1:]))) + Settings.motors_enabled = True + left.reset() + right.reset() + # Print settings + elif command.startswith("?"): + robot.uart.write(f"M{Settings.speed:.1f}\n".encode()) + robot.uart.write(f"P{left.pid.kp:.2f}:I{left.pid.ki:.2f}:D{left.pid.kd:.2f}\n".encode()) + robot.uart.write(f"T{Settings.time_interval:.1f}\n".encode()) + await asyncio.sleep(3) + await asyncio.sleep(0) + +try: + asyncio.create_task(motor_speed_loop()) + asyncio.run(command_handler()) +finally: + robot.stop() diff --git a/ch-11/3-known-distance-acceleration/pid_controller.py b/ch-11/3-known-distance-acceleration/pid_controller.py new file mode 100644 index 0000000..a09358f --- /dev/null +++ b/ch-11/3-known-distance-acceleration/pid_controller.py @@ -0,0 +1,27 @@ +class PIDController: + def __init__(self, kp, ki, kd, d_filter_gain=0.1, imax=None, imin=None): + self.kp = kp + self.ki = ki + self.kd = kd + self.d_filter_gain = d_filter_gain + self.imax = imax + self.imin = imin + self.reset() + + def reset(self): + self.integral = 0 + self.error_prev = 0 + self.derivative = 0 + + def calculate(self, error, dt): + self.integral += error * dt + if self.imax is not None and self.integral > self.imax: + self.integral = self.imax + if self.imin is not None and self.integral < self.imin: + self.integral = self.imin + # Add a low pass filter to the difference + difference = (error - self.error_prev) * self.d_filter_gain + self.error_prev += difference + self.derivative = difference / dt + + return self.kp * error + self.ki * self.integral + self.kd * self.derivative diff --git a/ch-11/3-known-distance-acceleration/pio_encoder.py b/ch-11/3-known-distance-acceleration/pio_encoder.py new file mode 100644 index 0000000..b831bf4 --- /dev/null +++ b/ch-11/3-known-distance-acceleration/pio_encoder.py @@ -0,0 +1,84 @@ +import rp2pio +import adafruit_pioasm +import array +import asyncio + + +program = """ +; use the osr for count +; input pins c1 c2 + + set y, 0 ; clear y + mov osr, y ; and clear osr +read: + ; x will be the old value + ; y the new values + mov x, y ; store old Y in x + in null, 32 ; Clear ISR - using y + in pins, 2 ; read two pins into y + mov y, isr + jmp x!=y, different ; Jump if its different + jmp read ; otherwise loop back to read + +different: + ; x has old value, y has new. + ; extract the upper bit of X. + in x, 31 ; get bit 31 - old p1 (remember which direction it came in) + in null, 31 ; keep only 1 bit + mov x, isr ; put this back in x + jmp !x, c1_old_zero + +c1_old_not_zero: + jmp pin, count_up + jmp count_down + +c1_old_zero: + jmp pin, count_down + ; fall through +count_up: + ; for a clockwise move - we'll add 1 by inverting + mov x, ~ osr ; store inverted OSR on x + jmp x--, fake ; use jump to take off 1 +fake: + mov x, ~ x ; invert back + jmp send +count_down: + ; for a clockwise move, just take one off + mov x, osr ; store osr in x + jmp x--, send ; dec and send +send: + ; send x. + mov isr, x ; send it + push noblock ; put ISR into input FIFO + mov osr, x ; put X back in OSR + jmp read ; loop back +""" + +assembled = adafruit_pioasm.assemble(program) + + +class QuadratureEncoder: + def __init__(self, first_pin, second_pin, reversed=False): + """Encoder with 2 pins. Must use sequential pins on the board""" + self.sm = rp2pio.StateMachine( + assembled, + frequency=0, + first_in_pin=first_pin, + jmp_pin=second_pin, + in_pin_count=2, + ) + self.reversed = reversed + self._buffer = array.array("i", [0]) + asyncio.create_task(self.poll_loop()) + + async def poll_loop(self): + while True: + await asyncio.sleep(0) + while self.sm.in_waiting: + self.sm.readinto(self._buffer) + + def read(self): + if self.reversed: + return -self._buffer[0] + else: + return self._buffer[0] diff --git a/ch-11/3-known-distance-acceleration/robot.py b/ch-11/3-known-distance-acceleration/robot.py new file mode 100755 index 0000000..bc83e0b --- /dev/null +++ b/ch-11/3-known-distance-acceleration/robot.py @@ -0,0 +1,65 @@ +import board +import pwmio +import pio_encoder +import busio +import adafruit_vl53l1x +import math +import busio + +uart = busio.UART(board.GP12, board.GP13, baudrate=9600) + +wheel_diameter_mm = 70 +wheel_circumference_mm = math.pi * wheel_diameter_mm +gear_ratio = 298 +encoder_poles = 28 +ticks_per_revolution = encoder_poles * gear_ratio +ticks_to_mm_const = wheel_circumference_mm / ticks_per_revolution + +def ticks_to_mm(ticks): + return ticks_to_mm_const * ticks + +motor_A2 = pwmio.PWMOut(board.GP17, frequency=100) +motor_A1 = pwmio.PWMOut(board.GP16, frequency=100) +motor_B2 = pwmio.PWMOut(board.GP18, frequency=100) +motor_B1 = pwmio.PWMOut(board.GP19, frequency=100) + +right_motor = motor_A1, motor_A2 +left_motor = motor_B1, motor_B2 + +right_encoder = pio_encoder.QuadratureEncoder(board.GP20, board.GP21) +left_encoder = pio_encoder.QuadratureEncoder(board.GP26, board.GP27, reversed=True) + +i2c0 = busio.I2C(sda=board.GP0, scl=board.GP1) +i2c1 = busio.I2C(sda=board.GP2, scl=board.GP3) + +left_distance = adafruit_vl53l1x.VL53L1X(i2c0) +right_distance = adafruit_vl53l1x.VL53L1X(i2c1) + + +def stop(): + motor_A1.duty_cycle = 0 + motor_A2.duty_cycle = 0 + motor_B1.duty_cycle = 0 + motor_B2.duty_cycle = 0 + + +def set_speed(motor, speed): + # Swap motor pins if we reverse the speed + if speed < 0: + direction = motor[1], motor[0] + speed = -speed + else: + direction = motor + speed = min(speed, 1) # limit to 1.0 + max_speed = 2 ** 16 - 1 + + direction[0].duty_cycle = int(max_speed * speed) + direction[1].duty_cycle = 0 + + +def set_left(speed): + set_speed(left_motor, speed) + + +def set_right(speed): + set_speed(right_motor, speed) diff --git a/ch-11/_test_encoder_output/lpf.py b/ch-11/_test_encoder_output/lpf.py new file mode 100644 index 0000000..fe46441 --- /dev/null +++ b/ch-11/_test_encoder_output/lpf.py @@ -0,0 +1,8 @@ +class LPF: + """Low pass filter.""" + def __init__(self, alpha): + self.alpha = alpha + self.last = 0 + def update(self, value): + self.last = self.alpha * value + (1 - self.alpha) * self.last + return self.last diff --git a/ch-11/pc-live-graph/live_graph.py b/ch-11/pc-live-graph/live_graph.py deleted file mode 100644 index aeb6ea4..0000000 --- a/ch-11/pc-live-graph/live_graph.py +++ /dev/null @@ -1,59 +0,0 @@ -""" Turn JSON data stream into graphs""" -import requests -import json - -import matplotlib.pyplot as plt -from matplotlib.animation import FuncAnimation - -url = "http://192.168.1.128" - - -class AnimatedGraph: - def __init__(self): - self.fields = {} - self.samples = 100 - self.reset() - print("getting source to iterate over") - self.source = self._graph_source() - print("init completed") - - def reset(self): - for field in self.fields: - self.fields[field] = [] - - def _graph_source(self): - while True: - try: - with requests.get(url, timeout=1, stream=True) as response: - print(f"status: {response.status_code}") - yield from response.iter_lines() - except requests.exceptions.RequestException: - pass - - def make_frame(self, frame): - print("loading next item") - item = json.loads(next(self.source)) - print("item loaded") - if 'time' in self.fields and item["time"] < self.fields['time'][-1]: - self.reset() - for field in item: - if field not in self.fields: - self.fields[field] = [] - self.fields[field].append(item[field]) - - if len(self.fields['time'] ) > self.samples: - for field in self.fields: - self.fields[field] = self.fields[field][-self.samples:] - - plt.cla() # clear axes. - # plot the items - for field in self.fields: - if field != "time": - plt.plot("time", field, data=self.fields) - - plt.legend(loc="upper right") - -# Create the animation. gcf - get current figure. random_stream - callback func. -animation = FuncAnimation(plt.gcf(), AnimatedGraph().make_frame, interval=200) -plt.tight_layout() -plt.show() diff --git a/ch-11/pc-live-graph/requirements.txt b/ch-11/pc-live-graph/requirements.txt deleted file mode 100644 index 66ed226..0000000 --- a/ch-11/pc-live-graph/requirements.txt +++ /dev/null @@ -1,2 +0,0 @@ -matplotlib -requests