198 lines
8.0 KiB
Python
198 lines
8.0 KiB
Python
from typing import List
|
|
from util import DeviceInfo, name_to_id
|
|
from util import extent as extend_general
|
|
import functools
|
|
|
|
__all__ = ['extent', 'import_ets5_csv_file', 'create_power_plug', 'create_lights', 'create_shutters', 'create_switches']
|
|
|
|
|
|
def first_lower(s):
|
|
if len(s) == 0:
|
|
return s
|
|
else:
|
|
return s[0].lower() + s[1:]
|
|
|
|
|
|
extend = functools.partial(extend_general, platform='knx')
|
|
|
|
|
|
def extent(result_dict, input_dict):
|
|
for k, v in input_dict.items():
|
|
if k not in result_dict:
|
|
result_dict[k] = []
|
|
for entry in v:
|
|
if 'platform' not in entry:
|
|
entry['platform'] = 'knx'
|
|
result_dict[k] += v
|
|
|
|
|
|
def import_ets5_csv_file(csv_file):
|
|
result = dict()
|
|
with open(csv_file, encoding='utf-8') as f:
|
|
for line in f:
|
|
splitted_line = line.split(",")
|
|
name = splitted_line[0].replace('"', "")
|
|
group_address = splitted_line[1].replace('"', "")
|
|
if '-' not in group_address:
|
|
result[name.strip()] = splitted_line[1].replace('"', "").strip()
|
|
return result
|
|
|
|
|
|
def create_power_plug(device_info: List[DeviceInfo],
|
|
csv_contents,
|
|
postfix_on_off_write=" Schalten",
|
|
postfix_on_off_read=" RM Schalten",
|
|
postfix_counter_value=" BSZ Wert",
|
|
postfix_consumption=" Verbrauch",
|
|
postfix_consumption_sum=" VerbrauchSumme",
|
|
postfix_consumption_reset=" Verbrauch Neustart",
|
|
postfix_counter_reset=" BSZ Neustart"):
|
|
result = {'switch': [],
|
|
'sensor': []}
|
|
for entry in device_info:
|
|
try:
|
|
# Switching
|
|
on_off_write_addr = csv_contents.get(entry.csv_name + postfix_on_off_write, None)
|
|
on_off_read_addr = csv_contents.get(entry.csv_name + postfix_on_off_read, None)
|
|
if on_off_write_addr:
|
|
device_entry = {
|
|
'name': entry.display_name,
|
|
'address': on_off_write_addr,
|
|
}
|
|
if on_off_read_addr:
|
|
device_entry['state_address'] = on_off_read_addr
|
|
result['switch'].append(device_entry)
|
|
|
|
# Counter for time (in hours) how long device was switched on
|
|
counter_value = csv_contents.get(entry.csv_name + postfix_counter_value, None)
|
|
if counter_value:
|
|
result['sensor'].append({
|
|
'name': entry.display_name + " Betriebsstunden",
|
|
'address': counter_value,
|
|
'type': '2byte_unsigned',
|
|
})
|
|
counter_reset = csv_contents.get(entry.csv_name + postfix_counter_reset, None)
|
|
if counter_reset:
|
|
result['switch'].append({
|
|
'name': entry.display_name + " Betriebsstunden Reset",
|
|
'address': counter_reset
|
|
})
|
|
|
|
# Consumption
|
|
consumption = csv_contents.get(entry.csv_name + postfix_consumption, None)
|
|
if consumption:
|
|
result['sensor'].append({
|
|
'name': entry.display_name + " Verbrauch mA",
|
|
'address': consumption,
|
|
'type': 'DPT-9',
|
|
})
|
|
name = name_to_id(entry.display_name + " Verbrauch mA", 'sensor')
|
|
result['sensor'].append({
|
|
'platform': 'template',
|
|
'sensors': {
|
|
name_to_id(entry.display_name + " Verbrauch", None):
|
|
{'friendly_name': entry.display_name + " Verbrauch",
|
|
'unit_of_measurement': 'W',
|
|
'value_template': f"{{{{ (states('{name}') | float / 1000 * 230) | round(1) }}}}"}}
|
|
})
|
|
consumption_sum = csv_contents.get(entry.csv_name + postfix_consumption_sum, None)
|
|
if consumption_sum:
|
|
result['sensor'].append({
|
|
'name': entry.display_name + " Verbrauch Summe",
|
|
'address': consumption_sum,
|
|
'type': '4byte_unsigned',
|
|
})
|
|
consumption_reset = csv_contents.get(entry.csv_name + postfix_consumption_reset, None)
|
|
if consumption_reset:
|
|
result['switch'].append({
|
|
'name': entry.display_name + " Verbrauch Reset",
|
|
'address': consumption_reset
|
|
})
|
|
|
|
except KeyError as e:
|
|
raise ValueError(f"Skipping light {entry.csv_name} - Could not find CSV File entry: {e}")
|
|
return result
|
|
|
|
|
|
def create_lights(device_info: List[DeviceInfo],
|
|
csv_contents,
|
|
postfix_on_off_write=" Schalten",
|
|
postfix_on_off_read=" RM Schalten",
|
|
postfix_brightness_write=" Helligkeit",
|
|
postfix_brightness_read=" RM Helligkeit"):
|
|
result = []
|
|
for entry in device_info:
|
|
try:
|
|
on_off_write_addr = csv_contents[entry.csv_name + postfix_on_off_write]
|
|
on_off_read_addr = csv_contents.get(entry.csv_name + postfix_on_off_read, None)
|
|
brightness_write_addr = csv_contents.get(entry.csv_name + postfix_brightness_write, None)
|
|
brightness_read_addr = csv_contents.get(entry.csv_name + postfix_brightness_read, None)
|
|
|
|
entry = {
|
|
'name': entry.display_name,
|
|
'address': on_off_write_addr,
|
|
}
|
|
if on_off_read_addr:
|
|
entry['state_address'] = on_off_read_addr
|
|
if brightness_write_addr:
|
|
entry['brightness_address'] = brightness_write_addr
|
|
if brightness_read_addr:
|
|
entry['brightness_state_address'] = brightness_read_addr
|
|
result.append(entry)
|
|
except KeyError as e:
|
|
raise ValueError(f"Skipping light {entry.csv_name} - Could not find CSV File entry: {e}")
|
|
|
|
return {'light': result}
|
|
|
|
|
|
def create_switches(device_info: List[DeviceInfo], csv_contents,
|
|
postfix_on_off_write=" Schalten",
|
|
postfix_on_off_read=" RM Schalten"):
|
|
result = []
|
|
for entry in device_info:
|
|
try:
|
|
on_off_write_addr = csv_contents.get(entry.csv_name + postfix_on_off_write, None)
|
|
if on_off_write_addr is None:
|
|
on_off_write_addr = csv_contents[entry.csv_name]
|
|
on_off_read_addr = csv_contents.get(entry.csv_name + postfix_on_off_read, None)
|
|
|
|
entry = {
|
|
'name': entry.display_name,
|
|
'address': on_off_write_addr,
|
|
}
|
|
if on_off_read_addr:
|
|
entry['state_address'] = on_off_read_addr
|
|
else:
|
|
entry['state_address'] = on_off_write_addr
|
|
result.append(entry)
|
|
except KeyError as e:
|
|
raise ValueError(f"Skipping switch {entry.csv_name} - Could not find CSV File entry: {e}")
|
|
|
|
return {'switch': result}
|
|
|
|
|
|
def create_shutters(device_info: List[DeviceInfo],
|
|
csv_contents,
|
|
postfix_long_addr=" Lang", postfix_short_addr=" Kurz", postfix_read_position=" RM Position",
|
|
postfix_set_position=" Position"):
|
|
result = []
|
|
|
|
for entry in device_info:
|
|
try:
|
|
long_addr = csv_contents[entry.csv_name + postfix_long_addr]
|
|
short_addr = csv_contents[entry.csv_name + postfix_short_addr]
|
|
read_pos_addr = csv_contents[entry.csv_name + postfix_read_position]
|
|
set_pos_addr = csv_contents[entry.csv_name + postfix_set_position]
|
|
|
|
result.append({
|
|
'name': entry.display_name,
|
|
'move_long_address': long_addr,
|
|
'move_short_address': short_addr,
|
|
'position_address': set_pos_addr,
|
|
'position_state_address': read_pos_addr,
|
|
})
|
|
except KeyError as e:
|
|
raise ValueError(f"Skipping shutter {entry.csv_name} - Could not find CSV File entry: {e}")
|
|
|
|
return {'cover': result}
|