"""FHEM integration""" import logging import voluptuous as vol import asyncio import homeassistant.helpers.config_validation as cv from homeassistant.const import CONF_HOST, CONF_PORT, EVENT_HOMEASSISTANT_STOP _LOGGER = logging.getLogger(__name__) CONF_CUL_DEVICE_NAME = 'cul_device_name' DOMAIN = 'fhem' DATA_FHEM = "data_fhem" CONFIG_SCHEMA = vol.Schema({ DOMAIN: vol.Schema({ vol.Required(CONF_HOST): cv.string, vol.Optional(CONF_PORT, default=7072): cv.port, vol.Required(CONF_CUL_DEVICE_NAME): cv.string, }) }, extra=vol.ALLOW_EXTRA) async def async_setup(hass, config): connection = FhemConnection(hass, config) hass.data[DATA_FHEM] = connection await connection.start() return True class FhemConnection: def __init__(self, hass, config): self.hass = hass self._host = config[DOMAIN][CONF_HOST] self._port = config[DOMAIN][CONF_PORT] self._cul_device_name = config[DOMAIN][CONF_CUL_DEVICE_NAME] self.connected = False self.reconnect_time_start = 1 self.reconnect_time_max = 60 self.reconnect_time = self.reconnect_time_start self.devices = {} self._run = False self._writer = None async def start(self): self._run = True self.hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, self.stop) self.hass.loop.create_task(self._connection()) async def stop(self): self._run = False self.connected = False async def _connection(self): try: reader, writer = await asyncio.open_connection(self._host, self._port) _LOGGER.info("Connected to FHEM {}:{}".format(self._host, self._port)) self._writer = writer self.connected = True for device in self.devices.values(): await device.async_update_ha_state() self.reconnect_time = self.reconnect_time_start writer.writelines([ "displayattr .*\n".encode(), "inform on\n".encode(), ]) while self._run: line = await reader.readline() line = line.decode() _LOGGER.debug("FHEM received line: {}".format(line)) await self._process_line(line) except OSError: _LOGGER.warning("Connection to FHEM failed {}:{}".format(self._host, self._port)) self.connected = False for device in self.devices.values(): await device.async_update_ha_state() await asyncio.sleep(self.reconnect_time) self.reconnect_time = min(2 * self.reconnect_time, self.reconnect_time) self.hass.loop.create_task(self._connection()) async def _process_line(self, line): if line.startswith(self._cul_device_name + " "): # Status update message _, device_name, command = line.split(" ", 2) if device_name in self.devices: await self.devices[device_name].line_received(command.strip()) else: # potential response to displayattr split_line = line.split(" ", 1) if len(split_line) == 2: device_name, command = split_line if device_name in self.devices: await self.devices[device_name].line_received(command.strip()) def write_line(self, line): if self._writer: line += '\n' self._writer.write(line.encode()) def fhem_set(self, id, *arguments): """ Send command to FHEM using this device :param arguments: string or list of strings containing command parameters """ arguments = " ".join([str(a) for a in arguments]) self._writer.write("set {} {}\n".format(id, arguments).encode())