93 lines
2.9 KiB
Python
93 lines
2.9 KiB
Python
import asyncio
|
|
from bleak import BleakScanner
|
|
from bleak.assigned_numbers import AdvertisementDataType
|
|
from bleak.backends.bluezdbus.advertisement_monitor import OrPattern
|
|
from bleak.backends.bluezdbus.scanner import BlueZScannerArgs
|
|
from Crypto.Cipher import AES
|
|
from typing import Dict
|
|
|
|
irks = {
|
|
"aa67542b82c0e05d65c27fb7e313aba5": "martins_apple_watch",
|
|
"840e3892644c1ebd1594a9069c14ce0d" : "martins_iphone",
|
|
}
|
|
|
|
def resolve_rpa(rpa: bytes, irk: bytes) -> bool:
|
|
"""Compares the random address rpa to an irk (secret key) and return True if it matches"""
|
|
assert len(rpa) == 6
|
|
assert len(irk) == 16
|
|
|
|
key = irk
|
|
plain_text = b'\x00' * 16
|
|
plain_text = bytearray(plain_text)
|
|
plain_text[15] = rpa[3]
|
|
plain_text[14] = rpa[4]
|
|
plain_text[13] = rpa[5]
|
|
plain_text = bytes(plain_text)
|
|
|
|
cipher = AES.new(key, AES.MODE_ECB)
|
|
cipher_text = cipher.encrypt(plain_text)
|
|
return cipher_text[15] == rpa[0] and cipher_text[14] == rpa[1] and cipher_text[13] == rpa[2]
|
|
|
|
|
|
def addr_to_bytes(addr:str) -> bytes:
|
|
"""Converts a bluetooth mac address string with semicolons to bytes"""
|
|
str_without_colons = addr.replace(":", "")
|
|
bytearr = bytearray.fromhex(str_without_colons)
|
|
bytearr.reverse()
|
|
return bytes(bytearr)
|
|
|
|
|
|
def decode_address(addr: str, irks: Dict[str, str]):
|
|
"""
|
|
addr is a bluetooth address as a string e.g. 4d:24:12:12:34:10
|
|
irks is dict with irk as a hex string, mapping to device name
|
|
"""
|
|
for irk, name in irks.items():
|
|
if resolve_rpa(addr_to_bytes(addr), bytes.fromhex(irk)):
|
|
return name
|
|
return None
|
|
|
|
|
|
def estimate_distance(rssi, tx_power, pl0=54):
|
|
"""
|
|
RSSI in dBm
|
|
txPower is a transmitter parameter that calculated according to its physic layer and antenna in dBm
|
|
Return value in meter
|
|
|
|
You should calculate "PL0" in calibration stage:
|
|
PL0 = txPower - RSSI; // When distance is distance0 (distance0 = 1m or more)
|
|
|
|
SO, RSSI will be calculated by below formula:
|
|
RSSI = txPower - PL0 - 10 * n * log(distance/distance0) - G(t)
|
|
G(t) ~= 0 //This parameter is the main challenge in achiving to more accuracy.
|
|
n = 2 (Path Loss Exponent, in the free space is 2)
|
|
distance0 = 1 (m)
|
|
distance = 10 ^ ((txPower - RSSI - PL0 ) / (10 * n))
|
|
|
|
Read more details:
|
|
https://en.wikipedia.org/wiki/Log-distance_path_loss_model
|
|
"""
|
|
n = 2.7
|
|
return 10**(( tx_power - rssi - pl0) / (10 * n))
|
|
|
|
|
|
def callback(device, advertising_data):
|
|
print(device.address)
|
|
decoded = decode_address(device.address, irks)
|
|
if decoded:
|
|
print(f"{decoded} @ {advertising_data.rssi} distance {estimate_distance(advertising_data.rssi, advertising_data.tx_power)}")
|
|
|
|
|
|
async def main():
|
|
# Scan for
|
|
args = BlueZScannerArgs(
|
|
or_patterns=[OrPattern(0, AdvertisementDataType.MANUFACTURER_SPECIFIC_DATA, b"\x10\x05")]
|
|
)
|
|
stop_event = asyncio.Event()
|
|
|
|
#async with BleakScanner(bluez=args, scanning_mode="passive") as scanner:
|
|
async with BleakScanner(callback) as scanner:
|
|
await stop_event.wait()
|
|
|
|
asyncio.run(main())
|