#!/usr/bin/env python3 import aiomqtt import json import asyncio from time import time from pathlib import Path from collections import namedtuple, defaultdict from typing import Dict, Optional, List from Crypto.Cipher import AES import pandas as pd import numpy as np from copy import copy import logging from sklearn import svm from sklearn.model_selection import cross_val_score logging.basicConfig(level=logging.INFO) BtleMeasurement = namedtuple("BtleMeasurement", ["time", "tracker", "address", "rssi", "tx_power"]) BtleDeviceMeasurement = namedtuple("BtleDeviceMeasurement", ["time", "device", "tracker", "rssi", "tx_power"]) MqttInfo = namedtuple("MqttInfo", ["server", "username", "password"]) # ------------------------------------------------------- DECODING ------------------------------------------------------------------------- class DeviceDecoder: """Decode bluetooth addresses - either simple ones (just address to name) or random changing ones like Apple devices using irk keys""" def __init__(self, irk_to_devicename: Dict[str, str], address_to_name: Dict[str, str]): """ address_to_name: dictionary from bt address as string separated by ":" to a device name irk_to_devicename is dict with irk as a hex string, mapping to device name """ self.irk_to_devicename = {bytes.fromhex(k): v for k, v in irk_to_devicename.items()} self.address_to_name = address_to_name def _resolve_rpa(rpa: bytes, irk: bytes) -> bool: """Compares the random address rpa to an irk (secret key) and return True if it matches""" assert len(rpa) == 6 assert len(irk) == 16 key = irk plain_text = b"\x00" * 16 plain_text = bytearray(plain_text) plain_text[15] = rpa[3] plain_text[14] = rpa[4] plain_text[13] = rpa[5] plain_text = bytes(plain_text) cipher = AES.new(key, AES.MODE_ECB) cipher_text = cipher.encrypt(plain_text) return cipher_text[15] == rpa[0] and cipher_text[14] == rpa[1] and cipher_text[13] == rpa[2] def _addr_to_bytes(addr: str) -> bytes: """Converts a bluetooth mac address string with semicolons to bytes""" str_without_colons = addr.replace(":", "") bytearr = bytearray.fromhex(str_without_colons) bytearr.reverse() return bytes(bytearr) def decode(self, addr: str) -> Optional[str]: """addr is a bluetooth address as a string e.g. 4d:24:12:12:34:10""" for irk, name in self.irk_to_devicename.items(): if DeviceDecoder._resolve_rpa(DeviceDecoder._addr_to_bytes(addr), irk): return name return self.address_to_name.get(addr, None) def __call__(self, m: BtleMeasurement) -> Optional[BtleDeviceMeasurement]: decoded_device_name = self.decode(m.address) if not decoded_device_name: return None return BtleDeviceMeasurement(m.time, decoded_device_name, m.tracker, m.rssi, m.tx_power) # ------------------------------------------------------- MACHINE LEARNING ---------------------------------------------------------------- class KnownRoomCsvLogger: """Logs known room measurements to be used later as training data for classifier""" def __init__(self, csv_file: Path): self.known_room = None if csv_file.exists(): self.csv_file_handle = open(csv_file, "a") else: self.csv_file_handle = open(csv_file, "w") print(f"#time,device,tracker,rssi,tx_power,known_room", file=csv_file) def update_known_room(self, known_room: str): if known_room != self.known_room: logging.info(f"Updating known_room {self.known_room} -> {known_room}") self.known_room = known_room def report_measure(self, m: BtleDeviceMeasurement): ignore_rooms = ("keins", "?", "none", "unknown") if self.known_room is None or self.known_room in ignore_rooms: return logging.info(f"Appending to training set: {m}") print( f"{m.time},{m.device},{m.tracker},{m.rssi},{m.tx_power},{self.known_room}", file=self.csv_file_handle,) FAR_AWAY_FEATURE_VALUE = 1 def get_feature_value(rssi, tx_power): """Transforms rssi and tx power into a value between 0 and 1, where 0 is close and 1 is far away""" MIN_RSSI = -90 MAX_TRANSFORMED_RSSI = 40 v = tx_power - rssi - MAX_TRANSFORMED_RSSI if v < 0: v = 0 return v / (-MIN_RSSI) def training_data_from_df(df: pd.DataFrame, device: str): """Returns a feature matrix (num_measurement, num_trackers) and a label vector (both numeric) to be used in scikit learn""" idx_to_tracker = dict(enumerate(df["tracker"].cat.categories)) tracker_to_idx = {v: k for k, v in idx_to_tracker.items()} idx_to_room = dict(enumerate(df["known_room"].cat.categories)) room_to_idx = {v: k for k, v in idx_to_room.items()} last_known_room = None start_time = None current_feature = [FAR_AWAY_FEATURE_VALUE] * len(idx_to_tracker) features = [] labels = [] # Feature vectors - rssi column for each room for i, row in df.iterrows(): time, device, tracker, rssi, tx_power, known_room = row if device != device: continue if last_known_room != known_room: start_time = time last_known_room = known_room tracker_idx = tracker_to_idx[tracker] current_feature[tracker_idx] = get_feature_value(rssi, tx_power) if time - start_time > 20: # Wait 20secs to have measurements from all trackers features.append(copy(current_feature)) labels.append(room_to_idx[known_room]) return np.array(features), np.array(labels) def load_measurements_from_csv(csv_file: Path) -> pd.DataFrame: """Load csv with training data into dataframe""" def cleanup_column_name(col_name: str): return col_name.replace("#", "").strip() df = pd.read_csv(str(csv_file)) # String cleanup in column names and room names df = df.rename(columns=cleanup_column_name) df.map(lambda x: x.strip() if isinstance(x, str) else x) df["tracker"] = df["tracker"].astype("category") df["known_room"] = df["known_room"].astype("category") df['device'] = df['device'].astype("category") return df async def send_discovery_messages(mqtt_client, device_names): for device_name in device_names: topic = f"homeassistant/sensor/my_btmonitor/{device_name}/config" msg = { "name": device_name, "state_topic": f"my_btmonitor/ml/{device_name}", "expire_after": 30, "unique_id": device_name, } await mqtt_client.publish(topic, json.dumps(msg).encode(), retain=True) async def async_main( mqtt_info: MqttInfo, trackers: List[str], devices: List[str], classifier, device_decoder: DeviceDecoder, training_data_logger: KnownRoomCsvLogger, ): feature_vecs_per_device = defaultdict(lambda: [FAR_AWAY_FEATURE_VALUE] * len(trackers)) current_rooms = defaultdict(lambda: "unknown") tracker_name_to_idx = {name: i for i, name in enumerate(trackers)} async with aiomqtt.Client( hostname=mqtt_info.server, username=mqtt_info.username, password=mqtt_info.password ) as client: await send_discovery_messages(client, devices) await client.subscribe("my_btmonitor/#") async for message in client.messages: current_time = time() topic = message.topic if topic.value == "my_btmonitor/known_room": training_data_logger.update_known_room(message.payload.decode()) else: splitted_topic = message.topic.value.split("/") if splitted_topic[0] == "my_btmonitor" and splitted_topic[1] == "raw_measurements": msg_json = json.loads(message.payload) measurement = BtleMeasurement( time=current_time, tracker=splitted_topic[2], address=msg_json["address"], rssi=msg_json["rssi"], tx_power=msg_json.get("tx_power", 0), ) logging.debug(f"Got Measurement {measurement}") m = device_decoder(measurement) if m is not None: logging.debug(f"Decoded Measurement {m}") training_data_logger.report_measure(m) tracker_idx = tracker_name_to_idx[m.tracker] feature_vecs_per_device[m.device][tracker_idx] = get_feature_value(m.rssi, m.tx_power) if classifier is not None: room = classifier(m.device, feature_vecs_per_device[m.device]) if room != current_rooms[m.device]: logging.info(f"{m.device} moved room {current_rooms[m.device]} to {room}") current_rooms[m.device] = room await client.publish(f"my_btmonitor/ml/{m.device}", room.encode()) def get_classification_func(training_df: pd.DataFrame, log_classifier_scores=False): devices_to_track = list(training_df["device"].unique()) classifiers = {} rooms = list(training_df["known_room"].dtype.categories) for device_to_track in devices_to_track: features, labels = training_data_from_df(training_df, devices_to_track) clf = svm.SVC(kernel="rbf") logging.info(f"Computing cross validation score for {device_to_track}") if log_classifier_scores: scores = cross_val_score(clf, features, labels, cv=5) logging.info(" %0.2f accuracy with a standard deviation of %0.2f" % (scores.mean(), scores.std())) logging.info(f"Training SVM classifier for {device_to_track}") clf.fit(features, labels) classifiers[device_to_track] = clf def classify(device_name, feature_vec): room_idx = classifiers[device_name].predict([feature_vec])[0] return rooms[room_idx] return classify if __name__ == "__main__": mqtt_info = MqttInfo(server="homeassistant.fritz.box", username="my_btmonitor", password="8aBIAC14jaKKbla") # Dict with bt addresses as strings to device name address_to_name = {} # Devices with random addresses - need irk key irk_to_devicename = { "aa67542b82c0e05d65c27fb7e313aba5": "martins_apple_watch", "840e3892644c1ebd1594a9069c14ce0d": "martins_iphone", } data_file = Path("training_data.csv") training_df = load_measurements_from_csv(data_file) classification_func = get_classification_func(training_df) training_data_logger = KnownRoomCsvLogger(data_file) device_decoder = DeviceDecoder(irk_to_devicename, address_to_name) trackers = list(training_df["tracker"].cat.categories) devices = list(training_df['device'].cat.categories) asyncio.run(async_main(mqtt_info, trackers, devices, classification_func, device_decoder, training_data_logger))