from pathlib import Path import pandas as pd import numpy as np from copy import copy from sklearn.model_selection import cross_val_score from sklearn import svm from sklearn.neural_network import MLPClassifier from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay import matplotlib.pyplot as plt from sklearn.model_selection import train_test_split def load_measurements(csv_file: Path): def cleanup_column_name(col_name: str): clean_name = col_name.replace('#', '').strip() if clean_name == 'room': return 'tracker' return clean_name df = pd.read_csv(str(csv_file)) # String cleanup in column names and room names df = df.rename(columns=cleanup_column_name) df.applymap(lambda x: x.strip() if isinstance(x, str) else x) df['tracker'] = df['tracker'].astype("category") df['real_room'] = df['real_room'].astype("category") return df FAR_AWAY_FEATURE_VALUE = 1 def get_feature_value(rssi, tx_power): MIN_RSSI = -90 MAX_TRANSFORMED_RSSI = 40 v = tx_power - rssi - MAX_TRANSFORMED_RSSI if v < 0: v = 0 return v / (-MIN_RSSI) def make_training_data(df: pd.DataFrame, device_to_map): idx_to_tracker = dict(enumerate(df['tracker'].cat.categories )) tracker_to_idx = {v: k for k, v in idx_to_tracker.items()} idx_to_room = dict(enumerate(df['real_room'].cat.categories )) room_to_idx = {v: k for k, v in idx_to_room.items()} last_real_room = None start_time = None current_feature = [FAR_AWAY_FEATURE_VALUE] * len(idx_to_tracker) features = [] labels = [] # Feature vectors - rssi column for each room for i, row in df.iterrows(): time, device, tracker, rssi, tx_power, real_room = row if device != device_to_map: continue if last_real_room != real_room: start_time = time last_real_room = real_room tracker_idx = tracker_to_idx[tracker] current_feature[tracker_idx] = get_feature_value(rssi, tx_power) if time - start_time > 20: features.append(copy(current_feature)) labels.append(room_to_idx[real_room]) return np.array(features), np.array(labels) def train(features, labels, classes): clf = svm.SVC(kernel='rbf') print("Training") scores = cross_val_score(clf, features, labels, cv=5) print(scores) print("%0.2f accuracy with a standard deviation of %0.2f" % (scores.mean(), scores.std())) X_train, X_test, y_train, y_test = train_test_split(features, labels, random_state=0) clf.fit(X_train, y_train) cm = confusion_matrix(clf.predict(X_test), y_test) print(cm) print(classes) disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=classes) disp.plot() plt.show() if __name__ == "__main__": csv_path = Path("/home/martin/code/ansible/roles/bluetooth-monitor/other/collected.csv") df = load_measurements(csv_path) features, labels = make_training_data(df, "martins_apple_watch") print(np.unique(labels)) print(features.shape, labels.shape) train(features, labels, list(df['real_room'].dtype.categories))