#!/usr/bin/env python3 import os import aiomqtt import json import asyncio from time import time from pathlib import Path from collections import namedtuple, defaultdict, deque from typing import Dict, Optional, List from Crypto.Cipher import AES import pandas as pd import numpy as np import logging from sklearn import svm from sklearn.model_selection import cross_val_score logging.basicConfig(level=logging.INFO) BtleMeasurement = namedtuple("BtleMeasurement", ["time", "tracker", "address", "rssi", "tx_power"]) BtleDeviceMeasurement = namedtuple("BtleDeviceMeasurement", ["time", "device", "tracker", "rssi", "tx_power"]) MqttInfo = namedtuple("MqttInfo", ["server", "username", "password"]) # ------------------------------------------------------- DECODING ------------------------------------------------------------------------- class DeviceDecoder: """Decode bluetooth addresses - either simple ones (just address to name) or random changing ones like Apple devices using irk keys""" def __init__(self, irk_to_devicename: Dict[str, str], address_to_name: Dict[str, str]): """ address_to_name: dictionary from bt address as string separated by ":" to a device name irk_to_devicename is dict with irk as a hex string, mapping to device name """ self.irk_to_devicename = {bytes.fromhex(k): v for k, v in irk_to_devicename.items()} self.address_to_name = address_to_name def _resolve_rpa(rpa: bytes, irk: bytes) -> bool: """Compares the random address rpa to an irk (secret key) and return True if it matches""" assert len(rpa) == 6 assert len(irk) == 16 key = irk plain_text = b"\x00" * 16 plain_text = bytearray(plain_text) plain_text[15] = rpa[3] plain_text[14] = rpa[4] plain_text[13] = rpa[5] plain_text = bytes(plain_text) cipher = AES.new(key, AES.MODE_ECB) cipher_text = cipher.encrypt(plain_text) return cipher_text[15] == rpa[0] and cipher_text[14] == rpa[1] and cipher_text[13] == rpa[2] def _addr_to_bytes(addr: str) -> bytes: """Converts a bluetooth mac address string with semicolons to bytes""" str_without_colons = addr.replace(":", "") bytearr = bytearray.fromhex(str_without_colons) bytearr.reverse() return bytes(bytearr) def decode(self, addr: str) -> Optional[str]: """addr is a bluetooth address as a string e.g. 4d:24:12:12:34:10""" for irk, name in self.irk_to_devicename.items(): if DeviceDecoder._resolve_rpa(DeviceDecoder._addr_to_bytes(addr), irk): return name return self.address_to_name.get(addr, None) def __call__(self, m: BtleMeasurement) -> Optional[BtleDeviceMeasurement]: decoded_device_name = self.decode(m.address) if not decoded_device_name: return None return BtleDeviceMeasurement(m.time, decoded_device_name, m.tracker, m.rssi, m.tx_power) # ------------------------------------------------------- MACHINE LEARNING ---------------------------------------------------------------- class KnownRoomCsvLogger: """Logs known room measurements to be used later as training data for classifier""" def __init__(self, csv_file: Path): self.known_room = None if csv_file.exists(): self.csv_file_handle = open(csv_file, "a") else: self.csv_file_handle = open(csv_file, "w") print(f"#time,device,tracker,rssi,tx_power,known_room", file=csv_file) def update_known_room(self, known_room: str): if known_room != self.known_room: logging.info(f"Updating known_room {self.known_room} -> {known_room}") self.known_room = known_room def report_measure(self, m: BtleDeviceMeasurement): ignore_rooms = ("keins", "?", "none", "unknown") if self.known_room is None or self.known_room in ignore_rooms: return logging.info(f"Appending to training set: {m}") print( f"{m.time},{m.device},{m.tracker},{m.rssi},{m.tx_power},{self.known_room}", file=self.csv_file_handle,) class RunningFeatureVector: FAR_AWAY_FEATURE_VALUE = 1 MIN_TIME_UNTIL_PREDICTION = 40 # wait until every reachable tracker detected the device TIME_TO_DELETE_IF_NOT_SEEN = 30 # if device wasn't spotted for this time period, the measure is set to inf def __init__(self, trackers: List[str]): self.trackers = trackers self.feature_vecs_per_device = defaultdict(lambda: [self.FAR_AWAY_FEATURE_VALUE] * len(trackers)) self.last_measurements = deque() self.tracker_name_to_idx = {name: i for i, name in enumerate(trackers)} self.start_time = None @staticmethod def _get_feature_value(rssi, tx_power): """Transforms rssi and tx power into a value between 0 and 1, where 0 is close and 1 is far away""" MIN_RSSI = -90 MAX_TRANSFORMED_RSSI = 40 v = tx_power - rssi - MAX_TRANSFORMED_RSSI if v < 0: v = 0 return v / (-MIN_RSSI) def add_measurement(self, new_measurement: BtleDeviceMeasurement): if self.start_time is None: self.start_time = new_measurement.time self.last_measurements.append(new_measurement) while len(self.last_measurements) > 0 and new_measurement.time - self.last_measurements[0].time > self.TIME_TO_DELETE_IF_NOT_SEEN: self.last_measurements.popleft() feature_vec = [self.FAR_AWAY_FEATURE_VALUE] * len(self.trackers) for m in self.last_measurements: if m.device == new_measurement.device: tracker_idx = self.tracker_name_to_idx[m.tracker] feature_vec[tracker_idx] = self._get_feature_value(m.rssi, m.tx_power) return feature_vec if new_measurement.time - self.start_time > self.MIN_TIME_UNTIL_PREDICTION else None def training_data_from_df(df: pd.DataFrame, device_to_train: str): """Returns a feature matrix (num_measurement, num_trackers) and a label vector (both numeric) to be used in scikit learn""" trackers = list(df["tracker"].cat.categories) idx_to_room = dict(enumerate(df["known_room"].cat.categories)) room_to_idx = {v: k for k, v in idx_to_room.items()} last_known_room = None features = [] labels = [] feature_accumulator = RunningFeatureVector(trackers) # Feature vectors - rssi column for each room for i, row in df.iterrows(): time, device, tracker, rssi, tx_power, known_room = row m = BtleDeviceMeasurement(time, device, tracker, rssi, tx_power) if device != device_to_train: continue if last_known_room != known_room: feature_accumulator = RunningFeatureVector(trackers) # reset for new room last_known_room = known_room feature_vec = feature_accumulator.add_measurement(m) if feature_vec is not None: features.append(feature_vec) labels.append(room_to_idx[known_room]) return np.array(features), np.array(labels) def load_measurements_from_csv(csv_file: Path) -> pd.DataFrame: """Load csv with training data into dataframe""" def cleanup_column_name(col_name: str): return col_name.replace("#", "").strip() df = pd.read_csv(str(csv_file)) # String cleanup in column names and room names df = df.rename(columns=cleanup_column_name) df.map(lambda x: x.strip() if isinstance(x, str) else x) df["tracker"] = df["tracker"].astype("category") df["known_room"] = df["known_room"].astype("category") df['device'] = df['device'].astype("category") return df async def send_discovery_messages(mqtt_client, device_names): for device_name in device_names: topic = f"homeassistant/sensor/my_btmonitor/{device_name}/config" msg = { "name": device_name, "state_topic": f"my_btmonitor/ml/{device_name}", "expire_after": 30, "unique_id": device_name, } await mqtt_client.publish(topic, json.dumps(msg).encode(), retain=True) async def async_main( mqtt_info: MqttInfo, trackers: List[str], devices: List[str], classifier, device_decoder: DeviceDecoder, training_data_logger: KnownRoomCsvLogger, ): current_rooms = defaultdict(lambda: "unknown") feature_accumulator = RunningFeatureVector(trackers) async with aiomqtt.Client( hostname=mqtt_info.server, username=mqtt_info.username, password=mqtt_info.password ) as client: await send_discovery_messages(client, devices) await client.subscribe("my_btmonitor/#") async for message in client.messages: current_time = time() topic = message.topic if topic.value == "my_btmonitor/known_room": training_data_logger.update_known_room(message.payload.decode()) else: splitted_topic = message.topic.value.split("/") if splitted_topic[0] == "my_btmonitor" and splitted_topic[1] == "raw_measurements": msg_json = json.loads(message.payload) measurement = BtleMeasurement( time=current_time, tracker=splitted_topic[2], address=msg_json["address"], rssi=msg_json["rssi"], tx_power=msg_json.get("tx_power", 0), ) logging.debug(f"Got Measurement {measurement}") m = device_decoder(measurement) if m is not None: logging.info(f"Decoded Measurement {m}") training_data_logger.report_measure(m) feature_vec =feature_accumulator.add_measurement(m) if feature_vec: feature_str={tracker : value for tracker, value in zip(trackers, feature_vec)} logging.info(f"Features: {feature_str}") if feature_vec is not None and classifier is not None: room = classifier(m.device, feature_vec) if room != current_rooms[m.device]: logging.info(f"{m.device} moved room {current_rooms[m.device]} to {room}") current_rooms[m.device] = room await client.publish(f"my_btmonitor/ml/{m.device}", room.encode()) async def async_main_with_restart( mqtt_info: MqttInfo, trackers: List[str], devices: List[str], classifier, device_decoder: DeviceDecoder, training_data_logger: KnownRoomCsvLogger, ): while True: try: await async_main(mqtt_info, trackers, devices, classifier, device_decoder, training_data_logger) except Exception as e: print(e) print("restarting...") def get_classification_func(training_df: pd.DataFrame, log_classifier_scores=True): devices_to_track = list(training_df["device"].unique()) classifiers = {} rooms = list(training_df["known_room"].dtype.categories) for device_to_track in devices_to_track: features, labels = training_data_from_df(training_df, device_to_track) clf = svm.SVC(kernel="rbf") logging.info(f"Computing cross validation score for {device_to_track}") if log_classifier_scores: scores = cross_val_score(clf, features, labels, cv=5) logging.info(" %0.2f accuracy with a standard deviation of %0.2f" % (scores.mean(), scores.std())) logging.info(f"Training SVM classifier for {device_to_track}") clf.fit(features, labels) classifiers[device_to_track] = clf def classify(device_name, feature_vec): room_idx = classifiers[device_name].predict([feature_vec])[0] return rooms[room_idx] return classify if __name__ == "__main__": mqtt_info = MqttInfo(server="homeassistant.fritz.box", username="my_btmonitor", password="8aBIAC14jaKKbla") # Dict with bt addresses as strings to device name address_to_name = {} # Devices with random addresses - need irk key irk_to_devicename = { "aa67542b82c0e05d65c27fb7e313aba5": "martins_apple_watch", "840e3892644c1ebd1594a9069c14ce0d": "martins_iphone", } script_path = os.path.dirname(os.path.realpath(__file__)) data_file = Path(script_path) / Path("training_data.csv") training_df = load_measurements_from_csv(data_file) classification_func = get_classification_func(training_df) training_data_logger = KnownRoomCsvLogger(data_file) device_decoder = DeviceDecoder(irk_to_devicename, address_to_name) trackers = list(training_df["tracker"].cat.categories) devices = list(training_df['device'].cat.categories) asyncio.run(async_main_with_restart(mqtt_info, trackers, devices, classification_func, device_decoder, training_data_logger))