import asyncio from enum import Enum import struct from led_cmds import * MAGIC_TOKEN_HOST_TO_FW = 0x1d6379e3 MAGIC_TOKEN_FW_TO_HOST = 0x10c65631 class MessageFwToHost(Enum): RFID_TOKEN_READ = 0 ROTARY_ENCODER = 1 TOUCH_BUTTON_PRESS = 2 TOUCH_BUTTON_RELEASE = 3 class MessageHostToFw(Enum): LED_WHEEL_EFFECT_STATIC = 0 LED_WHEEL_EFFECT_ALEXA_SWIPE = 1 LED_WHEEL_EFFECT_CIRCULAR = 2 LED_WHEEL_EFFECT_RANDOM_TWO_COLOR_INTERPOLATION = 3 class TouchButton(Enum): LEFT_FOOT = 0 RIGHT_FOOT = 1 LEFT_EAR = 2 RIGHT_EAR = 3 outgoing_msg_map = { EffectStaticConfig: 0, EffectAlexaSwipeConfig: 1, EffectCircularConfig: 2, EffectRandomTwoColorInterpolationConfig: 3, EffectSwipeAndChange: 4, } class RfidTokenRead: def __init__(self, id: bytes): self.id = id def __repr__(self): return "RFID Token (" + " ".join(f"{v:02x}" for v in self.id) + ")" class RotaryEncoderEvent: def __init__(self, msg_content: bytes): self.position, self.direction = struct.unpack(" str: return "Pressed " + repr(self.touch_button) class TouchButtonRelease: def __init__(self, msg_content: bytes): val = int(msg_content[0]) self.touch_button = TouchButton(val) def __repr__(self) -> str: return "Released " + repr(self.touch_button) incomingMsgMap = { 0: RfidTokenRead, 1: RotaryEncoderEvent, 2: TouchButtonPress, 3: TouchButtonRelease, } class MusicMouseProtocol(asyncio.Protocol): def __init__(self): super() self._msg_callback = None def register_message_callback(self, cb): self._msg_callback = cb def connection_made(self, transport): self.transport = transport self.in_buff = bytes() def send_message(self, message): msg_content = message.as_bytes() header = struct.pack("= HEADER_SIZE: token, msg_type, msg_size = struct.unpack("= HEADER_SIZE + msg_size: self._on_msg_receive(msg_type, self.in_buff[HEADER_SIZE:HEADER_SIZE + msg_size]) self.in_buff = self.in_buff[HEADER_SIZE + msg_size:] else: idx = self.in_buff.find("\n".encode()) if idx >= 0: text_msg = self.in_buff[:idx] print("LOG:", text_msg.decode()) self.in_buff = self.in_buff[idx + 1:] def _on_msg_receive(self, msg_type, msg_payload): parsed_msg = incomingMsgMap[msg_type](msg_payload) if self._msg_callback is not None: self._msg_callback(self, parsed_msg)