# %% from ping3 import ping from fritzconnection import FritzConnection from fritzconnection.lib.fritzhosts import FritzHosts # Data collection # # ping_results # - timestamp # - ip # - ms (or result if worked) def find_mesh_node_by_uid(mesh_topo, uid): matching_nodes = [e for e in mesh_topo['nodes'] if e['uid'] == uid] assert len(matching_nodes) == 1 return matching_nodes[0] def find_mesh_node_by_device_name(mesh_topo, device_name): matching_nodes = [e for e in mesh_topo['nodes'] if e['device_name'] == device_name] assert len(matching_nodes) == 1 return matching_nodes[0] def mesh_connection(mesh_topo, device_name): node = find_mesh_node_by_device_name(mesh_topo, device_name) assert len(node['node_interfaces']) == 1 interface = node['node_interfaces'][0] links = interface['node_links'] last_link = links[0] connected = last_link['state'] == 'CONNECTED' connected_via = find_mesh_node_by_uid(mesh_topo, uid=last_link['node_1_uid'])['device_name'] return { 'current_channel': interface['current_channel'], 'channel_utilization': interface['channel_utilization'], 'cur_availability_rx': last_link['cur_availability_rx'], 'cur_availability_tx': last_link['cur_availability_tx'], 'cur_data_rate_rx': last_link['cur_data_rate_rx'], 'cur_data_rate_tx': last_link['cur_data_rate_tx'], 'rx_rsni': last_link['rx_rsni'], 'tx_rsni': last_link['tx_rsni'], 'rx_rcpi': last_link['rx_rcpi'], 'tx_rcpi': last_link['tx_rcpi'], 'connected': connected, 'connected_via': connected_via, } PW = "1yBhlx8dp3aB" #fritzconnection = FritzConnection(address="192.168.178.1", user="martin", password=PW) fritz_hosts = FritzHosts(address="192.168.178.1", user="martin", password=PW) # %% mesh_topo = fritz_hosts.get_mesh_topology() #print(res) print(mesh_connection(mesh_topo, 'iPad-von-Martin')) print(mesh_connection(mesh_topo, 'tasmota07')) # %%