#!/usr/bin/env python3 import asyncio from bleak import BleakScanner from bleak.assigned_numbers import AdvertisementDataType from bleak.backends.bluezdbus.advertisement_monitor import OrPattern from bleak.backends.bluezdbus.scanner import BlueZScannerArgs from Cryptodome.Cipher import AES from functools import partial import asyncio_mqtt import json from typing import Dict import collections import numpy as np # ------------------- Config ---------------------------------------------------------------- config = { "mqtt": { "hostname": "homeassistant.fritz.box", "username": "{{my_btmonitor_mqtt_username}}", "password": "{{my_btmonitor_mqtt_password}}", "room": "{{sensor_room_name_ascii}}" }, "irk_to_devicename": { "aa67542b82c0e05d65c27fb7e313aba5": "martins_apple_watch", "840e3892644c1ebd1594a9069c14ce0d" : "martins_iphone", } } # -------------------------------------------------------------------------------------------- def resolve_rpa(rpa: bytes, irk: bytes) -> bool: """Compares the random address rpa to an irk (secret key) and return True if it matches""" assert len(rpa) == 6 assert len(irk) == 16 key = irk plain_text = b'\x00' * 16 plain_text = bytearray(plain_text) plain_text[15] = rpa[3] plain_text[14] = rpa[4] plain_text[13] = rpa[5] plain_text = bytes(plain_text) cipher = AES.new(key, AES.MODE_ECB) cipher_text = cipher.encrypt(plain_text) return cipher_text[15] == rpa[0] and cipher_text[14] == rpa[1] and cipher_text[13] == rpa[2] def addr_to_bytes(addr:str) -> bytes: """Converts a bluetooth mac address string with semicolons to bytes""" str_without_colons = addr.replace(":", "") bytearr = bytearray.fromhex(str_without_colons) bytearr.reverse() return bytes(bytearr) def decode_address(addr: str, irks: Dict[str, str]): """ addr is a bluetooth address as a string e.g. 4d:24:12:12:34:10 irks is dict with irk as a hex string, mapping to device name """ for irk, name in irks.items(): if resolve_rpa(addr_to_bytes(addr), bytes.fromhex(irk)): return name return None def estimate_distance(rssi, tx_power, pl0=73): """ RSSI in dBm txPower is a transmitter parameter that calculated according to its physic layer and antenna in dBm Return value in meter You should calculate "PL0" in calibration stage: PL0 = txPower - RSSI; // When distance is distance0 (distance0 = 1m or more) SO, RSSI will be calculated by below formula: RSSI = txPower - PL0 - 10 * n * log(distance/distance0) - G(t) G(t) ~= 0 //This parameter is the main challenge in achiving to more accuracy. n = 2 (Path Loss Exponent, in the free space is 2) distance0 = 1 (m) distance = 10 ^ ((txPower - RSSI - PL0 ) / (10 * n)) Read more details: https://en.wikipedia.org/wiki/Log-distance_path_loss_model """ n = 3.5 return 10**((tx_power - rssi - pl0) / (10 * n)) def smooth(y, box_pts): box = np.ones(box_pts)/box_pts y_smooth = np.convolve(y, box, mode='same') return y_smooth WINDOW_SIZE = 6 last_values = collections.deque(maxlen=WINDOW_SIZE) def filter_distance(dist: float): global last_values last_values.append(dist) return smooth(np.array(last_values), 12)[-1] async def on_device_found_callback(irks, mqtt_client, room, device, advertising_data): decoded_device_id = decode_address(device.address, irks) rssi = advertising_data.rssi tx_power = advertising_data.tx_power if decoded_device_id and tx_power is not None and rssi is not None: topic = f"my_btmonitor/devices/{decoded_device_id}/{room}" distance = estimate_distance(rssi, tx_power, {{my_btmonitor_pl0 | default('73')}} ) filtered_distance = filter_distance(distance) data = {"id": decoded_device_id, "name": decoded_device_id, "rssi": rssi, "tx_power": tx_power, "distance": filtered_distance, "unfiltered_distance": distance, } await mqtt_client.publish(topic, json.dumps(data).encode()) #print(data) async def main(): stop_event = asyncio.Event() mqtt_conf = config['mqtt'] while True: try: async with asyncio_mqtt.Client(hostname=mqtt_conf["hostname"], username=mqtt_conf["username"], password=mqtt_conf['password']) as mqtt_client: cb = partial(on_device_found_callback, config['irk_to_devicename'], mqtt_client, mqtt_conf['room']) active_scan = True if active_scan: async with BleakScanner(cb) as scanner: await stop_event.wait() else: # Doesn't work, because of the strange or_patters args = BlueZScannerArgs( or_patterns=[OrPattern(0, AdvertisementDataType.MANUFACTURER_SPECIFIC_DATA, b"\x00\x4c")] ) async with BleakScanner(cb, bluez=args, scanning_mode="passive") as scanner: await stop_event.wait() except Exception as e: print("Error", e) print("Starting again") asyncio.run(main())