150 lines
5.0 KiB
Python
150 lines
5.0 KiB
Python
#!/usr/bin/env python3
|
|
import asyncio
|
|
from bleak import BleakScanner
|
|
from bleak.assigned_numbers import AdvertisementDataType
|
|
from bleak.backends.bluezdbus.advertisement_monitor import OrPattern
|
|
from bleak.backends.bluezdbus.scanner import BlueZScannerArgs
|
|
from Cryptodome.Cipher import AES
|
|
from functools import partial
|
|
import asyncio_mqtt
|
|
import json
|
|
from typing import Dict
|
|
import collections
|
|
import numpy as np
|
|
|
|
# ------------------- Config ----------------------------------------------------------------
|
|
|
|
config = {
|
|
"mqtt": {
|
|
"hostname": "homeassistant.fritz.box",
|
|
"username": "{{my_btmonitor_mqtt_username}}",
|
|
"password": "{{my_btmonitor_mqtt_password}}",
|
|
"room": "{{sensor_room_name_ascii}}"
|
|
},
|
|
"irk_to_devicename": {
|
|
"aa67542b82c0e05d65c27fb7e313aba5": "martins_apple_watch",
|
|
"840e3892644c1ebd1594a9069c14ce0d" : "martins_iphone",
|
|
}
|
|
}
|
|
|
|
# --------------------------------------------------------------------------------------------
|
|
|
|
def resolve_rpa(rpa: bytes, irk: bytes) -> bool:
|
|
"""Compares the random address rpa to an irk (secret key) and return True if it matches"""
|
|
assert len(rpa) == 6
|
|
assert len(irk) == 16
|
|
|
|
key = irk
|
|
plain_text = b'\x00' * 16
|
|
plain_text = bytearray(plain_text)
|
|
plain_text[15] = rpa[3]
|
|
plain_text[14] = rpa[4]
|
|
plain_text[13] = rpa[5]
|
|
plain_text = bytes(plain_text)
|
|
|
|
cipher = AES.new(key, AES.MODE_ECB)
|
|
cipher_text = cipher.encrypt(plain_text)
|
|
return cipher_text[15] == rpa[0] and cipher_text[14] == rpa[1] and cipher_text[13] == rpa[2]
|
|
|
|
|
|
def addr_to_bytes(addr:str) -> bytes:
|
|
"""Converts a bluetooth mac address string with semicolons to bytes"""
|
|
str_without_colons = addr.replace(":", "")
|
|
bytearr = bytearray.fromhex(str_without_colons)
|
|
bytearr.reverse()
|
|
return bytes(bytearr)
|
|
|
|
|
|
def decode_address(addr: str, irks: Dict[str, str]):
|
|
"""
|
|
addr is a bluetooth address as a string e.g. 4d:24:12:12:34:10
|
|
irks is dict with irk as a hex string, mapping to device name
|
|
"""
|
|
for irk, name in irks.items():
|
|
if resolve_rpa(addr_to_bytes(addr), bytes.fromhex(irk)):
|
|
return name
|
|
return None
|
|
|
|
|
|
def estimate_distance(rssi, tx_power, pl0=73):
|
|
"""
|
|
RSSI in dBm
|
|
txPower is a transmitter parameter that calculated according to its physic layer and antenna in dBm
|
|
Return value in meter
|
|
|
|
You should calculate "PL0" in calibration stage:
|
|
PL0 = txPower - RSSI; // When distance is distance0 (distance0 = 1m or more)
|
|
|
|
SO, RSSI will be calculated by below formula:
|
|
RSSI = txPower - PL0 - 10 * n * log(distance/distance0) - G(t)
|
|
G(t) ~= 0 //This parameter is the main challenge in achiving to more accuracy.
|
|
n = 2 (Path Loss Exponent, in the free space is 2)
|
|
distance0 = 1 (m)
|
|
distance = 10 ^ ((txPower - RSSI - PL0 ) / (10 * n))
|
|
|
|
Read more details:
|
|
https://en.wikipedia.org/wiki/Log-distance_path_loss_model
|
|
"""
|
|
n = 3.5
|
|
return 10**((tx_power - rssi - pl0) / (10 * n))
|
|
|
|
|
|
def smooth(y, box_pts):
|
|
box = np.ones(box_pts)/box_pts
|
|
y_smooth = np.convolve(y, box, mode='same')
|
|
return y_smooth
|
|
|
|
WINDOW_SIZE = 6
|
|
last_values = collections.deque(maxlen=WINDOW_SIZE)
|
|
def filter_distance(dist: float):
|
|
global last_values
|
|
last_values.append(dist)
|
|
return smooth(np.array(last_values), 12)[-1]
|
|
|
|
|
|
async def on_device_found_callback(irks, mqtt_client, room, device, advertising_data):
|
|
decoded_device_id = decode_address(device.address, irks)
|
|
rssi = advertising_data.rssi
|
|
tx_power = advertising_data.tx_power
|
|
if decoded_device_id and tx_power is not None and rssi is not None:
|
|
topic = f"my_btmonitor/devices/{decoded_device_id}/{room}"
|
|
distance = estimate_distance(rssi, tx_power, {{my_btmonitor_pl0 | default('73')}} )
|
|
filtered_distance = filter_distance(distance)
|
|
data = {"id": decoded_device_id,
|
|
"name": decoded_device_id,
|
|
"rssi": rssi,
|
|
"tx_power": tx_power,
|
|
"distance": filtered_distance,
|
|
"unfiltered_distance": distance,
|
|
}
|
|
await mqtt_client.publish(topic, json.dumps(data).encode())
|
|
#print(data)
|
|
|
|
|
|
async def main():
|
|
stop_event = asyncio.Event()
|
|
|
|
mqtt_conf = config['mqtt']
|
|
while True:
|
|
try:
|
|
async with asyncio_mqtt.Client(hostname=mqtt_conf["hostname"],
|
|
username=mqtt_conf["username"],
|
|
password=mqtt_conf['password']) as mqtt_client:
|
|
cb = partial(on_device_found_callback, config['irk_to_devicename'], mqtt_client, mqtt_conf['room'])
|
|
active_scan = True
|
|
if active_scan:
|
|
async with BleakScanner(cb) as scanner:
|
|
await stop_event.wait()
|
|
else:
|
|
# Doesn't work, because of the strange or_patters
|
|
args = BlueZScannerArgs(
|
|
or_patterns=[OrPattern(0, AdvertisementDataType.MANUFACTURER_SPECIFIC_DATA, b"\x00\x4c")]
|
|
)
|
|
async with BleakScanner(cb, bluez=args, scanning_mode="passive") as scanner:
|
|
await stop_event.wait()
|
|
except Exception as e:
|
|
print("Error", e)
|
|
print("Starting again")
|
|
|
|
asyncio.run(main())
|