homeassistant-config/custom_components/squeezebox_telnet/media_player.py

505 lines
17 KiB
Python

"""Support for interfacing to the Logitech SqueezeBox API."""
import asyncio
import logging
import urllib.parse
import sys
from typing import List, Tuple
import voluptuous as vol
from ..reconnecting_client import ReconnectingClient
from homeassistant.components.media_player import (
MediaPlayerDevice, MEDIA_PLAYER_SCHEMA, PLATFORM_SCHEMA)
from homeassistant.components.media_player.const import (
ATTR_MEDIA_ENQUEUE, DOMAIN, MEDIA_TYPE_MUSIC,
SUPPORT_CLEAR_PLAYLIST, SUPPORT_NEXT_TRACK, SUPPORT_PAUSE,
SUPPORT_PLAY, SUPPORT_PLAY_MEDIA, SUPPORT_PREVIOUS_TRACK, SUPPORT_SEEK,
SUPPORT_SHUFFLE_SET, SUPPORT_TURN_OFF, SUPPORT_TURN_ON,
SUPPORT_VOLUME_MUTE, SUPPORT_VOLUME_SET)
from homeassistant.const import (
ATTR_COMMAND, CONF_HOST, CONF_PASSWORD, CONF_PORT, CONF_USERNAME,
STATE_IDLE, STATE_OFF, STATE_PAUSED, STATE_PLAYING)
import homeassistant.helpers.config_validation as cv
from homeassistant.util.dt import utcnow
_LOGGER = logging.getLogger(__name__)
DEFAULT_PORT = 9090
SUPPORT_SQUEEZEBOX = SUPPORT_PAUSE | SUPPORT_VOLUME_SET | \
SUPPORT_VOLUME_MUTE | SUPPORT_PREVIOUS_TRACK | SUPPORT_NEXT_TRACK | \
SUPPORT_SEEK | SUPPORT_TURN_ON | SUPPORT_TURN_OFF | SUPPORT_PLAY_MEDIA | \
SUPPORT_PLAY | SUPPORT_SHUFFLE_SET | SUPPORT_CLEAR_PLAYLIST
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
vol.Required(CONF_HOST): cv.string,
vol.Optional(CONF_PASSWORD): cv.string,
vol.Optional(CONF_PORT, default=DEFAULT_PORT): cv.port,
vol.Optional(CONF_USERNAME): cv.string,
})
SERVICE_CALL_METHOD = 'squeezebox_call_method'
DATA_SQUEEZEBOX = 'squeezebox_telnet'
KNOWN_SERVERS = 'squeezebox_telnet_known_servers'
ATTR_PARAMETERS = 'parameters'
SQUEEZEBOX_CALL_METHOD_SCHEMA = MEDIA_PLAYER_SCHEMA.extend({
vol.Required(ATTR_COMMAND): cv.string,
vol.Optional(ATTR_PARAMETERS):
vol.All(cv.ensure_list, vol.Length(min=1), [cv.string]),
})
SERVICE_TO_METHOD = {
SERVICE_CALL_METHOD: {
'method': 'async_call_method',
'schema': SQUEEZEBOX_CALL_METHOD_SCHEMA},
}
async def async_setup_platform(hass, config, async_add_entities,
discovery_info=None):
"""Set up the squeezebox platform."""
import socket
known_servers = hass.data.get(KNOWN_SERVERS)
if known_servers is None:
hass.data[KNOWN_SERVERS] = known_servers = set()
if DATA_SQUEEZEBOX not in hass.data:
hass.data[DATA_SQUEEZEBOX] = []
username = config.get(CONF_USERNAME)
password = config.get(CONF_PASSWORD)
if discovery_info is not None:
host = discovery_info.get("host")
port = discovery_info.get("port")
else:
host = config.get(CONF_HOST)
port = config.get(CONF_PORT)
# In case the port is not discovered
if port is None:
port = DEFAULT_PORT
# Get IP of host, to prevent duplication of same host (different DNS names)
try:
ipaddr = socket.gethostbyname(host)
except OSError as error:
_LOGGER.error("Could not communicate with %s:%d: %s", host, port, error)
return False
if ipaddr in known_servers:
return
known_servers.add(ipaddr)
_LOGGER.debug("Creating LMS telnet object for %s", ipaddr)
lms = LogitechMediaServer(hass, host, port, username, password)
await lms.start()
players = await lms.create_players()
hass.data[DATA_SQUEEZEBOX].extend(players)
async_add_entities(players)
async def async_service_handler(service):
"""Map services to methods on MediaPlayerDevice."""
method = SERVICE_TO_METHOD.get(service.service)
if not method:
return
params = {key: value for key, value in service.data.items()
if key != 'entity_id'}
entity_ids = service.data.get('entity_id')
if entity_ids:
target_players = [player for player in hass.data[DATA_SQUEEZEBOX]
if player.entity_id in entity_ids]
else:
target_players = hass.data[DATA_SQUEEZEBOX]
update_tasks = []
for player in target_players:
await getattr(player, method['method'])(**params)
update_tasks.append(player.async_update_ha_state(True))
if update_tasks:
await asyncio.wait(update_tasks)
for service in SERVICE_TO_METHOD:
schema = SERVICE_TO_METHOD[service]['schema']
hass.services.async_register(
DOMAIN, service, async_service_handler,
schema=schema)
return True
class LogitechMediaServer(ReconnectingClient):
"""Representation of a Logitech media server."""
def __init__(self, hass, host, port, username, password):
"""Initialize the Logitech device."""
super().__init__(hass, host, port, 'Squeezeserver',
self._on_receive_line, self._on_connection_status_changed)
self._username = username
self._password = password
self._players = {}
async def create_players(self):
"""Create a list of devices connected to LMS."""
result = []
data = await self.async_one_time_query('players', 0, sys.maxsize)
cmd, player_infos = self._parse(data, recurse_at=('playerindex',))
#_LOGGER.error("Player infos " + str(player_infos))
assert cmd == ['players', '0', str(sys.maxsize)]
if int(player_infos['count']) > 0:
for player_info in player_infos['player'].values():
player = SqueezeBoxDevice(self, **player_info)
result.append(player)
self._players[player.unique_id] = player
return result
@staticmethod
def _unquote(msg):
if isinstance(msg, bytes):
msg = msg.decode()
msg = msg.split()
return [urllib.parse.unquote(s) for s in msg]
@staticmethod
def _command_to_str(*command):
return " ".join(urllib.parse.quote(str(c)) for c in command)
@staticmethod
def _parse(response: List[str], recurse_at: Tuple[str]):
def parse_dict(to_parse, stop_element=None):
result = {}
while to_parse:
if stop_element and to_parse[0].startswith(stop_element):
return result
e = to_parse.pop(0)
if ':' not in e:
_LOGGER.warning(f"Not in key:value form: {e}")
continue
else:
key, value = e.split(':', maxsplit=1)
if key in recurse_at:
index_name = key[:-len('index')].strip()
index_value = int(value)
if index_name not in result:
result[index_name] = {}
result[index_name][index_value] = parse_dict(to_parse, stop_element=key)
else:
if key in result:
raise ValueError("Duplicate key " + key)
result[key] = value
return result
command = []
data = {}
while response:
if ':' in response[0]:
data = parse_dict(response)
assert len(response) == 0, "Remaining" + str(response)
else:
command.append(response.pop(0))
return command, data
async def async_one_time_query(self, *command):
reader, writer = await asyncio.open_connection(self._host, self._port)
if self._username:
_LOGGER.warning("username password")
writer.writelines([self._command_to_str("login", self._username, self._password).encode()])
cmd_line = (self._command_to_str(*command) + "\n").encode()
writer.write(cmd_line)
response = self._unquote(await reader.readline())
writer.close()
return response
def send_command(self, *command):
self.write_line(self._command_to_str(*command))
async def _on_connection_status_changed(self, status):
if status == 'connected':
self.send_command('listen', 1)
async def _on_receive_line(self, line):
decoded = self._unquote(line)
player_id = decoded.pop(0)
try:
cmd, data = self._parse(decoded, recurse_at=('playlist index',))
except ValueError:
_LOGGER.warning("Unable to parse " + str(decoded))
return
if player_id in self._players:
await self._players[player_id].on_receive(cmd, data)
else:
_LOGGER.warning("LMS Ignoring line " + line)
class SqueezeBoxDevice(MediaPlayerDevice):
"""Representation of a SqueezeBox device."""
def __init__(self, lms, playerid, name, **attributes):
"""Initialize the SqueezeBox device."""
super(SqueezeBoxDevice, self).__init__()
self._lms = lms
self._id = playerid
self._status = {}
self._name = name
self._last_media_position_update = None
self._device_state_attributes = attributes
_LOGGER.debug("Creating SqueezeBox object: %s, %s", name, playerid)
self.update()
@property
def should_poll(self):
"""No polling needed."""
return True
def update(self):
tags = 'adKlJj'
self.call_method('status', "-", "1", f'tags:<{tags}>')
async def on_receive(self, cmd, data):
last_media_position = self.media_position
if cmd[0] == 'status':
if 'playlist' not in data:
return
assert len(data['playlist']) == 1, "Unexpected status response: " + str(data)
current_track_data = list(data['playlist'].values())[0]
data.update(current_track_data)
self._status = data
elif cmd[0] == 'playlist':
if cmd[1] == 'newsong':
self._status['time'] = 0
self.update() # trigger information for new song
elif cmd[1] == 'pause' and cmd[2] == '1':
self._status['mode'] = 'pause'
elif cmd[1] == 'pause' and cmd[2] == '0':
self._status['mode'] = 'play'
elif cmd[1] in ('play', 'pause', 'stop'):
self._status['mode'] = cmd[1]
elif cmd[0] == 'play':
self._status['mode'] = 'play'
elif cmd[0] == 'pause':
self._status['mode'] = 'pause'
elif cmd[0] == 'time':
self._status['time'] = cmd[1]
elif cmd[:3] == ['prefset', 'server', 'volume']:
self._status['mixer volume'] = cmd[3]
elif cmd[:3] == ['prefset', 'server', 'repeat']:
self._status['playlist shuffle'] = cmd[3]
if self.media_position != last_media_position:
self._last_media_position_update = utcnow()
await self.async_update_ha_state()
@property
def name(self):
"""Return the name of the device."""
return self._name
@property
def device_state_attributes(self):
return self._device_state_attributes
@property
def unique_id(self):
"""Return a unique ID."""
return self._id
@property
def state(self):
"""Return the state of the device."""
if 'power' in self._status and self._status['power'] == '0':
return STATE_OFF
if 'mode' in self._status:
if self._status['mode'] == 'pause':
return STATE_PAUSED
if self._status['mode'] == 'play':
return STATE_PLAYING
if self._status['mode'] == 'stop':
return STATE_IDLE
return None
@property
def volume_level(self):
"""Volume level of the media player (0..1)."""
if 'mixer volume' in self._status:
return int(float(self._status['mixer volume'])) / 100.0
@property
def is_volume_muted(self):
"""Return true if volume is muted."""
if 'mixer volume' in self._status:
return str(self._status['mixer volume']).startswith('-')
@property
def media_content_id(self):
"""Content ID of current playing media."""
if 'current_title' in self._status:
return self._status['current_title']
@property
def media_content_type(self):
"""Content type of current playing media."""
return MEDIA_TYPE_MUSIC
@property
def media_duration(self):
"""Duration of current playing media in seconds."""
if 'duration' in self._status:
return float(self._status['duration'])
@property
def media_position(self):
"""Duration of current playing media in seconds."""
if 'time' in self._status:
return float(self._status['time'])
@property
def media_position_updated_at(self):
"""Last time status was updated."""
return self._last_media_position_update
@property
def media_image_url(self):
"""Image url of current playing media."""
if 'artwork_url' in self._status:
media_url = self._status['artwork_url']
elif 'id' in self._status:
media_url = '/music/{track_id}/cover.jpg'.format(
track_id=self._status['id'])
else:
media_url = '/music/current/cover.jpg?player={player}'.format(
player=self._id)
http_port = 9000 # TODO
# pylint: disable=protected-access
if self._lms._username:
base_url = 'http://{username}:{password}@{server}:{port}/'.format(
username=self._lms._username,
password=self._lms._password,
server=self._lms._host,
port=http_port)
else:
base_url = 'http://{server}:{port}/'.format(
server=self._lms._host,
port=http_port)
if not media_url.startswith('http'):
url = urllib.parse.urljoin(base_url, media_url)
else:
url = media_url
return url
@property
def media_title(self):
"""Title of current playing media."""
if 'title' in self._status:
return self._status['title']
if 'current_title' in self._status:
return self._status['current_title']
@property
def media_artist(self):
"""Artist of current playing media."""
if 'artist' in self._status:
return self._status['artist']
@property
def media_album_name(self):
"""Album of current playing media."""
if 'album' in self._status:
return self._status['album']
@property
def shuffle(self):
"""Boolean if shuffle is enabled."""
if 'playlist shuffle' in self._status:
return self._status['playlist shuffle'] == '1'
elif 'playlist_shuffle' in self._status:
return self._status['playlist_shuffle'] == '1'
@property
def supported_features(self):
"""Flag media player features that are supported."""
return SUPPORT_SQUEEZEBOX
def turn_off(self):
self.call_method('power', '0')
def volume_up(self):
self.call_method('mixer', 'volume', '+5')
def volume_down(self):
self.call_method('mixer', 'volume', '-5')
def set_volume_level(self, volume):
volume_percent = str(int(volume*100))
self.call_method('mixer', 'volume', volume_percent)
def mute_volume(self, mute):
mute_numeric = '1' if mute else '0'
self.call_method('mixer', 'muting', mute_numeric)
def media_play_pause(self):
self.call_method('pause')
def media_play(self):
self.call_method('play')
def media_pause(self):
self.call_method('pause', '1')
def media_next_track(self):
self.call_method('playlist', 'index', '+1')
def media_previous_track(self):
self.call_method('playlist', 'index', '-1')
def media_seek(self, position):
self.call_method('time', position)
def turn_on(self):
self.call_method('power', '1')
def play_media(self, media_type, media_id, **kwargs):
"""
Send the play_media command to the media player.
If ATTR_MEDIA_ENQUEUE is True, add `media_id` to the current playlist.
This method must be run in the event loop and returns a coroutine.
"""
if kwargs.get(ATTR_MEDIA_ENQUEUE):
return self._add_uri_to_playlist(media_id)
return self._play_uri(media_id)
def _play_uri(self, media_id):
"""Replace the current play list with the uri."""
self.call_method('playlist', 'play', media_id)
def _add_uri_to_playlist(self, media_id):
"""Add an item to the existing playlist."""
self.call_method('playlist', 'add', media_id)
def set_shuffle(self, shuffle):
self.call_method('playlist', 'shuffle', int(shuffle))
def clear_playlist(self):
self.call_method('playlist', 'clear')
def call_method(self, *command):
self._lms.send_command(self._id, *command)