2way host-firmware comm

This commit is contained in:
Martin Bauer
2021-11-27 19:04:08 +01:00
parent e1807360f8
commit 11db5763eb
4 changed files with 137 additions and 60 deletions

View File

@@ -0,0 +1,113 @@
import asyncio
import serial_asyncio
from enum import Enum
from dataclasses import dataclass
import struct
from led_cmds import *
MAGIC_TOKEN_HOST_TO_FW = 0x1d6379e3
MAGIC_TOKEN_FW_TO_HOST = 0x10c65631
class MessageFwToHost(Enum):
RFID_TOKEN_READ = 0
BUTTON_NORMAL_PRESS = 1
ROTARY_ENCODER = 2
class MessageHostToFw(Enum):
LED_WHEEL_EFFECT_STATIC = 0
LED_WHEEL_EFFECT_ALEXA_SWIPE = 1
LED_WHEEL_EFFECT_CIRCULAR = 2
LED_WHEEL_EFFECT_RANDOM_TWO_COLOR_INTERPOLATION = 3
outgoingMsgMap = {
EffectStaticConfig: 0,
EffectAlexaSwipeConfig: 1,
EffectCircularConfig: 2,
}
class RfidTokenRead:
def __init__(self, id: bytes):
self.id = id
def __repr__(self):
return "RFID Token (" + " ".join(f"{v:02x}" for v in self.id) + ")"
incomingMsgMap = {0: RfidTokenRead}
class MusicMouseProtocol(asyncio.Protocol):
def connection_made(self, transport):
self.transport = transport
self.in_buff = bytes()
def send_message(self, message):
msg_content = message.as_bytes()
print("Sending message content", len(msg_content))
header = struct.pack("<IBH", MAGIC_TOKEN_HOST_TO_FW, outgoingMsgMap[type(message)],
len(msg_content))
print(repr(header + msg_content))
self.transport.write(header + msg_content)
def data_received(self, data):
self.in_buff += data
self._parse_message()
def connection_lost(self, exc):
print('port closed')
self.transport.loop.stop()
def pause_writing(self):
print('pause writing')
print(self.transport.get_write_buffer_size())
def resume_writing(self):
print(self.transport.get_write_buffer_size())
print('resume writing')
def _parse_message(self):
HEADER_SIZE = 4 + 1 + 2
if len(self.in_buff) == 0:
return
if len(self.in_buff) >= HEADER_SIZE:
token, msg_type, msg_size = struct.unpack("<IBH", self.in_buff[:HEADER_SIZE])
if token == MAGIC_TOKEN_FW_TO_HOST and len(self.in_buff) >= HEADER_SIZE + msg_size:
self._on_msg_receive(msg_type, self.in_buff[HEADER_SIZE:HEADER_SIZE + msg_size])
self.in_buff = self.in_buff[HEADER_SIZE + msg_size:]
else:
idx = self.in_buff.find("\n".encode())
if idx >= 0:
text_msg = self.in_buff[:idx]
print("LOG:", text_msg.decode())
self.in_buff = self.in_buff[idx + 1:]
def _on_msg_receive(self, msg_type, msg_payload):
parsed_msg = incomingMsgMap[msg_type](msg_payload)
print("MSG:", parsed_msg)
async def main(protocol: MusicMouseProtocol):
for i in range(10):
protocol.send_message(EffectStaticConfig(ColorRGBW(1, 0, 0, 0)))
await asyncio.sleep(2)
protocol.send_message(EffectStaticConfig(ColorRGBW(0, 1, 0, 0)))
await asyncio.sleep(2)
protocol.send_message(EffectStaticConfig(ColorRGBW(0, 1, 1, 0)))
await asyncio.sleep(2)
loop = asyncio.get_event_loop()
coro = serial_asyncio.create_serial_connection(loop,
MusicMouseProtocol,
'/dev/ttyUSB0',
baudrate=115200)
transport, protocol = loop.run_until_complete(coro)
#loop.create_task(main(protocol))
loop.run_forever()
loop.close()

View File

@@ -0,0 +1,88 @@
from dataclasses import dataclass
import struct
@dataclass
class ColorRGBW:
r: float
g: float
b: float
w: float
def as_bytes(self) -> bytes:
return struct.pack("<BBBB", self.r * 255, self.g * 255, self.b * 255, self.w * 255)
def is_valid(self):
vals = (self.r, self.g, self.b, self.w)
return all(0 <= v <= 1 for v in vals)
@dataclass
class ColorHSV:
h: float
s: float
v: float
def as_bytes(self) -> bytes:
return struct.pack("<fff", self.h, self.s, self.v)
def is_valid(self):
if not 0 <= self.h <= 360:
return False
if not 0 <= self.s <= 1:
return False
if not 0 <= self.v <= 2:
return False
return True
@dataclass
class EffectStaticConfig:
color: ColorRGBW
def as_bytes(self) -> bytes:
return self.color.as_bytes()
@dataclass
class EffectAlexaSwipeConfig:
primary_color_width: float # in degrees
transition_width: float # in degrees
swipe_speed: float # in degrees per second
bell_curve_width_in_leds: float
start_position: float # in degrees
forward: bool
primary_color: ColorRGBW
secondary_color: ColorRGBW
def as_bytes(self) -> bytes:
return struct.pack(
"<fffff?", self.primary_color_width, self.transition_width, self.swipe_speed,
self.bell_curve_width_in_leds, self.start_position,
self.forward) + self.primary_color.as_bytes() + self.secondary_color.as_bytes()
@dataclass
class EffectRandomTwoColorInterpolationConfig:
cycle_durations_ms: int
start_with_existing: bool
num_segments: int
hue1_random: bool
hue2_random: bool
color1: ColorHSV
color2: ColorHSV
def as_bytes(self) -> bytes:
return struct.pack("<i?i??", self.cycle_durations_ms, self.start_with_existing,
self.num_segments, self.hue1_random,
self.hue2_random) + self.color1.as_bytes(), +self.color2.as_bytes()
@dataclass
class EffectCircularConfig:
speed: float # in degrees per second
width: float # in degrees
color: ColorRGBW
def as_bytes(self) -> bytes:
return struct.pack("<ff", self.speed, self.width) + self.color.as_bytes()