import asyncio from enum import Enum import struct from led_cmds import * MAGIC_TOKEN_HOST_TO_FW = 0x1d6379e3 MAGIC_TOKEN_FW_TO_HOST = 0x10c65631 class MessageFwToHost(Enum): RFID_TOKEN_READ = 0 ROTARY_ENCODER = 1 TOUCH_BUTTON_PRESS = 2 TOUCH_BUTTON_RELEASE = 3 class MessageHostToFw(Enum): LED_WHEEL_EFFECT_STATIC = 0 LED_WHEEL_EFFECT_ALEXA_SWIPE = 1 LED_WHEEL_EFFECT_CIRCULAR = 2 LED_WHEEL_EFFECT_RANDOM_TWO_COLOR_INTERPOLATION = 3 class TouchButton(Enum): LEFT_FOOT = 0 RIGHT_FOOT = 1 LEFT_EAR = 2 RIGHT_EAR = 3 led_ring_effect_to_message_id = { EffectStaticConfig: 0, EffectAlexaSwipeConfig: 1, EffectCircularConfig: 2, EffectRandomTwoColorInterpolationConfig: 3, EffectSwipeAndChange: 4, } mouse_led_effect_to_message_id = { EffectStaticConfig: 5, EffectCircularConfig: 6, EffectRandomTwoColorInterpolationConfig: 7 } class RfidTokenRead: def __init__(self, id: bytes): self.id = id def __repr__(self): return "RFID Token (" + " ".join(f"{v:02x}" for v in self.id) + ")" class RotaryEncoderEvent: def __init__(self, msg_content: bytes): self.position, self.direction = struct.unpack(" str: return "Pressed " + repr(self.touch_button) class TouchButtonRelease: def __init__(self, msg_content: bytes): val = int(msg_content[0]) self.touch_button = TouchButton(val) def __repr__(self) -> str: return "Released " + repr(self.touch_button) class ButtonEvent: button_name = {1: 'left', 2: 'right', 3: 'rotary'} event_name = { 0: 'pressed', 1: 'released', 2: 'clicked', 3: 'double_clicked', 4: 'long_pressed', 5: 'repeat_pressed', 6: 'long_released' } def __init__(self, msg_content: bytes): button_nr, event_nr = struct.unpack(" str: return f"Button {self.button} {self.event}" incomingMsgMap = { 0: RfidTokenRead, 1: RotaryEncoderEvent, 2: TouchButtonPress, 3: TouchButtonRelease, 4: ButtonEvent, } class MusicMouseProtocol(asyncio.Protocol): def __init__(self): super() self._msg_callback = None def register_message_callback(self, cb): self._msg_callback = cb def connection_made(self, transport): self.transport = transport self.in_buff = bytes() def led_ring_effect(self, effect_cfg): msg_content = effect_cfg.as_bytes() header = struct.pack("= HEADER_SIZE: token, msg_type, msg_size = struct.unpack("= HEADER_SIZE + msg_size: self._on_msg_receive(msg_type, self.in_buff[HEADER_SIZE:HEADER_SIZE + msg_size]) self.in_buff = self.in_buff[HEADER_SIZE + msg_size:] else: idx = self.in_buff.find("\n".encode()) if idx >= 0: text_msg = self.in_buff[:idx] print("LOG:", text_msg.decode()) self.in_buff = self.in_buff[idx + 1:] def _on_msg_receive(self, msg_type, msg_payload): parsed_msg = incomingMsgMap[msg_type](msg_payload) if self._msg_callback is not None: self._msg_callback(self, parsed_msg)