homeassistant-config/custom_components/fhem/cover.py

111 lines
3.7 KiB
Python

"""Support for covers from FHEM"""
import voluptuous as vol
import logging
from homeassistant.components.cover import PLATFORM_SCHEMA, CoverDevice, SUPPORT_OPEN, SUPPORT_CLOSE, \
SUPPORT_SET_POSITION, SUPPORT_STOP, ATTR_POSITION
from homeassistant.const import CONF_NAME
import homeassistant.helpers.config_validation as cv
from . import DATA_FHEM, device_error_reporting, CONF_FHEM_IDS
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
vol.Required(CONF_FHEM_IDS): vol.All(cv.ensure_list, [cv.string]),
vol.Required(CONF_NAME): cv.string,
})
_LOGGER = logging.getLogger(__name__)
async def async_setup_platform(hass, config, async_add_entities, discovery_info=None):
connection = hass.data[DATA_FHEM]
cover = FhemCover(connection, config[CONF_NAME], config[CONF_FHEM_IDS])
for dev_id in config[CONF_FHEM_IDS]:
connection.register_device(dev_id, cover)
async_add_entities([cover])
class FhemCover(CoverDevice):
def __init__(self, connection, name, ids):
self._position = None
self.connection = connection
self._ids = ids
self._name = name
self._available = True
@property
def name(self):
return self._name
@property
def should_poll(self):
"""No polling needed."""
return False
@property
def available(self) -> bool:
return self._available and self.connection.connected
@property
def supported_features(self):
"""Flag supported features."""
return SUPPORT_OPEN | SUPPORT_CLOSE | SUPPORT_SET_POSITION | SUPPORT_STOP
@property
def current_cover_position(self):
return self._position
def refresh(self):
self.connection.fhem_set(self._ids[0], 'statusRequest')
@property
def is_closed(self):
"""Return if the cover is closed."""
if self._position is None:
return None
return self._position <= 25
async def async_close_cover(self, **kwargs):
await self.async_set_cover_position(**{ATTR_POSITION: 0})
async def async_open_cover(self, **kwargs):
await self.async_set_cover_position(**{ATTR_POSITION: 100})
async def async_set_cover_position(self, **kwargs):
"""Move the cover to a specific position."""
if ATTR_POSITION in kwargs:
position = kwargs[ATTR_POSITION]
self._position = position
self.connection.fhem_set(self._ids[0], int(position))
async def async_stop_cover(self, **kwargs):
"""Stop the cover."""
self.connection.fhem_set(self._ids[0], 'stop')
async def line_received(self, line):
if line.startswith('motor:'):
self._available = True
_, new_motor_state, new_position = line.split(':')
new_position = new_position.strip().lower()
new_motor_state = new_motor_state.strip().lower()
assert new_motor_state in ('up', 'down', 'stop', 'err'), 'Unknown motor state ' + new_motor_state
if new_motor_state == 'stop':
if new_position == 'on':
self._position = 100
elif new_position == 'off':
self._position = 0
else:
new_position = int(float(new_position)) # first convert from string to floating point then truncate
assert 0 <= new_position <= 100
self._position = new_position
await self.async_update_ha_state()
elif line.startswith('ResndFail') or line.startswith('MISSING ACK'):
self._available = False
await self.async_update_ha_state()
else:
device_error_reporting(self.hass, line, component_type="Cover", component_name=self.entity_id)