3dprinterbox/mqttcontrol/printerbox_mqtt.py

171 lines
6.2 KiB
Python

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import paho.mqtt.client as mqtt
import json
from pprint import pprint
import printerbox_hardware as hw
def light_cfg(id, name):
return {f'light_{id}': {
'entity_type': 'light',
'name': f"PrusaBox Licht {name}",
'unique_id': f'prusa_caselights_{id}',
'command_topic': f'prusa_printer/lights_{id}/switch',
'state_topic': f'prusa_printer/lights_{id}/switch_state',
'brightness_command_topic': f'prusa_printer/lights_{id}/brightness',
'brightness_state_topic': f'prusa_printer/lights_{id}/brightness_state',
'rgb_command_topic': f'prusa_printer/lights_{id}/rgb',
'rgb_state_topic': f'prusa_printer/lights_{id}/rgb_state',
'effect_command_topic': f'prusa_printer/lights_{id}/effect',
'effect_state_topic': f'prusa_printer/lights_{id}/effect_state',
'effect_list': ['color_wipe', 'blink', 'blink_move'],
}}
# ----------------------------------- Config ------------------------------------------------------
CONFIG = {
'mqtt_server': 'homeassistant',
'mqtt_user': 'prusaprinter',
'mqtt_password': 'ANchibUdeClEnegue',
'mqtt_discovery_prefix': 'homeassistant',
'devices': {
'venting_fan': {
'entity_type': 'fan',
'unique_id': 'prusa_venting_fan',
'name': 'PrusaBox Lüfter draussen',
'state_topic': 'prusa_printer/venting_fan',
'command_topic': 'prusa_printer/venting_fan/set',
'percentage_command_topic': 'prusa_printer/venting_fan/percentage',
'percentage_state_topic': 'prusa_printer/venting_fan/percentageState',
'min_voltage_percentage': 0.35,
'icon': 'mdi:fan',
},
'filter_fan': {
'entity_type': 'fan',
'unique_id': 'prusa_filter_fan',
'name': 'PrusaBox Filter Lüfter',
'state_topic': 'prusa_printer/filter_fan',
'command_topic': 'prusa_printer/filter_fan/set',
'percentage_command_topic': 'prusa_printer/filter_fan/percentage',
'percentage_state_topic': 'prusa_printer/filter_fan/percentageState',
'min_voltage_percentage': 0.5,
'icon': 'mdi:fan',
},
}
}
for id, name in hw.LED_SUBSET_NAMES.items():
CONFIG['devices'].update(light_cfg(id, name))
# -----------------------------------------------------------------------------------------------------
def create_discovery_msg(config_entry):
cfg = config_entry.copy()
topic = "{}/{}/{}/config".format(
CONFIG['mqtt_discovery_prefix'], cfg['entity_type'], cfg['unique_id'])
del cfg['entity_type']
if 'min_voltage_percentage' in cfg:
del cfg['min_voltage_percentage']
msg = {'topic': topic, 'payload': json.dumps(cfg), 'retain': True}
print("sending discovery msg", msg)
return msg
def send_autodiscovery_messages(client):
client.publish(**create_discovery_msg(CONFIG['devices']['venting_fan']))
client.publish(**create_discovery_msg(CONFIG['devices']['filter_fan']))
for id in hw.LED_SUBSET_NAMES.keys():
client.publish(
**create_discovery_msg(CONFIG['devices']['light_' + id]))
def handle_light_message(client, msg):
prefix = 'prusa_printer/lights_'
if not msg.topic.startswith(prefix):
return False
id, cmd = msg.topic[len(prefix):].split("/")
payload = msg.payload.decode()
cfg = CONFIG['devices']['light_' + id]
if cmd == "rgb":
r, g, b = tuple(int(i) for i in payload.split(","))
hw.set_leds_color(r, g, b, subset=id)
client.publish(cfg['rgb_state_topic'], payload)
elif cmd == "switch":
if payload == "ON":
hw.set_leds_on(subset=id)
elif payload == "OFF":
hw.set_leds_off(subset=id)
client.publish(cfg['state_topic'], payload)
elif cmd == "brightness":
hw.set_leds_brightness(int(payload))
client.publish(cfg['brightness_state_topic'], payload)
elif cmd == "effect":
if payload == "color_wipe":
hw.effect_color_wipe(0, 0, 255)
elif payload == "blink":
print("starting blink effect")
hw.effect_blink_on_off(255, 0, 0)
print("finished starting blink effect")
elif payload == "blink_move":
hw.effect_move_blink(255, 0, 0)
client.publish(cfg['effect_state_topic'], payload)
def handle_fan_message(client, msg):
topic = msg.topic
payload = msg.payload
for cfg, fan in [(CONFIG['devices']['venting_fan'], hw.venting_fan),
(CONFIG['devices']['filter_fan'], hw.filter_fan)]:
if topic == cfg['command_topic']:
fan.value = 1 if payload == b"ON" else 0
client.publish(cfg['state_topic'], payload)
client.publish(cfg['percentage_state_topic'], 100)
return True
elif topic == cfg['percentage_command_topic']:
fan_percentage = float(payload.decode()) / 100
min_perc = cfg['min_voltage_percentage']
voltage_percentage = min_perc + (1 - min_perc) * fan_percentage
if fan_percentage == 0:
voltage_percentage = 0
fan.value = voltage_percentage
client.publish(cfg['percentage_state_topic'], payload)
client.publish(cfg['state_topic'], b"ON" if voltage_percentage > 0 else b"OFF")
return True
return False
def on_mqtt_message(client, userdata, msg):
print(msg.topic, msg.payload)
handle_fan_message(client, msg)
handle_light_message(client, msg)
def on_mqtt_connect(client, *args, **kwargs):
subscriptions = []
for device in CONFIG['devices'].values():
for key, value in device.items():
if key.endswith('_topic'):
subscriptions.append((value, 2))
client.subscribe(subscriptions)
def run():
client = mqtt.Client("prusa-printer-client")
client.username_pw_set(CONFIG['mqtt_user'], CONFIG['mqtt_password'])
client.on_connect = on_mqtt_connect
client.on_message = on_mqtt_message
client.connect(CONFIG['mqtt_server'])
send_autodiscovery_messages(client)
client.loop_forever()
if __name__ == "__main__":
run()