diff --git a/espmusicmouse/host_driver/led_cmds.py b/espmusicmouse/host_driver/led_cmds.py index ebee7ed..f2ed170 100644 --- a/espmusicmouse/host_driver/led_cmds.py +++ b/espmusicmouse/host_driver/led_cmds.py @@ -31,6 +31,10 @@ class ColorRGBW: def __neq__(self, other:'ColorRGBW'): return not self == other + + def without_white_channel(self, scale=1): + args = (min(1, e + self.w) for e in (self.r, self.g, self.b) ) + return ColorRGBW(*args, 0) @dataclass class ColorHSV: diff --git a/espmusicmouse/host_driver/main.py b/espmusicmouse/host_driver/main.py index 3a5c6ca..fe66eca 100755 --- a/espmusicmouse/host_driver/main.py +++ b/espmusicmouse/host_driver/main.py @@ -17,7 +17,7 @@ from ruamel.yaml import YAML import warnings from pprint import pprint from typing import Optional -from mqtt import start_mqtt +from mqtt_json import start_mqtt yaml = YAML(typ='safe') diff --git a/espmusicmouse/host_driver/mqtt.py b/espmusicmouse/host_driver/mqtt.py index ada5dae..606612d 100644 --- a/espmusicmouse/host_driver/mqtt.py +++ b/espmusicmouse/host_driver/mqtt.py @@ -104,6 +104,7 @@ class ShelveLightMqtt: if payload == "ON" and self._brightness == 0: self._brightness = 1 elif payload == "OFF": + self._color = ColorRGBW(0, 0, 0, 0) self._brightness = 0 self._update_device() await self._notify_mqtt_rgb() diff --git a/espmusicmouse/host_driver/mqtt_json.py b/espmusicmouse/host_driver/mqtt_json.py new file mode 100644 index 0000000..565c852 --- /dev/null +++ b/espmusicmouse/host_driver/mqtt_json.py @@ -0,0 +1,162 @@ +from led_cmds import ColorRGBW, EffectStaticConfig, EffectCircularConfig, EffectRandomTwoColorInterpolationConfig, EffectAlexaSwipeConfig, EffectSwipeAndChange +import asyncio +import asyncio_mqtt +import json +from copy import deepcopy + + +class ShelveLightMqtt: + def __init__(self, protocol, client): + self._protocol = protocol + self._mqtt_client = client + + self._state = { + "state": "OFF", + "color": { + "r": 255, + "g": 255, + "b": 255, + "w": 0, + }, + "color_mode": "rgbw", + "brightness": 30, + "effect": "static", + } + self._last_color = ColorRGBW(0.5, 0.5, 0.5, 0) + + self._discovery_spec = self._create_discovery_msg_light() + + async def init(self): + """Init method, because constructor can't be async""" + self._protocol.shelve_led_effect(EffectStaticConfig(ColorRGBW(0, 0, 0, 0))) + await self._send_autodiscovery_msg() + await self._notify_mqtt_state({"state": "OFF"}) + + async def handle_light_message(self, msg): + if msg.topic == self._discovery_spec['command_topic']: + payload = msg.payload.decode() + new_state = json.loads(payload) + print("IN ", new_state) + await self._update_state(new_state) + await self._notify_mqtt_state(new_state) + + async def _update_state(self, new_state): + """Merges current state with new state, updates device""" + + # memorize last color - this is used for effects that need 2 colors + if 'color' in new_state: + brightness = new_state.get('brightness', self._state['brightness']) + new_color = self._color_from_json(new_state['color'], brightness) + current_color = self._color_from_json(self._state['color']) + if new_color != current_color: + self._last_color = current_color + print("last color", self._last_color) + + self._state.update(new_state) + self._update_device() + + @staticmethod + def _color_from_json(json_color, brightness=255): + args = ((json_color[e] / 255) * (brightness / 255) for e in ('r', 'g', 'b', 'w')) + return ColorRGBW(*args) + + def _update_device(self): + s = self._state + current_color = self._color_from_json(s['color'], brightness=s["brightness"]) + + if s['state'] == "OFF": + eff = EffectStaticConfig(ColorRGBW(0, 0, 0, 0)) + elif s['effect'] == 'static': + eff = EffectStaticConfig(current_color) + elif s['effect'] == 'circular': + eff = EffectCircularConfig(speed=180, width=90, color=current_color) + elif s['effect'] == 'wipeup': + eff = EffectSwipeAndChange() + eff.swipe.secondary_color = current_color + eff.swipe.primary_color = self._last_color + eff.swipe.bell_curve_width_in_leds = 5 + eff.swipe.swipe_speed = 180 + eff.change.color1 = current_color + eff.change.color2 = self._last_color + elif s['effect'] == "twocolor": + eff = EffectRandomTwoColorInterpolationConfig() + eff.color1 = current_color + eff.color2 = self._last_color + eff.start_with_existing = True + elif s['effect'] == "twocolorrandom": + eff = EffectRandomTwoColorInterpolationConfig() + eff.color1 = current_color + eff.color2 = self._last_color + eff.hue1_random = True + eff.hue2_random = True + eff.start_with_existing = True + else: + print(f"Unknown effect {s['effect']}") + eff = EffectStaticConfig(ColorRGBW(0, 0, 0, 0)) + self._protocol.shelve_led_effect(eff) + + @staticmethod + def _create_discovery_msg_light(base_name="musicmouse_json", + display_name="Music Mouse Regal Licht"): + id = "shelve" + return { + 'platform': 'mqtt', + 'schema': 'json', + 'name': display_name, + 'unique_id': f'{base_name}_{id}', + 'command_topic': f'{base_name}/lights_{id}/command', + 'state_topic': f'{base_name}/lights_{id}/state', + 'color_mode': True, + 'brightness': True, + #'device': { + # 'manufacturer': 'bauer.tech', + # 'model': "SK6812 LED strip", + #}, + 'effect': True, + 'effect_list': ['static', 'circular', 'wipeup', 'twocolor', 'twocolorrandom'], + 'supported_color_modes': ['rgbw'], + } + + async def _send_autodiscovery_msg(self): + topic = f"homeassistant/light/{self._discovery_spec['unique_id']}/config" + await self._mqtt_client.publish(topic, json.dumps(self._discovery_spec).encode()) + + async def _notify_mqtt_state(self, state): + state_payload = json.dumps(self._state) + print("OUT ", state_payload) + await self._mqtt_client.publish(self._discovery_spec['state_topic'], state_payload.encode()) + return + + direct_ack = False + if direct_ack == True: + state_payload = json.dumps(state) + else: + s = deepcopy(self._state) + if s['state'] == "OFF": + state_payload = json.dumps({"state": "OFF"}) + else: + s['color_mode'] = "rgbw" + state_payload = json.dumps(s) + + print("OUT ", state_payload) + await self._mqtt_client.publish(self._discovery_spec['state_topic'], state_payload.encode()) + + +async def start_mqtt(music_mouse_protocol, server, username, password): + async with asyncio_mqtt.Client(hostname=server, username=username, password=password) as client: + shelve_light = ShelveLightMqtt(music_mouse_protocol, client) + await shelve_light.init() + async with client.filtered_messages("musicmouse_json/#") as messages: + await client.subscribe("musicmouse_json/#") + async for message in messages: + await shelve_light.handle_light_message(message) + + +if __name__ == "__main__": + + class DummyProtocol: + def shelve_led_effect(self, effect): + print("EFF ", repr(effect)) + + password = "" + asyncio.run(start_mqtt(DummyProtocol(), "homeassistant", "musicmouse", password)) diff --git a/espmusicmouse/src/main.cpp b/espmusicmouse/src/main.cpp index e1d251c..e60de98 100644 --- a/espmusicmouse/src/main.cpp +++ b/espmusicmouse/src/main.cpp @@ -158,7 +158,7 @@ void setupMouseLeds() // -------------------------------------------------- Shelf Leds ------------------------------------------- -LedStripRGBW<250> ledStripShelf; +LedStripRGBW<252> ledStripShelf; Esp32DriverRGBW ledDriverShelf; LedTask ledTaskShelf;