ansible/roles/bluetooth-monitor/templates/my_btmonitor.py

154 lines
5.1 KiB
Python

#!/usr/bin/env python3
import asyncio
from bleak import BleakScanner
from bleak.assigned_numbers import AdvertisementDataType
from bleak.backends.bluezdbus.advertisement_monitor import OrPattern
from bleak.backends.bluezdbus.scanner import BlueZScannerArgs
from Cryptodome.Cipher import AES
from functools import partial
import asyncio_mqtt
import json
from typing import Dict
import collections
import numpy as np
# ------------------- Config ----------------------------------------------------------------
config = {
"mqtt": {
"hostname": "homeassistant.fritz.box",
"username": "{{my_btmonitor_mqtt_username}}",
"password": "{{my_btmonitor_mqtt_password}}",
"room": "{{sensor_room_name_ascii}}"
},
"irk_to_devicename": {
"aa67542b82c0e05d65c27fb7e313aba5": "martins_apple_watch",
"840e3892644c1ebd1594a9069c14ce0d" : "martins_iphone",
}
}
# --------------------------------------------------------------------------------------------
def resolve_rpa(rpa: bytes, irk: bytes) -> bool:
"""Compares the random address rpa to an irk (secret key) and return True if it matches"""
assert len(rpa) == 6
assert len(irk) == 16
key = irk
plain_text = b'\x00' * 16
plain_text = bytearray(plain_text)
plain_text[15] = rpa[3]
plain_text[14] = rpa[4]
plain_text[13] = rpa[5]
plain_text = bytes(plain_text)
cipher = AES.new(key, AES.MODE_ECB)
cipher_text = cipher.encrypt(plain_text)
return cipher_text[15] == rpa[0] and cipher_text[14] == rpa[1] and cipher_text[13] == rpa[2]
def addr_to_bytes(addr:str) -> bytes:
"""Converts a bluetooth mac address string with semicolons to bytes"""
str_without_colons = addr.replace(":", "")
bytearr = bytearray.fromhex(str_without_colons)
bytearr.reverse()
return bytes(bytearr)
def decode_address(addr: str, irks: Dict[str, str]):
"""
addr is a bluetooth address as a string e.g. 4d:24:12:12:34:10
irks is dict with irk as a hex string, mapping to device name
"""
for irk, name in irks.items():
if resolve_rpa(addr_to_bytes(addr), bytes.fromhex(irk)):
return name
return None
def estimate_distance(rssi, tx_power, pl0=73):
"""
RSSI in dBm
txPower is a transmitter parameter that calculated according to its physic layer and antenna in dBm
Return value in meter
You should calculate "PL0" in calibration stage:
PL0 = txPower - RSSI; // When distance is distance0 (distance0 = 1m or more)
SO, RSSI will be calculated by below formula:
RSSI = txPower - PL0 - 10 * n * log(distance/distance0) - G(t)
G(t) ~= 0 //This parameter is the main challenge in achiving to more accuracy.
n = 2 (Path Loss Exponent, in the free space is 2)
distance0 = 1 (m)
distance = 10 ^ ((txPower - RSSI - PL0 ) / (10 * n))
Read more details:
https://en.wikipedia.org/wiki/Log-distance_path_loss_model
"""
n = 3.5
return 10**((tx_power - rssi - pl0) / (10 * n))
def smooth(y, box_pts):
box = np.ones(box_pts)/box_pts
y_smooth = np.convolve(y, box, mode='same')
return y_smooth
WINDOW_SIZE = 6
last_values = collections.deque(maxlen=WINDOW_SIZE)
def filter_distance(dist: float):
global last_values
last_values.append(dist)
return smooth(np.array(last_values), 12)[-1]
async def on_device_found_callback(irks, mqtt_client, room, device, advertising_data):
decoded_device_id = decode_address(device.address, irks)
rssi = advertising_data.rssi
tx_power = advertising_data.tx_power
if decoded_device_id and tx_power is not None and rssi is not None:
topic = f"my_btmonitor/devices/{decoded_device_id}/{room}"
distance = estimate_distance(rssi, tx_power, {{my_btmonitor_pl0 | default('73')}} )
filtered_distance = filter_distance(distance)
data = {"id": decoded_device_id,
"name": decoded_device_id,
"rssi": rssi,
"tx_power": tx_power,
"distance": filtered_distance,
"unfiltered_distance": distance,
}
try:
await mqtt_client.publish(topic, json.dumps(data).encode())
except Exception:
print("Probably mqtt isn't running - exit whole script and let systemd restart it")
exit(1)
#print(data)
async def main():
stop_event = asyncio.Event()
mqtt_conf = config['mqtt']
while True:
try:
async with asyncio_mqtt.Client(hostname=mqtt_conf["hostname"],
username=mqtt_conf["username"],
password=mqtt_conf['password']) as mqtt_client:
cb = partial(on_device_found_callback, config['irk_to_devicename'], mqtt_client, mqtt_conf['room'])
active_scan = True
if active_scan:
async with BleakScanner(cb) as scanner:
await stop_event.wait()
else:
# Doesn't work, because of the strange or_patters
args = BlueZScannerArgs(
or_patterns=[OrPattern(0, AdvertisementDataType.MANUFACTURER_SPECIFIC_DATA, b"\x00\x4c")]
)
async with BleakScanner(cb, bluez=args, scanning_mode="passive") as scanner:
await stop_event.wait()
except Exception as e:
print("Error", e)
print("Starting again")
asyncio.run(main())