Firmware cleanup

This commit is contained in:
Martin Bauer
2021-11-27 21:18:54 +01:00
parent 11db5763eb
commit 4fbd7f0f1b
12 changed files with 242 additions and 142 deletions

View File

@@ -1,7 +1,5 @@
import asyncio
import serial_asyncio
from enum import Enum
from dataclasses import dataclass
import struct
from led_cmds import *
@@ -23,10 +21,12 @@ class MessageHostToFw(Enum):
LED_WHEEL_EFFECT_RANDOM_TWO_COLOR_INTERPOLATION = 3
outgoingMsgMap = {
outgoing_msg_map = {
EffectStaticConfig: 0,
EffectAlexaSwipeConfig: 1,
EffectCircularConfig: 2,
EffectRandomTwoColorInterpolationConfig: 3,
EffectSwipeAndChange: 4,
}
@@ -42,17 +42,21 @@ incomingMsgMap = {0: RfidTokenRead}
class MusicMouseProtocol(asyncio.Protocol):
def __init__(self):
super()
self._msg_callback = None
def register_message_callback(self, cb):
self._msg_callback = cb
def connection_made(self, transport):
self.transport = transport
self.in_buff = bytes()
def send_message(self, message):
msg_content = message.as_bytes()
print("Sending message content", len(msg_content))
header = struct.pack("<IBH", MAGIC_TOKEN_HOST_TO_FW, outgoingMsgMap[type(message)],
header = struct.pack("<IBH", MAGIC_TOKEN_HOST_TO_FW, outgoing_msg_map[type(message)],
len(msg_content))
print(repr(header + msg_content))
self.transport.write(header + msg_content)
def data_received(self, data):
@@ -89,25 +93,5 @@ class MusicMouseProtocol(asyncio.Protocol):
def _on_msg_receive(self, msg_type, msg_payload):
parsed_msg = incomingMsgMap[msg_type](msg_payload)
print("MSG:", parsed_msg)
async def main(protocol: MusicMouseProtocol):
for i in range(10):
protocol.send_message(EffectStaticConfig(ColorRGBW(1, 0, 0, 0)))
await asyncio.sleep(2)
protocol.send_message(EffectStaticConfig(ColorRGBW(0, 1, 0, 0)))
await asyncio.sleep(2)
protocol.send_message(EffectStaticConfig(ColorRGBW(0, 1, 1, 0)))
await asyncio.sleep(2)
loop = asyncio.get_event_loop()
coro = serial_asyncio.create_serial_connection(loop,
MusicMouseProtocol,
'/dev/ttyUSB0',
baudrate=115200)
transport, protocol = loop.run_until_complete(coro)
#loop.create_task(main(protocol))
loop.run_forever()
loop.close()
if self._msg_callback is not None:
self._msg_callback(self, parsed_msg)

View File

@@ -9,8 +9,13 @@ class ColorRGBW:
b: float
w: float
def repr(self):
return f"ColorRGBW({self.r}, {self.g}, {self.b}, {self.w}"
def as_bytes(self) -> bytes:
return struct.pack("<BBBB", self.r * 255, self.g * 255, self.b * 255, self.w * 255)
assert self.is_valid(), "Trying to send invalid " + repr(self)
return struct.pack("<BBBB", int(self.r * 255), int(self.g * 255), int(self.b * 255),
int(self.w * 255))
def is_valid(self):
vals = (self.r, self.g, self.b, self.w)
@@ -46,14 +51,14 @@ class EffectStaticConfig:
@dataclass
class EffectAlexaSwipeConfig:
primary_color_width: float # in degrees
transition_width: float # in degrees
swipe_speed: float # in degrees per second
bell_curve_width_in_leds: float
start_position: float # in degrees
forward: bool
primary_color: ColorRGBW
secondary_color: ColorRGBW
primary_color_width: float = 20 # in degrees
transition_width: float = 30 # in degrees
swipe_speed: float = 3 * 360 # in degrees per second
bell_curve_width_in_leds: float = 3
start_position: float = 180 # in degrees
forward: bool = True
primary_color: ColorRGBW = ColorRGBW(0, 0, 1, 0)
secondary_color: ColorRGBW = ColorRGBW(0, 200 / 255, 1, 0)
def as_bytes(self) -> bytes:
return struct.pack(
@@ -64,25 +69,34 @@ class EffectAlexaSwipeConfig:
@dataclass
class EffectRandomTwoColorInterpolationConfig:
cycle_durations_ms: int
start_with_existing: bool
num_segments: int
hue1_random: bool
hue2_random: bool
color1: ColorHSV
color2: ColorHSV
cycle_durations_ms: int = 6000
start_with_existing: bool = True
num_segments: int = 3
hue1_random: bool = False
hue2_random: bool = False
color1: ColorHSV = ColorHSV(240, 1, 1)
color2: ColorHSV = ColorHSV(192, 1, 1)
def as_bytes(self) -> bytes:
return struct.pack("<i?i??", self.cycle_durations_ms, self.start_with_existing,
self.num_segments, self.hue1_random,
self.hue2_random) + self.color1.as_bytes(), +self.color2.as_bytes()
self.hue2_random) + self.color1.as_bytes() + self.color2.as_bytes()
@dataclass
class EffectCircularConfig:
speed: float # in degrees per second
width: float # in degrees
color: ColorRGBW
speed: float = 360 # in degrees per second
width: float = 180 # in degrees
color: ColorRGBW = ColorRGBW(0, 0, 1, 0)
def as_bytes(self) -> bytes:
return struct.pack("<ff", self.speed, self.width) + self.color.as_bytes()
@dataclass
class EffectSwipeAndChange:
swipe: EffectAlexaSwipeConfig = EffectAlexaSwipeConfig()
change: EffectRandomTwoColorInterpolationConfig = EffectRandomTwoColorInterpolationConfig()
def as_bytes(self) -> bytes:
return self.swipe.as_bytes() + self.change.as_bytes()

View File

@@ -0,0 +1,47 @@
import asyncio
import serial_asyncio
from led_cmds import (ColorRGBW, ColorHSV, EffectStaticConfig,
EffectRandomTwoColorInterpolationConfig, EffectAlexaSwipeConfig,
EffectSwipeAndChange)
from host_driver import MusicMouseProtocol, RfidTokenRead
rfid_token_map = {
bytes.fromhex("0000000000"): "None",
bytes.fromhex("88041174e9"): "Elephant",
bytes.fromhex("8804ce7230"): "Fox",
bytes.fromhex("88040d71f0"): "Owl",
}
def on_firmware_msg(protocol, message):
print("Got message", message)
if isinstance(message, RfidTokenRead) and message.id in rfid_token_map:
if rfid_token_map[message.id] == "Elephant":
print("Elephant")
eff = EffectStaticConfig(ColorRGBW(0, 0, 1, 0))
protocol.send_message(eff)
elif rfid_token_map[message.id] == "Fox":
print("Fox")
eff = EffectRandomTwoColorInterpolationConfig()
protocol.send_message(eff)
elif rfid_token_map[message.id] == "Owl":
print("Owl")
eff = EffectSwipeAndChange()
eff.swipe.primary_color = ColorRGBW(1, 1, 0, 0)
protocol.send_message(eff)
elif rfid_token_map[message.id] == "None":
eff = EffectAlexaSwipeConfig()
eff.forward = False
print("Nothing")
protocol.send_message(eff)
loop = asyncio.get_event_loop()
coro = serial_asyncio.create_serial_connection(loop,
MusicMouseProtocol,
'/dev/ttyUSB0',
baudrate=115200)
transport, protocol = loop.run_until_complete(coro)
protocol.register_message_callback(on_firmware_msg)
loop.run_forever()
loop.close()