import asyncio from enum import Enum import struct from led_cmds import * MAGIC_TOKEN_HOST_TO_FW = 0x1d6379e3 MAGIC_TOKEN_FW_TO_HOST = 0x10c65631 class MessageFwToHost(Enum): RFID_TOKEN_READ = 0 BUTTON_NORMAL_PRESS = 1 ROTARY_ENCODER = 2 class MessageHostToFw(Enum): LED_WHEEL_EFFECT_STATIC = 0 LED_WHEEL_EFFECT_ALEXA_SWIPE = 1 LED_WHEEL_EFFECT_CIRCULAR = 2 LED_WHEEL_EFFECT_RANDOM_TWO_COLOR_INTERPOLATION = 3 outgoing_msg_map = { EffectStaticConfig: 0, EffectAlexaSwipeConfig: 1, EffectCircularConfig: 2, EffectRandomTwoColorInterpolationConfig: 3, EffectSwipeAndChange: 4, } class RfidTokenRead: def __init__(self, id: bytes): self.id = id def __repr__(self): return "RFID Token (" + " ".join(f"{v:02x}" for v in self.id) + ")" incomingMsgMap = {0: RfidTokenRead} class MusicMouseProtocol(asyncio.Protocol): def __init__(self): super() self._msg_callback = None def register_message_callback(self, cb): self._msg_callback = cb def connection_made(self, transport): self.transport = transport self.in_buff = bytes() def send_message(self, message): msg_content = message.as_bytes() header = struct.pack("= HEADER_SIZE: token, msg_type, msg_size = struct.unpack("= HEADER_SIZE + msg_size: self._on_msg_receive(msg_type, self.in_buff[HEADER_SIZE:HEADER_SIZE + msg_size]) self.in_buff = self.in_buff[HEADER_SIZE + msg_size:] else: idx = self.in_buff.find("\n".encode()) if idx >= 0: text_msg = self.in_buff[:idx] print("LOG:", text_msg.decode()) self.in_buff = self.in_buff[idx + 1:] def _on_msg_receive(self, msg_type, msg_payload): parsed_msg = incomingMsgMap[msg_type](msg_payload) if self._msg_callback is not None: self._msg_callback(self, parsed_msg)