import DeviceHttpDataSource from './DeviceDataSource'; import { List } from 'immutable'; import { reportDeviceData, resetDeviceData } from '../state/ActionCreators'; import { PeakDetectorSimple } from './PeakDetection'; // todo: put in settings? const NUM_MEASUREMENTS_PER_SECOND = 10; const WINDOW_SIZE_SECS = 5; class DataProcessing { constructor(reduxStore) { this.store = reduxStore; this.store.subscribe(this.onStateChange); this.state = this.store.getState(); this.dataSource = null; this.peakDetectorSimple = new PeakDetectorSimple(this.state.settings.peakDetectorSimpleThreshold); this.onDataSourceChanged(this.state.settings.deviceURL); this.onRunningChanged(this.state.session.running, this.state.settings.deviceURL); } onStateChange = () => { const newState = this.store.getState(); if (newState.settings.deviceURL !== this.state.settings.deviceURL) this.onDataSourceChanged(newState.settings.deviceURL); if (newState.session.running !== this.state.session.running) { this.onRunningChanged(newState.session.running, newState.settings.deviceURL); }; if (newState.settings.peakDetectorSimpleThreshold !== this.state.settings.peakDetectorSimpleThreshold) { this.onAnalysisParameterChange(); }; this.state = newState; } resetAnalysis = () => { this.peakDetectorSimple = new PeakDetectorSimple(this.state.settings.peakDetectorSimpleThreshold); } onAnalysisParameterChange = () => { this.resetAnalysis(); this.peakDetectorSimple.addVector(this.state.session.rawData.toArray()); const analysis = this.analyzeNewMeasurements(data.values, List()); this.store.dispatch(reportDeviceData(this.state.session.sessionId, this.state.session.rawData.size, this.state.session.rawData, analysis)); } onDataSourceChanged = (newDeviceURL) => { if (this.dataSource !== null) { this.dataSource.stop(); this.dataSource = null; } this.dataSource = new DeviceHttpDataSource(newDeviceURL + "/api/session/data", this.onNewData); } onRunningChanged = (running, deviceURL) => { let req = new XMLHttpRequest(); if (running) { //console.log("Starting session", deviceURL + "/api/session/start"); req.open("GET", deviceURL + "/api/session/start"); this.dataSource.startIndex = 0; this.dataSource.start(); } else { //console.log("Stopping session"); req.open("GET", deviceURL + "/api/session/stop"); this.dataSource.stop(); this.dataSource.startIndex = 0; } req.addEventListener("error", evt => console.log(evt)); req.addEventListener("abort", evt => console.log(evt)); req.send(); } onNewData = (data) => { if (data.sessionStartTime == this.state.session.sessionId && data.startIndex == this.state.session.rawData.size) { // normal case, add received data to measurement array const newData = this.state.session.rawData.concat(List(data.values)); const analysis = this.analyzeNewMeasurements(data.values, this.state.session.rawData); this.store.dispatch(reportDeviceData(data.sessionStartTime, data.startIndex, newData, analysis)); } else if (data.startIndex === 0) { this.resetAnalysis(); const newData = List(data.values); const analysis = this.analyzeNewMeasurements(data.values, this.state.session.rawData); this.store.dispatch(reportDeviceData(data.sessionStartTime, data.startIndex, newData, analysis)); } else { // missed some data -> re-query console.log("Requery :("); //console.log("Session times", data.sessionStartTime == this.state.session.sessionId, data.sessionStartTime, this.state.session.sessionId); //console.log("Index ",data.startIndex == this.state.session.rawData.size, data.startIndex, this.state.session.rawData.size); this.resetAnalysis(); this.dataSource.startIndex = 0; this.store.dispatch(resetDeviceData()); } } analyzeNewMeasurements = (newData, oldData) => { const newPeaks = this.peakDetectorSimple.addVector(newData); const allPeaks = this.state.session.analysis.peaks.concat(List(newPeaks)); const allMeasurements = oldData.concat(List(newData)); const totalMomentum = allMeasurements.reduce((sum, x) => sum + x, 0); const peakMax = allMeasurements.reduce((running, x) => Math.max(x, running), 0); // windowed quantities const windowSizeMeasurements = WINDOW_SIZE_SECS * NUM_MEASUREMENTS_PER_SECOND; const windowedSeq = allMeasurements.slice(-windowSizeMeasurements); const peakMaxWindow = windowedSeq.reduce((running, x) => Math.max(x, running), 0); const momentumWindow = windowedSeq.reduce((sum, x) => sum + x, 0); return { peaks: allPeaks, totalTime: allMeasurements.length / NUM_MEASUREMENTS_PER_SECOND, activeTime: 0, totalMomentum: totalMomentum, peakFrequency: 0, peakMax: peakMax, // windowed quantities momentumWindow: momentumWindow, frequencyWindow: 0, peakMaxWindow: peakMaxWindow, } }; } export default DataProcessing;