swimtracker-firmware/firmware/websocket_test.py

37 lines
1.3 KiB
Python
Raw Normal View History

2020-06-25 22:01:53 +02:00
import asyncio
import websockets
import struct
import numpy as np
INITIAL_INFO = 1
SESSION_STARTED = 2
SESSION_STOPPED = 3
SESSION_NEW_DATA = 4
async def hello():
uri = "ws://192.168.178.110:81"
async with websockets.connect(uri) as websocket:
while True:
res = await websocket.recv()
msg_type = struct.unpack("<B", res[:1])[0]
payload = res[1:]
if msg_type == INITIAL_INFO:
running = struct.unpack("<B", payload[0:1])[0]
session_id = struct.unpack("<I", payload[1:5])[0]
data = np.frombuffer(payload[5:], dtype=np.uint16)
print(f"Initial info: running {running} session_id {session_id} data", data)
elif msg_type == SESSION_STARTED:
id = struct.unpack("<I", payload)[0]
print(f"Session with id {id} started")
elif msg_type == SESSION_STOPPED:
assert len(payload) == 0
print("Session stopped")
elif msg_type == SESSION_NEW_DATA:
data = np.frombuffer(payload, dtype=np.uint16)
print("New data", data)
else:
print("Got unexpected packet of type", msg_type)
asyncio.get_event_loop().run_until_complete(hello())