bt_monitor_server/bt_monitor_server.py

296 lines
12 KiB
Python
Executable File

#!/usr/bin/env python3
import os
import aiomqtt
import json
import asyncio
from time import time
from pathlib import Path
from collections import namedtuple, defaultdict, deque
from typing import Dict, Optional, List
from Crypto.Cipher import AES
import pandas as pd
import numpy as np
import logging
from sklearn import svm
from sklearn.model_selection import cross_val_score
logging.basicConfig(level=logging.INFO)
BtleMeasurement = namedtuple("BtleMeasurement", ["time", "tracker", "address", "rssi", "tx_power"])
BtleDeviceMeasurement = namedtuple("BtleDeviceMeasurement", ["time", "device", "tracker", "rssi", "tx_power"])
MqttInfo = namedtuple("MqttInfo", ["server", "username", "password"])
# ------------------------------------------------------- DECODING -------------------------------------------------------------------------
class DeviceDecoder:
"""Decode bluetooth addresses - either simple ones (just address to name) or random changing ones like Apple devices using irk keys"""
def __init__(self, irk_to_devicename: Dict[str, str], address_to_name: Dict[str, str]):
"""
address_to_name: dictionary from bt address as string separated by ":" to a device name
irk_to_devicename is dict with irk as a hex string, mapping to device name
"""
self.irk_to_devicename = {bytes.fromhex(k): v for k, v in irk_to_devicename.items()}
self.address_to_name = address_to_name
def _resolve_rpa(rpa: bytes, irk: bytes) -> bool:
"""Compares the random address rpa to an irk (secret key) and return True if it matches"""
assert len(rpa) == 6
assert len(irk) == 16
key = irk
plain_text = b"\x00" * 16
plain_text = bytearray(plain_text)
plain_text[15] = rpa[3]
plain_text[14] = rpa[4]
plain_text[13] = rpa[5]
plain_text = bytes(plain_text)
cipher = AES.new(key, AES.MODE_ECB)
cipher_text = cipher.encrypt(plain_text)
return cipher_text[15] == rpa[0] and cipher_text[14] == rpa[1] and cipher_text[13] == rpa[2]
def _addr_to_bytes(addr: str) -> bytes:
"""Converts a bluetooth mac address string with semicolons to bytes"""
str_without_colons = addr.replace(":", "")
bytearr = bytearray.fromhex(str_without_colons)
bytearr.reverse()
return bytes(bytearr)
def decode(self, addr: str) -> Optional[str]:
"""addr is a bluetooth address as a string e.g. 4d:24:12:12:34:10"""
for irk, name in self.irk_to_devicename.items():
if DeviceDecoder._resolve_rpa(DeviceDecoder._addr_to_bytes(addr), irk):
return name
return self.address_to_name.get(addr, None)
def __call__(self, m: BtleMeasurement) -> Optional[BtleDeviceMeasurement]:
decoded_device_name = self.decode(m.address)
if not decoded_device_name:
return None
return BtleDeviceMeasurement(m.time, decoded_device_name, m.tracker, m.rssi, m.tx_power)
# ------------------------------------------------------- MACHINE LEARNING ----------------------------------------------------------------
class KnownRoomCsvLogger:
"""Logs known room measurements to be used later as training data for classifier"""
def __init__(self, csv_file: Path):
self.known_room = None
if csv_file.exists():
self.csv_file_handle = open(csv_file, "a")
else:
self.csv_file_handle = open(csv_file, "w")
print(f"#time,device,tracker,rssi,tx_power,known_room", file=csv_file)
def update_known_room(self, known_room: str):
if known_room != self.known_room:
logging.info(f"Updating known_room {self.known_room} -> {known_room}")
self.known_room = known_room
def report_measure(self, m: BtleDeviceMeasurement):
ignore_rooms = ("keins", "?", "none", "unknown")
if self.known_room is None or self.known_room in ignore_rooms:
return
logging.info(f"Appending to training set: {m}")
print(
f"{m.time},{m.device},{m.tracker},{m.rssi},{m.tx_power},{self.known_room}",
file=self.csv_file_handle,)
class RunningFeatureVector:
FAR_AWAY_FEATURE_VALUE = 1
MIN_TIME_UNTIL_PREDICTION = 40 # wait until every reachable tracker detected the device
TIME_TO_DELETE_IF_NOT_SEEN = 30 # if device wasn't spotted for this time period, the measure is set to inf
def __init__(self, trackers: List[str]):
self.trackers = trackers
self.feature_vecs_per_device = defaultdict(lambda: [self.FAR_AWAY_FEATURE_VALUE] * len(trackers))
self.last_measurements = deque()
self.tracker_name_to_idx = {name: i for i, name in enumerate(trackers)}
self.start_time = None
@staticmethod
def _get_feature_value(rssi, tx_power):
"""Transforms rssi and tx power into a value between 0 and 1, where 0 is close and 1 is far away"""
MIN_RSSI = -90
MAX_TRANSFORMED_RSSI = 40
v = tx_power - rssi - MAX_TRANSFORMED_RSSI
if v < 0:
v = 0
return v / (-MIN_RSSI)
def add_measurement(self, new_measurement: BtleDeviceMeasurement):
if self.start_time is None:
self.start_time = new_measurement.time
self.last_measurements.append(new_measurement)
while len(self.last_measurements) > 0 and new_measurement.time - self.last_measurements[0].time > self.TIME_TO_DELETE_IF_NOT_SEEN:
self.last_measurements.popleft()
feature_vec = [self.FAR_AWAY_FEATURE_VALUE] * len(self.trackers)
for m in self.last_measurements:
if m.device == new_measurement.device:
tracker_idx = self.tracker_name_to_idx[m.tracker]
feature_vec[tracker_idx] = self._get_feature_value(m.rssi, m.tx_power)
return feature_vec if new_measurement.time - self.start_time > self.MIN_TIME_UNTIL_PREDICTION else None
def training_data_from_df(df: pd.DataFrame, device_to_train: str):
"""Returns a feature matrix (num_measurement, num_trackers) and a label vector (both numeric) to be used in scikit learn"""
trackers = list(df["tracker"].cat.categories)
idx_to_room = dict(enumerate(df["known_room"].cat.categories))
room_to_idx = {v: k for k, v in idx_to_room.items()}
last_known_room = None
features = []
labels = []
feature_accumulator = RunningFeatureVector(trackers)
# Feature vectors - rssi column for each room
for i, row in df.iterrows():
time, device, tracker, rssi, tx_power, known_room = row
m = BtleDeviceMeasurement(time, device, tracker, rssi, tx_power)
if device != device_to_train:
continue
if last_known_room != known_room:
feature_accumulator = RunningFeatureVector(trackers) # reset for new room
last_known_room = known_room
feature_vec = feature_accumulator.add_measurement(m)
if feature_vec is not None:
features.append(feature_vec)
labels.append(room_to_idx[known_room])
return np.array(features), np.array(labels)
def load_measurements_from_csv(csv_file: Path) -> pd.DataFrame:
"""Load csv with training data into dataframe"""
def cleanup_column_name(col_name: str):
return col_name.replace("#", "").strip()
df = pd.read_csv(str(csv_file))
# String cleanup in column names and room names
df = df.rename(columns=cleanup_column_name)
df.map(lambda x: x.strip() if isinstance(x, str) else x)
df["tracker"] = df["tracker"].astype("category")
df["known_room"] = df["known_room"].astype("category")
df['device'] = df['device'].astype("category")
return df
async def send_discovery_messages(mqtt_client, device_names):
for device_name in device_names:
topic = f"homeassistant/sensor/my_btmonitor/{device_name}/config"
msg = {
"name": device_name,
"state_topic": f"my_btmonitor/ml/{device_name}",
"expire_after": 30,
"unique_id": device_name,
}
await mqtt_client.publish(topic, json.dumps(msg).encode(), retain=True)
async def async_main(
mqtt_info: MqttInfo,
trackers: List[str],
devices: List[str],
classifier,
device_decoder: DeviceDecoder,
training_data_logger: KnownRoomCsvLogger,
):
current_rooms = defaultdict(lambda: "unknown")
feature_accumulator = RunningFeatureVector(trackers)
async with aiomqtt.Client(
hostname=mqtt_info.server, username=mqtt_info.username, password=mqtt_info.password
) as client:
await send_discovery_messages(client, devices)
await client.subscribe("my_btmonitor/#")
async for message in client.messages:
current_time = time()
topic = message.topic
if topic.value == "my_btmonitor/known_room":
training_data_logger.update_known_room(message.payload.decode())
else:
splitted_topic = message.topic.value.split("/")
if splitted_topic[0] == "my_btmonitor" and splitted_topic[1] == "raw_measurements":
msg_json = json.loads(message.payload)
measurement = BtleMeasurement(
time=current_time,
tracker=splitted_topic[2],
address=msg_json["address"],
rssi=msg_json["rssi"],
tx_power=msg_json.get("tx_power", 0),
)
logging.debug(f"Got Measurement {measurement}")
m = device_decoder(measurement)
if m is not None:
logging.debug(f"Decoded Measurement {m}")
training_data_logger.report_measure(m)
feature_vec =feature_accumulator.add_measurement(m)
if feature_vec:
feature_str={tracker : value for tracker, value in zip(trackers, feature_vec)}
logging.debug(f"Features: {feature_str}")
if feature_vec is not None and classifier is not None:
room = classifier(m.device, feature_vec)
if room != current_rooms[m.device]:
logging.info(f"{m.device} moved room {current_rooms[m.device]} to {room}")
current_rooms[m.device] = room
await client.publish(f"my_btmonitor/ml/{m.device}", room.encode())
def get_classification_func(training_df: pd.DataFrame, log_classifier_scores=True):
devices_to_track = list(training_df["device"].unique())
classifiers = {}
rooms = list(training_df["known_room"].dtype.categories)
for device_to_track in devices_to_track:
features, labels = training_data_from_df(training_df, device_to_track)
clf = svm.SVC(kernel="rbf")
logging.info(f"Computing cross validation score for {device_to_track}")
if log_classifier_scores:
scores = cross_val_score(clf, features, labels, cv=5)
logging.info(" %0.2f accuracy with a standard deviation of %0.2f" % (scores.mean(), scores.std()))
logging.info(f"Training SVM classifier for {device_to_track}")
clf.fit(features, labels)
classifiers[device_to_track] = clf
def classify(device_name, feature_vec):
room_idx = classifiers[device_name].predict([feature_vec])[0]
return rooms[room_idx]
return classify
if __name__ == "__main__":
mqtt_info = MqttInfo(server="homeassistant.fritz.box", username="my_btmonitor", password="8aBIAC14jaKKbla")
# Dict with bt addresses as strings to device name
address_to_name = {}
# Devices with random addresses - need irk key
irk_to_devicename = {
"aa67542b82c0e05d65c27fb7e313aba5": "martins_apple_watch",
"840e3892644c1ebd1594a9069c14ce0d": "martins_iphone",
}
script_path = os.path.dirname(os.path.realpath(__file__))
data_file = Path(script_path) / Path("training_data.csv")
training_df = load_measurements_from_csv(data_file)
classification_func = get_classification_func(training_df)
training_data_logger = KnownRoomCsvLogger(data_file)
device_decoder = DeviceDecoder(irk_to_devicename, address_to_name)
trackers = list(training_df["tracker"].cat.categories)
devices = list(training_df['device'].cat.categories)
asyncio.run(async_main(mqtt_info, trackers, devices, classification_func, device_decoder, training_data_logger))