app changes

This commit is contained in:
Martin Bauer
2020-06-02 17:19:09 +02:00
parent d0b81bb7fb
commit 2584d2249f
13 changed files with 606 additions and 46 deletions

View File

@@ -0,0 +1,129 @@
import DeviceHttpDataSource from './DeviceDataSource';
import { List } from 'immutable';
import { reportDeviceData } from '../state/ActionCreators';
import { PeakDetectorSimple} from './PeakDetection';
// todo: put in settings?
const NUM_MEASUREMENTS_PER_SECOND = 10;
const WINDOW_SIZE_SECS = 5;
// This is the main data processing entry point, coupling between device and redux store
// - periodically fetch data
// - feeds them to analysis, (manages analysis classes)
// - adds them to redux store
class DataProcessing {
constructor(reduxStore) {
this.store = reduxStore;
this.store.subscribe(this.onStateChange);
this.state = this.store.getState();
this.dataSource = null;
//console.log("state", this.state);
console.assert(this.state.session.running === false, "Created DataProcessing with running=True");
this.onDataSourceChanged(this.state.settings.deviceURL);
this.rawMeasurements = List();
this.sessionStartTime = 0;
this.peakDetectorSimple = new PeakDetectorSimple(this.state.settings.peakDetectorSimpleThreshold);
this.peaks = List();
}
onStateChange = () => {
const newState = this.store.getState();
//console.log("DataProcessing state change", this.state, newState);
if (newState.settings.deviceURL !== this.state.settings.deviceURL)
this.onDataSourceChanged(newState.settings.deviceURL);
if (newState.session.running && !this.state.session.running) {
this.onRunningChanged(newState.session.running);
};
if(newState.settings.peakDetectorSimpleThreshold !== this.state.settings.peakDetectorSimpleThreshold) {
this.peakDetectorSimple = new PeakDetectorSimple(newState.settings.peakDetectorSimpleThreshold, this.onNewPeak);
this.peaks = List(this.peakDetectorSimple.addVector(this.rawMeasurements));
};
this.state = newState;
}
onDataSourceChanged = (newDeviceURL) => {
if (this.dataSource !== null) {
this.dataSource.stop();
this.dataSource = null;
}
this.dataSource = new DeviceHttpDataSource(this.newDeviceURL + "/api/session/data", this.onNewData);
}
onRunningChanged = (running, deviceURL) => {
if (running) {
//console.log("Starting session");
let req = new XMLHttpRequest();
req.open("GET", deviceURL + "/api/session/start");
this.dataSource.startIndex = 0;
this.dataSource.start();
} else {
//console.log("Stopping session");
req.open("GET", deviceURL + "/api/session/stop");
this.dataSource.stop();
this.dataSource.startIndex = 0;
}
}
onNewData = (data) => {
data.values;
data.sessionStartTime;
data.startIndex;
let success = false;
if (data.sessionStartTime === this.sessionStartTime && data.startIndex === this.rawMeasurements.length) {
// normal case, add received data to measurement array
this.rawMeasurements.concat(List(data.values));
this.analyzeNewMeasurements(data.startIndex);
success = true;
}
else if (data.startIndex === 0) {
// new start
this.sessionStartTime = data.sessionStartTime;
this.rawMeasurements = List(data.values);
success = true;
} else {
// missed some data -> re-query
this.dataSource.startIndex = 0;
this.sessionStartTime = 0;
//console.log("Problem: got non-consequtive data. Received:", data,
// "Own state ", { startTime: this.sessionStartTime, values: this.rawMeasurements });
}
if (success) {
const analysis = this.analyzeNewMeasurements(data.startIndex);
this.store.dispatch(reportDeviceData(this.sessionStartTime, data.startIndex, this.rawMeasurements, analysis));
}
}
analyzeNewMeasurements = (newDataStartIdx) => {
const newPeaks = this.peakDetectorSimple.addVector(this.rawMeasurements.slice(newDataStartIdx));
this.peaks = this.peaks.concat(List(newPeaks));
const totalMomentum = this.rawMeasurements.reduce((sum, x) => sum + x, 0);
const peakMax = this.rawMeasurements.reduce((running, x) => max(x, running), 0);
// windowed quantities
const windowSizeMeasurements = WINDOW_SIZE_SECS * NUM_MEASUREMENTS_PER_SECOND;
const windowedSeq = this.rawMeasurements.slice(-windowSizeMeasurements);
const peakMaxWindow = windowedSeq.reduce((running, x) => max(x, running), 0);
const momentumWindow = windowedSeq.reduce((sum, x) => sum + x, 0);
return {
peaks: this.peaks,
totalTime: this.rawMeasurements.length / NUM_MEASUREMENTS_PER_SECOND,
activeTime: 0,
totalMomentum: totalMomentum,
peakFrequency: 0,
peakMax: peakMax,
// windowed quantities
momentumWindow: momentumWindow,
frequencyWindow: 0,
peakMaxWindow: peakMaxWindow,
}
};
};
export default DataProcessing;

View File

@@ -0,0 +1,74 @@
import * as msgpack from 'msgpack-lite';
class DeviceHttpDataSource {
constructor(dataUrl, onNewData, pollInterval=1000, startIndex = 0) {
this.dataUrl = dataUrl;
this.onNewData = onNewData;
this.pollInterval = pollInterval;
this.startIndex = startIndex;
// msgpack setup
this.msgpackCodec = msgpack.createCodec();
this.msgpackCodec.addExtUnpacker(205, function (byteArr) {
const buffer = byteArr.buffer.slice(byteArr.byteOffset, byteArr.byteLength + byteArr.byteOffset);
const result = new Int16Array(buffer);
return result;
});
this.fetchDataHttp = this.fetchDataHttp.bind(this);
}
getUrl(url) {
return new Promise((accept, reject) => {
var req = new XMLHttpRequest();
req.open("GET", url, true);
req.responseType = "arraybuffer";
req.onload = function (event) {
var resp = req.response;
if (resp) {
accept(resp);
}
};
req.addEventListener("error", evt => reject(evt));
req.addEventListener("abort", evt => reject(evt));
req.send(null);
});
}
async fetchDataHttp() {
try {
const url = this.dataUrl + "?startIdx=" + this.startIndex;
const arrayBuffer = await this.getUrl(url);
const decoded = msgpack.decode(new Uint8Array(arrayBuffer), { codec: this.msgpackCodec });
this.startIndex += decoded["values"].length;
//"values", "sessionStartTime", "startIndex"
this.onNewData(decoded);
} catch (err) {
//console.log(err);
}
}
start() {
if (this.timer === null) {
this.timer = setInterval(this.fetchDataHttp, this.pollInterval);
return true;
} else {
return false;
}
}
stop() {
if (this.timer !== null) {
clearInterval(this.timer);
this.timer = null;
return true;
} else {
return false;
}
}
};
export default DeviceHttpDataSource;

View File

@@ -0,0 +1,187 @@
/**
* A simple peak detector
*
* Usage: Successively add values via add() and query the indices of the peaks with peaks
*
* A peak is detected if the current point is local maximum (no filtering!) and the current
* value is larger than (threshold + minimum_since_last_peak)
*/
class PeakDetectorSimple {
constructor(threshold, handleNewPeaks) {
this._threshold = threshold;
this._queue = [];
this._last_min = 0;
this._counter = 0;
this._handleNewPeaks = handleNewPeaks;
}
getThreshold() {
return this._threshold;
}
addVector(vec) {
let result = [];
const callbackBackup = this._handleNewPeaks;
this._handleNewPeaks = null;
for (let i = 0; i < vec.length; ++i) {
const res = this.add(vec[i]);
if(res !== null)
result.push(res);
}
this._handleNewPeaks = callbackBackup;
return result;
}
add(value) {
let result = null;
this._queue.push(value);
if (this._queue.length > 3) {
this._queue.shift();
}
if (this._queue.length !== 3) {
return;
}
const [last, current, next] = this._queue;
const is_maximum = current > next && current > last;
if (is_maximum && (current - this._last_min) > this._threshold) {
result = this._counter;
}
this._last_min = Math.min(this._last_min, current);
this._counter += 1;
return result;
}
}
/**
* Implementation of Z-Score peak detection according to
* https://stackoverflow.com/questions/22583391/peak-signal-detection-in-realtime-timeseries-data
*/
class PeakDetectorZScore {
constructor(lag, threshold, influence, handleNewPeaks) {
this.peaks = [];
this._filter = ZScoreFilter(lag, threshold, influence);
this._counter = 0;
this._previous_signal = 0;
this._max = null;
this._max_index = null;
this._handleNewPeaks = handleNewPeaks;
}
addVector(vec) {
const callbackBackup = this._handleNewPeaks;
const numPeaksBefore = this.peaks.length;
this._handleNewPeaks = null;
for (let i = 0; i < vec.length; ++i) {
this.add(vec[i]);
}
this._handleNewPeaks = callbackBackup;
if (numPeaksBefore != this.peaks.length) {
this._handleNewPeaks(this.peaks);
}
}
add(value) {
const signal = this._filter.add(value);
if (signal != null) {
const rising_flank = this._previous_signal !== 1 && signal === 1;
const falling_flank = this._previous_signal === 1 && signal !== 1;
if (rising_flank)
this._max = -1;
if (signal === 1 && this._max != null && value > this._max) {
this._max = value;
this._max_index = this._counter;
}
if (falling_flank) {
this.peaks.push(this._max_index);
if (this._handleNewPeaks) {
this._handleNewPeaks(this.peaks);
}
}
this._previous_signal = signal;
}
this._counter += 1;
}
}
export { PeakDetectorSimple, PeakDetectorZScore };
// --------------------------------------- Helper classes -------------------------------------------------------------
class StatisticsQueue {
constructor(size) {
this._size = size;
this._queue = [];
this._queue_sum = 0; // running sum over all elements currently in _queue
this._queue_sum_sq = 0; // sum of squared elements in _queue
}
add(value) {
this._queue.push(value);
this._queue_sum += value;
this._queue_sum_sq += value * value;
if (this._queue.length > this._size) {
const removed = this._queue[0];
this._queue.shift();
this._queue_sum -= removed;
this._queue_sum_sq -= removed * removed;
}
}
get avg() {
return this._queue_sum / self._queue.length;
}
get variance() {
const exp_sq = this._queue_sum_sq / self._queue.length;
const my_avg = self.avg;
return exp_sq - (my_avg * my_avg);
}
get std_deviation() {
return Math.sqrt(this.variance);
}
get filled() {
return this._queue.length === this._size;
}
}
class ZScoreFilter {
constructor(lag, threshold, influence) {
this._threshold = threshold;
this._influence = influence;
this._last_value = null;
this._stat_queue = StatisticsQueue(lag);
}
add(value) {
let sq = this._stat_queue;
if (!sq.filled) {
sq.add(value);
this._last_value = value;
return null;
} else {
const avg = sq.avg;
if (Math.abs(value - avg) > this._threshold * sq.std_deviation) {
const signal = value > avg ? 1 : -1;
const filtered = this._influence * value + (1 - this._influence) * this._last_value;
sq.add(filtered);
this._last_value = filtered;
return signal;
} else {
sq.add(value);
this._last_value = value;
}
}
}
}