import numpy as np import msgpack def _decoding_hook(code, data): if code == -47 or code == 47 or code== -51: return np.frombuffer(data, dtype=np.int16) print(code) return 'unknown ext' def _encoding_hook(obj): if isinstance(obj, np.ndarray) and obj.dtype == np.int16: buffer = memoryview(obj).tobytes() assert len(buffer) / 2 == len(obj) return msgpack.ExtType(47, buffer) raise TypeError("Cannot pack: %s of type %s" % (obj, str(type(obj)))) def deserialize(stream): return msgpack.unpackb(stream, ext_hook=_decoding_hook, raw=False) def serialize(obj): return msgpack.packb(obj, default=_encoding_hook, use_bin_type=True) def load_session_from_file(file): with open(file, 'rb') as f: file_contents = f.read() return deserialize(file_contents) def plot_session(session): import matplotlib.pyplot as plt y = session['values'] if 'timestamps' in session: interval = 10 t = session['timestamps'] / interval time_range_seconds = t[-1] - t[0] description = f"Sparse session, {len(t)} data points, {time_range_seconds / 60} minutes" plt.plot(session['timestamps'], session['values'], 'x-') else: measurement_interval = session.get('interval', 100) t = np.arange(len(y)) * measurement_interval / 1000 description = f"Dense session, {len(t)} data points, {t[-1] / 60} minutes" plt.plot(t, y) plt.title(description) def prune_overflown_session(session, max_elements=8 * 1024): session = session.copy() session['values'] = session['values'][:max_elements] if 'timestamps' in session: session['timestamps'] = session['timestamps'][:max_elements] return session def prune(session, beginning=10, end=10): session = session.copy() session['values'] = session['values'][beginning:-end] session['timestamps'] = session['timestamps'][beginning:-end] session['startIndex'] += beginning return session #def extend(session, value, beginning, end): # session = session.copy() # session['values'] = np.concatenate((np.ones([beginning]) * value, session['values'], np.ones([end]) * value)) # what to put in time stamps? # return session def start_at_index(session, index): session = session.copy() to_remove = index - session['startIndex'] if to_remove > 0: session['values'] = session['values'][to_remove:] if 'timestamps' in session: session['timestamps'] = session['timestamps'][to_remove:] return session def test_serialization_deserialization(session_file): with open(session_file, 'rb') as f: raw_file_contents = f.read() session = load_session_from_file(session_file) serialized = serialize(session) deserialized = deserialize(serialized) np.testing.assert_equal(deserialized['values'], session['values']) if __name__ == '__main__': test_serialization_deserialization('../example_sessions/04.st')