#!/usr/bin/env python import asyncio import sys import serial_asyncio from led_cmds import (ColorRGBW, ColorHSV, EffectCircularConfig, EffectStaticConfig, EffectRandomTwoColorInterpolationConfig, EffectAlexaSwipeConfig, EffectSwipeAndChange, EffectReverseSwipe) from host_driver import MusicMouseProtocol, RfidTokenRead, RotaryEncoderEvent, ButtonEvent, TouchButton, TouchButtonPress, TouchButtonRelease, mouse_leds_index_ranges from player import AudioPlayer from glob import glob from copy import deepcopy import os from hass_client import HomeAssistantClient import argparse from ruamel.yaml import YAML import warnings from pprint import pprint from typing import Optional from mqtt_json import start_mqtt yaml = YAML(typ='safe') OFF_COLOR = ColorRGBW(0, 0, 0, 0) def parse_color(color_str: str): if isinstance(color_str, ColorRGBW): return color_str elif color_str.startswith("#"): color_str = color_str.lstrip('#') t = tuple(int(color_str[i:i + 2], 16) / 255 for i in (0, 2, 4)) return ColorRGBW(*t, 0) elif color_str.startswith("w"): color_str = color_str.lstrip("w") return ColorRGBW(0, 0, 0, int(color_str, 16) / 255) def load_config(config_path): with open(os.path.join(config_path, "config.yml")) as cfg_file: cfg = yaml.load(cfg_file) for figure_name, figure_cfg in cfg["figures"].items(): figure_cfg["colors"] = [parse_color(c) for c in figure_cfg["colors"]] if 'media_files' not in figure_cfg: figure_cfg['media_files'] = sorted(glob(os.path.join(config_path, figure_name))) return cfg def hass_service(hass, domain, service, **kwargs): asyncio.create_task(hass.call_service(domain, service, kwargs)) class MusicMouseState: def __init__(self, protocol: MusicMouseProtocol): self.active_figure: Optional[ str] = None # None if no figure is placed on the reader, or the name of the figure self.last_partially_played_figure: Optional[ str] = None # figure whose playlist wasn't played completely and was removed self.current_mouse_led_effect = None self.current_led_ring_effect = None self.protocol: MusicMouseProtocol = protocol self.button_led_brightness = None def mouse_led_effect(self, effect_cfg): self.current_mouse_led_effect = effect_cfg self.protocol.mouse_led_effect(effect_cfg) def led_ring_effect(self, effect_cfg): self.current_led_ring_effect = effect_cfg self.protocol.led_ring_effect(effect_cfg) def button_leds(self, brightness): assert 0 <= brightness <= 1 self.protocol.button_background_led_prev(brightness) self.protocol.button_background_led_next(brightness) self.button_led_brightness = brightness def reset(self): self.mouse_led_effect(EffectStaticConfig(OFF_COLOR)) self.led_ring_effect(EffectStaticConfig(OFF_COLOR)) class Controller: def __init__(self, protocol, hass, cfg): self.cfg = cfg self.audio_player = AudioPlayer(cfg["general"]["alsa_device"]) self.audio_player.set_volume(50) self.mmstate = MusicMouseState(protocol) self.protocol = protocol self.hass = hass vol_min = self.cfg["general"].get("min_volume", None) vol_max = self.cfg["general"].get("max_volume", None) self.audio_player.set_volume_limits(vol_min, vol_max) protocol.register_message_callback(self.on_firmware_msg) self.audio_player.on_playlist_end_callback = self._on_playlist_end self.playlists = { fig: self.audio_player.create_playlist(fig_cfg['media_files']) for fig, fig_cfg in cfg['figures'].items() } self._rfid_to_figure_name = { bytes.fromhex(figure_cfg["id"]): figure_name for figure_name, figure_cfg in cfg["figures"].items() } self.protocol.shelve_led_effect(EffectStaticConfig(ColorRGBW(0, 0, 0.1, 0))) shelf_eff = EffectCircularConfig() shelf_eff.color = ColorRGBW(0, 0, 0.4, 0) shelf_eff = EffectStaticConfig(ColorRGBW(0, 0, 0, 0)) self.protocol.shelve_led_effect(shelf_eff) def _on_playlist_end(self): if not self.audio_player.is_playing(): self.mmstate.last_partially_played_figure = None self._run_off_animation() else: print("Playlist end was called, even if player remains playing?!") def handle_rfid_event(self, tagid): if tagid == bytes.fromhex("0000000000"): if self.audio_player.is_playing(): print("Got 000 rfid -> playing off animation") self._run_off_animation() self.audio_player.pause() self.mmstate.last_partially_played_figure = self.mmstate.active_figure else: self.mmstate.last_partially_played_figure = None self.mmstate.active_figure = None elif tagid in self._rfid_to_figure_name: newly_placed_figure = self._rfid_to_figure_name[tagid] primary_color, secondary_color, *rest = self.cfg["figures"][newly_placed_figure][ "colors"] self._start_animation(primary_color, secondary_color) self.mmstate.button_leds(self.cfg["general"].get("button_leds_brightness", 0.5)) if newly_placed_figure in self.cfg['figures']: if self.mmstate.last_partially_played_figure == newly_placed_figure: print("Continuing playlist") self.audio_player.play() else: print("Restarting playlist") self.audio_player.set_playlist( self.audio_player.create_playlist( self.cfg['figures'][newly_placed_figure]['media_files'])) self.audio_player.play_from_start() self.mmstate.active_figure = newly_placed_figure else: warnings.warn(f"Unknown figure/tag with id {tagid}") def on_firmware_msg(self, _, message): print("FW msg:", message) if isinstance(message, RfidTokenRead): self.handle_rfid_event(message.id) elif isinstance(message, RotaryEncoderEvent): volume_increment = self.cfg["general"].get("volume_increment", 2) * abs( message.increment) if message.direction == 2: self.audio_player.change_volume(volume_increment) elif message.direction == 1: self.audio_player.change_volume(-volume_increment) elif isinstance(message, ButtonEvent): btn = message.button if btn == "left" and message.event == "pressed" and self.audio_player.is_playing(): self.audio_player.previous() elif btn == "right" and message.event == "pressed" and self.audio_player.is_playing(): self.audio_player.next() elif message.button == "rotary" and message.event == "pressed": hass_service(self.hass, "light", "toggle", entity_id="light.kinderzimmer_fluter") elif isinstance(message, TouchButtonPress): figure = self.mmstate.active_figure if figure and self.audio_player.is_playing(): primary_color, secondary_color, bg, accent = self.cfg["figures"][figure]["colors"] self.protocol.mouse_led_effect( EffectStaticConfig(accent, *mouse_leds_index_ranges[message.touch_button])) colors = { TouchButton.RIGHT_FOOT: { 'rgb_color': [235, 255, 67] }, TouchButton.LEFT_FOOT: { 'color_temp': 469 }, TouchButton.RIGHT_EAR: { 'rgb_color': [101, 49, 255] }, TouchButton.LEFT_EAR: { 'rgb_color': [255, 74, 254] }, } hass_service(self.hass, "light", "turn_on", entity_id="light.kinderzimmer_fluter", **colors[message.touch_button]) elif isinstance(message, TouchButtonRelease): figure = self.mmstate.active_figure eff_change = EffectRandomTwoColorInterpolationConfig() eff_static = EffectStaticConfig(ColorRGBW(0, 0, 0, 0), *mouse_leds_index_ranges[message.touch_button]) if self.audio_player.is_playing(): primary_color, secondary_color, bg, accent = self.cfg["figures"][figure]["colors"] eff_static.color = primary_color self.protocol.mouse_led_effect(eff_static) if self.audio_player.is_playing(): primary_color, secondary_color, bg, accent = self.cfg["figures"][figure]["colors"] eff_change.color1 = primary_color eff_change.color2 = secondary_color eff_change.start_with_existing = True self.protocol.mouse_led_effect(eff_change) def _start_animation(self, primary_color, secondary_color): ring_eff = EffectSwipeAndChange() ring_eff.swipe.primary_color = primary_color ring_eff.swipe.secondary_color = secondary_color ring_eff.swipe.swipe_speed = 180 ring_eff.change.color1 = primary_color ring_eff.change.color2 = secondary_color self.mmstate.led_ring_effect(ring_eff) mouse_eff = deepcopy(ring_eff) mouse_eff.swipe.start_position = 6 / 45 * 360 mouse_eff.swipe.bell_curve_width_in_leds = 16 mouse_eff.swipe.swipe_speed = 180 self.mmstate.mouse_led_effect(mouse_eff) def _run_off_animation(self): print("Running off animation") ring_eff = EffectReverseSwipe() self.mmstate.led_ring_effect(ring_eff) mouse_eff = EffectReverseSwipe() mouse_eff.startPosition = 6 / 45 * 360 self.mmstate.mouse_led_effect(mouse_eff) self.mmstate.button_leds(0) def main(config_path): cfg = load_config(config_path) loop = asyncio.get_event_loop() hass = HomeAssistantClient(cfg["general"]["hass_url"], cfg["general"]["hass_token"], loop) coro = serial_asyncio.create_serial_connection(loop, MusicMouseProtocol, cfg["general"]["serial_port"], baudrate=115200) transport, protocol = loop.run_until_complete(coro) controller = Controller(protocol, hass, cfg) mqtt_cfg = cfg["general"]["mqtt"] loop.create_task(start_mqtt(protocol, mqtt_cfg["server"], mqtt_cfg["user"],mqtt_cfg["password"] )) loop.create_task(hass.connect()) return controller, loop if __name__ == "__main__": if len(sys.argv) == 2: controller, loop = main(config_path=sys.argv[1]) loop.run_forever() loop.close() else: print("Error: run with config file path as first argument")