import numpy as np __all__ = ['PeakDetectorZScore', 'PeakDetectorSimple', 'detect_peaks'] def detect_peaks(values, detector): for v in values: detector.add(v) return np.array(detector.peaks, dtype=int) class PeakDetectorSimple: def __init__(self, threshold): self.peaks = [] self._queue = [] self.threshold = threshold self._counter = 0 self._last_min = 0 def add(self, value): self._queue.append(value) if len(self._queue) > 3: self._queue.pop(0) if len(self._queue) != 3: return last, current, following = self._queue is_maximum = current > following and current > last if is_maximum and (current - self._last_min) > self.threshold: self.peaks.append(self._counter + 1) self._last_min = current self._last_min = min(self._last_min, current) self._counter += 1 class PeakDetectorZScore: def __init__(self, lag, threshold, influence): self._filter = ZScoreFilter(lag, threshold, influence) self.peaks = [] self._counter = 0 self._previous_signal = 0 self._max = None self._max_index = None # debug self.up_down_signal = [] def add(self, value): signal = self._filter.add(value) if signal is not None: self.up_down_signal.append(signal) rising_flank = self._previous_signal != 1 and signal == 1 falling_flank = self._previous_signal == 1 and signal != 1 if rising_flank: self._max = -1 if signal == 1 and self._max is not None and value > self._max: self._max = value self._max_index = self._counter if falling_flank: self.peaks.append(self._max_index) self._previous_signal = signal self._counter += 1 class StatisticsQueue: def __init__(self, size): self._queue = [] self._queue_sum = 0 self._queue_sum_sq = 0 self._size = size def add(self, value): self._queue.append(value) self._queue_sum += value self._queue_sum_sq += value ** 2 if len(self._queue) > self._size: removed = self._queue.pop(0) self._queue_sum -= removed self._queue_sum_sq -= removed ** 2 @property def avg(self): return self._queue_sum / len(self._queue) @property def variance(self): exp_sq = self._queue_sum_sq / len(self._queue) return exp_sq - self.avg ** 2 @property def std_deviation(self): return np.sqrt(self.variance) @property def filled(self): return len(self._queue) == self._size class ZScoreFilter: def __init__(self, lag, threshold, influence): self._threshold = threshold self._influence = influence self._stat_queue = StatisticsQueue(lag) self._last_value = None # debug self.filtered = [] self.means = [] self.upper_bounds = [] def add(self, value): sq = self._stat_queue if not sq.filled: sq.add(value) self._last_value = value return None else: avg = sq.avg self.means.append(avg) self.upper_bounds.append(avg + self._threshold * sq.std_deviation) if abs(value - avg) > self._threshold * sq.std_deviation: signal = 1 if value > avg else -1 filtered = self._influence * value + (1 - self._influence) * self._last_value sq.add(filtered) self._last_value = filtered self.filtered.append(filtered) return signal else: sq.add(value) self._last_value = value self.filtered.append(value) return 0 def peak_detection_z_score(y, lag, threshold, influence): signals = np.zeros(len(y)) filtered_y = np.array(y) avg_filter = [0] * len(y) std_filter = [0] * len(y) avg_filter[lag - 1] = np.mean(y[0:lag]) std_filter[lag - 1] = np.std(y[0:lag]) for i in range(lag, len(y)): if abs(y[i] - avg_filter[i - 1]) > threshold * std_filter[i - 1]: if y[i] > avg_filter[i - 1]: signals[i] = 1 else: signals[i] = -1 filtered_y[i] = influence * y[i] + (1 - influence) * filtered_y[i - 1] else: signals[i] = 0 filtered_y[i] = y[i] avg_filter[i] = np.mean(filtered_y[(i - lag + 1):i + 1]) std_filter[i] = np.std(filtered_y[(i - lag + 1):i + 1]) return dict(signals=np.asarray(signals), avgFilter=np.asarray(avg_filter), stdFilter=np.asarray(std_filter)) def test_zscore(): from measurement_session import load_session_from_file, prune_overflown_session, prune import matplotlib.pyplot as plt session_file = '../example_sessions/06.st' session = load_session_from_file(session_file) session = prune_overflown_session(session) session = prune(session, 10, 50) lag = 8 peak_detector_zscore = PeakDetectorZScore(lag=lag, threshold=2, influence=0) peaks = detect_peaks(session['values'], peak_detector_zscore) up_down = np.array([0] * lag + peak_detector_zscore.up_down_signal) up_down[up_down < 0] = -10 up_down[up_down > 0] = 10000 avgs = np.array([0] * lag + peak_detector_zscore._filter.means) filtered = np.array([0] * lag + peak_detector_zscore._filter.filtered) plt.figure() plt.plot(session['timestamps'], session['values'], 'x-', label='data') plt.plot(session['timestamps'], filtered, 'x', label='filtered') plt.plot(session['timestamps'], up_down, '-', label='up_down') plt.plot(session['timestamps'], avgs, '-', label='avg') # plt.plot(session['timestamps'][peaks+8], session['values'][peaks+8], 'o', # label=f"Simple {peak_detector_simple.threshold}") plt.title("Peak detection") plt.show() if __name__ == '__main__': test_zscore()