swimtracker-firmware/python-mock/measurement_session.py

96 lines
2.9 KiB
Python
Raw Permalink Normal View History

2020-06-05 21:41:16 +02:00
import numpy as np
import msgpack
def _decoding_hook(code, data):
if code == -47 or code == 47 or code== -51:
return np.frombuffer(data, dtype=np.int16)
print(code)
return 'unknown ext'
def _encoding_hook(obj):
if isinstance(obj, np.ndarray) and obj.dtype == np.int16:
buffer = memoryview(obj).tobytes()
assert len(buffer) / 2 == len(obj)
return msgpack.ExtType(47, buffer)
raise TypeError("Cannot pack: %s of type %s" % (obj, str(type(obj))))
def deserialize(stream):
return msgpack.unpackb(stream, ext_hook=_decoding_hook, raw=False)
def serialize(obj):
return msgpack.packb(obj, default=_encoding_hook, use_bin_type=True)
def load_session_from_file(file):
with open(file, 'rb') as f:
file_contents = f.read()
return deserialize(file_contents)
def plot_session(session):
import matplotlib.pyplot as plt
y = session['values']
if 'timestamps' in session:
interval = 10
t = session['timestamps'] / interval
time_range_seconds = t[-1] - t[0]
description = f"Sparse session, {len(t)} data points, {time_range_seconds / 60} minutes"
plt.plot(session['timestamps'], session['values'], 'x-')
else:
measurement_interval = session.get('interval', 100)
t = np.arange(len(y)) * measurement_interval / 1000
description = f"Dense session, {len(t)} data points, {t[-1] / 60} minutes"
plt.plot(t, y)
plt.title(description)
def prune_overflown_session(session, max_elements=8 * 1024):
session = session.copy()
session['values'] = session['values'][:max_elements]
if 'timestamps' in session:
session['timestamps'] = session['timestamps'][:max_elements]
return session
def prune(session, beginning=10, end=10):
session = session.copy()
session['values'] = session['values'][beginning:-end]
session['timestamps'] = session['timestamps'][beginning:-end]
session['startIndex'] += beginning
return session
#def extend(session, value, beginning, end):
# session = session.copy()
# session['values'] = np.concatenate((np.ones([beginning]) * value, session['values'], np.ones([end]) * value))
# what to put in time stamps?
# return session
def start_at_index(session, index):
session = session.copy()
to_remove = index - session['startIndex']
if to_remove > 0:
session['values'] = session['values'][to_remove:]
if 'timestamps' in session:
session['timestamps'] = session['timestamps'][to_remove:]
return session
def test_serialization_deserialization(session_file):
with open(session_file, 'rb') as f:
raw_file_contents = f.read()
session = load_session_from_file(session_file)
serialized = serialize(session)
deserialized = deserialize(serialized)
np.testing.assert_equal(deserialized['values'], session['values'])
if __name__ == '__main__':
test_serialization_deserialization('../example_sessions/04.st')