"""Support for interfacing to the Logitech SqueezeBox API.""" import asyncio import logging import urllib.parse import sys from typing import List, Tuple import voluptuous as vol from ..reconnecting_client import ReconnectingClient from .radio import find_local_radio_url_by_name from homeassistant.components.media_player import ( MediaPlayerDevice, PLATFORM_SCHEMA) from homeassistant.components.media_player.const import ( ATTR_MEDIA_ENQUEUE, DOMAIN, MEDIA_TYPE_MUSIC, SUPPORT_CLEAR_PLAYLIST, SUPPORT_NEXT_TRACK, SUPPORT_PAUSE, SUPPORT_PLAY, SUPPORT_PLAY_MEDIA, SUPPORT_PREVIOUS_TRACK, SUPPORT_SEEK, SUPPORT_SHUFFLE_SET, SUPPORT_TURN_OFF, SUPPORT_TURN_ON, SUPPORT_VOLUME_MUTE, SUPPORT_VOLUME_SET) from homeassistant.const import ( ATTR_COMMAND, CONF_HOST, CONF_PASSWORD, CONF_PORT, CONF_USERNAME, ATTR_ENTITY_ID, STATE_IDLE, STATE_OFF, STATE_PAUSED, STATE_PLAYING) import homeassistant.helpers.config_validation as cv from homeassistant.util.dt import utcnow _LOGGER = logging.getLogger(__name__) DEFAULT_PORT = 9090 SUPPORT_SQUEEZEBOX = SUPPORT_PAUSE | SUPPORT_VOLUME_SET | \ SUPPORT_VOLUME_MUTE | SUPPORT_PREVIOUS_TRACK | SUPPORT_NEXT_TRACK | \ SUPPORT_SEEK | SUPPORT_TURN_ON | SUPPORT_TURN_OFF | SUPPORT_PLAY_MEDIA | \ SUPPORT_PLAY | SUPPORT_SHUFFLE_SET | SUPPORT_CLEAR_PLAYLIST PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({ vol.Required(CONF_HOST): cv.string, vol.Optional(CONF_PASSWORD): cv.string, vol.Optional(CONF_PORT, default=DEFAULT_PORT): cv.port, vol.Optional(CONF_USERNAME): cv.string, }) SERVICE_CALL_METHOD = 'squeezebox_call_method' DATA_SQUEEZEBOX = 'squeezebox_telnet' KNOWN_SERVERS = 'squeezebox_telnet_known_servers' ATTR_PARAMETERS = 'parameters' MEDIA_PLAYER_SCHEMA = vol.Schema({ATTR_ENTITY_ID: cv.comp_entity_ids}) SQUEEZEBOX_CALL_METHOD_SCHEMA = MEDIA_PLAYER_SCHEMA.extend({ vol.Required(ATTR_COMMAND): cv.string, vol.Optional(ATTR_PARAMETERS): vol.All(cv.ensure_list, vol.Length(min=1), [cv.string]), }) SERVICE_TO_METHOD = { SERVICE_CALL_METHOD: { 'method': 'async_call_method', 'schema': SQUEEZEBOX_CALL_METHOD_SCHEMA}, } async def async_setup_platform(hass, config, async_add_entities, discovery_info=None): """Set up the squeezebox platform.""" import socket known_servers = hass.data.get(KNOWN_SERVERS) if known_servers is None: hass.data[KNOWN_SERVERS] = known_servers = set() if DATA_SQUEEZEBOX not in hass.data: hass.data[DATA_SQUEEZEBOX] = [] username = config.get(CONF_USERNAME) password = config.get(CONF_PASSWORD) if discovery_info is not None: host = discovery_info.get("host") port = discovery_info.get("port") else: host = config.get(CONF_HOST) port = config.get(CONF_PORT) # In case the port is not discovered if port is None: port = DEFAULT_PORT # Get IP of host, to prevent duplication of same host (different DNS names) try: ipaddr = socket.gethostbyname(host) except OSError as error: _LOGGER.error("Could not communicate with %s:%d: %s", host, port, error) return False if ipaddr in known_servers: return known_servers.add(ipaddr) _LOGGER.debug("Creating LMS telnet object for %s", ipaddr) lms = LogitechMediaServer(hass, host, port, username, password) await lms.start() players = await lms.create_players() hass.data[DATA_SQUEEZEBOX].extend(players) async_add_entities(players) async def async_service_handler(service): """Map services to methods on MediaPlayerDevice.""" method = SERVICE_TO_METHOD.get(service.service) if not method: return params = {key: value for key, value in service.data.items() if key != 'entity_id'} entity_ids = service.data.get('entity_id') if entity_ids: target_players = [player for player in hass.data[DATA_SQUEEZEBOX] if player.entity_id in entity_ids] else: target_players = hass.data[DATA_SQUEEZEBOX] update_tasks = [] for player in target_players: command = service.data.get('command').split() player.call_method(*command) update_tasks.append(player.async_update_ha_state(True)) if update_tasks: await asyncio.wait(update_tasks) for service in SERVICE_TO_METHOD: schema = SERVICE_TO_METHOD[service]['schema'] hass.services.async_register( DOMAIN, service, async_service_handler, schema=schema) return True class LogitechMediaServer(ReconnectingClient): """Representation of a Logitech media server.""" def __init__(self, hass, host, port, username, password): """Initialize the Logitech device.""" super().__init__(hass, host, port, 'Squeezeserver', self._on_receive_line, self._on_connection_status_changed) self._username = username self._password = password self._players = {} async def create_players(self): """Create a list of devices connected to LMS.""" result = [] data = await self.async_one_time_query('players', 0, sys.maxsize) cmd, player_infos = self._parse(data, recurse_at=('playerindex',)) #_LOGGER.error("Player infos " + str(player_infos)) assert cmd == ['players', '0', str(sys.maxsize)] if int(player_infos['count']) > 0: for player_info in player_infos['player'].values(): player = SqueezeBoxDevice(self, **player_info) result.append(player) self._players[player.unique_id] = player return result @staticmethod def _unquote(msg): if isinstance(msg, bytes): msg = msg.decode() msg = msg.split() return [urllib.parse.unquote(s) for s in msg] @staticmethod def _command_to_str(*command): return " ".join(urllib.parse.quote(str(c)) for c in command) @staticmethod def _parse(response: List[str], recurse_at: Tuple[str]): def parse_dict(to_parse, stop_element=None): result = {} while to_parse: if stop_element and to_parse[0].startswith(stop_element): return result e = to_parse.pop(0) if ':' not in e: _LOGGER.warning(f"Not in key:value form: {e}") continue else: key, value = e.split(':', maxsplit=1) if key in recurse_at: index_name = key[:-len('index')].strip() index_value = int(value) if index_name not in result: result[index_name] = {} result[index_name][index_value] = parse_dict(to_parse, stop_element=key) else: if key in result: raise ValueError("Duplicate key " + key) result[key] = value return result command = [] data = {} while response: if ':' in response[0]: data = parse_dict(response) assert len(response) == 0, "Remaining" + str(response) else: command.append(response.pop(0)) return command, data async def async_one_time_query(self, *command): reader, writer = await asyncio.open_connection(self._host, self._port) if self._username: _LOGGER.warning("username password") writer.writelines([self._command_to_str("login", self._username, self._password).encode()]) cmd_line = (self._command_to_str(*command) + "\n").encode() writer.write(cmd_line) response = self._unquote(await reader.readline()) writer.close() return response def send_command(self, *command): self.write_line(self._command_to_str(*command)) async def _on_connection_status_changed(self, status): if status == 'connected': self.send_command('listen', 1) async def _on_receive_line(self, line): decoded = self._unquote(line) player_id = decoded.pop(0) try: cmd, data = self._parse(decoded, recurse_at=('playlist index',)) except ValueError: _LOGGER.warning("Unable to parse " + str(decoded)) return if player_id in self._players: await self._players[player_id].on_receive(cmd, data) elif line.startswith('listen'): pass else: _LOGGER.warning("LMS Ignoring line " + line) class SqueezeBoxDevice(MediaPlayerDevice): """Representation of a SqueezeBox device.""" def __init__(self, lms, playerid, name, **attributes): """Initialize the SqueezeBox device.""" super(SqueezeBoxDevice, self).__init__() self._lms = lms self._id = playerid self._status = {} self._name = name self._last_media_position_update = None self._device_state_attributes = attributes _LOGGER.debug("Creating SqueezeBox object: %s, %s", name, playerid) self.update() @property def should_poll(self): """No polling needed.""" return True def update(self): tags = 'adKlJj' self.call_method('status', "-", "1", f'tags:<{tags}>') async def on_receive(self, cmd, data): last_media_position = self.media_position if cmd[0] == 'status': if 'playlist' not in data: return assert len(data['playlist']) == 1, "Unexpected status response: " + str(data) current_track_data = list(data['playlist'].values())[0] data.update(current_track_data) self._status = data elif cmd[0] == 'playlist': if cmd[1] == 'newsong': self._status['time'] = 0 self.update() # trigger information for new song elif cmd[1] == 'pause' and cmd[2] == '1': self._status['mode'] = 'pause' elif cmd[1] == 'pause' and cmd[2] == '0': self._status['mode'] = 'play' elif cmd[1] in ('play', 'pause', 'stop'): self._status['mode'] = cmd[1] elif cmd[0] == 'play': self._status['mode'] = 'play' elif cmd[0] == 'pause': self._status['mode'] = 'pause' elif cmd[0] == 'time': self._status['time'] = cmd[1] elif cmd[:3] == ['prefset', 'server', 'volume']: self._status['mixer volume'] = cmd[3] elif cmd[:3] == ['prefset', 'server', 'repeat']: self._status['playlist shuffle'] = cmd[3] if self.media_position != last_media_position: self._last_media_position_update = utcnow() await self.async_update_ha_state() @property def name(self): """Return the name of the device.""" return self._name @property def device_state_attributes(self): return self._device_state_attributes @property def unique_id(self): """Return a unique ID.""" return self._id @property def state(self): """Return the state of the device.""" if 'power' in self._status and self._status['power'] == '0': return STATE_OFF if 'mode' in self._status: if self._status['mode'] == 'pause': return STATE_PAUSED if self._status['mode'] == 'play': return STATE_PLAYING if self._status['mode'] == 'stop': return STATE_IDLE return None @property def volume_level(self): """Volume level of the media player (0..1).""" if 'mixer volume' in self._status: return int(float(self._status['mixer volume'])) / 100.0 @property def is_volume_muted(self): """Return true if volume is muted.""" if 'mixer volume' in self._status: return str(self._status['mixer volume']).startswith('-') @property def media_content_id(self): """Content ID of current playing media.""" if 'current_title' in self._status: return self._status['current_title'] @property def media_content_type(self): """Content type of current playing media.""" return MEDIA_TYPE_MUSIC @property def media_duration(self): """Duration of current playing media in seconds.""" if 'duration' in self._status: return float(self._status['duration']) @property def media_position(self): """Duration of current playing media in seconds.""" if 'time' in self._status: return float(self._status['time']) @property def media_position_updated_at(self): """Last time status was updated.""" return self._last_media_position_update @property def media_image_url(self): """Image url of current playing media.""" if 'artwork_url' in self._status: media_url = self._status['artwork_url'] elif 'id' in self._status: media_url = '/music/{track_id}/cover.jpg'.format( track_id=self._status['id']) else: media_url = '/music/current/cover.jpg?player={player}'.format( player=self._id) http_port = 9000 # TODO # pylint: disable=protected-access if self._lms._username: base_url = 'http://{username}:{password}@{server}:{port}/'.format( username=self._lms._username, password=self._lms._password, server=self._lms._host, port=http_port) else: base_url = 'http://{server}:{port}/'.format( server=self._lms._host, port=http_port) if not media_url.startswith('http'): url = urllib.parse.urljoin(base_url, media_url) else: url = media_url return url @property def media_title(self): """Title of current playing media.""" if 'title' in self._status: return self._status['title'] if 'current_title' in self._status: return self._status['current_title'] @property def media_artist(self): """Artist of current playing media.""" if 'artist' in self._status: return self._status['artist'] @property def media_album_name(self): """Album of current playing media.""" if 'album' in self._status: return self._status['album'] @property def shuffle(self): """Boolean if shuffle is enabled.""" if 'playlist shuffle' in self._status: return self._status['playlist shuffle'] == '1' elif 'playlist_shuffle' in self._status: return self._status['playlist_shuffle'] == '1' @property def supported_features(self): """Flag media player features that are supported.""" return SUPPORT_SQUEEZEBOX def turn_off(self): self.call_method('power', '0') def volume_up(self): self.call_method('mixer', 'volume', '+5') def volume_down(self): self.call_method('mixer', 'volume', '-5') def set_volume_level(self, volume): volume_percent = str(int(volume*100)) self.call_method('mixer', 'volume', volume_percent) def mute_volume(self, mute): mute_numeric = '1' if mute else '0' self.call_method('mixer', 'muting', mute_numeric) def media_play_pause(self): self.call_method('pause') def media_play(self): self.call_method('play') def media_pause(self): self.call_method('pause', '1') def media_next_track(self): self.call_method('playlist', 'index', '+1') def media_previous_track(self): self.call_method('playlist', 'index', '-1') def media_seek(self, position): self.call_method('time', position) def turn_on(self): self.call_method('power', '1') def play_media(self, media_type, media_id, **kwargs): """ Send the play_media command to the media player. If ATTR_MEDIA_ENQUEUE is True, add `media_id` to the current playlist. This method must be run in the event loop and returns a coroutine. """ if media_type == 'channel': media_id = find_local_radio_url_by_name(media_id) if kwargs.get(ATTR_MEDIA_ENQUEUE): return self._add_uri_to_playlist(media_id) return self._play_uri(media_id) def _play_uri(self, media_id): """Replace the current play list with the uri.""" self.call_method('playlist', 'play', media_id) def _add_uri_to_playlist(self, media_id): """Add an item to the existing playlist.""" self.call_method('playlist', 'add', media_id) def set_shuffle(self, shuffle): self.call_method('playlist', 'shuffle', int(shuffle)) def clear_playlist(self): self.call_method('playlist', 'clear') def call_method(self, *command): self._lms.send_command(self._id, *command)