homeassistant-config/config_creation/knx_conf.py

200 lines
8.1 KiB
Python

from typing import List
from util import DeviceInfo, name_to_id
from util import extent as extend_general
import functools
__all__ = ['extent', 'import_ets5_csv_file', 'create_power_plug', 'create_lights', 'create_shutters', 'create_switches']
def first_lower(s):
if len(s) == 0:
return s
else:
return s[0].lower() + s[1:]
extend = functools.partial(extend_general, platform='knx')
def extent(result_dict, input_dict):
for k, v in input_dict.items():
if k not in result_dict:
result_dict[k] = []
for entry in v:
if 'platform' not in entry:
entry['platform'] = 'knx'
result_dict[k] += v
def import_ets5_csv_file(csv_file):
result = dict()
with open(csv_file, encoding='utf-8') as f:
for line in f:
splitted_line = line.split(",")
name = splitted_line[0].replace('"', "")
group_address = splitted_line[1].replace('"', "")
if '-' not in group_address:
result[name.strip()] = splitted_line[1].replace('"', "").strip()
return result
def create_power_plug(device_info: List[DeviceInfo],
csv_contents,
postfix_on_off_write=" Schalten",
postfix_on_off_read=" RM Schalten",
postfix_counter_value=" BSZ Wert",
postfix_consumption=" Verbrauch",
postfix_consumption_sum=" VerbrauchSumme",
postfix_consumption_reset=" Verbrauch Neustart",
postfix_counter_reset=" BSZ Neustart"):
result = {'switch': [],
'sensor': []}
for entry in device_info:
try:
# Switching
on_off_write_addr = csv_contents.get(entry.csv_name + postfix_on_off_write, None)
on_off_read_addr = csv_contents.get(entry.csv_name + postfix_on_off_read, None)
if on_off_write_addr:
device_entry = {
'name': entry.display_name,
'address': on_off_write_addr,
}
if on_off_read_addr:
device_entry['state_address'] = on_off_read_addr
result['switch'].append(device_entry)
# Counter for time (in hours) how long device was switched on
counter_value = csv_contents.get(entry.csv_name + postfix_counter_value, None)
if counter_value:
result['sensor'].append({
'name': entry.display_name + " Betriebsstunden",
'state_address': counter_value,
'type': '2byte_unsigned',
})
counter_reset = csv_contents.get(entry.csv_name + postfix_counter_reset, None)
if counter_reset:
result['switch'].append({
'name': entry.display_name + " Betriebsstunden Reset",
'address': counter_reset
})
# Consumption
consumption = csv_contents.get(entry.csv_name + postfix_consumption, None)
if consumption:
result['sensor'].append({
'name': entry.display_name + " Verbrauch mA",
'state_address': consumption,
'type': 'DPT-9',
})
name = name_to_id(entry.display_name + " Verbrauch mA", 'sensor')
result['sensor'].append({
'platform': 'template',
'sensors': {
name_to_id(entry.display_name + " Verbrauch", None):
{'friendly_name': entry.display_name + " Verbrauch",
'unit_of_measurement': 'W',
'value_template': f"{{{{ (states('{name}') | float / 1000 * 230) | round(1) }}}}"}}
})
consumption_sum = csv_contents.get(entry.csv_name + postfix_consumption_sum, None)
if consumption_sum:
result['sensor'].append({
'name': entry.display_name + " Verbrauch Summe",
'state_address': consumption_sum,
'type': '4byte_unsigned',
})
consumption_reset = csv_contents.get(entry.csv_name + postfix_consumption_reset, None)
if consumption_reset:
result['switch'].append({
'name': entry.display_name + " Verbrauch Reset",
'address': consumption_reset
})
except KeyError as e:
raise ValueError(f"Skipping light {entry.csv_name} - Could not find CSV File entry: {e}")
return result
def create_lights(device_info: List[DeviceInfo],
csv_contents,
postfix_on_off_write=" Schalten",
postfix_on_off_read=" RM Schalten",
postfix_brightness_write=" Helligkeit",
postfix_brightness_read=" RM Helligkeit"):
result = []
for entry in device_info:
try:
on_off_write_addr = csv_contents.get(entry.csv_name + postfix_on_off_write, None)
if on_off_write_addr is None:
on_off_write_addr = csv_contents[entry.csv_name]
on_off_read_addr = csv_contents.get(entry.csv_name + postfix_on_off_read, None)
brightness_write_addr = csv_contents.get(entry.csv_name + postfix_brightness_write, None)
brightness_read_addr = csv_contents.get(entry.csv_name + postfix_brightness_read, None)
entry = {
'name': entry.display_name,
'address': on_off_write_addr,
}
if on_off_read_addr:
entry['state_address'] = on_off_read_addr
if brightness_write_addr:
entry['brightness_address'] = brightness_write_addr
if brightness_read_addr:
entry['brightness_state_address'] = brightness_read_addr
result.append(entry)
except KeyError as e:
raise ValueError(f"Skipping light {entry.csv_name} - Could not find CSV File entry: {e}")
return {'light': result}
def create_switches(device_info: List[DeviceInfo], csv_contents,
postfix_on_off_write=" Schalten",
postfix_on_off_read=" RM Schalten"):
result = []
for entry in device_info:
try:
on_off_write_addr = csv_contents.get(entry.csv_name + postfix_on_off_write, None)
if on_off_write_addr is None:
on_off_write_addr = csv_contents[entry.csv_name]
on_off_read_addr = csv_contents.get(entry.csv_name + postfix_on_off_read, None)
entry = {
'name': entry.display_name,
'address': on_off_write_addr,
}
if on_off_read_addr:
entry['state_address'] = on_off_read_addr
else:
entry['state_address'] = on_off_write_addr
result.append(entry)
except KeyError as e:
raise ValueError(f"Skipping switch {entry.csv_name} - Could not find CSV File entry: {e}")
return {'switch': result}
def create_shutters(device_info: List[DeviceInfo],
csv_contents,
postfix_long_addr=" Lang", postfix_short_addr=" Kurz", postfix_read_position=" RM Position",
postfix_set_position=" Position"):
result = []
for entry in device_info:
try:
long_addr = csv_contents[entry.csv_name + postfix_long_addr]
short_addr = csv_contents[entry.csv_name + postfix_short_addr]
read_pos_addr = csv_contents[entry.csv_name + postfix_read_position]
set_pos_addr = csv_contents[entry.csv_name + postfix_set_position]
result.append({
'name': entry.display_name,
'move_long_address': long_addr,
'move_short_address': short_addr,
'position_address': set_pos_addr,
'position_state_address': read_pos_addr,
})
except KeyError as e:
raise ValueError(f"Skipping shutter {entry.csv_name} - Could not find CSV File entry: {e}")
return {'cover': result}