swimtracker-firmware/firmware/websocket_test.py

159 lines
4.1 KiB
Python

import asyncio
import websockets
import struct
import numpy as np
from pprint import pprint
import datetime
import msgpack
import aiomonitor
class MsgManager:
def __init__(self):
self.msg_history = []
def add_msg(self, msg):
pprint(msg)
self.msg_history.append(msg)
send_functions = []
class MsgCode:
ERROR = 1
# device to frontend
INITIAL_INFO = 2
SESSION_STARTED = 3
SESSION_STOPPED = 4
SESSION_NEW_DATA = 5
ANSWER_USER_LIST = 6
ANSWER_SESSION_LIST = 7
WIFI_STATE_RESPONSE = 8
WIFI_SCAN_RESPONSE = 9
# from frontend to device
START_SESSION = 128
STOP_SESSION = 129
TARE = 130
QUERY_USER_LIST = 131
QUERY_SESSION_LIST = 132
WIFI_STATE_SET = 133
WIFI_STATE_GET = 134
WIFI_TRIGGER_SCAN = 135
async def send_message(websocket, msg_type, payload=None):
payload = struct.pack("<B", msg_type)
if payload is not None:
payload += msgpack.packb(payload, use_bin_type=True)
await websocket.send(payload)
# --------------------------------- Session API --------------------------------
def parse_session_initial_info(payload):
session_id = struct.unpack("<I", payload[1:5])[0]
return {'type': "session_initial_info",
'running': struct.unpack("<B", payload[0:1])[0],
'session_id': session_id,
'start_time': datetime.fromtimestamp(session_id),
'data': np.frombuffer(payload[5:], dtype=np.uint16),
}
def parse_session_started(payload):
session_id = struct.unpack("<I", payload)[0]
return {'type': "session_started",
'session_id': session_id,
'start_time': datetime.fromtimestamp(session_id),
}
def parse_session_stopped(payload):
assert len(payload) == 0
return {'type': "session_stopped"}
def parse_session_new_data(payload):
return {'type': "session_new_data", 'data': np.frombuffer(payload, dtype=np.uint16)}
async def send_session_start(websocket):
await send_message(websocket, MsgCode.START_SESSION)
async def send_session_stop(websocket):
await send_message(websocket, MsgCode.STOP_SESSION)
send_functions += [send_session_start, send_session_stop]
# ------------------------------- WiFi API --------------------------------------
def parse_wifi_state(payload):
return {'type': "wifi_state", 'data': msgpack.unpackb(payload, raw=True)}
def parse_wifi_scan(payload):
return {'type': "wifi_scan_response", 'data': msgpack.unpackb(payload, raw=True)}
async def send_wifi_state_get(websocket):
await send_message(websocket, MsgCode.WIFI_STATE_GET)
async def send_wifi_trigger_scan(websocket):
await send_message(websocket, MsgCode.WIFI_TRIGGER_SCAN)
#send_functions += [send_wifi_state_get, send_wifi_trigger_scan]
# -------------------------------------------------------------------------------
def parse_message(data):
parse_funcs = {
MsgCode.INITIAL_INFO: parse_session_initial_info,
MsgCode.SESSION_STARTED: parse_session_started,
MsgCode.SESSION_STOPPED: parse_session_stopped,
MsgCode.SESSION_NEW_DATA: parse_session_new_data,
MsgCode.WIFI_STATE_RESPONSE: parse_wifi_state,
MsgCode.WIFI_SCAN_RESPONSE: parse_wifi_scan,
}
msg_type = struct.unpack("<B", data[:1])[0]
payload = data[1:]
try:
parsed = parse_funcs[msg_type](payload)
except KeyError:
raise KeyError(f"Unknown message type {msg_type}")
return parsed
msg_manager = MsgManager()
async def main():
global msg_manager
uri = "ws://192.168.42.1:81"
async with websockets.connect(uri) as websocket:
for send_func in send_functions:
async def bound_func(*args, **kwargs):
await send_func(websocket, *args, **kwargs)
setattr(msg_manager, send_func.__name__, bound_func)
while True:
res = await websocket.recv()
msg = parse_message(res)
msg_manager.add_msg(msg)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
with aiomonitor.start_monitor(loop=loop, locals={'m': msg_manager}):
loop.run_until_complete(main())