196 lines
6.0 KiB
Python
196 lines
6.0 KiB
Python
import numpy as np
|
|
|
|
__all__ = ['PeakDetectorZScore', 'PeakDetectorSimple', 'detect_peaks']
|
|
|
|
|
|
def detect_peaks(values, detector):
|
|
for v in values:
|
|
detector.add(v)
|
|
return np.array(detector.peaks, dtype=int)
|
|
|
|
|
|
class PeakDetectorSimple:
|
|
def __init__(self, threshold):
|
|
self.peaks = []
|
|
self._queue = []
|
|
self.threshold = threshold
|
|
self._counter = 0
|
|
self._last_min = 0
|
|
|
|
def add(self, value):
|
|
self._queue.append(value)
|
|
if len(self._queue) > 3:
|
|
self._queue.pop(0)
|
|
if len(self._queue) != 3:
|
|
return
|
|
last, current, following = self._queue
|
|
is_maximum = current > following and current > last
|
|
if is_maximum and (current - self._last_min) > self.threshold:
|
|
self.peaks.append(self._counter + 1)
|
|
self._last_min = current
|
|
self._last_min = min(self._last_min, current)
|
|
self._counter += 1
|
|
|
|
|
|
class PeakDetectorZScore:
|
|
def __init__(self, lag, threshold, influence):
|
|
self._filter = ZScoreFilter(lag, threshold, influence)
|
|
self.peaks = []
|
|
self._counter = 0
|
|
self._previous_signal = 0
|
|
self._max = None
|
|
self._max_index = None
|
|
|
|
# debug
|
|
self.up_down_signal = []
|
|
|
|
def add(self, value):
|
|
signal = self._filter.add(value)
|
|
if signal is not None:
|
|
self.up_down_signal.append(signal)
|
|
|
|
rising_flank = self._previous_signal != 1 and signal == 1
|
|
falling_flank = self._previous_signal == 1 and signal != 1
|
|
if rising_flank:
|
|
self._max = -1
|
|
if signal == 1 and self._max is not None and value > self._max:
|
|
self._max = value
|
|
self._max_index = self._counter
|
|
if falling_flank:
|
|
self.peaks.append(self._max_index)
|
|
|
|
self._previous_signal = signal
|
|
self._counter += 1
|
|
|
|
|
|
class StatisticsQueue:
|
|
|
|
def __init__(self, size):
|
|
self._queue = []
|
|
self._queue_sum = 0
|
|
self._queue_sum_sq = 0
|
|
self._size = size
|
|
|
|
def add(self, value):
|
|
self._queue.append(value)
|
|
|
|
self._queue_sum += value
|
|
self._queue_sum_sq += value ** 2
|
|
if len(self._queue) > self._size:
|
|
removed = self._queue.pop(0)
|
|
self._queue_sum -= removed
|
|
self._queue_sum_sq -= removed ** 2
|
|
|
|
@property
|
|
def avg(self):
|
|
return self._queue_sum / len(self._queue)
|
|
|
|
@property
|
|
def variance(self):
|
|
exp_sq = self._queue_sum_sq / len(self._queue)
|
|
return exp_sq - self.avg ** 2
|
|
|
|
@property
|
|
def std_deviation(self):
|
|
return np.sqrt(self.variance)
|
|
|
|
@property
|
|
def filled(self):
|
|
return len(self._queue) == self._size
|
|
|
|
|
|
class ZScoreFilter:
|
|
def __init__(self, lag, threshold, influence):
|
|
self._threshold = threshold
|
|
self._influence = influence
|
|
self._stat_queue = StatisticsQueue(lag)
|
|
self._last_value = None
|
|
|
|
# debug
|
|
self.filtered = []
|
|
self.means = []
|
|
self.upper_bounds = []
|
|
|
|
def add(self, value):
|
|
sq = self._stat_queue
|
|
if not sq.filled:
|
|
sq.add(value)
|
|
self._last_value = value
|
|
return None
|
|
else:
|
|
avg = sq.avg
|
|
self.means.append(avg)
|
|
self.upper_bounds.append(avg + self._threshold * sq.std_deviation)
|
|
if abs(value - avg) > self._threshold * sq.std_deviation:
|
|
signal = 1 if value > avg else -1
|
|
filtered = self._influence * value + (1 - self._influence) * self._last_value
|
|
sq.add(filtered)
|
|
self._last_value = filtered
|
|
self.filtered.append(filtered)
|
|
return signal
|
|
else:
|
|
sq.add(value)
|
|
self._last_value = value
|
|
self.filtered.append(value)
|
|
return 0
|
|
|
|
|
|
def peak_detection_z_score(y, lag, threshold, influence):
|
|
signals = np.zeros(len(y))
|
|
filtered_y = np.array(y)
|
|
avg_filter = [0] * len(y)
|
|
std_filter = [0] * len(y)
|
|
avg_filter[lag - 1] = np.mean(y[0:lag])
|
|
std_filter[lag - 1] = np.std(y[0:lag])
|
|
for i in range(lag, len(y)):
|
|
if abs(y[i] - avg_filter[i - 1]) > threshold * std_filter[i - 1]:
|
|
if y[i] > avg_filter[i - 1]:
|
|
signals[i] = 1
|
|
else:
|
|
signals[i] = -1
|
|
|
|
filtered_y[i] = influence * y[i] + (1 - influence) * filtered_y[i - 1]
|
|
else:
|
|
signals[i] = 0
|
|
filtered_y[i] = y[i]
|
|
avg_filter[i] = np.mean(filtered_y[(i - lag + 1):i + 1])
|
|
std_filter[i] = np.std(filtered_y[(i - lag + 1):i + 1])
|
|
|
|
return dict(signals=np.asarray(signals),
|
|
avgFilter=np.asarray(avg_filter),
|
|
stdFilter=np.asarray(std_filter))
|
|
|
|
|
|
def test_zscore():
|
|
from measurement_session import load_session_from_file, prune_overflown_session, prune
|
|
import matplotlib.pyplot as plt
|
|
|
|
session_file = '../example_sessions/06.st'
|
|
session = load_session_from_file(session_file)
|
|
session = prune_overflown_session(session)
|
|
session = prune(session, 10, 50)
|
|
|
|
lag = 8
|
|
peak_detector_zscore = PeakDetectorZScore(lag=lag, threshold=2, influence=0)
|
|
peaks = detect_peaks(session['values'], peak_detector_zscore)
|
|
up_down = np.array([0] * lag + peak_detector_zscore.up_down_signal)
|
|
up_down[up_down < 0] = -10
|
|
up_down[up_down > 0] = 10000
|
|
avgs = np.array([0] * lag + peak_detector_zscore._filter.means)
|
|
filtered = np.array([0] * lag + peak_detector_zscore._filter.filtered)
|
|
|
|
plt.figure()
|
|
plt.plot(session['timestamps'], session['values'], 'x-', label='data')
|
|
plt.plot(session['timestamps'], filtered, 'x', label='filtered')
|
|
plt.plot(session['timestamps'], up_down, '-', label='up_down')
|
|
plt.plot(session['timestamps'], avgs, '-', label='avg')
|
|
|
|
# plt.plot(session['timestamps'][peaks+8], session['values'][peaks+8], 'o',
|
|
# label=f"Simple {peak_detector_simple.threshold}")
|
|
plt.title("Peak detection")
|
|
plt.show()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
test_zscore()
|