#!/usr/bin/env python import asyncio import sys import serial_asyncio from led_cmds import (ColorRGBW, ColorHSV, EffectStaticConfig, EffectRandomTwoColorInterpolationConfig, EffectAlexaSwipeConfig, EffectSwipeAndChange, EffectReverseSwipe) from host_driver import MusicMouseProtocol, RfidTokenRead, RotaryEncoderEvent, ButtonEvent, TouchButton, TouchButtonPress, TouchButtonRelease, mouse_leds_index_ranges from player import AudioPlayer from glob import glob from copy import deepcopy import os from hass_client import HomeAssistantClient import argparse from ruamel.yaml import YAML import warnings from pprint import pprint yaml = YAML(typ='safe') OFF_COLOR = ColorRGBW(0, 0, 0, 0) def parse_color(color_str: str): if isinstance(color_str, ColorRGBW): return color_str elif color_str.startswith("#"): color_str = color_str.lstrip('#') t = tuple(int(color_str[i:i + 2], 16) / 255 for i in (0, 2, 4)) return ColorRGBW(*t, 0) elif color_str.startswith("w"): color_str = color_str.lstrip("w") return ColorRGBW(0, 0, 0, int(color_str, 16) / 255) def load_config(config_path): with open(os.path.join(config_path, "config.yml")) as cfg_file: cfg = yaml.load(cfg_file) for figure_name, figure_cfg in cfg["figures"].items(): figure_cfg["colors"] = [parse_color(c) for c in figure_cfg["colors"]] if 'media_files' not in figure_cfg: figure_cfg['media_files'] = sorted(glob(os.path.join(config_path, figure_name))) return cfg def hass_service(hass, domain, service, **kwargs): asyncio.create_task(hass.call_service(domain, service, kwargs)) class MusicMouseState: def __init__(self, protocol: MusicMouseProtocol): self.current_figure: str = None self.last_figure: str = None self.current_mouse_led_effect = None self.current_led_ring_effect = None self.protocol: MusicMouseProtocol = protocol self.button_led_brightness = None def mouse_led_effect(self, effect_cfg): self.current_mouse_led_effect = effect_cfg self.protocol.mouse_led_effect(effect_cfg) def led_ring_effect(self, effect_cfg): self.current_led_ring_effect = effect_cfg self.protocol.led_ring_effect(effect_cfg) def button_leds(self, brightness): assert 0 <= brightness <= 1 self.protocol.button_background_led_prev(brightness) self.protocol.button_background_led_next(brightness) self.button_led_brightness = brightness def reset(self): self.mouse_led_effect(EffectStaticConfig(OFF_COLOR)) self.led_ring_effect(EffectStaticConfig(OFF_COLOR)) def figure_placed(self, figure_state): self.last_figure = self.current_figure self.current_figure = figure_state def figure_removed(self): self.last_figure = self.current_figure class Controller: def __init__(self, protocol, hass, cfg): self.cfg = cfg self.audio_player = AudioPlayer(cfg["general"]["alsa_device"]) self.audio_player.set_volume(50) self.mmstate = MusicMouseState(protocol) self.protocol = protocol self.hass = hass vol_min = self.cfg["general"].get("min_volume", None) vol_max = self.cfg["general"].get("max_volume", None) self.audio_player.set_volume_limits(vol_min, vol_max) protocol.register_message_callback(self.on_firmware_msg) self.audio_player.on_playlist_end_callback = self._run_off_animation self.playlists = { fig: self.audio_player.create_playlist(fig_cfg['media_files']) for fig, fig_cfg in cfg['figures'].items() } self._rfid_to_figure_name = { bytes.fromhex(figure_cfg["id"]): figure_name for figure_name, figure_cfg in cfg["figures"].items() } def handle_rfid_event(self, tagid): if tagid == bytes.fromhex("0000000000"): if self.audio_player.is_playing(): self._run_off_animation() self.audio_player.pause() self.mmstate.figure_removed() elif tagid in self._rfid_to_figure_name: figure = self._rfid_to_figure_name[tagid] primary_color, secondary_color, *rest = self.cfg["figures"][figure]["colors"] self._start_animation(primary_color, secondary_color) self.mmstate.button_leds(self.cfg["general"].get("button_leds_brightness", 0.5)) if figure in self.playlists: self.audio_player.set_playlist(self.playlists[figure]) if self.mmstate.last_figure == figure: self.audio_player.play() else: self.audio_player.play_from_start() self.mmstate.figure_placed(figure) else: warnings.warn(f"Unknown figure/tag with id {tagid}") def on_firmware_msg(self, _, message): print("FW msg:", message) if isinstance(message, RfidTokenRead): self.handle_rfid_event(message.id) elif isinstance(message, RotaryEncoderEvent): volume_increment = self.cfg["general"].get("volume_increment", 2) * abs( message.increment) if message.direction == 2: self.audio_player.change_volume(volume_increment) elif message.direction == 1: self.audio_player.change_volume(-volume_increment) elif isinstance(message, ButtonEvent): if message.button == "left" and message.event == "pressed": self.audio_player.previous() elif message.button == "right" and message.event == "pressed": self.audio_player.next() elif message.button == "rotary" and message.event == "pressed": hass_service(self.hass, "light", "toggle", entity_id="light.kinderzimmer_fluter") elif isinstance(message, TouchButtonPress): figure = self.mmstate.current_figure if figure and self.audio_player.is_playing(): primary_color, secondary_color, bg, accent = self.cfg["figures"][figure]["colors"] self.protocol.mouse_led_effect( EffectStaticConfig(accent, *mouse_leds_index_ranges[message.touch_button])) colors = { TouchButton.RIGHT_FOOT: { 'rgb_color': [235, 255, 67] }, TouchButton.LEFT_FOOT: { 'color_temp': 469 }, TouchButton.RIGHT_EAR: { 'rgb_color': [101, 49, 255] }, TouchButton.LEFT_EAR: { 'rgb_color': [255, 74, 254] }, } hass_service(self.hass, "light", "turn_on", entity_id="light.kinderzimmer_fluter", **colors[message.touch_button]) #if message.touch_button == TouchButton.RIGHT_FOOT: # asyncio.create_task( # self.hass.call_service("switch", "toggle", {"entity_id": "switch.tasmota06"})) # asyncio.create_task( # self.hass.call_service("light", "turn_on", {"entity_id": "light.arbeitszimmer_fluter"})) #elif message.touch_button == TouchButton.LEFT_FOOT: # asyncio.create_task( # self.hass.call_service("light", "turn_off", {"entity_id": "light.arbeitszimmer_fluter"})) elif isinstance(message, TouchButtonRelease): figure = self.mmstate.current_figure eff_change = EffectRandomTwoColorInterpolationConfig() eff_static = EffectStaticConfig(ColorRGBW(0, 0, 0, 0), *mouse_leds_index_ranges[message.touch_button]) if self.audio_player.is_playing(): primary_color, secondary_color, bg, accent = self.cfg["figures"][figure]["colors"] eff_static.color = primary_color self.protocol.mouse_led_effect(eff_static) if self.audio_player.is_playing(): primary_color, secondary_color, bg, accent = self.cfg["figures"][figure]["colors"] eff_change.color1 = primary_color eff_change.color2 = secondary_color eff_change.start_with_existing = True self.protocol.mouse_led_effect(eff_change) def _start_animation(self, primary_color, secondary_color): ring_eff = EffectSwipeAndChange() ring_eff.swipe.primary_color = primary_color ring_eff.swipe.secondary_color = secondary_color ring_eff.swipe.swipe_speed = 180 ring_eff.change.color1 = primary_color ring_eff.change.color2 = secondary_color self.mmstate.led_ring_effect(ring_eff) mouse_eff = deepcopy(ring_eff) mouse_eff.swipe.start_position = 6 / 45 * 360 mouse_eff.swipe.bell_curve_width_in_leds = 16 mouse_eff.swipe.swipe_speed = 180 self.mmstate.mouse_led_effect(mouse_eff) def _run_off_animation(self): ring_eff = EffectReverseSwipe() self.mmstate.led_ring_effect(ring_eff) mouse_eff = EffectReverseSwipe() mouse_eff.startPosition = 6 / 45 * 360 self.mmstate.mouse_led_effect(mouse_eff) self.mmstate.button_leds(0) def main(config_path): cfg = load_config(config_path) loop = asyncio.get_event_loop() hass = HomeAssistantClient(cfg["general"]["hass_url"], cfg["general"]["hass_token"], loop) coro = serial_asyncio.create_serial_connection(loop, MusicMouseProtocol, cfg["general"]["serial_port"], baudrate=115200) transport, protocol = loop.run_until_complete(coro) controller = Controller(protocol, hass, cfg) loop.create_task(hass.connect()) return controller, loop if __name__ == "__main__": if len(sys.argv) == 2: controller, loop = main(config_path=sys.argv[1]) loop.run_forever() loop.close() else: print("Error: run with config file path as first argument")