swimtracker-firmware/python-mock/peak_detection.py

196 lines
6.0 KiB
Python

import numpy as np
__all__ = ['PeakDetectorZScore', 'PeakDetectorSimple', 'detect_peaks']
def detect_peaks(values, detector):
for v in values:
detector.add(v)
return np.array(detector.peaks, dtype=int)
class PeakDetectorSimple:
def __init__(self, threshold):
self.peaks = []
self._queue = []
self.threshold = threshold
self._counter = 0
self._last_min = 0
def add(self, value):
self._queue.append(value)
if len(self._queue) > 3:
self._queue.pop(0)
if len(self._queue) != 3:
return
last, current, following = self._queue
is_maximum = current > following and current > last
if is_maximum and (current - self._last_min) > self.threshold:
self.peaks.append(self._counter + 1)
self._last_min = current
self._last_min = min(self._last_min, current)
self._counter += 1
class PeakDetectorZScore:
def __init__(self, lag, threshold, influence):
self._filter = ZScoreFilter(lag, threshold, influence)
self.peaks = []
self._counter = 0
self._previous_signal = 0
self._max = None
self._max_index = None
# debug
self.up_down_signal = []
def add(self, value):
signal = self._filter.add(value)
if signal is not None:
self.up_down_signal.append(signal)
rising_flank = self._previous_signal != 1 and signal == 1
falling_flank = self._previous_signal == 1 and signal != 1
if rising_flank:
self._max = -1
if signal == 1 and self._max is not None and value > self._max:
self._max = value
self._max_index = self._counter
if falling_flank:
self.peaks.append(self._max_index)
self._previous_signal = signal
self._counter += 1
class StatisticsQueue:
def __init__(self, size):
self._queue = []
self._queue_sum = 0
self._queue_sum_sq = 0
self._size = size
def add(self, value):
self._queue.append(value)
self._queue_sum += value
self._queue_sum_sq += value ** 2
if len(self._queue) > self._size:
removed = self._queue.pop(0)
self._queue_sum -= removed
self._queue_sum_sq -= removed ** 2
@property
def avg(self):
return self._queue_sum / len(self._queue)
@property
def variance(self):
exp_sq = self._queue_sum_sq / len(self._queue)
return exp_sq - self.avg ** 2
@property
def std_deviation(self):
return np.sqrt(self.variance)
@property
def filled(self):
return len(self._queue) == self._size
class ZScoreFilter:
def __init__(self, lag, threshold, influence):
self._threshold = threshold
self._influence = influence
self._stat_queue = StatisticsQueue(lag)
self._last_value = None
# debug
self.filtered = []
self.means = []
self.upper_bounds = []
def add(self, value):
sq = self._stat_queue
if not sq.filled:
sq.add(value)
self._last_value = value
return None
else:
avg = sq.avg
self.means.append(avg)
self.upper_bounds.append(avg + self._threshold * sq.std_deviation)
if abs(value - avg) > self._threshold * sq.std_deviation:
signal = 1 if value > avg else -1
filtered = self._influence * value + (1 - self._influence) * self._last_value
sq.add(filtered)
self._last_value = filtered
self.filtered.append(filtered)
return signal
else:
sq.add(value)
self._last_value = value
self.filtered.append(value)
return 0
def peak_detection_z_score(y, lag, threshold, influence):
signals = np.zeros(len(y))
filtered_y = np.array(y)
avg_filter = [0] * len(y)
std_filter = [0] * len(y)
avg_filter[lag - 1] = np.mean(y[0:lag])
std_filter[lag - 1] = np.std(y[0:lag])
for i in range(lag, len(y)):
if abs(y[i] - avg_filter[i - 1]) > threshold * std_filter[i - 1]:
if y[i] > avg_filter[i - 1]:
signals[i] = 1
else:
signals[i] = -1
filtered_y[i] = influence * y[i] + (1 - influence) * filtered_y[i - 1]
else:
signals[i] = 0
filtered_y[i] = y[i]
avg_filter[i] = np.mean(filtered_y[(i - lag + 1):i + 1])
std_filter[i] = np.std(filtered_y[(i - lag + 1):i + 1])
return dict(signals=np.asarray(signals),
avgFilter=np.asarray(avg_filter),
stdFilter=np.asarray(std_filter))
def test_zscore():
from measurement_session import load_session_from_file, prune_overflown_session, prune
import matplotlib.pyplot as plt
session_file = '../example_sessions/06.st'
session = load_session_from_file(session_file)
session = prune_overflown_session(session)
session = prune(session, 10, 50)
lag = 8
peak_detector_zscore = PeakDetectorZScore(lag=lag, threshold=2, influence=0)
peaks = detect_peaks(session['values'], peak_detector_zscore)
up_down = np.array([0] * lag + peak_detector_zscore.up_down_signal)
up_down[up_down < 0] = -10
up_down[up_down > 0] = 10000
avgs = np.array([0] * lag + peak_detector_zscore._filter.means)
filtered = np.array([0] * lag + peak_detector_zscore._filter.filtered)
plt.figure()
plt.plot(session['timestamps'], session['values'], 'x-', label='data')
plt.plot(session['timestamps'], filtered, 'x', label='filtered')
plt.plot(session['timestamps'], up_down, '-', label='up_down')
plt.plot(session['timestamps'], avgs, '-', label='avg')
# plt.plot(session['timestamps'][peaks+8], session['values'][peaks+8], 'o',
# label=f"Simple {peak_detector_simple.threshold}")
plt.title("Peak detection")
plt.show()
if __name__ == '__main__':
test_zscore()