new device communication & analysis

- switched to websockets
- analysis as pure function with internal hidden cache
- new redux reducers
This commit is contained in:
Martin Bauer 2020-06-28 18:55:58 +02:00
parent 00be9e1db2
commit fa1518546b
12 changed files with 336 additions and 169 deletions

1
.expo-shared/assets.json Normal file
View File

@ -0,0 +1 @@
{}

4
App.js
View File

@ -6,14 +6,14 @@ import * as Font from 'expo-font';
// Redux // Redux
import swimtrackerReducer from './state/Reducer'; import swimtrackerReducer from './state/Reducer';
import { createStore } from 'redux'; import { createStore } from 'redux';
import { DeviceReduxCoupling } from './state/DeviceReduxCoupling';
import { Provider } from 'react-redux'; import { Provider } from 'react-redux';
import ThemedStackNavigation from './components/ThemedStackNavigation'; import ThemedStackNavigation from './components/ThemedStackNavigation';
import DataProcessing from "./data_processing/DataProcessing";
const store = createStore(swimtrackerReducer); const store = createStore(swimtrackerReducer);
const dataProcessing = new DataProcessing(store); const deviceReduxCoupling = new DeviceReduxCoupling(store);
export default class App extends React.Component { export default class App extends React.Component {

View File

@ -1,83 +0,0 @@
import React from 'react';
import PropTypes from 'prop-types';
import * as msgpack from 'msgpack-lite';
class DeviceHttpDataSource extends React.Component {
constructor(props) {
super(props);
this.data = [];
this.dataUrl = this.props.deviceUrl + "/api/session/data";
// msgpack setup
this.msgpackCodec = msgpack.createCodec();
this.msgpackCodec.addExtUnpacker(205, function (byteArr) {
const buffer = byteArr.buffer.slice(byteArr.byteOffset, byteArr.byteLength + byteArr.byteOffset);
const result = new Int16Array(buffer);
return result;
});
this.fetchDataHttp = this.fetchDataHttp.bind(this);
}
getUrl(url) {
return new Promise((accept, reject) => {
var req = new XMLHttpRequest();
req.open("GET", url, true);
req.responseType = "arraybuffer";
req.onload = function (event) {
var resp = req.response;
if (resp) {
accept(resp);
}
};
req.send(null);
});
//todo reject on error
}
async fetchDataHttp() {
try {
const url = this.dataUrl + "?startIdx=" + this.data.length;
const arrayBuffer = await this.getUrl(url);
const decoded = msgpack.decode(new Uint8Array(arrayBuffer), { codec: this.msgpackCodec });
const typedValueArr = decoded['values'];
const newDataStart = this.data.length;
for (let i = 0; i < typedValueArr.length; ++i) {
this.data.push(typedValueArr[i]);
}
this.props.onNewData(this.data, newDataStart);
} catch (err) {
//console.log(err);
}
}
componentDidMount() {
this.timer = setInterval(this.fetchDataHttp, this.props.pollInterval);
}
componentWillUnmount() {
clearInterval(this.timer);
this.timer = null;
}
render() {
return null;
}
}
DeviceHttpDataSource.propTypes = {
deviceUrl: PropTypes.string.isRequired,
onNewData: PropTypes.func.isRequired,
pollInterval: PropTypes.number // poll interval in ms
};
DeviceHttpDataSource.defaultProps = {
pollInterval: 20000
};
export default DeviceHttpDataSource;

View File

@ -2,7 +2,7 @@ import React from 'react';
import { Content, Card, CardItem, Body, Text, Button } from 'native-base'; import { Content, Card, CardItem, Body, Text, Button } from 'native-base';
import { Image, ScrollView } from 'react-native'; import { Image, ScrollView } from 'react-native';
import { connect } from 'react-redux'; import { connect } from 'react-redux';
import { startSession} from '../state/ActionCreators'; import { ConnState, startSession } from '../state/DeviceReduxCoupling';
function HomeView(props) { function HomeView(props) {
const buttonText = props.running ? "View Swim Session" : "Start swimming"; const buttonText = props.running ? "View Swim Session" : "Start swimming";
@ -50,7 +50,7 @@ function HomeView(props) {
} }
const mapStateToProps = (state) => { const mapStateToProps = (state) => {
return { running: state.session.running }; return { running: state.deviceState.connState == ConnState.CONNECTED_STARTING };
}; };
export default connect(mapStateToProps)(HomeView); export default connect(mapStateToProps)(HomeView);

View File

@ -5,13 +5,11 @@ import { LinearGradient } from 'expo-linear-gradient';
import IconCard from './IconCard'; import IconCard from './IconCard';
import Graph from './Graph'; import Graph from './Graph';
import { connect } from 'react-redux'; import { connect } from 'react-redux';
import { stopSession } from '../state/ActionCreators';
import backgroundColors from './Themes'; import backgroundColors from './Themes';
import { useKeepAwake } from 'expo-keep-awake'; import { useKeepAwake } from 'expo-keep-awake';
import { stopSession } from '../state/DeviceReduxCoupling';
function LiveTrainingView(props) {
function LiveTrainingView(props)
{
const analysis = props.session.analysis; const analysis = props.session.analysis;
const onStopClick = () => { const onStopClick = () => {
props.dispatch(stopSession()); props.dispatch(stopSession());
@ -68,11 +66,10 @@ const styles = StyleSheet.create({
const mapStateToProps = (state) => { const mapStateToProps = (state) => {
return { return {
session: state.session, session: state.deviceState,
peaksPerLap: state.settings.peaksPerLap, peaksPerLap: state.settings.analysis.peaksPerLap,
theme: state.settings.theme, theme: state.settings.theme,
}; };
}; };
export default connect(mapStateToProps)(LiveTrainingView); export default connect(mapStateToProps)(LiveTrainingView);

View File

@ -0,0 +1,66 @@
import { PeakDetectorSimple } from './PeakDetection';
import { List } from 'immutable';
export default class DataAnalysis {
constructor() {
this._resetCache(null, 0);
}
analyze(analysisParameters, sessionId, allMeasurements) {
const cacheValid = (
this.sessionId === sessionId &&
this.analyzedUpToIdx <= allMeasurements.size &&
this.analysisParameters === analysisParameters);
let newData = null;
if (cacheValid) {
newData = allMeasurements.slice(this.analyzedUpToIdx);
}
else {
this._resetCache(analysisParameters, sessionId);
newData = allMeasurements;
}
// peaks
const newPeaks = this.peakDetectorSimple.addVector(newData);
this.allPeaks = this.allPeaks.concat(List(newPeaks));
// aggregated sum/max
this.aggregatedMomentum = newData.reduce((sum, x) => sum + x, this.aggregatedMomentum);
this.peakMax = newData.reduce((running, x) => Math.max(x, running), this.peakMax);
// windowed
const windowNumDataPoints = analysisParameters.windowSizeInSecs * analysisParameters.numMeasurementsPerSec;
const windowed = allMeasurements.slice(-windowNumDataPoints);
const peakMaxWindow = windowed.reduce((running, x) => Math.max(x, running), 0);
const momentumWindow = windowed.reduce((sum, x) => sum + x, 0);
this.analyzedUpToIdx = allMeasurements.size;
return {
peaks: this.allPeaks,
totalTime: allMeasurements / analysisParameters.numMeasurementsPerSec,
totalMomentum: this.aggregatedMomentum / allMeasurements.size,
peakMax: this.peakMax,
momentumWindow: momentumWindow,
peakMaxWindow: peakMaxWindow,
};
}
_resetCache(analysisParameters, sessionId) {
this.peakDetectorSimple = analysisParameters ? new PeakDetectorSimple(analysisParameters.peakDetectorSimpleThreshold) : null;
this.allPeaks = List();
this.aggregatedMomentum = 0;
this.peakMax = 0;
this.sessionId = sessionId;
this.analyzedUpToIdx = 0;
this.analysisParameters = analysisParameters;
}
};

View File

@ -0,0 +1,84 @@
import ReconnectingWebSocket from 'reconnecting-websocket';
const OpCodes = {
// from device to frontend
INITIAL_INFO: 1,
SESSION_STARTED: 2,
SESSION_STOPPED: 3,
SESSION_NEW_DATA: 4,
// from frontend to device
START_SESSION: 5,
STOP_SESSION: 6,
TARE: 7
};
export default class SwimTrackerWebsocketConnection {
constructor(swimTrackerHost, onData, onStarted, onStopped, onConnect, onDisconnect) {
this.swimTrackerHost = swimTrackerHost;
this.onData = onData;
this.onStarted = onStarted;
this.onStopped = onStopped;
this.onConnect = onConnect;
this.onDisconnect = onDisconnect;
const wsOptions = {
maxReconnectionDelay: 4000
};
this.ws = new ReconnectingWebSocket(`ws://${swimTrackerHost}:81`, [], wsOptions);
this.ws.onmessage = this._onMessage;
this.ws.onopen = this.onConnect;
this.ws.onclose = this.onDisconnect;
this.ws.onerror = this._onError;
this.ws.binaryType = 'arraybuffer';
}
sendStartCommand() {
const data = new Uint8Array(1);
data[0] = OpCodes.START_SESSION;
this.ws.send(data);
}
sendStopCommand() {
const data = new Uint8Array(1);
data[0] = OpCodes.STOP_SESSION;
this.ws.send(data);
}
sendTareCommand() {
const data = new Uint8Array(1);
data[0] = OpCodes.TARE;
this.ws.send(data);
}
_onMessage = (e) => {
const dv = new DataView(e.data);
const opCode = dv.getInt8(0);
if (opCode === OpCodes.INITIAL_INFO) {
const headerSize = 6;
const running = Boolean(dv.getInt8(1));
const sessionId = dv.getUint32(2);
if (running && e.data.byteLength > headerSize) {
const data = new Uint16Array(e.data.slice(headerSize));
this.onStarted(sessionId);
this.onData(data);
} else
this.onStopped();
} else if (opCode === OpCodes.SESSION_STARTED) {
const sessionId = dv.getUint32(1);
this.onStarted(sessionId);
} else if (opCode === OpCodes.SESSION_STOPPED) {
this.onStopped();
} else if (opCode === OpCodes.SESSION_NEW_DATA) {
const data = new Uint16Array(e.data.slice(1));
this.onData(data);
}
}
_onError = (ev) => {
console.log("Websocket error", ev);
}
};

5
package-lock.json generated
View File

@ -8205,6 +8205,11 @@
"util.promisify": "^1.0.0" "util.promisify": "^1.0.0"
} }
}, },
"reconnecting-websocket": {
"version": "4.4.0",
"resolved": "https://registry.npmjs.org/reconnecting-websocket/-/reconnecting-websocket-4.4.0.tgz",
"integrity": "sha512-D2E33ceRPga0NvTDhJmphEgJ7FUYF0v4lr1ki0csq06OdlxKfugGzN0dSkxM/NfqCxYELK4KcaTOUOjTV6Dcng=="
},
"redux": { "redux": {
"version": "4.0.5", "version": "4.0.5",
"resolved": "https://registry.npmjs.org/redux/-/redux-4.0.5.tgz", "resolved": "https://registry.npmjs.org/redux/-/redux-4.0.5.tgz",

View File

@ -20,6 +20,7 @@
"@react-navigation/stack": "^5.3.9", "@react-navigation/stack": "^5.3.9",
"expo": "^37.0.0", "expo": "^37.0.0",
"expo-blur": "~8.1.0", "expo-blur": "~8.1.0",
"expo-keep-awake": "^8.1.0",
"expo-linear-gradient": "~8.1.0", "expo-linear-gradient": "~8.1.0",
"immutable": "^4.0.0-rc.12", "immutable": "^4.0.0-rc.12",
"msgpack-lite": "^0.1.26", "msgpack-lite": "^0.1.26",
@ -39,6 +40,7 @@
"react-native-unimodules": "~0.8.1", "react-native-unimodules": "~0.8.1",
"react-native-web": "^0.11.7", "react-native-web": "^0.11.7",
"react-redux": "^7.2.0", "react-redux": "^7.2.0",
"reconnecting-websocket": "^4.4.0",
"redux": "^4.0.5" "redux": "^4.0.5"
}, },
"devDependencies": { "devDependencies": {

View File

@ -1,37 +1,30 @@
export const NEW_DEVICE_DATA = "NEW_DEVICE_DATA";
export const CHANGE_USER_NAME = "SET_USERNAME"; export const CHANGE_USER_NAME = "SET_USERNAME";
export const CHANGE_THEME = "CHANGE_THEME"; export const CHANGE_THEME = "CHANGE_THEME";
export const START_SESSION = "START_SESSION";
export const STOP_SESSION = "STOP_SESSION";
export const RESET_DEVICE_DATA = "RESET_DEVICE_DATA"; export const RESET_DEVICE_DATA = "RESET_DEVICE_DATA";
export const reportDeviceData = (sessionId, newDataStart, data, analysis) => ({
type: NEW_DEVICE_DATA,
sessionId: sessionId,
newDataStart: newDataStart,
data: data,
analysis: analysis,
})
export const resetDeviceData = () => ({
type: RESET_DEVICE_DATA,
});
export const changeUsername = newUsername => ({ export const changeUsername = newUsername => ({
type: CHANGE_USER_NAME, type: CHANGE_USER_NAME,
newUserName: newUsername, newUserName: newUsername,
}) });
export const changeTheme = newThemeName => ({ export const changeTheme = newThemeName => ({
type: CHANGE_THEME, type: CHANGE_THEME,
newThemeName: newThemeName newThemeName: newThemeName
}) });
export const startSession = () => ({ export const startSession = () => ({
type: START_SESSION type: START_SESSION
}) });
export const stopSession = () => ({ export const stopSession = () => ({
type: STOP_SESSION type: STOP_SESSION
}) });
// ---------------------

View File

@ -0,0 +1,143 @@
import SwimTrackerWebsocketConnection from "../data_processing/SwimTrackerWebsocketConnection";
import DataAnalysis from "../data_processing/DataAnalysis";
import { List } from "immutable";
export const ConnState = {
DISCONNECTED: 'disconnected',
CONNECTED_STOPPED: 'connected_stopped',
CONNECTED_RUNNING: 'connected_running',
CONNECTED_STARTING: 'connected_starting', // start message sent, but device hasn't ack'ed it yet
CONNECTED_STOPPING: 'connected_stopping' // stop message sent..
}
// -------------------------------------------- Actions ---------------------------------------------
export const DEVICE_DISCONNECT = "DEVICE_DISCONNECT";
export const DEVICE_CONNECT = "DEVICE_CONNECT";
export const SESSION_STARTED = "SESSION_STARTED";
export const SESSION_STOPPED = "SESSION_STOPPED";
export const SESSION_NEW_DATA = "SESSION_NEW_DATA";
export const START_SESSION = "START_SESSION";
export const STOP_SESSION = "STOP_SESSION";
export const reportSessionStarted = (sessionId) => ({
type: SESSION_STARTED,
sessionId: sessionId
});
export const reportSessionStopped = () => ({
type: SESSION_STOPPED
});
export const reportNewSessionData = (allMeasurements, analysis) => ({
type: SESSION_NEW_DATA,
data: allMeasurements,
analysis: analysis
});
export const reportDeviceConnect = () => ({
type: DEVICE_CONNECT
});
export const reportDeviceDisconnect = () => ({
type: DEVICE_DISCONNECT
});
export const startSession = () => ({
type: START_SESSION
});
export const stopSession = () => ({
type: STOP_SESSION
});
// -------------------------------------------- Device coupling -------------------------------------
export class DeviceReduxCoupling {
constructor(reduxStore) {
this.reduxStore = reduxStore;
this.analysis = new DataAnalysis();
this.conn = null;
this.reduxStore.subscribe(this._onStateChange);
this._onStateChange();
}
_onStateChange = () => {
const state = this.reduxStore.getState();
if (this.conn === null || (state.settings.swimTrackerHost != this.conn.swimTrackerHost)) {
this.conn = new SwimTrackerWebsocketConnection(state.settings.swimTrackerHost,
this._onNewData,
(sessionId) => this.reduxStore.dispatch(reportSessionStarted(sessionId)),
() => this.reduxStore.dispatch(reportSessionStopped()),
() => this.reduxStore.dispatch(reportDeviceConnect()),
() => this.reduxStore.dispatch(reportDeviceDisconnect())
);
}
if (state.deviceState.connState === ConnState.CONNECTED_STARTING) {
console.log("sending start command to connection");
this.conn.sendStartCommand();
}
else if (state.deviceState.connState === ConnState.CONNECTED_STOPPING)
this.conn.sendStopCommand();
}
_onNewData = (newData) => {
const state = this.reduxStore.getState();
const allMeasurements = state.deviceState.measurements.concat(List(newData));
const analysisResult = this.analysis.analyze(state.settings.analysis, state.deviceState.sessionId, allMeasurements);
this.reduxStore.dispatch(reportNewSessionData(allMeasurements, analysisResult));
}
};
// -------------------------------------------- Reducer -----------------------------------------------
const INITIAL_ANALYSIS = {
'peaks': List(),
'totalTime': null,
'totalMomentum': null,
'peakMax': null,
'momentumWindow': null,
'peakMaxWindow': null,
};
const INITIAL_DEVICE_STATE = {
connState: ConnState.DISCONNECTED,
sessionId: 0,
measurements: List(),
analysis: INITIAL_ANALYSIS,
};
export const deviceStateReducer = (state = INITIAL_DEVICE_STATE, action) => {
switch (action.type) {
case SESSION_NEW_DATA:
const res = {
...state,
measurements: action.data,
analysis: { ...state.analysis, ...action.analysis }
};
return res;
case DEVICE_CONNECT:
return { ...INITIAL_DEVICE_STATE, connState: ConnState.CONNECTED_STOPPED };
case DEVICE_DISCONNECT:
return { ...INITIAL_DEVICE_STATE, connState: ConnState.DISCONNECTED };
case SESSION_STARTED:
return { ...INITIAL_DEVICE_STATE, connState: ConnState.CONNECTED_RUNNING, sessionId: action.sessionId };
case SESSION_STOPPED:
return { ...INITIAL_DEVICE_STATE, connState: ConnState.CONNECTED_STOPPED };
case START_SESSION:
return { ...INITIAL_DEVICE_STATE, connState: ConnState.CONNECTED_STARTING };
case STOP_SESSION:
return { ...INITIAL_DEVICE_STATE, connState: ConnState.CONNECTED_STOPPING };
default:
console.log("Unhandled state in deviceStateReducer", action.type);
return state
}
};

View File

@ -1,40 +1,27 @@
import { combineReducers } from 'redux'; import { combineReducers } from 'redux';
import { List } from 'immutable';
import { CHANGE_THEME, CHANGE_USER_NAME, NEW_DEVICE_DATA, START_SESSION, STOP_SESSION, RESET_DEVICE_DATA } from './ActionCreators'; import { CHANGE_THEME, CHANGE_USER_NAME, NEW_DEVICE_DATA, START_SESSION, STOP_SESSION, RESET_DEVICE_DATA } from './ActionCreators';
import { deviceStateReducer } from "./DeviceReduxCoupling";
const INITIAL_SETTINGS = { const INITIAL_SETTINGS = {
theme: "hot", theme: "hot",
username: "", username: "",
deviceURL: "http://192.168.178.107", swimTrackerHost: "192.168.178.110",
peaksPerLap: 30,
// advanced
peakDetector: 'SIMPLE', // either 'SIMPLE' or 'ZSCORE'
peakDetectorSimpleThreshold: 2500,
peakDetectorZScoreLag: 8, // peak detector z-score values
peakDetectorZScoreThreshold: 2,
peakDetectorZScoreInfluence: 0.1,
};
const INITIAL_CURRENT_SESSION = {
running: false,
sessionId: 0,
rawData: List(),
analysis: { analysis: {
'peaks': List(), peaksPerLap: 30,
'totalTime': null, windowSizeInSecs: 5,
'activeTime': null, numMeasurementsPerSec: 10,
'totalMomentum': null,
'peakFrequency': null, peakDetector: 'SIMPLE', // either 'SIMPLE' or 'ZSCORE'
'peakMax': null, peakDetectorSimpleThreshold: 2500,
// windowed quantities
'momentumWindow': null, peakDetectorZScoreLag: 8, // peak detector z-score values
'frequencyWindow': null, peakDetectorZScoreThreshold: 2,
'peakMaxWindow': null, peakDetectorZScoreInfluence: 0.1,
} }
}; };
const settingsReducer = (state = INITIAL_SETTINGS, action) => { const settingsReducer = (state = INITIAL_SETTINGS, action) => {
switch (action.type) { switch (action.type) {
case CHANGE_THEME: case CHANGE_THEME:
@ -46,35 +33,7 @@ const settingsReducer = (state = INITIAL_SETTINGS, action) => {
} }
}; };
const currentSessionReducer = (state = INITIAL_CURRENT_SESSION, action) => {
switch (action.type) {
case START_SESSION:
return {
running: true,
rawData: List(),
analysis: INITIAL_CURRENT_SESSION.analysis
};
case STOP_SESSION:
return {
running: false,
rawData: List(),
analysis: INITIAL_CURRENT_SESSION.analysis
};
case NEW_DEVICE_DATA:
return {
running: action.data.size > 0,
sessionId: action.sessionId,
rawData: action.data,
analysis: { ...state.analysis, ...action.analysis },
}
case RESET_DEVICE_DATA:
return INITIAL_CURRENT_SESSION
default:
return state
}
};
export default combineReducers({ export default combineReducers({
settings: settingsReducer, settings: settingsReducer,
session: currentSessionReducer, deviceState: deviceStateReducer,
}); });