musicmouse/espmusicmouse/host_driver/main.py

213 lines
8.3 KiB
Python
Executable File

#!/usr/bin/env python
import asyncio
import sys
import serial_asyncio
from led_cmds import (ColorRGBW, ColorHSV, EffectStaticConfig,
EffectRandomTwoColorInterpolationConfig, EffectAlexaSwipeConfig,
EffectSwipeAndChange, EffectReverseSwipe)
from host_driver import MusicMouseProtocol, RfidTokenRead, RotaryEncoderEvent, ButtonEvent, TouchButton, TouchButtonPress, TouchButtonRelease
from player import AudioPlayer
from glob import glob
from copy import deepcopy
import os
from hass_client import HomeAssistantClient
import argparse
from ruamel.yaml import YAML
import warnings
from pprint import pprint
yaml = YAML(typ='safe')
OFF_COLOR = ColorRGBW(0, 0, 0, 0)
def parse_color(color_str: str):
if isinstance(color_str, ColorRGBW):
return color_str
elif color_str.startswith("#"):
color_str = color_str.lstrip('#')
t = tuple(int(color_str[i:i + 2], 16) / 255 for i in (0, 2, 4))
return ColorRGBW(*t, 0)
elif color_str.startswith("w"):
color_str = color_str.lstrip("w")
return ColorRGBW(0, 0, 0, int(color_str, 16) / 255)
def load_config(config_path):
with open(os.path.join(config_path, "config.yml")) as cfg_file:
cfg = yaml.load(cfg_file)
for figure_name, figure_cfg in cfg["figures"].items():
figure_cfg["colors"] = [parse_color(c) for c in figure_cfg["colors"]]
if 'media_files' not in figure_cfg:
figure_cfg['media_files'] = sorted(glob(os.path.join(config_path, figure_name)))
return cfg
class MusicMouseState:
def __init__(self, protocol: MusicMouseProtocol):
self.current_figure: str = None
self.last_figure: str = None
self.current_mouse_led_effect = None
self.current_led_ring_effect = None
self.protocol: MusicMouseProtocol = protocol
self.button_led_brightness = None
def mouse_led_effect(self, effect_cfg):
self.current_mouse_led_effect = effect_cfg
self.protocol.mouse_led_effect(effect_cfg)
def led_ring_effect(self, effect_cfg):
self.current_led_ring_effect = effect_cfg
self.protocol.led_ring_effect(effect_cfg)
def button_leds(self, brightness):
assert 0 <= brightness <= 1
self.protocol.button_background_led_prev(brightness)
self.protocol.button_background_led_next(brightness)
self.button_led_brightness = brightness
def reset(self):
self.mouse_led_effect(EffectStaticConfig(OFF_COLOR))
self.led_ring_effect(EffectStaticConfig(OFF_COLOR))
def figure_placed(self, figure_state):
self.last_figure = self.current_figure
self.current_figure = figure_state
def figure_removed(self):
self.last_figure = self.current_figure
class Controller:
def __init__(self, protocol, cfg):
self.cfg = cfg
self.audio_player = AudioPlayer(cfg["general"]["alsa_device"])
self.mmstate = MusicMouseState(protocol)
protocol.register_message_callback(self.on_firmware_msg)
self.audio_player.on_playlist_end_callback = self._run_off_animation
self.playlists = {
fig: self.audio_player.create_playlist(fig_cfg['media_files'])
for fig, fig_cfg in cfg['figures'].items()
}
self._rfid_to_figure_name = {
bytes.fromhex(figure_cfg["id"]): figure_name
for figure_name, figure_cfg in cfg["figures"].items()
}
def handle_rfid_event(self, tagid):
if tagid == bytes.fromhex("0000000000"):
if self.audio_player.is_playing():
self._run_off_animation()
self.audio_player.pause()
self.mmstate.figure_removed()
elif tagid in self._rfid_to_figure_name:
figure = self._rfid_to_figure_name[tagid]
primary_color, secondary_color, *rest = self.cfg["figures"]["colors"]
self._start_animation(primary_color, secondary_color)
self.mmstate.button_leds(self.cfg["general"].get("button_leds_brightness", 0.5))
if figure in self.playlists:
self.audio_player.set_playlist(self.playlists[figure])
if self.mmstate.last_figure == figure:
self.audio_player.play()
else:
self.audio_player.play_from_start()
self.mmstate.figure_placed(figure)
else:
warnings.warn(f"Unknown figure/tag with id {tagid}")
def on_firmware_msg(self, _, message):
print("FW msg:", message)
if isinstance(message, RfidTokenRead):
self.handle_rfid_event(message.id)
elif isinstance(message, RotaryEncoderEvent):
volume_increment = self.cfg["general"].get("volume_increment", 2)
if message.direction == 2:
self.audio_player.change_volume(volume_increment)
elif message.direction == 1:
self.audio_player.change_volume(-volume_increment)
elif isinstance(message, ButtonEvent):
if message.button == "left" and message.event == "pressed":
self.audio_player.previous()
elif message.button == "right" and message.event == "pressed":
self.audio_player.next()
elif False and isinstance(message, TouchButtonPress):
if current_figure:
ccfg = color_cfg[current_figure]
protocol.mouse_led_effect(
EffectStaticConfig(ccfg.accent, *mouse_leds[message.touch_button]))
#if message.touch_button == TouchButton.RIGHT_FOOT:
# asyncio.create_task(
# hass.call_service("switch", "toggle", {"entity_id": "switch.tasmota06"}))
# asyncio.create_task(
# hass.call_service("light", "turn_on", {"entity_id": "light.arbeitszimmer_fluter"}))
#elif message.touch_button == TouchButton.LEFT_FOOT:
# asyncio.create_task(
# hass.call_service("light", "turn_off", {"entity_id": "light.arbeitszimmer_fluter"}))
elif False and isinstance(message, TouchButtonRelease):
if current_figure:
ccfg = color_cfg[current_figure]
eff_change = EffectRandomTwoColorInterpolationConfig()
eff_static = EffectStaticConfig(ColorRGBW(0, 0, 0, 0),
*mouse_leds[message.touch_button])
if audio_player.is_playing():
eff_static.color = ccfg.primary
protocol.mouse_led_effect(eff_static)
if audio_player.is_playing():
eff_change.color1 = ccfg.primary
eff_change.color2 = ccfg.secondary
eff_change.start_with_existing = True
protocol.mouse_led_effect(eff_change)
def _start_animation(self, primary_color, secondary_color):
ring_eff = EffectSwipeAndChange()
ring_eff.swipe.primary_color = primary_color
ring_eff.swipe.secondary_color = secondary_color
ring_eff.swipe.swipe_speed = 180
ring_eff.change.color1 = primary_color
ring_eff.change.color2 = secondary_color
self.mmstate.led_ring_effect(ring_eff)
mouse_eff = deepcopy(ring_eff)
mouse_eff.swipe.start_position = 6 / 45 * 360
mouse_eff.swipe.bell_curve_width_in_leds = 16
mouse_eff.swipe.swipe_speed = 180
self.mmstate.mouse_led_effect(mouse_eff)
def _run_off_animation(self):
ring_eff = EffectReverseSwipe()
self.mmstate.led_ring_effect(ring_eff)
mouse_eff = EffectReverseSwipe()
mouse_eff.startPosition = 6 / 45 * 360
self.mmstate.mouse_led_effect(mouse_eff)
self.mmstate.button_leds(0)
def main(config_path):
cfg = load_config(config_path)
loop = asyncio.get_event_loop()
coro = serial_asyncio.create_serial_connection(loop,
MusicMouseProtocol,
cfg["general"]["serial_port"],
baudrate=115200)
transport, protocol = loop.run_until_complete(coro)
controller = Controller(protocol, cfg)
return controller, loop
if __name__ == "__main__":
if len(sys.argv) == 2:
controller, loop = main(config_path=sys.argv[1])
loop.run_forever()
loop.close()
else:
print("Error: run with config file path as first argument")