"""FHEM integration""" import logging import voluptuous as vol from collections import defaultdict import homeassistant.helpers.config_validation as cv from homeassistant.const import CONF_HOST, CONF_PORT from ..reconnecting_client import ReconnectingClient _LOGGER = logging.getLogger(__name__) CONF_CUL_DEVICE_NAME = 'cul_device_name' CONF_FHEM_SENSOR_TYPE = 'fhem_sensor_type' CONF_FHEM_IDS = 'fhem_ids' DOMAIN = 'fhem' DATA_FHEM = "data_fhem" CONFIG_SCHEMA = vol.Schema({ DOMAIN: vol.Schema({ vol.Required(CONF_HOST): cv.string, vol.Optional(CONF_PORT, default=7072): cv.port, vol.Required(CONF_CUL_DEVICE_NAME): cv.string, }) }, extra=vol.ALLOW_EXTRA) async def async_setup(hass, config): connection = FhemConnection(hass, config[DOMAIN]) hass.data[DATA_FHEM] = connection await connection.start() return True class FhemConnection(ReconnectingClient): def __init__(self, hass, config): super().__init__(hass, config[CONF_HOST], config[CONF_PORT], "FHEM", receive_line_callback=self._process_line, connection_status_changed_callback=self._update_all_devices) self._cul_device_name = config[CONF_CUL_DEVICE_NAME] self._devices = defaultdict(list) def register_device(self, id, d): self._devices[id].append(d) if self._writer: d.refresh() async def _update_all_devices(self, state): if state == 'connected': self.write_line("displayattr .*") self.write_line("inform on") for device_list in self._devices.values(): for device in device_list: device.refresh() # await device.async_update_ha_state() async def _process_line(self, line): if line.startswith(self._cul_device_name + " "): # Status update message _, device_name, command = line.split(" ", 2) for device in self._devices[device_name]: #_LOGGER.debug("FHEM line received (device): " + device_name + ": " + line) await device.line_received(command.strip()) else: # potential response to displayattr split_line = line.split(" ", 1) if len(split_line) == 2: device_name, command = split_line for device in self._devices[device_name]: await device.line_received(command.strip()) def write_line(self, line): if self._writer: line += '\n' self._writer.write(line.encode()) #_LOGGER.debug(f"FHEM write line {line}") else: _LOGGER.debug(f"FHEM Failed to write line {line}") def fhem_set(self, id, *arguments): """ Send command to FHEM using this device :param arguments: string or list of strings containing command parameters """ arguments = " ".join([str(a) for a in arguments]) self.write_line("set {} {}\n".format(id, arguments)) def device_error_reporting(hass, received_line, component_type, component_name): if received_line.startswith('overheat'): overheat = received_line.split(':')[1] overheat = overheat.strip().lower() assert overheat == 'on' or overheat == 'off' if overheat == 'on': text = "FHEM: {} overheated:
{}".format(component_type, component_name) hass.components.persistent_notification.async_create(text, title="{} overheat".format(component_type)) elif received_line.startswith('overload'): overload = received_line.split(':')[1] overload = overload.strip().lower() assert overload == 'on' or overload == 'off' if overload == 'on': text = "FHEM: {} overloaded:
{}".format(component_type, component_name) hass.components.persistent_notification.async_create(text, title="{} overloaded".format(component_type))