From 1458793d748e67745c0f27c048bcf6fd1eae8b82 Mon Sep 17 00:00:00 2001 From: Martin Bauer Date: Wed, 12 Jan 2022 22:35:49 +0100 Subject: [PATCH] started mqtt shelve led control --- espmusicmouse/host_driver/led_cmds.py | 9 ++ espmusicmouse/host_driver/mqtt.py | 152 ++++++++++++++++++++++++++ 2 files changed, 161 insertions(+) create mode 100644 espmusicmouse/host_driver/mqtt.py diff --git a/espmusicmouse/host_driver/led_cmds.py b/espmusicmouse/host_driver/led_cmds.py index a232218..ebee7ed 100644 --- a/espmusicmouse/host_driver/led_cmds.py +++ b/espmusicmouse/host_driver/led_cmds.py @@ -21,7 +21,16 @@ class ColorRGBW: def is_valid(self): vals = (self.r, self.g, self.b, self.w) return all(0 <= v <= 1 for v in vals) + + def __mul__(self, other:float): + assert 0<= other <= 1 + return ColorRGBW(self.r * other, self.g * other, self.b * other, self.w * other) + def __eq__(self, other:'ColorRGBW'): + return self.r == other.r and self.g == other.g and self.b == other.b and self.w == other.w + + def __neq__(self, other:'ColorRGBW'): + return not self == other @dataclass class ColorHSV: diff --git a/espmusicmouse/host_driver/mqtt.py b/espmusicmouse/host_driver/mqtt.py new file mode 100644 index 0000000..e46a69b --- /dev/null +++ b/espmusicmouse/host_driver/mqtt.py @@ -0,0 +1,152 @@ +from led_cmds import ColorRGBW, EffectStaticConfig, EffectCircularConfig, EffectRandomTwoColorInterpolationConfig, EffectAlexaSwipeConfig +import asyncio +import asyncio_mqtt +import json + +BRIGHTNESS_SCALE = 0.4 # power supply is a bit weak -> scale brightness down globally + + +class ShelveLightMqtt: + def __init__(self, protocol, client): + self._brightness = 100 + self._protocol = protocol + self._mqtt_client = client + + self._brightness = 0 + self._color = ColorRGBW(0.5, 0.5, 0.5, 0) + self._last_color = ColorRGBW(0.5, 0.5, 0.5, 0) + self._original_color_repr = (127, 127, 127) + self._effect = "static" + + self._discovery_spec = self._create_discovery_msg_light() + + async def init(self): + self._protocol.shelve_led_effect(EffectStaticConfig(ColorRGBW(0, 0, 0, 0))) + await self._notify_mqtt_brightness() + await self._notify_mqtt_state() + await self._notify_mqtt_rgb() + await self._send_autodiscovery_msg() + + async def _send_autodiscovery_msg(self): + topic = f"homeassistant/light/{self._discovery_spec['unique_id']}/config" + await self._mqtt_client.publish(topic, json.dumps(self._discovery_spec).encode()) + + async def _notify_mqtt_rgb(self): + rgb_payload = ",".join(str(e) for e in self._original_color_repr) + print("OUT ", "rgb", rgb_payload) + await self._mqtt_client.publish(self._discovery_spec['rgb_state_topic'], + rgb_payload.encode()) + + async def _notify_mqtt_state(self): + state_payload = "ON" if self._brightness > 0 else "OFF" + print("OUT ", "state", state_payload) + await self._mqtt_client.publish(self._discovery_spec['state_topic'], state_payload.encode()) + + async def _notify_mqtt_brightness(self): + brightness_payload = str(int(self._brightness * 255)) + print("OUT ", "brightness", brightness_payload) + await self._mqtt_client.publish(self._discovery_spec['brightness_state_topic'], + brightness_payload.encode()) + + async def _notify_mqtt_shelve_effect(self, effect): + await self._mqtt_client.publish(self._discovery_spec['effect_state_topic'], effect.encode()) + + def _set_rgb_color(self, color: ColorRGBW): + if self._color != self._last_color: # mqtt sends color multiple times, we want to remember last distinct color as second color for effects + self._last_color = self._color + self._color = color * BRIGHTNESS_SCALE * self._brightness + + def _update_device(self): + if self._effect == "static": + eff = EffectStaticConfig(self._color) + elif self._effect == "circular": + eff = EffectCircularConfig() + eff.color = self._color + elif self._effect == "wipe-up": + eff = EffectAlexaSwipeConfig() + eff.secondary_color = self._color + eff.primary_color = self._last_color + eff.bell_curve_width_in_leds = 5 + eff.swipe_speed = 180 + elif self._effect == "two-color": + eff = EffectRandomTwoColorInterpolationConfig() + eff.color1 = self._color + eff.color2 = self._last_color + eff.start_with_existing = True + elif self._effect == "two-color-random": + eff = EffectRandomTwoColorInterpolationConfig() + eff.color1 = self._color + eff.color2 = self._last_color + eff.hue1_random = True + eff.hue2_random = True + eff.start_with_existing = True + else: + print(f"Unknown effect {self._effect}") + eff = EffectStaticConfig(ColorRGBW(0, 0, 0, 0)) + self._protocol.shelve_led_effect(eff) + + async def handle_light_message(self, msg): + prefix = 'musicmouse/lights_shelve' + if not msg.topic.startswith(prefix): + return False + cmd = msg.topic.split("/")[-1] + payload = msg.payload.decode() + + print("IN ", cmd, payload) + if cmd == "rgb": + r, g, b = tuple(int(i) for i in payload.split(",")) + self._original_color_repr = (r, g, b) + self._set_rgb_color(ColorRGBW(r / 255, g / 255, b / 255, 0)) + await self._notify_mqtt_rgb() + elif cmd == "switch": + if payload == "ON" and self._brightness == 0: + self._brightness = 1 + elif payload == "OFF": + self._brightness = 0 + self._update_device() + await self._notify_mqtt_rgb() + await self._notify_mqtt_brightness() + await self._notify_mqtt_state() + elif cmd == "brightness": + self._brightness = int(payload) / 255 + self._set_rgb_color(self._color) + await self._notify_mqtt_brightness() + elif cmd == "effect": + self._effect = payload + + @staticmethod + def _create_discovery_msg_light(base_name="musicmouse", display_name="Music Mouse Regal Licht"): + id = "shelve" + return { + 'name': display_name, + 'unique_id': f'{base_name}_{id}', + 'command_topic': f'{base_name}/lights_{id}/switch', + 'state_topic': f'{base_name}/lights_{id}/switch_state', + 'brightness_command_topic': f'{base_name}/lights_{id}/brightness', + 'brightness_state_topic': f'{base_name}/lights_{id}/brightness_state', + 'rgb_command_topic': f'{base_name}/lights_{id}/rgb', + 'rgb_state_topic': f'{base_name}/lights_{id}/rgb_state', + 'effect_command_topic': f'{base_name}/lights_{id}/effect', + 'effect_state_topic': f'{base_name}/lights_{id}/effect_state', + 'effect_list': ['static', 'circular', 'wipe-up', 'two-color', 'two-color-random'], + } + + +async def start_mqtt(music_mouse_protocol, server, username, password): + async with asyncio_mqtt.Client(hostname=server, username=username, password=password) as client: + shelve_light = ShelveLightMqtt(music_mouse_protocol, client) + await shelve_light.init() + async with client.filtered_messages("musicmouse/#") as messages: + await client.subscribe("musicmouse/#") + async for message in messages: + await shelve_light.handle_light_message(message) + + +if __name__ == "__main__": + + class DummyProtocol: + def shelve_led_effect(self, effect): + print("EFF ", repr(effect)) + + password = "pw" + asyncio.run(start_mqtt(DummyProtocol(), "homeassistant", "musicmouse", password))