Compare commits

3 Commits

Author SHA1 Message Date
Martin Bauer
b2c060fcc9 Adapted to new aiomqtt version 2024-03-07 16:38:05 +01:00
Martin Bauer
11bf0505fb Changes for python3.11 2024-03-06 17:15:07 +01:00
Martin Bauer
ef038a29a4 fixes 2024-01-02 17:41:43 +01:00
7 changed files with 65 additions and 187 deletions

View File

@@ -1,4 +1,4 @@
from dataclasses import dataclass
from dataclasses import dataclass, field
import struct
import colorsys
@@ -98,8 +98,8 @@ class EffectAlexaSwipeConfig:
bell_curve_width_in_leds: float = 3
start_position: float = 180 # in degrees
forward: bool = True
primary_color: ColorRGBW = ColorRGBW(0, 0, 1, 0)
secondary_color: ColorRGBW = ColorRGBW(0, 200 / 255, 1, 0)
primary_color: ColorRGBW = field(default_factory=lambda: ColorRGBW(0, 0, 1, 0))
secondary_color: ColorRGBW = field(default_factory=lambda: ColorRGBW(0, 200 / 255, 1, 0))
def as_bytes(self) -> bytes:
return struct.pack(
@@ -118,8 +118,8 @@ class EffectRandomTwoColorInterpolationConfig:
num_segments: int = 3
hue1_random: bool = False
hue2_random: bool = False
color1: ColorHSV = ColorHSV(240, 1, 1)
color2: ColorHSV = ColorHSV(192, 1, 1)
color1: ColorHSV = field(default_factory=lambda: ColorHSV(240, 1, 1))
color2: ColorHSV = field(default_factory=lambda: ColorHSV(192, 1, 1))
def as_bytes(self) -> bytes:
c1 = ColorHSV.fromRGB(self.color1) if isinstance(self.color1, ColorRGBW) else self.color1
@@ -136,7 +136,7 @@ class EffectRandomTwoColorInterpolationConfig:
class EffectCircularConfig:
speed: float = 360 # in degrees per second
width: float = 180 # in degrees
color: ColorRGBW = ColorRGBW(0, 0, 1, 0)
color: ColorRGBW = field(default_factory=lambda: ColorRGBW(0, 0, 1, 0))
def as_bytes(self) -> bytes:
return struct.pack("<ff", self.speed, self.width) + self.color.as_bytes()
@@ -144,8 +144,8 @@ class EffectCircularConfig:
@dataclass
class EffectSwipeAndChange:
swipe: EffectAlexaSwipeConfig = EffectAlexaSwipeConfig()
change: EffectRandomTwoColorInterpolationConfig = EffectRandomTwoColorInterpolationConfig()
swipe: EffectAlexaSwipeConfig = field(default_factory=lambda: EffectAlexaSwipeConfig())
change: EffectRandomTwoColorInterpolationConfig = field(default_factory=lambda: EffectRandomTwoColorInterpolationConfig())
def as_bytes(self) -> bytes:
return self.swipe.as_bytes() + self.change.as_bytes()
@@ -164,4 +164,4 @@ class EffectReverseSwipe:
return struct.pack("<fff", self.swipeSpeed, self.bellCurveWidthInLeds, self.startPosition)
def __repr__(self) -> str:
return f"Reverse swipe, speed {self.swipeSpeed}, width in leds {self.bellCurveWidthInLeds}, start position {self.startPosition}"
return f"Reverse swipe, speed {self.swipeSpeed}, width in leds {self.bellCurveWidthInLeds}, start position {self.startPosition}"

View File

@@ -18,6 +18,7 @@ import warnings
from pprint import pprint
from typing import Optional
from mqtt_json import start_mqtt
import aiohttp
yaml = YAML(typ='safe')
@@ -247,7 +248,7 @@ def main(config_path):
cfg = load_config(config_path)
loop = asyncio.get_event_loop()
hass = HomeAssistantClient(cfg["general"]["hass_url"], cfg["general"]["hass_token"], loop)
hass = HomeAssistantClient(cfg["general"]["hass_url"], cfg["general"]["hass_token"], loop=loop)
coro = serial_asyncio.create_serial_connection(loop,
MusicMouseProtocol,
@@ -256,7 +257,7 @@ def main(config_path):
transport, protocol = loop.run_until_complete(coro)
controller = Controller(protocol, hass, cfg)
mqtt_cfg = cfg["general"]["mqtt"]
loop.create_task(start_mqtt(protocol, mqtt_cfg["server"], mqtt_cfg["user"],mqtt_cfg["password"] ))
loop.create_task(start_mqtt(protocol, mqtt_cfg["server"], mqtt_cfg["user"], mqtt_cfg["password"] ))
loop.create_task(hass.connect())
return controller, loop

View File

@@ -1,155 +0,0 @@
from led_cmds import ColorRGBW, EffectStaticConfig, EffectCircularConfig, EffectRandomTwoColorInterpolationConfig, EffectAlexaSwipeConfig, EffectSwipeAndChange
import asyncio
import asyncio_mqtt
import json
BRIGHTNESS_SCALE = 1 # power supply is a bit weak -> scale brightness down globally
class ShelveLightMqtt:
def __init__(self, protocol, client):
self._brightness = 100
self._protocol = protocol
self._mqtt_client = client
self._brightness = 0
self._color = ColorRGBW(0.5, 0.5, 0.5, 0)
self._last_color = ColorRGBW(0.5, 0.5, 0.5, 0)
self._original_color_repr = (127, 127, 127)
self._effect = "static"
self._discovery_spec = self._create_discovery_msg_light()
async def init(self):
self._protocol.shelve_led_effect(EffectStaticConfig(ColorRGBW(0, 0, 0, 0)))
await self._notify_mqtt_brightness()
await self._notify_mqtt_state()
await self._notify_mqtt_rgb()
await self._send_autodiscovery_msg()
async def _send_autodiscovery_msg(self):
topic = f"homeassistant/light/{self._discovery_spec['unique_id']}/config"
await self._mqtt_client.publish(topic, json.dumps(self._discovery_spec).encode())
async def _notify_mqtt_rgb(self):
rgb_payload = ",".join(str(e) for e in self._original_color_repr)
print("OUT ", "rgb", rgb_payload)
await self._mqtt_client.publish(self._discovery_spec['rgb_state_topic'],
rgb_payload.encode())
async def _notify_mqtt_state(self):
state_payload = "ON" if self._brightness > 0 else "OFF"
print("OUT ", "state", state_payload)
await self._mqtt_client.publish(self._discovery_spec['state_topic'], state_payload.encode())
async def _notify_mqtt_brightness(self):
brightness_payload = str(int(self._brightness * 255))
print("OUT ", "brightness", brightness_payload)
await self._mqtt_client.publish(self._discovery_spec['brightness_state_topic'],
brightness_payload.encode())
async def _notify_mqtt_shelve_effect(self, effect):
await self._mqtt_client.publish(self._discovery_spec['effect_state_topic'], effect.encode())
def _set_rgb_color(self, color: ColorRGBW):
if self._color != self._last_color: # mqtt sends color multiple times, we want to remember last distinct color as second color for effects
self._last_color = self._color
self._color = color * BRIGHTNESS_SCALE * self._brightness
def _update_device(self):
if self._effect == "static":
eff = EffectStaticConfig(self._color)
elif self._effect == "circular":
eff = EffectCircularConfig()
eff.color = self._color
elif self._effect == "wipeup":
eff = EffectSwipeAndChange()
eff.swipe.secondary_color = self._color
eff.swipe.primary_color = self._last_color
eff.swipe.bell_curve_width_in_leds = 5
eff.swipe.swipe_speed = 180
eff.change.color1 = self._color
eff.change.color2 = self._last_color
elif self._effect == "twocolor":
eff = EffectRandomTwoColorInterpolationConfig()
eff.color1 = self._color
eff.color2 = self._last_color
eff.start_with_existing = True
elif self._effect == "twocolorrandom":
eff = EffectRandomTwoColorInterpolationConfig()
eff.color1 = self._color
eff.color2 = self._last_color
eff.hue1_random = True
eff.hue2_random = True
eff.start_with_existing = True
else:
print(f"Unknown effect {self._effect}")
eff = EffectStaticConfig(ColorRGBW(0, 0, 0, 0))
self._protocol.shelve_led_effect(eff)
async def handle_light_message(self, msg):
prefix = 'musicmouse/lights_shelve'
if not msg.topic.startswith(prefix):
return False
cmd = msg.topic.split("/")[-1]
payload = msg.payload.decode()
print("IN ", cmd, payload)
if cmd == "rgb":
r, g, b = tuple(int(i) for i in payload.split(","))
self._original_color_repr = (r, g, b)
self._set_rgb_color(ColorRGBW(r / 255, g / 255, b / 255, 0))
await self._notify_mqtt_rgb()
elif cmd == "switch":
if payload == "ON" and self._brightness == 0:
self._brightness = 1
elif payload == "OFF":
self._color = ColorRGBW(0, 0, 0, 0)
self._brightness = 0
self._update_device()
await self._notify_mqtt_rgb()
await self._notify_mqtt_brightness()
await self._notify_mqtt_state()
elif cmd == "brightness":
self._brightness = int(payload) / 255
self._set_rgb_color(self._color)
await self._notify_mqtt_brightness()
elif cmd == "effect":
self._effect = payload
@staticmethod
def _create_discovery_msg_light(base_name="musicmouse", display_name="Music Mouse Regal Licht"):
id = "shelve"
return {
'name': display_name,
'unique_id': f'{base_name}_{id}',
'command_topic': f'{base_name}/lights_{id}/switch',
'state_topic': f'{base_name}/lights_{id}/switch_state',
'brightness_command_topic': f'{base_name}/lights_{id}/brightness',
'brightness_state_topic': f'{base_name}/lights_{id}/brightness_state',
'rgb_command_topic': f'{base_name}/lights_{id}/rgb',
'rgb_state_topic': f'{base_name}/lights_{id}/rgb_state',
'effect_command_topic': f'{base_name}/lights_{id}/effect',
'effect_state_topic': f'{base_name}/lights_{id}/effect_state',
'effect_list': ['static', 'circular', 'wipeup', 'twocolor', 'twocolorrandom'],
}
async def start_mqtt(music_mouse_protocol, server, username, password):
async with asyncio_mqtt.Client(hostname=server, username=username, password=password) as client:
shelve_light = ShelveLightMqtt(music_mouse_protocol, client)
await shelve_light.init()
async with client.filtered_messages("musicmouse/#") as messages:
await client.subscribe("musicmouse/#")
async for message in messages:
await shelve_light.handle_light_message(message)
if __name__ == "__main__":
class DummyProtocol:
def shelve_led_effect(self, effect):
print("EFF ", repr(effect))
password = "KNLEFLZF94yA6Zhj141"
asyncio.run(start_mqtt(DummyProtocol(), "homeassistant", "musicmouse", password))

View File

@@ -1,12 +1,12 @@
from led_cmds import ColorRGBW, EffectStaticConfig, EffectStaticDetailedConfig, EffectCircularConfig, EffectRandomTwoColorInterpolationConfig, EffectAlexaSwipeConfig, EffectSwipeAndChange
import asyncio
import asyncio_mqtt
import aiomqtt
import json
from copy import deepcopy
class ShelveLightMqtt:
def __init__(self, protocol, client: asyncio_mqtt.Client):
def __init__(self, protocol, client: aiomqtt.Client):
self._protocol = protocol
self._mqtt_client = client
@@ -33,7 +33,7 @@ class ShelveLightMqtt:
await self._notify_mqtt_state({"state": "OFF"})
async def handle_light_message(self, msg):
if msg.topic == self._discovery_spec['command_topic']:
if msg.topic.value == self._discovery_spec['command_topic']:
payload = msg.payload.decode()
new_state = json.loads(payload)
print("IN ", new_state)
@@ -63,12 +63,12 @@ class ShelveLightMqtt:
def _update_device(self):
s = self._state
current_color = self._color_from_json(s['color'], brightness=s["brightness"])
transition = s.get("transition", 0.0) * 1000
transition = s.get("transition", 0.3) * 1000
print(f"Effect {s['effect']} Transition {transition}")
if s['state'] == "OFF":
if transition > 0:
eff = EffectStaticDetailedConfig(ColorRGBW(0,0,0,0), transition_tim_in_ms=transition)
eff = EffectStaticDetailedConfig(ColorRGBW(0,0,0,0), transition_time_in_ms=transition)
else:
eff = EffectStaticConfig(ColorRGBW(0, 0, 0, 0))
elif s['effect'] == 'static':
@@ -104,10 +104,20 @@ class ShelveLightMqtt:
eff = EffectStaticDetailedConfig(current_color, begin=0.9, end=0.1, increment=1, transition_time_in_ms=transition)
elif s['effect'] == "side_0.2_inc4":
eff = EffectStaticDetailedConfig(current_color, begin=0.9, end=0.1, increment=4, transition_time_in_ms=transition)
elif s['effect'] == "side_0.2_inc8":
eff = EffectStaticDetailedConfig(current_color, begin=0.9, end=0.1, increment=8, transition_time_in_ms=transition)
elif s['effect'] == "side_0.5":
eff = EffectStaticDetailedConfig(current_color, begin=0.75, end=0.25, increment=1, transition_time_in_ms=transition)
elif s['effect'] == "side_0.5_inc4":
eff = EffectStaticDetailedConfig(current_color, begin=0.75, end=0.25, increment=4, transition_time_in_ms=transition)
elif s['effect'] == "top_0.2":
eff = EffectStaticDetailedConfig(current_color, begin=0.4, end=0.6, increment=1, transition_time_in_ms=transition)
elif s['effect'] == "top_0.2_inc4":
eff = EffectStaticDetailedConfig(current_color, begin=0.4, end=0.6, increment=4, transition_time_in_ms=transition)
elif s['effect'] == "top_0.5":
eff = EffectStaticDetailedConfig(current_color, begin=0.25, end=0.75, increment=1, transition_time_in_ms=transition)
elif s['effect'] == "top_0.5_inc4":
eff = EffectStaticDetailedConfig(current_color, begin=0.25, end=0.75, increment=4, transition_time_in_ms=transition)
else:
print(f"Unknown effect {s['effect']}")
eff = EffectStaticConfig(ColorRGBW(0, 0, 0, 0))
@@ -132,7 +142,8 @@ class ShelveLightMqtt:
#},
'effect': True,
'effect_list': ['static', 'circular', 'wipeup', 'twocolor', 'twocolorrandom',
"side_0.2", "side_0.5", "side_0.2_inc4", "side_0.5_inc4", "top_0.2"],
"side_0.2", "side_0.5", "side_0.2_inc4", "side_0.2_inc8", "side_0.5_inc4",
"top_0.2", "top_0.5", "top_0.2_inc4", "top_0.5_inc4"],
'supported_color_modes': ['rgbw'],
}
@@ -165,14 +176,13 @@ async def start_mqtt(music_mouse_protocol, server, username, password):
reconnect_interval = 10 # [seconds]
while True:
try:
async with asyncio_mqtt.Client(hostname=server, username=username, password=password) as client:
async with aiomqtt.Client(hostname=server, username=username, password=password) as client:
shelve_light = ShelveLightMqtt(music_mouse_protocol, client)
await shelve_light.init()
async with client.filtered_messages("musicmouse_json/#") as messages:
await client.subscribe("musicmouse_json/#")
async for message in messages:
await shelve_light.handle_light_message(message)
except asyncio_mqtt.MqttError as error:
await client.subscribe("musicmouse_json/#")
async for message in client.messages:
await shelve_light.handle_light_message(message)
except aiomqtt.MqttError as error:
print(f'Error "{error}". Reconnecting in {reconnect_interval} seconds')
finally:
await asyncio.sleep(reconnect_interval)

View File

@@ -1,3 +1,5 @@
pyserial-asyncio==0.6
python-vlc==3.0.12118
python-vlc==3.0.20123
hass-client==0.1.2
ruamel.yaml==0.18.6
aiomqtt==2.0.0

View File

@@ -36,6 +36,8 @@ public:
beginIdx_ = constrain(static_cast<int>(cfg.begin * NUM_LEDS + 0.5f), 0, NUM_LEDS - 1);
endIdx_ = constrain(static_cast<int>(cfg.end * NUM_LEDS + 0.5f), 0, NUM_LEDS - 1);
while (endIdx_ < beginIdx_)
endIdx_ += NUM_LEDS;
}
bool finished() const { return finished_; }
@@ -45,22 +47,25 @@ public:
if (finished_)
return 1000000;
const float progress = static_cast<float>(DELAY_MS * calls_) / config_.transition_time_in_ms;
const float progress = config_.transition_time_in_ms > 0.0f ? static_cast<float>(DELAY_MS * calls_) / config_.transition_time_in_ms : 1.f;
// Finished case
if(progress > 1.0) {
if (config_.transition_time_in_ms <= 0.0f || progress >= 1.0)
{
finished_ = true;
clear(ledStrip_);
for (int i = beginIdx_; i < endIdx_; i += config_.increment)
setLedRGBW(ledStrip_, i % NUM_LEDS, config_.color);
return 10000000;
}
// In-progress case
clear(ledStrip_);
for(int i = beginIdx_; i != endIdx_; i += config_.increment) {
ColorRGBW newColor = hsv2rgb(interpolate(rgb2hsv(state_[i]), rgb2hsv(config_.color), progress));
newColor.w = config_.color.w * progress + state_[i].w * (1 - progress);
setLedRGBW(ledStrip_, i, newColor);
if(i >= NUM_LEDS)
i = 0;
for (int i = beginIdx_; i < endIdx_; i += config_.increment)
{
const auto idx = i % NUM_LEDS;
ColorRGBW newColor = ColorRGBW::interpolate(state_[idx], config_.color, progress);
setLedRGBW(ledStrip_, idx, newColor);
}
++calls_;
@@ -68,6 +73,11 @@ public:
}
private:
static int int_interpolate(int prev, int next, float progress)
{
return static_cast<float>(prev) * (1 - progress) +
static_cast<float>(next) * progress;
}
EffectStaticDetailedConfig config_;
TLedStrip &ledStrip_;
ColorRGBW state_[NUM_LEDS];

View File

@@ -13,4 +13,14 @@ struct ColorRGBW
uint8_t(s * b),
uint8_t(s * w)};
}
static inline ColorRGBW interpolate(const ColorRGBW &c1, const ColorRGBW &c2, float f)
{
return ColorRGBW{
static_cast<uint8_t>((1.0f - f) * static_cast<float>(c1.r) + f * static_cast<float>(c2.r)),
static_cast<uint8_t>((1.0f - f) * static_cast<float>(c1.g) + f * static_cast<float>(c2.g)),
static_cast<uint8_t>((1.0f - f) * static_cast<float>(c1.b) + f * static_cast<float>(c2.b)),
static_cast<uint8_t>((1.0f - f) * static_cast<float>(c1.w) + f * static_cast<float>(c2.w)),
};
}
};