musicmouse/espmusicmouse/host_driver/mqtt_json.py

198 lines
8.7 KiB
Python

from led_cmds import ColorRGBW, EffectStaticConfig, EffectStaticDetailedConfig, EffectCircularConfig, EffectRandomTwoColorInterpolationConfig, EffectAlexaSwipeConfig, EffectSwipeAndChange
import asyncio
import aiomqtt
import json
from copy import deepcopy
class ShelveLightMqtt:
def __init__(self, protocol, client: aiomqtt.Client):
self._protocol = protocol
self._mqtt_client = client
self._state = {
"state": "OFF",
"color": {
"r": 255,
"g": 255,
"b": 255,
"w": 0,
},
"color_mode": "rgbw",
"brightness": 30,
"effect": "static",
}
self._last_color = ColorRGBW(0.5, 0.5, 0.5, 0)
self._discovery_spec = self._create_discovery_msg_light()
async def init(self):
"""Init method, because constructor can't be async"""
self._protocol.shelve_led_effect(EffectStaticConfig(ColorRGBW(0, 0, 0, 0)))
await self._send_autodiscovery_msg()
await self._notify_mqtt_state({"state": "OFF"})
async def handle_light_message(self, msg):
if msg.topic.value == self._discovery_spec['command_topic']:
payload = msg.payload.decode()
new_state = json.loads(payload)
print("IN ", new_state)
await self._update_state(new_state)
await self._notify_mqtt_state(new_state)
async def _update_state(self, new_state):
"""Merges current state with new state, updates device"""
# memorize last color - this is used for effects that need 2 colors
if 'color' in new_state:
brightness = new_state.get('brightness', self._state['brightness'])
new_color = self._color_from_json(new_state['color'], brightness)
current_color = self._color_from_json(self._state['color'])
if new_color != current_color:
self._last_color = current_color
print("last color", self._last_color)
self._state.update(new_state)
self._update_device()
@staticmethod
def _color_from_json(json_color, brightness=255):
args = ((json_color[e] / 255) * (brightness / 255) for e in ('r', 'g', 'b', 'w'))
return ColorRGBW(*args)
def _update_device(self):
s = self._state
current_color = self._color_from_json(s['color'], brightness=s["brightness"])
transition = s.get("transition", 0.3) * 1000
print(f"Effect {s['effect']} Transition {transition}")
if s['state'] == "OFF":
if transition > 0:
eff = EffectStaticDetailedConfig(ColorRGBW(0,0,0,0), transition_time_in_ms=transition)
else:
eff = EffectStaticConfig(ColorRGBW(0, 0, 0, 0))
elif s['effect'] == 'static':
if transition > 0:
eff = EffectStaticDetailedConfig(current_color, transition_time_in_ms=transition)
else:
eff = EffectStaticConfig(current_color)
elif s['effect'] == 'circular':
eff = EffectCircularConfig(speed=180, width=90, color=current_color)
elif s['effect'] == 'wipeup':
eff = EffectSwipeAndChange()
eff.swipe.secondary_color = current_color
eff.swipe.primary_color = self._last_color
eff.swipe.bell_curve_width_in_leds = 10
eff.swipe.transition_width = 30
eff.swipe.start_position = 0
eff.swipe.swipe_speed = 260
eff.change.color1 = current_color
eff.change.color2 = self._last_color
elif s['effect'] == "twocolor":
eff = EffectRandomTwoColorInterpolationConfig()
eff.color1 = current_color
eff.color2 = self._last_color
eff.start_with_existing = True
elif s['effect'] == "twocolorrandom":
eff = EffectRandomTwoColorInterpolationConfig()
eff.color1 = current_color
eff.color2 = self._last_color
eff.hue1_random = True
eff.hue2_random = True
eff.start_with_existing = True
elif s['effect'] == "side_0.2":
eff = EffectStaticDetailedConfig(current_color, begin=0.9, end=0.1, increment=1, transition_time_in_ms=transition)
elif s['effect'] == "side_0.2_inc4":
eff = EffectStaticDetailedConfig(current_color, begin=0.9, end=0.1, increment=4, transition_time_in_ms=transition)
elif s['effect'] == "side_0.2_inc8":
eff = EffectStaticDetailedConfig(current_color, begin=0.9, end=0.1, increment=8, transition_time_in_ms=transition)
elif s['effect'] == "side_0.5":
eff = EffectStaticDetailedConfig(current_color, begin=0.75, end=0.25, increment=1, transition_time_in_ms=transition)
elif s['effect'] == "side_0.5_inc4":
eff = EffectStaticDetailedConfig(current_color, begin=0.75, end=0.25, increment=4, transition_time_in_ms=transition)
elif s['effect'] == "top_0.2":
eff = EffectStaticDetailedConfig(current_color, begin=0.4, end=0.6, increment=1, transition_time_in_ms=transition)
elif s['effect'] == "top_0.2_inc4":
eff = EffectStaticDetailedConfig(current_color, begin=0.4, end=0.6, increment=4, transition_time_in_ms=transition)
elif s['effect'] == "top_0.5":
eff = EffectStaticDetailedConfig(current_color, begin=0.25, end=0.75, increment=1, transition_time_in_ms=transition)
elif s['effect'] == "top_0.5_inc4":
eff = EffectStaticDetailedConfig(current_color, begin=0.25, end=0.75, increment=4, transition_time_in_ms=transition)
else:
print(f"Unknown effect {s['effect']}")
eff = EffectStaticConfig(ColorRGBW(0, 0, 0, 0))
self._protocol.shelve_led_effect(eff)
@staticmethod
def _create_discovery_msg_light(base_name="musicmouse_json",
display_name="Music Mouse Regal Licht"):
id = "shelve"
return {
'platform': 'mqtt',
'schema': 'json',
'name': display_name,
'unique_id': f'{base_name}_{id}',
'command_topic': f'{base_name}/lights_{id}/command',
'state_topic': f'{base_name}/lights_{id}/state',
'color_mode': True,
'brightness': True,
#'device': {
# 'manufacturer': 'bauer.tech',
# 'model': "SK6812 LED strip",
#},
'effect': True,
'effect_list': ['static', 'circular', 'wipeup', 'twocolor', 'twocolorrandom',
"side_0.2", "side_0.5", "side_0.2_inc4", "side_0.2_inc8", "side_0.5_inc4",
"top_0.2", "top_0.5", "top_0.2_inc4", "top_0.5_inc4"],
'supported_color_modes': ['rgbw'],
}
async def _send_autodiscovery_msg(self):
topic = f"homeassistant/light/{self._discovery_spec['unique_id']}/config"
await self._mqtt_client.publish(topic, json.dumps(self._discovery_spec).encode(), retain=True)
async def _notify_mqtt_state(self, state):
state_payload = json.dumps(self._state)
print("OUT ", state_payload)
await self._mqtt_client.publish(self._discovery_spec['state_topic'], state_payload.encode())
return
direct_ack = False
if direct_ack == True:
state_payload = json.dumps(state)
else:
s = deepcopy(self._state)
if s['state'] == "OFF":
state_payload = json.dumps({"state": "OFF"})
else:
s['color_mode'] = "rgbw"
state_payload = json.dumps(s)
print("OUT ", state_payload)
await self._mqtt_client.publish(self._discovery_spec['state_topic'], state_payload.encode())
async def start_mqtt(music_mouse_protocol, server, username, password):
reconnect_interval = 10 # [seconds]
while True:
try:
async with aiomqtt.Client(hostname=server, username=username, password=password) as client:
shelve_light = ShelveLightMqtt(music_mouse_protocol, client)
await shelve_light.init()
await client.subscribe("musicmouse_json/#")
async for message in client.messages:
await shelve_light.handle_light_message(message)
except aiomqtt.MqttError as error:
print(f'Error "{error}". Reconnecting in {reconnect_interval} seconds')
finally:
await asyncio.sleep(reconnect_interval)
if __name__ == "__main__":
class DummyProtocol:
def shelve_led_effect(self, effect):
print("EFF ", repr(effect))
password = ""
asyncio.run(start_mqtt(DummyProtocol(), "homeassistant", "musicmouse", password))