168 lines
6.0 KiB
Python
168 lines
6.0 KiB
Python
|
#!/usr/bin/env python
|
||
|
# -*- coding: utf-8 -*-
|
||
|
|
||
|
import paho.mqtt.client as mqtt
|
||
|
import json
|
||
|
from pprint import pprint
|
||
|
import printerbox_hardware as hw
|
||
|
|
||
|
|
||
|
def light_cfg(id, name):
|
||
|
return {f'light_{id}': {
|
||
|
'entity_type': 'light',
|
||
|
'name': f"PrusaBox Licht {name}",
|
||
|
'unique_id': f'prusa_caselights_{id}',
|
||
|
'command_topic': f'prusa_printer/lights_{id}/switch',
|
||
|
'state_topic': f'prusa_printer/lights_{id}/switch_state',
|
||
|
'brightness_command_topic': f'prusa_printer/lights_{id}/brightness',
|
||
|
'brightness_state_topic': f'prusa_printer/lights_{id}/brightness_state',
|
||
|
'rgb_command_topic': f'prusa_printer/lights_{id}/rgb',
|
||
|
'rgb_state_topic': f'prusa_printer/lights_{id}/rgb_state',
|
||
|
'effect_command_topic': f'prusa_printer/lights_{id}/effect',
|
||
|
'effect_state_topic': f'prusa_printer/lights_{id}/effect_state',
|
||
|
'effect_list': ['color_wipe', 'blink', 'blink_move'],
|
||
|
}}
|
||
|
|
||
|
|
||
|
# ----------------------------------- Config ------------------------------------------------------
|
||
|
|
||
|
|
||
|
CONFIG = {
|
||
|
'mqtt_server': 'homeassistant',
|
||
|
'mqtt_user': 'prusaprinter',
|
||
|
'mqtt_password': 'ANchibUdeClEnegue',
|
||
|
'mqtt_discovery_prefix': 'homeassistant',
|
||
|
|
||
|
'devices': {
|
||
|
'venting_fan': {
|
||
|
'entity_type': 'fan',
|
||
|
'unique_id': 'prusa_venting_fan',
|
||
|
'name': 'PrusaBox Lüfter draussen',
|
||
|
'state_topic': 'prusa_printer/venting_fan',
|
||
|
'command_topic': 'prusa_printer/venting_fan/set',
|
||
|
'percentage_command_topic': 'prusa_printer/venting_fan/percentage',
|
||
|
'percentage_state_topic': 'prusa_printer/venting_fan/percentageState',
|
||
|
'min_voltage_percentage': 0.35,
|
||
|
'icon': 'mdi:fan',
|
||
|
},
|
||
|
'filter_fan': {
|
||
|
'entity_type': 'fan',
|
||
|
'unique_id': 'prusa_filter_fan',
|
||
|
'name': 'PrusaBox Filter Lüfter',
|
||
|
'state_topic': 'prusa_printer/filter_fan',
|
||
|
'command_topic': 'prusa_printer/filter_fan/set',
|
||
|
'percentage_command_topic': 'prusa_printer/filter_fan/percentage',
|
||
|
'percentage_state_topic': 'prusa_printer/filter_fan/percentageState',
|
||
|
'min_voltage_percentage': 0.5,
|
||
|
'icon': 'mdi:fan',
|
||
|
},
|
||
|
}
|
||
|
}
|
||
|
for id, name in hw.LED_SUBSET_NAMES.items():
|
||
|
CONFIG['devices'].update(light_cfg(id, name))
|
||
|
|
||
|
# -----------------------------------------------------------------------------------------------------
|
||
|
|
||
|
|
||
|
def create_discovery_msg(config_entry):
|
||
|
cfg = config_entry.copy()
|
||
|
topic = "{}/{}/{}/config".format(
|
||
|
CONFIG['mqtt_discovery_prefix'], cfg['entity_type'], cfg['unique_id'])
|
||
|
del cfg['entity_type']
|
||
|
return {'topic': topic, 'payload': json.dumps(cfg), 'retain': False}
|
||
|
|
||
|
|
||
|
def send_autodiscovery_messages(client):
|
||
|
client.publish(**create_discovery_msg(CONFIG['devices']['venting_fan']))
|
||
|
client.publish(**create_discovery_msg(CONFIG['devices']['filter_fan']))
|
||
|
for id in hw.LED_SUBSET_NAMES.keys():
|
||
|
client.publish(
|
||
|
**create_discovery_msg(CONFIG['devices']['light_' + id]))
|
||
|
|
||
|
|
||
|
def handle_light_message(client, msg):
|
||
|
prefix = 'prusa_printer/lights_'
|
||
|
if not msg.topic.startswith(prefix):
|
||
|
return False
|
||
|
id, cmd = msg.topic[len(prefix):].split("/")
|
||
|
payload = msg.payload.decode()
|
||
|
cfg = CONFIG['devices']['light_' + id]
|
||
|
|
||
|
if cmd == "rgb":
|
||
|
r, g, b = tuple(int(i) for i in payload.split(","))
|
||
|
hw.set_leds_color(r, g, b, subset=id)
|
||
|
client.publish(cfg['rgb_state_topic'], payload)
|
||
|
elif cmd == "switch":
|
||
|
if payload == "ON":
|
||
|
hw.set_leds_on(subset=id)
|
||
|
elif payload == "OFF":
|
||
|
hw.set_leds_off(subset=id)
|
||
|
client.publish(cfg['state_topic'], payload)
|
||
|
elif cmd == "brightness":
|
||
|
hw.set_leds_brightness(int(payload))
|
||
|
client.publish(cfg['brightness_state_topic'], payload)
|
||
|
elif cmd == "effect":
|
||
|
if payload == "color_wipe":
|
||
|
hw.effect_color_wipe(0, 0, 255)
|
||
|
elif payload == "blink":
|
||
|
print("starting blink effect")
|
||
|
hw.effect_blink_on_off(255, 0, 0)
|
||
|
print("finished starting blink effect")
|
||
|
elif payload == "blink_move":
|
||
|
hw.effect_move_blink(255, 0, 0)
|
||
|
client.publish(cfg['effect_state_topic'], payload)
|
||
|
|
||
|
def handle_fan_message(client, msg):
|
||
|
topic = msg.topic
|
||
|
payload = msg.payload.decode()
|
||
|
|
||
|
for cfg, fan in [(CONFIG['devices']['venting_fan'], hw.venting_fan),
|
||
|
(CONFIG['devices']['filter_fan'], hw.filter_fan)]:
|
||
|
if topic == cfg['command_topic']:
|
||
|
fan.value = 1 if payload == b"ON" else 0
|
||
|
client.publish(cfg['state_topic'], payload)
|
||
|
return True
|
||
|
elif topic == cfg['percentage_command_topic']:
|
||
|
#print("setting percentage, payload = ", payload)
|
||
|
fan_percentage = float(payload) / 100
|
||
|
min_perc = cfg['min_voltage_percentage']
|
||
|
voltage_percentage = min_perc + (1 - min_perc) * fan_percentage
|
||
|
if fan_percentage == 0:
|
||
|
voltage_percentage = 0
|
||
|
#print("setting voltage percentage ", voltage_percentage)
|
||
|
fan.value = voltage_percentage
|
||
|
client.publish(cfg['percentage_state_topic'], payload)
|
||
|
return True
|
||
|
|
||
|
return False
|
||
|
|
||
|
|
||
|
def on_mqtt_message(client, userdata, msg):
|
||
|
print(msg.topic, msg.payload)
|
||
|
handle_fan_message(client, msg)
|
||
|
handle_light_message(client, msg)
|
||
|
|
||
|
|
||
|
def on_mqtt_connect(client, *args, **kwargs):
|
||
|
subscriptions = []
|
||
|
for device in CONFIG['devices'].values():
|
||
|
for key, value in device.items():
|
||
|
if key.endswith('_topic'):
|
||
|
subscriptions.append((value, 2))
|
||
|
client.subscribe(subscriptions)
|
||
|
|
||
|
|
||
|
def run():
|
||
|
client = mqtt.Client("prusa-printer-client")
|
||
|
client.username_pw_set(CONFIG['mqtt_user'], CONFIG['mqtt_password'])
|
||
|
client.on_connect = on_mqtt_connect
|
||
|
client.on_message = on_mqtt_message
|
||
|
client.connect(CONFIG['mqtt_server'])
|
||
|
send_autodiscovery_messages(client)
|
||
|
|
||
|
client.loop_forever()
|
||
|
|
||
|
|
||
|
if __name__ == "__main__":
|
||
|
run()
|