homeassistant-config/custom_components/fhem/__init__.py

98 lines
3.8 KiB
Python
Raw Normal View History

2019-06-01 16:17:51 +02:00
"""FHEM integration"""
import logging
import voluptuous as vol
from collections import defaultdict
2019-06-01 16:17:51 +02:00
import homeassistant.helpers.config_validation as cv
from homeassistant.const import CONF_HOST, CONF_PORT
from ..reconnecting_client import ReconnectingClient
2019-06-01 16:17:51 +02:00
_LOGGER = logging.getLogger(__name__)
CONF_CUL_DEVICE_NAME = 'cul_device_name'
CONF_FHEM_SENSOR_TYPE = 'fhem_sensor_type'
CONF_FHEM_IDS = 'fhem_ids'
2019-06-01 16:17:51 +02:00
DOMAIN = 'fhem'
DATA_FHEM = "data_fhem"
CONFIG_SCHEMA = vol.Schema({
DOMAIN: vol.Schema({
vol.Required(CONF_HOST): cv.string,
vol.Optional(CONF_PORT, default=7072): cv.port,
vol.Required(CONF_CUL_DEVICE_NAME): cv.string,
})
}, extra=vol.ALLOW_EXTRA)
async def async_setup(hass, config):
connection = FhemConnection(hass, config[DOMAIN])
2019-06-01 16:17:51 +02:00
hass.data[DATA_FHEM] = connection
await connection.start()
return True
class FhemConnection(ReconnectingClient):
2019-06-01 16:17:51 +02:00
def __init__(self, hass, config):
super().__init__(hass, config[CONF_HOST], config[CONF_PORT], "FHEM",
receive_line_callback=self._process_line,
connection_status_changed_callback=self._update_all_devices)
self._cul_device_name = config[CONF_CUL_DEVICE_NAME]
self._devices = defaultdict(list)
def register_device(self, id, d):
self._devices[id].append(d)
2019-06-19 16:12:22 +02:00
if self._writer:
self._writer.writelines([
"displayattr .*\n".encode(),
])
async def _update_all_devices(self, state):
if state == 'connected':
self.write_line("displayattr .*")
self.write_line("inform on")
for device_list in self._devices.values():
for device in device_list:
await device.async_update_ha_state()
2019-06-01 16:17:51 +02:00
async def _process_line(self, line):
if line.startswith(self._cul_device_name + " "): # Status update message
_, device_name, command = line.split(" ", 2)
for device in self._devices[device_name]:
_LOGGER.debug("FHEM line received (device): " + device_name + ": " + line)
await device.line_received(command.strip())
2019-06-01 16:17:51 +02:00
else: # potential response to displayattr
split_line = line.split(" ", 1)
if len(split_line) == 2:
device_name, command = split_line
for device in self._devices[device_name]:
await device.line_received(command.strip())
2019-06-01 16:17:51 +02:00
def write_line(self, line):
if self._writer:
line += '\n'
self._writer.write(line.encode())
def fhem_set(self, id, *arguments):
"""
Send command to FHEM using this device
:param arguments: string or list of strings containing command parameters
"""
arguments = " ".join([str(a) for a in arguments])
self.write_line("set {} {}\n".format(id, arguments))
def device_error_reporting(hass, received_line, component_type, component_name):
if received_line.startswith('overheat'):
overheat = received_line.split(':')[1]
overheat = overheat.strip().lower()
assert overheat == 'on' or overheat == 'off'
if overheat == 'on':
text = "FHEM: {} overheated: <br><b>{}</b>".format(component_type, component_name)
hass.components.persistent_notification.async_create(text, title="{} overheat".format(component_type))
elif received_line.startswith('overload'):
overload = received_line.split(':')[1]
overload = overload.strip().lower()
assert overload == 'on' or overload == 'off'
if overload == 'on':
text = "FHEM: {} overloaded: <br><b>{}</b>".format(component_type, component_name)
hass.components.persistent_notification.async_create(text, title="{} overloaded".format(component_type))