142 lines
5.5 KiB
Python
142 lines
5.5 KiB
Python
import asyncio
|
|
import logging
|
|
from homeassistant.const import EVENT_HOMEASSISTANT_STOP
|
|
from homeassistant.components.binary_sensor import BinarySensorDevice
|
|
import socket
|
|
|
|
|
|
class IsConnectedSensor(BinarySensorDevice):
|
|
|
|
def __init__(self, name):
|
|
self._on = True
|
|
self._name = name
|
|
|
|
async def set_value(self, value):
|
|
self._on = value
|
|
# this doesn't work sometimes - but polling is enabled anyway
|
|
try:
|
|
await asyncio.wait_for(self.async_update_ha_state(), 1)
|
|
except Exception:
|
|
pass
|
|
|
|
@property
|
|
def name(self):
|
|
return self._name
|
|
|
|
@property
|
|
def available(self) -> bool:
|
|
return True
|
|
|
|
@property
|
|
def is_on(self):
|
|
return self._on
|
|
|
|
def refresh(self):
|
|
pass
|
|
|
|
def update(self):
|
|
pass
|
|
|
|
|
|
_LOGGER = logging.getLogger(__name__)
|
|
|
|
|
|
class ReconnectingClient:
|
|
|
|
def __init__(self, hass, host, port, connection_name, receive_line_callback,
|
|
connection_status_changed_callback, timeout=None):
|
|
self.connected = False
|
|
self.reconnect_time_start = 1
|
|
self.reconnect_time_max = 60
|
|
self.reconnect_time = self.reconnect_time_start
|
|
self.hass = hass
|
|
self._host = host
|
|
self._port = port
|
|
self._receive_line_callback = receive_line_callback
|
|
self._connection_status_changed_callback = connection_status_changed_callback
|
|
self._run = False
|
|
self._writer = None
|
|
self._connection_last_state = 'UNKNOWN'
|
|
self._connection_name = connection_name
|
|
self._connection_task = None
|
|
self._connected_sensor = None
|
|
self._timeout = timeout
|
|
|
|
@property
|
|
def connected_sensor(self):
|
|
return self._connected_sensor
|
|
|
|
@connected_sensor.setter
|
|
def connected_sensor(self, new_value):
|
|
self._connected_sensor = new_value
|
|
self._connected_sensor._on = self.connected
|
|
|
|
async def start(self):
|
|
self._run = True
|
|
self.hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, self.stop)
|
|
self._connection_task = self.hass.loop.create_task(self._connection())
|
|
|
|
async def stop(self):
|
|
self._connection_task.cancel()
|
|
self._run = False
|
|
self.connected = False
|
|
|
|
def get_name(self):
|
|
return "{}_{}".format(self._connection_name, self._host)
|
|
|
|
def write_line(self, line):
|
|
try:
|
|
if self._writer:
|
|
line += '\n'
|
|
self._writer.write(line.encode())
|
|
else:
|
|
_LOGGER.warning(f"Skipping line '{line}'' because _writer is None")
|
|
except RuntimeError as e:
|
|
_LOGGER.error("Writing failed " + str(e))
|
|
self._connection_task.cancel()
|
|
self._connection_task = self.hass.loop.create_task(self._connection())
|
|
|
|
async def _connection(self):
|
|
try:
|
|
reader, writer = await asyncio.open_connection(self._host, self._port)
|
|
self._connection_last_state = 'CONNECTED'
|
|
_LOGGER.debug(f"{self._connection_name} starting connection")
|
|
self._writer = writer
|
|
self.connected = True
|
|
await self._connection_status_changed_callback('connected')
|
|
if self._connected_sensor:
|
|
await self._connected_sensor.set_value(True)
|
|
|
|
self.reconnect_time = self.reconnect_time_start
|
|
while self._run:
|
|
if self._timeout:
|
|
line = await asyncio.wait_for(reader.readline(), self._timeout)
|
|
else:
|
|
line = await reader.readline()
|
|
if not line:
|
|
raise OSError("Disconnect")
|
|
line = line.decode()
|
|
_LOGGER.debug(f"{self._connection_name} received line - passing along '{line}'")
|
|
await self._receive_line_callback(line)
|
|
except (OSError, asyncio.TimeoutError):
|
|
if self._connection_last_state != 'FAILED':
|
|
notification_text = f"{self._connection_name} connection to {self._host}:{self._port} failed"
|
|
self.hass.components.persistent_notification.async_create(notification_text, title="No connection")
|
|
_LOGGER.error(f"Connection to {self._connection_name} failed {self._host}:{self._port}")
|
|
await self._connection_status_changed_callback('disconnected')
|
|
if self._connected_sensor:
|
|
await asyncio.wait_for(self._connected_sensor.set_value(False), timeout=2.0)
|
|
_LOGGER.error(f"After setting sensor ({self._connection_name} {self._host}:{self._port})")
|
|
_LOGGER.error(f"After connection failed ({self._connection_name} {self._host}:{self._port})")
|
|
else:
|
|
_LOGGER.debug(f"{self._connection_name} retried connection, last state {self._connection_last_state}"
|
|
f"reconnection time {self.reconnect_time}")
|
|
self._connection_last_state = 'FAILED'
|
|
self.connected = False
|
|
_LOGGER.error("{} {} Sleeping for {}".format(self._connection_name, self._host, self.reconnect_time))
|
|
await asyncio.sleep(self.reconnect_time)
|
|
self.reconnect_time = min(2 * self.reconnect_time, self.reconnect_time_max)
|
|
_LOGGER.error("{} {} creating connection task again {}".format(self._connection_name, self._host,
|
|
self.reconnect_time))
|
|
self._connection_task = self.hass.loop.create_task(self._connection())
|