272 lines
11 KiB
Python
Executable File
272 lines
11 KiB
Python
Executable File
#!/usr/bin/env python
|
|
|
|
import asyncio
|
|
import sys
|
|
import serial_asyncio
|
|
from led_cmds import (ColorRGBW, ColorHSV, EffectCircularConfig, EffectStaticConfig,
|
|
EffectRandomTwoColorInterpolationConfig, EffectAlexaSwipeConfig,
|
|
EffectSwipeAndChange, EffectReverseSwipe)
|
|
from host_driver import MusicMouseProtocol, RfidTokenRead, RotaryEncoderEvent, ButtonEvent, TouchButton, TouchButtonPress, TouchButtonRelease, mouse_leds_index_ranges
|
|
from player import AudioPlayer
|
|
from glob import glob
|
|
from copy import deepcopy
|
|
import os
|
|
from hass_client import HomeAssistantClient
|
|
import argparse
|
|
from ruamel.yaml import YAML
|
|
import warnings
|
|
from pprint import pprint
|
|
from typing import Optional
|
|
from mqtt_json import start_mqtt
|
|
import aiohttp
|
|
|
|
yaml = YAML(typ='safe')
|
|
|
|
OFF_COLOR = ColorRGBW(0, 0, 0, 0)
|
|
|
|
|
|
def parse_color(color_str: str):
|
|
if isinstance(color_str, ColorRGBW):
|
|
return color_str
|
|
elif color_str.startswith("#"):
|
|
color_str = color_str.lstrip('#')
|
|
t = tuple(int(color_str[i:i + 2], 16) / 255 for i in (0, 2, 4))
|
|
return ColorRGBW(*t, 0)
|
|
elif color_str.startswith("w"):
|
|
color_str = color_str.lstrip("w")
|
|
return ColorRGBW(0, 0, 0, int(color_str, 16) / 255)
|
|
|
|
|
|
def load_config(config_path):
|
|
with open(os.path.join(config_path, "config.yml")) as cfg_file:
|
|
cfg = yaml.load(cfg_file)
|
|
for figure_name, figure_cfg in cfg["figures"].items():
|
|
figure_cfg["colors"] = [parse_color(c) for c in figure_cfg["colors"]]
|
|
if 'media_files' not in figure_cfg:
|
|
figure_cfg['media_files'] = sorted(glob(os.path.join(config_path, figure_name)))
|
|
return cfg
|
|
|
|
|
|
def hass_service(hass, domain, service, **kwargs):
|
|
asyncio.create_task(hass.call_service(domain, service, kwargs))
|
|
|
|
|
|
class MusicMouseState:
|
|
def __init__(self, protocol: MusicMouseProtocol):
|
|
self.active_figure: Optional[
|
|
str] = None # None if no figure is placed on the reader, or the name of the figure
|
|
self.last_partially_played_figure: Optional[
|
|
str] = None # figure whose playlist wasn't played completely and was removed
|
|
|
|
self.current_mouse_led_effect = None
|
|
self.current_led_ring_effect = None
|
|
self.protocol: MusicMouseProtocol = protocol
|
|
self.button_led_brightness = None
|
|
|
|
def mouse_led_effect(self, effect_cfg):
|
|
self.current_mouse_led_effect = effect_cfg
|
|
self.protocol.mouse_led_effect(effect_cfg)
|
|
|
|
def led_ring_effect(self, effect_cfg):
|
|
self.current_led_ring_effect = effect_cfg
|
|
self.protocol.led_ring_effect(effect_cfg)
|
|
self.protocol.shelve_led_effect(effect_cfg)
|
|
|
|
def button_leds(self, brightness):
|
|
assert 0 <= brightness <= 1
|
|
self.protocol.button_background_led_prev(brightness)
|
|
self.protocol.button_background_led_next(brightness)
|
|
self.button_led_brightness = brightness
|
|
|
|
def reset(self):
|
|
self.mouse_led_effect(EffectStaticConfig(OFF_COLOR))
|
|
self.led_ring_effect(EffectStaticConfig(OFF_COLOR))
|
|
|
|
|
|
class Controller:
|
|
def __init__(self, protocol, hass, cfg):
|
|
self.cfg = cfg
|
|
self.audio_player = AudioPlayer(cfg["general"]["alsa_device"])
|
|
self.audio_player.set_volume(50)
|
|
self.mmstate = MusicMouseState(protocol)
|
|
self.protocol = protocol
|
|
self.hass = hass
|
|
|
|
vol_min = self.cfg["general"].get("min_volume", None)
|
|
vol_max = self.cfg["general"].get("max_volume", None)
|
|
self.audio_player.set_volume_limits(vol_min, vol_max)
|
|
protocol.register_message_callback(self.on_firmware_msg)
|
|
|
|
self.audio_player.on_playlist_end_callback = self._on_playlist_end
|
|
self.playlists = {
|
|
fig: self.audio_player.create_playlist(fig_cfg['media_files'])
|
|
for fig, fig_cfg in cfg['figures'].items()
|
|
}
|
|
self._rfid_to_figure_name = {
|
|
bytes.fromhex(figure_cfg["id"]): figure_name
|
|
for figure_name, figure_cfg in cfg["figures"].items()
|
|
}
|
|
|
|
self.protocol.shelve_led_effect(EffectStaticConfig(ColorRGBW(0, 0, 0.1, 0)))
|
|
shelf_eff = EffectCircularConfig()
|
|
shelf_eff.color = ColorRGBW(0, 0, 0.4, 0)
|
|
shelf_eff = EffectStaticConfig(ColorRGBW(0, 0, 0, 0))
|
|
self.protocol.shelve_led_effect(shelf_eff)
|
|
|
|
def _on_playlist_end(self):
|
|
if not self.audio_player.is_playing():
|
|
self.mmstate.last_partially_played_figure = None
|
|
self._run_off_animation()
|
|
else:
|
|
print("Playlist end was called, even if player remains playing?!")
|
|
|
|
def handle_rfid_event(self, tagid):
|
|
if tagid == bytes.fromhex("0000000000"):
|
|
if self.audio_player.is_playing():
|
|
print("Got 000 rfid -> playing off animation")
|
|
self._run_off_animation()
|
|
self.audio_player.pause()
|
|
self.mmstate.last_partially_played_figure = self.mmstate.active_figure
|
|
else:
|
|
self.mmstate.last_partially_played_figure = None
|
|
|
|
self.mmstate.active_figure = None
|
|
elif tagid in self._rfid_to_figure_name:
|
|
newly_placed_figure = self._rfid_to_figure_name[tagid]
|
|
primary_color, secondary_color, *rest = self.cfg["figures"][newly_placed_figure][
|
|
"colors"]
|
|
self._start_animation(primary_color, secondary_color)
|
|
self.mmstate.button_leds(self.cfg["general"].get("button_leds_brightness", 0.5))
|
|
|
|
if newly_placed_figure in self.cfg['figures']:
|
|
if self.mmstate.last_partially_played_figure == newly_placed_figure:
|
|
print("Continuing playlist")
|
|
self.audio_player.play()
|
|
else:
|
|
print("Restarting playlist")
|
|
self.audio_player.set_playlist(
|
|
self.audio_player.create_playlist(
|
|
self.cfg['figures'][newly_placed_figure]['media_files']))
|
|
self.audio_player.play_from_start()
|
|
|
|
self.mmstate.active_figure = newly_placed_figure
|
|
else:
|
|
warnings.warn(f"Unknown figure/tag with id {tagid}")
|
|
|
|
def on_firmware_msg(self, _, message):
|
|
print("FW msg:", message)
|
|
if isinstance(message, RfidTokenRead):
|
|
self.handle_rfid_event(message.id)
|
|
elif isinstance(message, RotaryEncoderEvent):
|
|
volume_increment = self.cfg["general"].get("volume_increment", 2) * abs(
|
|
message.increment)
|
|
if message.direction == 2:
|
|
self.audio_player.change_volume(volume_increment)
|
|
elif message.direction == 1:
|
|
self.audio_player.change_volume(-volume_increment)
|
|
elif isinstance(message, ButtonEvent):
|
|
btn = message.button
|
|
if btn == "left" and message.event == "pressed" and self.audio_player.is_playing():
|
|
self.audio_player.previous()
|
|
elif btn == "right" and message.event == "pressed" and self.audio_player.is_playing():
|
|
self.audio_player.next()
|
|
elif message.button == "rotary" and message.event == "pressed":
|
|
hass_service(self.hass, "light", "toggle", entity_id="light.kinderzimmer_fluter")
|
|
elif isinstance(message, TouchButtonPress):
|
|
figure = self.mmstate.active_figure
|
|
if figure and self.audio_player.is_playing():
|
|
primary_color, secondary_color, bg, accent = self.cfg["figures"][figure]["colors"]
|
|
self.protocol.mouse_led_effect(
|
|
EffectStaticConfig(accent, *mouse_leds_index_ranges[message.touch_button]))
|
|
|
|
colors = {
|
|
TouchButton.RIGHT_FOOT: {
|
|
'rgb_color': [235, 255, 67]
|
|
},
|
|
TouchButton.LEFT_FOOT: {
|
|
'color_temp': 469
|
|
},
|
|
TouchButton.RIGHT_EAR: {
|
|
'rgb_color': [101, 49, 255]
|
|
},
|
|
TouchButton.LEFT_EAR: {
|
|
'rgb_color': [255, 74, 254]
|
|
},
|
|
}
|
|
hass_service(
|
|
self.hass,
|
|
"light",
|
|
"turn_on",
|
|
entity_id=["light.kinderzimmer_fluter", "light.music_mouse_regal_licht"],
|
|
**colors[message.touch_button])
|
|
|
|
elif isinstance(message, TouchButtonRelease):
|
|
figure = self.mmstate.active_figure
|
|
eff_change = EffectRandomTwoColorInterpolationConfig()
|
|
eff_static = EffectStaticConfig(ColorRGBW(0, 0, 0, 0),
|
|
*mouse_leds_index_ranges[message.touch_button])
|
|
if self.audio_player.is_playing():
|
|
primary_color, secondary_color, bg, accent = self.cfg["figures"][figure]["colors"]
|
|
eff_static.color = primary_color
|
|
self.protocol.mouse_led_effect(eff_static)
|
|
|
|
if self.audio_player.is_playing():
|
|
primary_color, secondary_color, bg, accent = self.cfg["figures"][figure]["colors"]
|
|
eff_change.color1 = primary_color
|
|
eff_change.color2 = secondary_color
|
|
eff_change.start_with_existing = True
|
|
self.protocol.mouse_led_effect(eff_change)
|
|
|
|
def _start_animation(self, primary_color, secondary_color):
|
|
ring_eff = EffectSwipeAndChange()
|
|
ring_eff.swipe.primary_color = primary_color
|
|
ring_eff.swipe.secondary_color = secondary_color
|
|
ring_eff.swipe.swipe_speed = 180
|
|
ring_eff.change.color1 = primary_color
|
|
ring_eff.change.color2 = secondary_color
|
|
self.mmstate.led_ring_effect(ring_eff)
|
|
|
|
mouse_eff = deepcopy(ring_eff)
|
|
mouse_eff.swipe.start_position = 6 / 45 * 360
|
|
mouse_eff.swipe.bell_curve_width_in_leds = 16
|
|
mouse_eff.swipe.swipe_speed = 180
|
|
self.mmstate.mouse_led_effect(mouse_eff)
|
|
|
|
def _run_off_animation(self):
|
|
print("Running off animation")
|
|
ring_eff = EffectReverseSwipe()
|
|
self.mmstate.led_ring_effect(ring_eff)
|
|
|
|
mouse_eff = EffectReverseSwipe()
|
|
mouse_eff.startPosition = 6 / 45 * 360
|
|
self.mmstate.mouse_led_effect(mouse_eff)
|
|
|
|
self.mmstate.button_leds(0)
|
|
|
|
|
|
def main(config_path):
|
|
cfg = load_config(config_path)
|
|
|
|
loop = asyncio.get_event_loop()
|
|
hass = HomeAssistantClient(cfg["general"]["hass_url"], cfg["general"]["hass_token"], loop=loop)
|
|
|
|
coro = serial_asyncio.create_serial_connection(loop,
|
|
MusicMouseProtocol,
|
|
cfg["general"]["serial_port"],
|
|
baudrate=115200)
|
|
transport, protocol = loop.run_until_complete(coro)
|
|
controller = Controller(protocol, hass, cfg)
|
|
mqtt_cfg = cfg["general"]["mqtt"]
|
|
loop.create_task(start_mqtt(protocol, mqtt_cfg["server"], mqtt_cfg["user"], mqtt_cfg["password"] ))
|
|
loop.create_task(hass.connect())
|
|
return controller, loop
|
|
|
|
|
|
if __name__ == "__main__":
|
|
if len(sys.argv) == 2:
|
|
controller, loop = main(config_path=sys.argv[1])
|
|
loop.run_forever()
|
|
loop.close()
|
|
else:
|
|
print("Error: run with config file path as first argument")
|