96 lines
2.9 KiB
Python
96 lines
2.9 KiB
Python
import numpy as np
|
|
import msgpack
|
|
|
|
|
|
def _decoding_hook(code, data):
|
|
if code == -47 or code == 47 or code== -51:
|
|
return np.frombuffer(data, dtype=np.int16)
|
|
print(code)
|
|
return 'unknown ext'
|
|
|
|
|
|
def _encoding_hook(obj):
|
|
if isinstance(obj, np.ndarray) and obj.dtype == np.int16:
|
|
buffer = memoryview(obj).tobytes()
|
|
assert len(buffer) / 2 == len(obj)
|
|
return msgpack.ExtType(47, buffer)
|
|
raise TypeError("Cannot pack: %s of type %s" % (obj, str(type(obj))))
|
|
|
|
|
|
def deserialize(stream):
|
|
return msgpack.unpackb(stream, ext_hook=_decoding_hook, raw=False)
|
|
|
|
|
|
def serialize(obj):
|
|
return msgpack.packb(obj, default=_encoding_hook, use_bin_type=True)
|
|
|
|
|
|
def load_session_from_file(file):
|
|
with open(file, 'rb') as f:
|
|
file_contents = f.read()
|
|
return deserialize(file_contents)
|
|
|
|
|
|
def plot_session(session):
|
|
import matplotlib.pyplot as plt
|
|
y = session['values']
|
|
if 'timestamps' in session:
|
|
interval = 10
|
|
t = session['timestamps'] / interval
|
|
time_range_seconds = t[-1] - t[0]
|
|
description = f"Sparse session, {len(t)} data points, {time_range_seconds / 60} minutes"
|
|
plt.plot(session['timestamps'], session['values'], 'x-')
|
|
else:
|
|
measurement_interval = session.get('interval', 100)
|
|
t = np.arange(len(y)) * measurement_interval / 1000
|
|
description = f"Dense session, {len(t)} data points, {t[-1] / 60} minutes"
|
|
plt.plot(t, y)
|
|
plt.title(description)
|
|
|
|
|
|
def prune_overflown_session(session, max_elements=8 * 1024):
|
|
session = session.copy()
|
|
session['values'] = session['values'][:max_elements]
|
|
if 'timestamps' in session:
|
|
session['timestamps'] = session['timestamps'][:max_elements]
|
|
|
|
return session
|
|
|
|
|
|
def prune(session, beginning=10, end=10):
|
|
session = session.copy()
|
|
session['values'] = session['values'][beginning:-end]
|
|
session['timestamps'] = session['timestamps'][beginning:-end]
|
|
session['startIndex'] += beginning
|
|
return session
|
|
|
|
|
|
#def extend(session, value, beginning, end):
|
|
# session = session.copy()
|
|
# session['values'] = np.concatenate((np.ones([beginning]) * value, session['values'], np.ones([end]) * value))
|
|
# what to put in time stamps?
|
|
# return session
|
|
|
|
|
|
def start_at_index(session, index):
|
|
session = session.copy()
|
|
to_remove = index - session['startIndex']
|
|
if to_remove > 0:
|
|
session['values'] = session['values'][to_remove:]
|
|
if 'timestamps' in session:
|
|
session['timestamps'] = session['timestamps'][to_remove:]
|
|
return session
|
|
|
|
|
|
def test_serialization_deserialization(session_file):
|
|
with open(session_file, 'rb') as f:
|
|
raw_file_contents = f.read()
|
|
session = load_session_from_file(session_file)
|
|
serialized = serialize(session)
|
|
deserialized = deserialize(serialized)
|
|
np.testing.assert_equal(deserialized['values'], session['values'])
|
|
|
|
|
|
if __name__ == '__main__':
|
|
test_serialization_deserialization('../example_sessions/04.st')
|