137 lines
		
	
	
		
			3.7 KiB
		
	
	
	
		
			Python
		
	
	
	
			
		
		
	
	
			137 lines
		
	
	
		
			3.7 KiB
		
	
	
	
		
			Python
		
	
	
	
| import asyncio
 | |
| from enum import Enum
 | |
| import struct
 | |
| 
 | |
| from led_cmds import *
 | |
| 
 | |
| MAGIC_TOKEN_HOST_TO_FW = 0x1d6379e3
 | |
| MAGIC_TOKEN_FW_TO_HOST = 0x10c65631
 | |
| 
 | |
| 
 | |
| class MessageFwToHost(Enum):
 | |
|     RFID_TOKEN_READ = 0
 | |
|     ROTARY_ENCODER = 1
 | |
|     TOUCH_BUTTON_PRESS = 2
 | |
|     TOUCH_BUTTON_RELEASE = 3
 | |
| 
 | |
| 
 | |
| class MessageHostToFw(Enum):
 | |
|     LED_WHEEL_EFFECT_STATIC = 0
 | |
|     LED_WHEEL_EFFECT_ALEXA_SWIPE = 1
 | |
|     LED_WHEEL_EFFECT_CIRCULAR = 2
 | |
|     LED_WHEEL_EFFECT_RANDOM_TWO_COLOR_INTERPOLATION = 3
 | |
| 
 | |
| 
 | |
| class TouchButton(Enum):
 | |
|     LEFT_FOOT = 0
 | |
|     RIGHT_FOOT = 1
 | |
|     LEFT_EAR = 2
 | |
|     RIGHT_EAR = 3
 | |
| 
 | |
| 
 | |
| outgoing_msg_map = {
 | |
|     EffectStaticConfig: 0,
 | |
|     EffectAlexaSwipeConfig: 1,
 | |
|     EffectCircularConfig: 2,
 | |
|     EffectRandomTwoColorInterpolationConfig: 3,
 | |
|     EffectSwipeAndChange: 4,
 | |
| }
 | |
| 
 | |
| 
 | |
| class RfidTokenRead:
 | |
|     def __init__(self, id: bytes):
 | |
|         self.id = id
 | |
| 
 | |
|     def __repr__(self):
 | |
|         return "RFID Token (" + " ".join(f"{v:02x}" for v in self.id) + ")"
 | |
| 
 | |
| 
 | |
| class RotaryEncoderEvent:
 | |
|     def __init__(self, msg_content: bytes):
 | |
|         self.position, self.direction = struct.unpack("<iB", msg_content)
 | |
| 
 | |
|     def __repr__(self):
 | |
|         return f"Rotary event: pos {self.position}, dir {self.direction}"
 | |
| 
 | |
| 
 | |
| class TouchButtonPress:
 | |
|     def __init__(self, msg_content: bytes):
 | |
|         val = int(msg_content[0])
 | |
|         self.touch_button = TouchButton(val)
 | |
| 
 | |
|     def __repr__(self) -> str:
 | |
|         return "Pressed " + repr(self.touch_button)
 | |
| 
 | |
| 
 | |
| class TouchButtonRelease:
 | |
|     def __init__(self, msg_content: bytes):
 | |
|         val = int(msg_content[0])
 | |
|         self.touch_button = TouchButton(val)
 | |
| 
 | |
|     def __repr__(self) -> str:
 | |
|         return "Released " + repr(self.touch_button)
 | |
| 
 | |
| 
 | |
| incomingMsgMap = {
 | |
|     0: RfidTokenRead,
 | |
|     1: RotaryEncoderEvent,
 | |
|     2: TouchButtonPress,
 | |
|     3: TouchButtonRelease,
 | |
| }
 | |
| 
 | |
| 
 | |
| class MusicMouseProtocol(asyncio.Protocol):
 | |
|     def __init__(self):
 | |
|         super()
 | |
|         self._msg_callback = None
 | |
| 
 | |
|     def register_message_callback(self, cb):
 | |
|         self._msg_callback = cb
 | |
| 
 | |
|     def connection_made(self, transport):
 | |
|         self.transport = transport
 | |
|         self.in_buff = bytes()
 | |
| 
 | |
|     def send_message(self, message):
 | |
|         msg_content = message.as_bytes()
 | |
|         header = struct.pack("<IBH", MAGIC_TOKEN_HOST_TO_FW, outgoing_msg_map[type(message)],
 | |
|                              len(msg_content))
 | |
|         self.transport.write(header + msg_content)
 | |
| 
 | |
|     def data_received(self, data):
 | |
|         self.in_buff += data
 | |
|         self._parse_message()
 | |
| 
 | |
|     def connection_lost(self, exc):
 | |
|         print('port closed')
 | |
|         self.transport.loop.stop()
 | |
| 
 | |
|     def pause_writing(self):
 | |
|         print('pause writing')
 | |
|         print(self.transport.get_write_buffer_size())
 | |
| 
 | |
|     def resume_writing(self):
 | |
|         print(self.transport.get_write_buffer_size())
 | |
|         print('resume writing')
 | |
| 
 | |
|     def _parse_message(self):
 | |
|         HEADER_SIZE = 4 + 1 + 2
 | |
|         if len(self.in_buff) == 0:
 | |
|             return
 | |
|         if len(self.in_buff) >= HEADER_SIZE:
 | |
|             token, msg_type, msg_size = struct.unpack("<IBH", self.in_buff[:HEADER_SIZE])
 | |
|             if token == MAGIC_TOKEN_FW_TO_HOST and len(self.in_buff) >= HEADER_SIZE + msg_size:
 | |
|                 self._on_msg_receive(msg_type, self.in_buff[HEADER_SIZE:HEADER_SIZE + msg_size])
 | |
|                 self.in_buff = self.in_buff[HEADER_SIZE + msg_size:]
 | |
|             else:
 | |
|                 idx = self.in_buff.find("\n".encode())
 | |
|                 if idx >= 0:
 | |
|                     text_msg = self.in_buff[:idx]
 | |
|                     print("LOG:", text_msg.decode())
 | |
|                     self.in_buff = self.in_buff[idx + 1:]
 | |
| 
 | |
|     def _on_msg_receive(self, msg_type, msg_payload):
 | |
|         parsed_msg = incomingMsgMap[msg_type](msg_payload)
 | |
|         if self._msg_callback is not None:
 | |
|             self._msg_callback(self, parsed_msg)
 |