homeassistant-config/custom_components/squeezebox_telnet/radio.py

79 lines
2.5 KiB
Python
Raw Normal View History

2019-12-22 19:21:12 +01:00
import xml.etree.ElementTree as ET
from urllib.request import urlopen
import aiohttp
import asyncio
2019-12-22 19:21:12 +01:00
ompl_radio_browse_url = 'http://opml.radiotime.com/Browse.ashx'
# goto ompl.radiotime with parameter ?c=local and look for your city there then paste id here
query_url = ompl_radio_browse_url + '?id=r100765'
STATION_CACHE = {}
2019-12-22 19:21:12 +01:00
def radio_name_cleanup(radio_name):
"""Removes tokens with brackets or points
"Radio Bamberg 106.1 (Top 40/Pop)" -> "Radio Bamberg 106.1"
"""
radio_name = radio_name.split()
res = []
for token in radio_name:
if '(' in token or ')' in token:
continue
res.append(token)
return " ".join(res)
def parse_sender_information(rootnode):
2019-12-22 19:21:12 +01:00
result = dict()
for node in rootnode.iter('outline'):
if 'type' not in node.attrib or node.attrib['type'] != "audio":
continue
station_name = radio_name_cleanup(node.attrib['text'])
station_url = node.attrib['URL']
station_id = node.attrib['guide_id']
print(station_name)
result[station_id] = {'name': station_name, 'url': station_url}
return result
def get_local_radio_stations():
"""
Example Query: GET http://opml.radiotime.com/Browse.ashx?partnerId=<partnerid>&serial=<serial>
partnerid and serial does not seem to be necessary - so we do not use it here,
"""
xmlfile = urlopen(query_url)
tree = ET.ElementTree(file=xmlfile)
return parse_sender_information(tree.getroot()
)
2019-12-22 19:21:12 +01:00
def find_local_radio_url_by_name(search_name):
"""Searches at the Radiotime database for a local radiostation which contains the searchName """
search_name = search_name.lower().strip()
if search_name.startswith('http'):
return search_name
2020-05-03 16:41:27 +02:00
# first try: find a station that starts with the search expression
for radio_id, radio in STATION_CACHE.items():
2020-05-03 16:41:27 +02:00
if radio['name'].lower().startswith(search_name):
return radio['url']
# second try: containment is enough
for radio_id, radio in STATION_CACHE.items():
2019-12-22 19:21:12 +01:00
if search_name in radio['name'].lower():
return radio['url']
raise ValueError("Could not find radio station " + search_name)
async def fill_station_cache_async(client_session):
print("-------- Fill station cache async")
global STATION_CACHE
async with client_session.get(query_url) as resp:
STATION_CACHE = parse_sender_information(ET.fromstring(await resp.text()))
if __name__ == '__main__':
print(find_local_radio_url_by_name("B 5 aktuell"))