Restructured full repo
This commit is contained in:
195
python-mock/peak_detection.py
Normal file
195
python-mock/peak_detection.py
Normal file
@@ -0,0 +1,195 @@
|
||||
import numpy as np
|
||||
|
||||
__all__ = ['PeakDetectorZScore', 'PeakDetectorSimple', 'detect_peaks']
|
||||
|
||||
|
||||
def detect_peaks(values, detector):
|
||||
for v in values:
|
||||
detector.add(v)
|
||||
return np.array(detector.peaks, dtype=int)
|
||||
|
||||
|
||||
class PeakDetectorSimple:
|
||||
def __init__(self, threshold):
|
||||
self.peaks = []
|
||||
self._queue = []
|
||||
self.threshold = threshold
|
||||
self._counter = 0
|
||||
self._last_min = 0
|
||||
|
||||
def add(self, value):
|
||||
self._queue.append(value)
|
||||
if len(self._queue) > 3:
|
||||
self._queue.pop(0)
|
||||
if len(self._queue) != 3:
|
||||
return
|
||||
last, current, following = self._queue
|
||||
is_maximum = current > following and current > last
|
||||
if is_maximum and (current - self._last_min) > self.threshold:
|
||||
self.peaks.append(self._counter + 1)
|
||||
self._last_min = current
|
||||
self._last_min = min(self._last_min, current)
|
||||
self._counter += 1
|
||||
|
||||
|
||||
class PeakDetectorZScore:
|
||||
def __init__(self, lag, threshold, influence):
|
||||
self._filter = ZScoreFilter(lag, threshold, influence)
|
||||
self.peaks = []
|
||||
self._counter = 0
|
||||
self._previous_signal = 0
|
||||
self._max = None
|
||||
self._max_index = None
|
||||
|
||||
# debug
|
||||
self.up_down_signal = []
|
||||
|
||||
def add(self, value):
|
||||
signal = self._filter.add(value)
|
||||
if signal is not None:
|
||||
self.up_down_signal.append(signal)
|
||||
|
||||
rising_flank = self._previous_signal != 1 and signal == 1
|
||||
falling_flank = self._previous_signal == 1 and signal != 1
|
||||
if rising_flank:
|
||||
self._max = -1
|
||||
if signal == 1 and self._max is not None and value > self._max:
|
||||
self._max = value
|
||||
self._max_index = self._counter
|
||||
if falling_flank:
|
||||
self.peaks.append(self._max_index)
|
||||
|
||||
self._previous_signal = signal
|
||||
self._counter += 1
|
||||
|
||||
|
||||
class StatisticsQueue:
|
||||
|
||||
def __init__(self, size):
|
||||
self._queue = []
|
||||
self._queue_sum = 0
|
||||
self._queue_sum_sq = 0
|
||||
self._size = size
|
||||
|
||||
def add(self, value):
|
||||
self._queue.append(value)
|
||||
|
||||
self._queue_sum += value
|
||||
self._queue_sum_sq += value ** 2
|
||||
if len(self._queue) > self._size:
|
||||
removed = self._queue.pop(0)
|
||||
self._queue_sum -= removed
|
||||
self._queue_sum_sq -= removed ** 2
|
||||
|
||||
@property
|
||||
def avg(self):
|
||||
return self._queue_sum / len(self._queue)
|
||||
|
||||
@property
|
||||
def variance(self):
|
||||
exp_sq = self._queue_sum_sq / len(self._queue)
|
||||
return exp_sq - self.avg ** 2
|
||||
|
||||
@property
|
||||
def std_deviation(self):
|
||||
return np.sqrt(self.variance)
|
||||
|
||||
@property
|
||||
def filled(self):
|
||||
return len(self._queue) == self._size
|
||||
|
||||
|
||||
class ZScoreFilter:
|
||||
def __init__(self, lag, threshold, influence):
|
||||
self._threshold = threshold
|
||||
self._influence = influence
|
||||
self._stat_queue = StatisticsQueue(lag)
|
||||
self._last_value = None
|
||||
|
||||
# debug
|
||||
self.filtered = []
|
||||
self.means = []
|
||||
self.upper_bounds = []
|
||||
|
||||
def add(self, value):
|
||||
sq = self._stat_queue
|
||||
if not sq.filled:
|
||||
sq.add(value)
|
||||
self._last_value = value
|
||||
return None
|
||||
else:
|
||||
avg = sq.avg
|
||||
self.means.append(avg)
|
||||
self.upper_bounds.append(avg + self._threshold * sq.std_deviation)
|
||||
if abs(value - avg) > self._threshold * sq.std_deviation:
|
||||
signal = 1 if value > avg else -1
|
||||
filtered = self._influence * value + (1 - self._influence) * self._last_value
|
||||
sq.add(filtered)
|
||||
self._last_value = filtered
|
||||
self.filtered.append(filtered)
|
||||
return signal
|
||||
else:
|
||||
sq.add(value)
|
||||
self._last_value = value
|
||||
self.filtered.append(value)
|
||||
return 0
|
||||
|
||||
|
||||
def peak_detection_z_score(y, lag, threshold, influence):
|
||||
signals = np.zeros(len(y))
|
||||
filtered_y = np.array(y)
|
||||
avg_filter = [0] * len(y)
|
||||
std_filter = [0] * len(y)
|
||||
avg_filter[lag - 1] = np.mean(y[0:lag])
|
||||
std_filter[lag - 1] = np.std(y[0:lag])
|
||||
for i in range(lag, len(y)):
|
||||
if abs(y[i] - avg_filter[i - 1]) > threshold * std_filter[i - 1]:
|
||||
if y[i] > avg_filter[i - 1]:
|
||||
signals[i] = 1
|
||||
else:
|
||||
signals[i] = -1
|
||||
|
||||
filtered_y[i] = influence * y[i] + (1 - influence) * filtered_y[i - 1]
|
||||
else:
|
||||
signals[i] = 0
|
||||
filtered_y[i] = y[i]
|
||||
avg_filter[i] = np.mean(filtered_y[(i - lag + 1):i + 1])
|
||||
std_filter[i] = np.std(filtered_y[(i - lag + 1):i + 1])
|
||||
|
||||
return dict(signals=np.asarray(signals),
|
||||
avgFilter=np.asarray(avg_filter),
|
||||
stdFilter=np.asarray(std_filter))
|
||||
|
||||
|
||||
def test_zscore():
|
||||
from measurement_session import load_session_from_file, prune_overflown_session, prune
|
||||
import matplotlib.pyplot as plt
|
||||
|
||||
session_file = '../example_sessions/06.st'
|
||||
session = load_session_from_file(session_file)
|
||||
session = prune_overflown_session(session)
|
||||
session = prune(session, 10, 50)
|
||||
|
||||
lag = 8
|
||||
peak_detector_zscore = PeakDetectorZScore(lag=lag, threshold=2, influence=0)
|
||||
peaks = detect_peaks(session['values'], peak_detector_zscore)
|
||||
up_down = np.array([0] * lag + peak_detector_zscore.up_down_signal)
|
||||
up_down[up_down < 0] = -10
|
||||
up_down[up_down > 0] = 10000
|
||||
avgs = np.array([0] * lag + peak_detector_zscore._filter.means)
|
||||
filtered = np.array([0] * lag + peak_detector_zscore._filter.filtered)
|
||||
|
||||
plt.figure()
|
||||
plt.plot(session['timestamps'], session['values'], 'x-', label='data')
|
||||
plt.plot(session['timestamps'], filtered, 'x', label='filtered')
|
||||
plt.plot(session['timestamps'], up_down, '-', label='up_down')
|
||||
plt.plot(session['timestamps'], avgs, '-', label='avg')
|
||||
|
||||
# plt.plot(session['timestamps'][peaks+8], session['values'][peaks+8], 'o',
|
||||
# label=f"Simple {peak_detector_simple.threshold}")
|
||||
plt.title("Peak detection")
|
||||
plt.show()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
test_zscore()
|
||||
Reference in New Issue
Block a user