"""Support for covers from FHEM""" import voluptuous as vol import logging from homeassistant.components.cover import PLATFORM_SCHEMA, CoverDevice, SUPPORT_OPEN, SUPPORT_CLOSE, \ SUPPORT_SET_POSITION, SUPPORT_STOP, ATTR_POSITION from homeassistant.const import CONF_NAME import homeassistant.helpers.config_validation as cv from . import DATA_FHEM, device_error_reporting, CONF_FHEM_IDS PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({ vol.Required(CONF_FHEM_IDS): vol.All(cv.ensure_list, [cv.string]), vol.Required(CONF_NAME): cv.string, }) _LOGGER = logging.getLogger(__name__) async def async_setup_platform(hass, config, async_add_entities, discovery_info=None): connection = hass.data[DATA_FHEM] cover = FhemCover(connection, config[CONF_NAME], config[CONF_FHEM_IDS]) for dev_id in config[CONF_FHEM_IDS]: connection.register_device(dev_id, cover) async_add_entities([cover]) class FhemCover(CoverDevice): def __init__(self, connection, name, ids): self._position = None self.connection = connection self._ids = ids self._name = name self._available = True @property def name(self): return self._name @property def should_poll(self): """No polling needed.""" return False @property def available(self) -> bool: return self._available and self.connection.connected @property def supported_features(self): """Flag supported features.""" return SUPPORT_OPEN | SUPPORT_CLOSE | SUPPORT_SET_POSITION | SUPPORT_STOP @property def current_cover_position(self): return self._position @property def is_closed(self): """Return if the cover is closed.""" if self._position is None: return None return self._position <= 25 async def async_close_cover(self, **kwargs): await self.async_set_cover_position(**{ATTR_POSITION: 0}) async def async_open_cover(self, **kwargs): await self.async_set_cover_position(**{ATTR_POSITION: 100}) async def async_set_cover_position(self, **kwargs): """Move the cover to a specific position.""" if ATTR_POSITION in kwargs: position = kwargs[ATTR_POSITION] self._position = position self.connection.fhem_set(self._ids[0], int(position)) async def async_stop_cover(self, **kwargs): """Stop the cover.""" self.connection.fhem_set(self._ids[0], 'stop') async def line_received(self, line): if line.startswith('motor:'): self._available = True _, new_motor_state, new_position = line.split(':') new_position = new_position.strip().lower() new_motor_state = new_motor_state.strip().lower() assert new_motor_state == 'stop' or new_motor_state == 'up' or new_motor_state == 'down' if new_motor_state == 'stop': if new_position == 'on': self._position = 100 elif new_position == 'off': self._position = 0 else: new_position = int(float(new_position)) # first convert from string to floating point then truncate assert 0 <= new_position <= 100 self._position = new_position await self.async_update_ha_state() elif line.startswith('ResndFail') or line.startswith('MISSING ACK'): self._available = False await self.async_update_ha_state() else: device_error_reporting(self.hass, line, component_type="Cover", component_name=self.entity_id)