musicmouse/espmusicmouse/host_driver/host_driver.py

174 lines
4.8 KiB
Python
Raw Normal View History

2021-11-25 22:20:04 +01:00
import asyncio
from enum import Enum
import struct
2021-11-27 19:04:08 +01:00
from led_cmds import *
2021-11-25 22:20:04 +01:00
MAGIC_TOKEN_HOST_TO_FW = 0x1d6379e3
MAGIC_TOKEN_FW_TO_HOST = 0x10c65631
class MessageFwToHost(Enum):
RFID_TOKEN_READ = 0
2021-12-15 22:47:28 +01:00
ROTARY_ENCODER = 1
TOUCH_BUTTON_PRESS = 2
TOUCH_BUTTON_RELEASE = 3
2021-11-25 22:20:04 +01:00
class MessageHostToFw(Enum):
LED_WHEEL_EFFECT_STATIC = 0
LED_WHEEL_EFFECT_ALEXA_SWIPE = 1
LED_WHEEL_EFFECT_CIRCULAR = 2
LED_WHEEL_EFFECT_RANDOM_TWO_COLOR_INTERPOLATION = 3
2021-12-15 22:47:28 +01:00
class TouchButton(Enum):
LEFT_FOOT = 0
RIGHT_FOOT = 1
LEFT_EAR = 2
RIGHT_EAR = 3
2021-12-16 22:34:40 +01:00
led_ring_effect_to_message_id = {
2021-11-27 19:04:08 +01:00
EffectStaticConfig: 0,
EffectAlexaSwipeConfig: 1,
EffectCircularConfig: 2,
2021-11-27 21:18:54 +01:00
EffectRandomTwoColorInterpolationConfig: 3,
EffectSwipeAndChange: 4,
2021-12-20 20:41:50 +01:00
EffectReverseSwipe: 5,
2021-11-27 19:04:08 +01:00
}
2021-11-25 22:20:04 +01:00
2021-12-16 22:34:40 +01:00
mouse_led_effect_to_message_id = {
2021-12-20 20:41:50 +01:00
EffectStaticConfig: 6,
EffectCircularConfig: 7,
EffectRandomTwoColorInterpolationConfig: 8,
EffectSwipeAndChange: 9,
EffectReverseSwipe: 10,
2021-12-16 22:34:40 +01:00
}
2021-11-25 22:20:04 +01:00
2021-11-27 19:04:08 +01:00
class RfidTokenRead:
def __init__(self, id: bytes):
self.id = id
2021-11-25 22:20:04 +01:00
2021-11-27 19:04:08 +01:00
def __repr__(self):
return "RFID Token (" + " ".join(f"{v:02x}" for v in self.id) + ")"
2021-11-25 22:20:04 +01:00
2021-12-15 22:47:28 +01:00
class RotaryEncoderEvent:
def __init__(self, msg_content: bytes):
self.position, self.direction = struct.unpack("<iB", msg_content)
def __repr__(self):
return f"Rotary event: pos {self.position}, dir {self.direction}"
class TouchButtonPress:
def __init__(self, msg_content: bytes):
val = int(msg_content[0])
self.touch_button = TouchButton(val)
def __repr__(self) -> str:
return "Pressed " + repr(self.touch_button)
class TouchButtonRelease:
def __init__(self, msg_content: bytes):
val = int(msg_content[0])
self.touch_button = TouchButton(val)
def __repr__(self) -> str:
return "Released " + repr(self.touch_button)
2021-12-16 22:34:40 +01:00
class ButtonEvent:
button_name = {1: 'left', 2: 'right', 3: 'rotary'}
event_name = {
0: 'pressed',
1: 'released',
2: 'clicked',
3: 'double_clicked',
4: 'long_pressed',
5: 'repeat_pressed',
6: 'long_released'
}
def __init__(self, msg_content: bytes):
button_nr, event_nr = struct.unpack("<BB", msg_content)
self.button = self.button_name[button_nr]
self.event = self.event_name[event_nr]
def __repr__(self) -> str:
return f"Button {self.button} {self.event}"
2021-12-15 22:47:28 +01:00
incomingMsgMap = {
0: RfidTokenRead,
1: RotaryEncoderEvent,
2: TouchButtonPress,
3: TouchButtonRelease,
2021-12-16 22:34:40 +01:00
4: ButtonEvent,
2021-12-15 22:47:28 +01:00
}
2021-11-25 22:20:04 +01:00
class MusicMouseProtocol(asyncio.Protocol):
2021-11-27 21:18:54 +01:00
def __init__(self):
super()
self._msg_callback = None
def register_message_callback(self, cb):
self._msg_callback = cb
2021-11-25 22:20:04 +01:00
def connection_made(self, transport):
self.transport = transport
2021-11-27 19:04:08 +01:00
self.in_buff = bytes()
2021-11-25 22:20:04 +01:00
2021-12-16 22:34:40 +01:00
def led_ring_effect(self, effect_cfg):
msg_content = effect_cfg.as_bytes()
header = struct.pack("<IBH", MAGIC_TOKEN_HOST_TO_FW,
led_ring_effect_to_message_id[type(effect_cfg)], len(msg_content))
self.transport.write(header + msg_content)
def mouse_led_effect(self, effect_cfg):
msg_content = effect_cfg.as_bytes()
header = struct.pack("<IBH", MAGIC_TOKEN_HOST_TO_FW,
mouse_led_effect_to_message_id[type(effect_cfg)], len(msg_content))
2021-11-25 22:20:04 +01:00
self.transport.write(header + msg_content)
def data_received(self, data):
2021-11-27 19:04:08 +01:00
self.in_buff += data
self._parse_message()
2021-11-25 22:20:04 +01:00
def connection_lost(self, exc):
print('port closed')
self.transport.loop.stop()
def pause_writing(self):
print('pause writing')
print(self.transport.get_write_buffer_size())
def resume_writing(self):
print(self.transport.get_write_buffer_size())
print('resume writing')
2021-11-27 19:04:08 +01:00
def _parse_message(self):
HEADER_SIZE = 4 + 1 + 2
if len(self.in_buff) == 0:
return
if len(self.in_buff) >= HEADER_SIZE:
token, msg_type, msg_size = struct.unpack("<IBH", self.in_buff[:HEADER_SIZE])
if token == MAGIC_TOKEN_FW_TO_HOST and len(self.in_buff) >= HEADER_SIZE + msg_size:
self._on_msg_receive(msg_type, self.in_buff[HEADER_SIZE:HEADER_SIZE + msg_size])
self.in_buff = self.in_buff[HEADER_SIZE + msg_size:]
else:
idx = self.in_buff.find("\n".encode())
if idx >= 0:
text_msg = self.in_buff[:idx]
print("LOG:", text_msg.decode())
self.in_buff = self.in_buff[idx + 1:]
def _on_msg_receive(self, msg_type, msg_payload):
parsed_msg = incomingMsgMap[msg_type](msg_payload)
2021-11-27 21:18:54 +01:00
if self._msg_callback is not None:
self._msg_callback(self, parsed_msg)