from led_cmds import ColorRGBW, EffectStaticConfig, EffectCircularConfig, EffectRandomTwoColorInterpolationConfig, EffectAlexaSwipeConfig import asyncio import asyncio_mqtt import json BRIGHTNESS_SCALE = 0.4 # power supply is a bit weak -> scale brightness down globally class ShelveLightMqtt: def __init__(self, protocol, client): self._brightness = 100 self._protocol = protocol self._mqtt_client = client self._brightness = 0 self._color = ColorRGBW(0.5, 0.5, 0.5, 0) self._last_color = ColorRGBW(0.5, 0.5, 0.5, 0) self._original_color_repr = (127, 127, 127) self._effect = "static" self._discovery_spec = self._create_discovery_msg_light() async def init(self): self._protocol.shelve_led_effect(EffectStaticConfig(ColorRGBW(0, 0, 0, 0))) await self._notify_mqtt_brightness() await self._notify_mqtt_state() await self._notify_mqtt_rgb() await self._send_autodiscovery_msg() async def _send_autodiscovery_msg(self): topic = f"homeassistant/light/{self._discovery_spec['unique_id']}/config" await self._mqtt_client.publish(topic, json.dumps(self._discovery_spec).encode()) async def _notify_mqtt_rgb(self): rgb_payload = ",".join(str(e) for e in self._original_color_repr) print("OUT ", "rgb", rgb_payload) await self._mqtt_client.publish(self._discovery_spec['rgb_state_topic'], rgb_payload.encode()) async def _notify_mqtt_state(self): state_payload = "ON" if self._brightness > 0 else "OFF" print("OUT ", "state", state_payload) await self._mqtt_client.publish(self._discovery_spec['state_topic'], state_payload.encode()) async def _notify_mqtt_brightness(self): brightness_payload = str(int(self._brightness * 255)) print("OUT ", "brightness", brightness_payload) await self._mqtt_client.publish(self._discovery_spec['brightness_state_topic'], brightness_payload.encode()) async def _notify_mqtt_shelve_effect(self, effect): await self._mqtt_client.publish(self._discovery_spec['effect_state_topic'], effect.encode()) def _set_rgb_color(self, color: ColorRGBW): if self._color != self._last_color: # mqtt sends color multiple times, we want to remember last distinct color as second color for effects self._last_color = self._color self._color = color * BRIGHTNESS_SCALE * self._brightness def _update_device(self): if self._effect == "static": eff = EffectStaticConfig(self._color) elif self._effect == "circular": eff = EffectCircularConfig() eff.color = self._color elif self._effect == "wipe-up": eff = EffectAlexaSwipeConfig() eff.secondary_color = self._color eff.primary_color = self._last_color eff.bell_curve_width_in_leds = 5 eff.swipe_speed = 180 elif self._effect == "two-color": eff = EffectRandomTwoColorInterpolationConfig() eff.color1 = self._color eff.color2 = self._last_color eff.start_with_existing = True elif self._effect == "two-color-random": eff = EffectRandomTwoColorInterpolationConfig() eff.color1 = self._color eff.color2 = self._last_color eff.hue1_random = True eff.hue2_random = True eff.start_with_existing = True else: print(f"Unknown effect {self._effect}") eff = EffectStaticConfig(ColorRGBW(0, 0, 0, 0)) self._protocol.shelve_led_effect(eff) async def handle_light_message(self, msg): prefix = 'musicmouse/lights_shelve' if not msg.topic.startswith(prefix): return False cmd = msg.topic.split("/")[-1] payload = msg.payload.decode() print("IN ", cmd, payload) if cmd == "rgb": r, g, b = tuple(int(i) for i in payload.split(",")) self._original_color_repr = (r, g, b) self._set_rgb_color(ColorRGBW(r / 255, g / 255, b / 255, 0)) await self._notify_mqtt_rgb() elif cmd == "switch": if payload == "ON" and self._brightness == 0: self._brightness = 1 elif payload == "OFF": self._brightness = 0 self._update_device() await self._notify_mqtt_rgb() await self._notify_mqtt_brightness() await self._notify_mqtt_state() elif cmd == "brightness": self._brightness = int(payload) / 255 self._set_rgb_color(self._color) await self._notify_mqtt_brightness() elif cmd == "effect": self._effect = payload @staticmethod def _create_discovery_msg_light(base_name="musicmouse", display_name="Music Mouse Regal Licht"): id = "shelve" return { 'name': display_name, 'unique_id': f'{base_name}_{id}', 'command_topic': f'{base_name}/lights_{id}/switch', 'state_topic': f'{base_name}/lights_{id}/switch_state', 'brightness_command_topic': f'{base_name}/lights_{id}/brightness', 'brightness_state_topic': f'{base_name}/lights_{id}/brightness_state', 'rgb_command_topic': f'{base_name}/lights_{id}/rgb', 'rgb_state_topic': f'{base_name}/lights_{id}/rgb_state', 'effect_command_topic': f'{base_name}/lights_{id}/effect', 'effect_state_topic': f'{base_name}/lights_{id}/effect_state', 'effect_list': ['static', 'circular', 'wipe-up', 'two-color', 'two-color-random'], } async def start_mqtt(music_mouse_protocol, server, username, password): async with asyncio_mqtt.Client(hostname=server, username=username, password=password) as client: shelve_light = ShelveLightMqtt(music_mouse_protocol, client) await shelve_light.init() async with client.filtered_messages("musicmouse/#") as messages: await client.subscribe("musicmouse/#") async for message in messages: await shelve_light.handle_light_message(message) if __name__ == "__main__": class DummyProtocol: def shelve_led_effect(self, effect): print("EFF ", repr(effect)) password = "pw" asyncio.run(start_mqtt(DummyProtocol(), "homeassistant", "musicmouse", password))