from led_cmds import ColorRGBW, EffectStaticConfig, EffectStaticDetailedConfig, EffectCircularConfig, EffectRandomTwoColorInterpolationConfig, EffectAlexaSwipeConfig, EffectSwipeAndChange import asyncio import aiomqtt import json from copy import deepcopy class ShelveLightMqtt: def __init__(self, protocol, client: aiomqtt.Client): self._protocol = protocol self._mqtt_client = client self._state = { "state": "OFF", "color": { "r": 255, "g": 255, "b": 255, "w": 0, }, "color_mode": "rgbw", "brightness": 30, "effect": "static", } self._last_color = ColorRGBW(0.5, 0.5, 0.5, 0) self._discovery_spec = self._create_discovery_msg_light() async def init(self): """Init method, because constructor can't be async""" self._protocol.shelve_led_effect(EffectStaticConfig(ColorRGBW(0, 0, 0, 0))) await self._send_autodiscovery_msg() await self._notify_mqtt_state({"state": "OFF"}) async def handle_light_message(self, msg): if msg.topic.value == self._discovery_spec['command_topic']: payload = msg.payload.decode() new_state = json.loads(payload) print("IN ", new_state) await self._update_state(new_state) await self._notify_mqtt_state(new_state) async def _update_state(self, new_state): """Merges current state with new state, updates device""" # memorize last color - this is used for effects that need 2 colors if 'color' in new_state: brightness = new_state.get('brightness', self._state['brightness']) new_color = self._color_from_json(new_state['color'], brightness) current_color = self._color_from_json(self._state['color']) if new_color != current_color: self._last_color = current_color print("last color", self._last_color) self._state.update(new_state) self._update_device() @staticmethod def _color_from_json(json_color, brightness=255): args = ((json_color[e] / 255) * (brightness / 255) for e in ('r', 'g', 'b', 'w')) return ColorRGBW(*args) def _update_device(self): s = self._state current_color = self._color_from_json(s['color'], brightness=s["brightness"]) transition = s.get("transition", 0.3) * 1000 print(f"Effect {s['effect']} Transition {transition}") if s['state'] == "OFF": if transition > 0: eff = EffectStaticDetailedConfig(ColorRGBW(0,0,0,0), transition_time_in_ms=transition) else: eff = EffectStaticConfig(ColorRGBW(0, 0, 0, 0)) elif s['effect'] == 'static': if transition > 0: eff = EffectStaticDetailedConfig(current_color, transition_time_in_ms=transition) else: eff = EffectStaticConfig(current_color) elif s['effect'] == 'circular': eff = EffectCircularConfig(speed=180, width=90, color=current_color) elif s['effect'] == 'wipeup': eff = EffectSwipeAndChange() eff.swipe.secondary_color = current_color eff.swipe.primary_color = self._last_color eff.swipe.bell_curve_width_in_leds = 10 eff.swipe.transition_width = 30 eff.swipe.start_position = 0 eff.swipe.swipe_speed = 260 eff.change.color1 = current_color eff.change.color2 = self._last_color elif s['effect'] == "twocolor": eff = EffectRandomTwoColorInterpolationConfig() eff.color1 = current_color eff.color2 = self._last_color eff.start_with_existing = True elif s['effect'] == "twocolorrandom": eff = EffectRandomTwoColorInterpolationConfig() eff.color1 = current_color eff.color2 = self._last_color eff.hue1_random = True eff.hue2_random = True eff.start_with_existing = True elif s['effect'] == "side_0.2": eff = EffectStaticDetailedConfig(current_color, begin=0.9, end=0.1, increment=1, transition_time_in_ms=transition) elif s['effect'] == "side_0.2_inc4": eff = EffectStaticDetailedConfig(current_color, begin=0.9, end=0.1, increment=4, transition_time_in_ms=transition) elif s['effect'] == "side_0.2_inc8": eff = EffectStaticDetailedConfig(current_color, begin=0.9, end=0.1, increment=8, transition_time_in_ms=transition) elif s['effect'] == "side_0.5": eff = EffectStaticDetailedConfig(current_color, begin=0.75, end=0.25, increment=1, transition_time_in_ms=transition) elif s['effect'] == "side_0.5_inc4": eff = EffectStaticDetailedConfig(current_color, begin=0.75, end=0.25, increment=4, transition_time_in_ms=transition) elif s['effect'] == "top_0.2": eff = EffectStaticDetailedConfig(current_color, begin=0.4, end=0.6, increment=1, transition_time_in_ms=transition) elif s['effect'] == "top_0.2_inc4": eff = EffectStaticDetailedConfig(current_color, begin=0.4, end=0.6, increment=4, transition_time_in_ms=transition) elif s['effect'] == "top_0.5": eff = EffectStaticDetailedConfig(current_color, begin=0.25, end=0.75, increment=1, transition_time_in_ms=transition) elif s['effect'] == "top_0.5_inc4": eff = EffectStaticDetailedConfig(current_color, begin=0.25, end=0.75, increment=4, transition_time_in_ms=transition) else: print(f"Unknown effect {s['effect']}") eff = EffectStaticConfig(ColorRGBW(0, 0, 0, 0)) self._protocol.shelve_led_effect(eff) @staticmethod def _create_discovery_msg_light(base_name="musicmouse_json", display_name="Music Mouse Regal Licht"): id = "shelve" return { 'platform': 'mqtt', 'schema': 'json', 'name': display_name, 'unique_id': f'{base_name}_{id}', 'command_topic': f'{base_name}/lights_{id}/command', 'state_topic': f'{base_name}/lights_{id}/state', 'color_mode': True, 'brightness': True, #'device': { # 'manufacturer': 'bauer.tech', # 'model': "SK6812 LED strip", #}, 'effect': True, 'effect_list': ['static', 'circular', 'wipeup', 'twocolor', 'twocolorrandom', "side_0.2", "side_0.5", "side_0.2_inc4", "side_0.2_inc8", "side_0.5_inc4", "top_0.2", "top_0.5", "top_0.2_inc4", "top_0.5_inc4"], 'supported_color_modes': ['rgbw'], } async def _send_autodiscovery_msg(self): topic = f"homeassistant/light/{self._discovery_spec['unique_id']}/config" await self._mqtt_client.publish(topic, json.dumps(self._discovery_spec).encode(), retain=True) async def _notify_mqtt_state(self, state): state_payload = json.dumps(self._state) print("OUT ", state_payload) await self._mqtt_client.publish(self._discovery_spec['state_topic'], state_payload.encode()) return direct_ack = False if direct_ack == True: state_payload = json.dumps(state) else: s = deepcopy(self._state) if s['state'] == "OFF": state_payload = json.dumps({"state": "OFF"}) else: s['color_mode'] = "rgbw" state_payload = json.dumps(s) print("OUT ", state_payload) await self._mqtt_client.publish(self._discovery_spec['state_topic'], state_payload.encode()) async def start_mqtt(music_mouse_protocol, server, username, password): reconnect_interval = 10 # [seconds] while True: try: async with aiomqtt.Client(hostname=server, username=username, password=password) as client: shelve_light = ShelveLightMqtt(music_mouse_protocol, client) await shelve_light.init() await client.subscribe("musicmouse_json/#") async for message in client.messages: await shelve_light.handle_light_message(message) except aiomqtt.MqttError as error: print(f'Error "{error}". Reconnecting in {reconnect_interval} seconds') finally: await asyncio.sleep(reconnect_interval) if __name__ == "__main__": class DummyProtocol: def shelve_led_effect(self, effect): print("EFF ", repr(effect)) password = "" asyncio.run(start_mqtt(DummyProtocol(), "homeassistant", "musicmouse", password))