125 lines
3.4 KiB
Python
125 lines
3.4 KiB
Python
|
import asyncio
|
||
|
import serial_asyncio
|
||
|
from enum import Enum
|
||
|
from dataclasses import dataclass
|
||
|
import struct
|
||
|
|
||
|
MAGIC_TOKEN_HOST_TO_FW = 0x1d6379e3
|
||
|
MAGIC_TOKEN_FW_TO_HOST = 0x10c65631
|
||
|
|
||
|
|
||
|
class MessageFwToHost(Enum):
|
||
|
RFID_TOKEN_READ = 0
|
||
|
BUTTON_NORMAL_PRESS = 1
|
||
|
ROTARY_ENCODER = 2
|
||
|
|
||
|
|
||
|
class MessageHostToFw(Enum):
|
||
|
LED_WHEEL_EFFECT_STATIC = 0
|
||
|
LED_WHEEL_EFFECT_ALEXA_SWIPE = 1
|
||
|
LED_WHEEL_EFFECT_CIRCULAR = 2
|
||
|
LED_WHEEL_EFFECT_RANDOM_TWO_COLOR_INTERPOLATION = 3
|
||
|
|
||
|
|
||
|
@dataclass
|
||
|
class ColorRGBW:
|
||
|
r: float
|
||
|
g: float
|
||
|
b: float
|
||
|
w: float
|
||
|
|
||
|
def as_bytes(self) -> bytes:
|
||
|
return struct.pack("BBBB", self.r * 255, self.g * 255, self.b * 255, self.w * 255)
|
||
|
|
||
|
|
||
|
@dataclass
|
||
|
class EffectStaticConfig:
|
||
|
color: ColorRGBW
|
||
|
|
||
|
def as_bytes(self) -> bytes:
|
||
|
return self.color.as_bytes()
|
||
|
|
||
|
|
||
|
@dataclass
|
||
|
class EffectAlexaSwipeConfig:
|
||
|
primaryColorWidth: float # in degrees
|
||
|
transitionWidth: float # in degrees
|
||
|
swipeSpeed: float # in degrees per second
|
||
|
bellCurveWidthInLeds: float
|
||
|
startPosition: float # in degrees
|
||
|
forward: bool
|
||
|
primaryColor: ColorRGBW
|
||
|
secondaryColor: ColorRGBW
|
||
|
|
||
|
def as_bytes(self) -> bytes:
|
||
|
return struct.pack(
|
||
|
"fffff?", self.primaryColorWidth, self.transitionWidth, self.swipeSpeed,
|
||
|
self.bellCurveWidthInLeds, self.startPosition,
|
||
|
self.forward) + self.primaryColor.as_bytes() + self.secondaryColor.as_bytes()
|
||
|
|
||
|
|
||
|
@dataclass
|
||
|
class EffectCircularConfig:
|
||
|
speed: float # in degrees per second
|
||
|
width: float # in degrees
|
||
|
color: ColorRGBW
|
||
|
|
||
|
def as_bytes(self) -> bytes:
|
||
|
return struct.pack("ff", self.speed, self.width) + self.color.as_bytes()
|
||
|
|
||
|
|
||
|
messageTypeMap = {
|
||
|
EffectStaticConfig: 0,
|
||
|
EffectAlexaSwipeConfig: 1,
|
||
|
EffectCircularConfig: 2,
|
||
|
}
|
||
|
|
||
|
|
||
|
class MusicMouseProtocol(asyncio.Protocol):
|
||
|
def connection_made(self, transport):
|
||
|
self.transport = transport
|
||
|
|
||
|
def send_message(self, message):
|
||
|
msg_content = message.as_bytes()
|
||
|
print("Sending message content", len(msg_content))
|
||
|
header = struct.pack("<IBH", MAGIC_TOKEN_HOST_TO_FW, messageTypeMap[type(message)],
|
||
|
len(msg_content))
|
||
|
|
||
|
print(repr(header + msg_content))
|
||
|
self.transport.write(header + msg_content)
|
||
|
|
||
|
def data_received(self, data):
|
||
|
print('data received', repr(data))
|
||
|
|
||
|
def connection_lost(self, exc):
|
||
|
print('port closed')
|
||
|
self.transport.loop.stop()
|
||
|
|
||
|
def pause_writing(self):
|
||
|
print('pause writing')
|
||
|
print(self.transport.get_write_buffer_size())
|
||
|
|
||
|
def resume_writing(self):
|
||
|
print(self.transport.get_write_buffer_size())
|
||
|
print('resume writing')
|
||
|
|
||
|
|
||
|
async def main(protocol: MusicMouseProtocol):
|
||
|
for i in range(10):
|
||
|
protocol.send_message(EffectStaticConfig(ColorRGBW(1, 0, 0, 0)))
|
||
|
await asyncio.sleep(2)
|
||
|
protocol.send_message(EffectStaticConfig(ColorRGBW(0, 1, 0, 0)))
|
||
|
await asyncio.sleep(2)
|
||
|
protocol.send_message(EffectStaticConfig(ColorRGBW(0, 1, 1, 0)))
|
||
|
await asyncio.sleep(2)
|
||
|
|
||
|
|
||
|
loop = asyncio.get_event_loop()
|
||
|
coro = serial_asyncio.create_serial_connection(loop,
|
||
|
MusicMouseProtocol,
|
||
|
'/dev/ttyUSB0',
|
||
|
baudrate=115200)
|
||
|
transport, protocol = loop.run_until_complete(coro)
|
||
|
loop.create_task(main(protocol))
|
||
|
loop.run_forever()
|
||
|
loop.close()
|