Wifi scanning

This commit is contained in:
Martin Bauer 2021-06-06 15:56:30 +02:00
parent 26d3469c83
commit fdfd26307e
10 changed files with 846 additions and 380 deletions

View File

@ -78,12 +78,13 @@ void WifiManager::resetToApProvisioning()
prefs_.remove("staPassword"); prefs_.remove("staPassword");
} }
void WifiManager::wifiWatchdog() void WifiManager::iteration()
{ {
if (state_ == STA && WiFi.status() != WL_CONNECTED) { if (state_ == STA && WiFi.status() != WL_CONNECTED) {
startWifi(); startWifi();
Serial.println("Connection lost - Restarting WIFI"); Serial.println("Connection lost - Restarting WIFI");
} }
} }

View File

@ -14,7 +14,7 @@
* *
* When operating in access point mode, the device IP is 192.168.42.1 * When operating in access point mode, the device IP is 192.168.42.1
* *
* call wifiWatchdog regularly to reconnect in station mode if connection was lost * call iteration() regularly, it has a wifiWatchdog to reconnect in station mode if connection was lost
*/ */
class WifiManager class WifiManager
{ {
@ -34,7 +34,7 @@ public:
void setApCredentials(const char *password); void setApCredentials(const char *password);
void resetToApProvisioning(); void resetToApProvisioning();
void wifiWatchdog(); void iteration();
bool inProvisioningMode() const { return state_ == INVALID || state_ == AP_PROVISIONING; } bool inProvisioningMode() const { return state_ == INVALID || state_ == AP_PROVISIONING; }

View File

@ -12,6 +12,8 @@ enum class MessageCode : uint8_t
SESSION_NEW_DATA = 5, SESSION_NEW_DATA = 5,
ANSWER_USER_LIST = 6, ANSWER_USER_LIST = 6,
ANSWER_SESSION_LIST = 7, ANSWER_SESSION_LIST = 7,
WIFI_STATE_RESPONSE = 8,
WIFI_SCAN_RESPONSE = 9,
// from frontend to device // from frontend to device
START_SESSION = 128, START_SESSION = 128,
@ -21,4 +23,5 @@ enum class MessageCode : uint8_t
QUERY_SESSION_LIST = 132, QUERY_SESSION_LIST = 132,
WIFI_STATE_SET = 133, WIFI_STATE_SET = 133,
WIFI_STATE_GET = 134, WIFI_STATE_GET = 134,
WIFI_TRIGGER_SCAN = 135,
}; };

View File

@ -2,13 +2,11 @@
#include <cstdint> #include <cstdint>
// Uncomment for Version 2.0 where load cell is connected differently // Uncomment for Version 2.0 where load cell is connected differently
//#define _HW_V_20 #define _HW_V_20
#define NEW_HEAVY_LOAD_CELL #define NEW_HEAVY_LOAD_CELL
constexpr const char *CONFIG_HOSTNAME = "swimtracker";
const char *CONFIG_HOSTNAME = "swimtracker";
// ------------------------------------- Hardware & Measurement Settings ------------------------------------------------------------ // ------------------------------------- Hardware & Measurement Settings ------------------------------------------------------------
@ -16,7 +14,7 @@ const uint8_t CONFIG_MEASUREMENT_AVG_COUNT = 1; // number of measurements in
const uint8_t CONFIG_TARE_AVG_COUNT = 20; // number of measurements in tare-phase (to find 0 ) const uint8_t CONFIG_TARE_AVG_COUNT = 20; // number of measurements in tare-phase (to find 0 )
const int CONFIG_MEASURE_DELAY = 100; // interval in ms between measurements const int CONFIG_MEASURE_DELAY = 100; // interval in ms between measurements
const uint32_t CONFIG_SESSION_MAX_LENGTH_HOURS = 3; // maximum length of one session const uint32_t CONFIG_SESSION_MAX_LENGTH_HOURS = 3; // maximum length of one session
const char *CONFIG_DATA_PATH = "/dat"; // folder in SPIFFS file system to store measurement data constexpr const char *CONFIG_DATA_PATH = "/dat"; // folder in SPIFFS file system to store measurement data
using MeasurementT = uint16_t; // data type for one measurement using MeasurementT = uint16_t; // data type for one measurement
#ifdef NEW_HEAVY_LOAD_CELL #ifdef NEW_HEAVY_LOAD_CELL
const int CONFIG_VALUE_RIGHT_SHIFT = 3; // uint32 measurements are divided by this power, before stored in uint16_t const int CONFIG_VALUE_RIGHT_SHIFT = 3; // uint32 measurements are divided by this power, before stored in uint16_t
@ -25,23 +23,22 @@ const int CONFIG_VALUE_RIGHT_SHIFT = 7;
#endif #endif
const MeasurementT CONFIG_KG_FACTOR_INV = 701; // after shifting - how many "measurement units" are one kg const MeasurementT CONFIG_KG_FACTOR_INV = 701; // after shifting - how many "measurement units" are one kg
static constexpr int MAX_WEBSOCKET_CONNECTIONS = 3; // maximal number of websocket connections maintained at the same time
const char * UPDATE_URL = "https://swimtracker-update.bauer.tech/firmware.bin"; constexpr const char *UPDATE_URL = "https://swimtracker-update.bauer.tech/firmware.bin";
// auto start/stop // auto start/stop
MeasurementT CONFIG_AUTO_START_MIN_THRESHOLD = CONFIG_KG_FACTOR_INV * 1; constexpr MeasurementT CONFIG_AUTO_START_MIN_THRESHOLD = CONFIG_KG_FACTOR_INV * 1;
MeasurementT CONFIG_AUTO_START_MAX_THRESHOLD = CONFIG_KG_FACTOR_INV * 3; constexpr MeasurementT CONFIG_AUTO_START_MAX_THRESHOLD = CONFIG_KG_FACTOR_INV * 3;
uint32_t CONFIG_AUTO_START_MAX_MEASUREMENTS_BETWEEN_PEAKS = (1000 / CONFIG_MEASURE_DELAY) * 6; constexpr uint32_t CONFIG_AUTO_START_MAX_MEASUREMENTS_BETWEEN_PEAKS = (1000 / CONFIG_MEASURE_DELAY) * 6;
MeasurementT CONFIG_AUTO_STOP_THRESHOLD = CONFIG_KG_FACTOR_INV * 1; constexpr MeasurementT CONFIG_AUTO_STOP_THRESHOLD = CONFIG_KG_FACTOR_INV * 1;
//uint32_t CONFIG_AUTO_STOP_NUM_MEASUREMENTS = (1000 / CONFIG_MEASURE_DELAY) * 60 * 15; //uint32_t CONFIG_AUTO_STOP_NUM_MEASUREMENTS = (1000 / CONFIG_MEASURE_DELAY) * 60 * 15;
uint32_t CONFIG_AUTO_STOP_NUM_MEASUREMENTS = (1000 / CONFIG_MEASURE_DELAY) * 30; constexpr uint32_t CONFIG_AUTO_STOP_NUM_MEASUREMENTS = (1000 / CONFIG_MEASURE_DELAY) * 30;
// ------------------------------------- Derived Settings ----------------------------------------------------------------------------- // ------------------------------------- Derived Settings -----------------------------------------------------------------------------
const uint32_t CONFIG_SESSION_MAX_SIZE = CONFIG_SESSION_MAX_LENGTH_HOURS * 3600 * (1000 / CONFIG_MEASURE_DELAY) * sizeof(uint16_t); constexpr uint32_t CONFIG_SESSION_MAX_SIZE = CONFIG_SESSION_MAX_LENGTH_HOURS * 3600 * (1000 / CONFIG_MEASURE_DELAY) * sizeof(uint16_t);
static_assert(CONFIG_SESSION_MAX_SIZE < 1024 * 1024, "Measurement data takes more than 1MiB space"); static_assert(CONFIG_SESSION_MAX_SIZE < 1024 * 1024, "Measurement data takes more than 1MiB space");
#ifdef _HW_V_20 #ifdef _HW_V_20

View File

@ -2,21 +2,23 @@
#pragma once #pragma once
#include "Dtypes.h" #include "Dtypes.h"
#include "UserDB.h" #include "UserDB.h"
#include "MessageCodes.h"
#include <ArduinoWebsockets.h> #include <ArduinoWebsockets.h>
#include <type_traits>
template <typename T> template <typename T>
class SessionManager; class SessionManager;
static constexpr int MAX_CONNECTIONS = 3;
static constexpr int NUM_DATA_CHUNK_SIZE = 1; static constexpr int NUM_DATA_CHUNK_SIZE = 1;
template <typename SessionT> template <typename ApiManagerTuple>
class WebsocketServer class WebsocketServer
{ {
public: public:
WebsocketServer(SessionManager<SessionT> &sessionManager, UserStorage &userStorage, int port) WebsocketServer(int port, ApiManagerTuple &tuple)
: sessionManager_(sessionManager), userStorage_(userStorage), nextFreeClient_(0), port_(port), : port_(port), nextFreeClient_(0), apiManagers_(tuple)
running_(false)
{ {
} }
@ -25,328 +27,145 @@ public:
server_.listen(port_); server_.listen(port_);
} }
void iteration(); void iteration()
private:
void reportSessionUpdate();
void sendMessageOnConnection(websockets::WebsocketsClient &client);
void sendSessionStartMessages();
void sendSessionStopMessages();
void sendNewDataMessages();
void sendUserList(websockets::WebsocketsClient &client);
void sendSessionList(websockets::WebsocketsClient &client, const String &userId);
SessionManager<SessionT> &sessionManager_;
UserStorage &userStorage_;
int nextFreeClient_;
int port_;
size_t sentMessageCount_;
websockets::WebsocketsServer server_;
websockets::WebsocketsClient clients_[MAX_CONNECTIONS];
// previous session state
size_t numSentMeasurements_[MAX_CONNECTIONS];
bool running_;
};
using websockets::WebsocketsClient;
// ------------------------------------- Message types & classes ---------------------------
enum MessageType
{
// from swim tracker device to frontend
INITIAL_INFO = 1,
SESSION_STARTED = 2,
SESSION_STOPPED = 3,
SESSION_NEW_DATA = 4,
ANSWER_USER_LIST = 5,
ANSWER_SESSION_LIST = 6,
// from frontend to device
START_SESSION = 7,
STOP_SESSION = 8,
TARE = 9,
QUERY_USER_LIST = 10,
QUERY_SESSION_LIST = 11,
};
#pragma pack(push, 1)
class SessionStartedMessage
{
public:
SessionStartedMessage(uint32_t id) : messageType_(SESSION_STARTED), sessionId_(id) {}
void send(WebsocketsClient &c) const
{ {
c.sendBinary((const char *)(this), sizeof(*this));
}
private:
uint8_t messageType_;
uint32_t sessionId_;
};
class SessionStoppedMessage
{
public:
SessionStoppedMessage() : messageType_(SESSION_STOPPED) {}
void send(WebsocketsClient &c) const
{
c.sendBinary((const char *)(this), sizeof(*this));
}
private:
uint8_t messageType_;
};
template <typename MeasurementT>
class SessionNewDataMessage
{
public:
// typically a message contains NUM_DATA_CHUNK_SIZE measurements
// if some measurements are skipped, because loop() takes too long
// there might actually be more measurements, to be safe there is an
// additional factor here
static constexpr size_t MAX_MEASUREMENTS = 4 * NUM_DATA_CHUNK_SIZE;
SessionNewDataMessage(MeasurementT *ptr, size_t numMeasurements)
: messageType_(SESSION_NEW_DATA), numMeasurements_(min(numMeasurements, MAX_MEASUREMENTS))
{
memcpy(measurements_, ptr, sizeof(MeasurementT) * numMeasurements_);
}
void send(WebsocketsClient &c) const
{
c.sendBinary((const char *)(this), numBytes());
}
size_t numMeasurements() const
{
return numMeasurements_;
}
private:
size_t numBytes() const { return sizeof(uint8_t) + numMeasurements() * sizeof(MeasurementT); }
// data to be sent
uint8_t messageType_;
MeasurementT measurements_[MAX_MEASUREMENTS];
// book-keeping
size_t numMeasurements_;
};
#pragma pack(pop)
// ------------------------------------- WebsocketServer members ---------------------------
template <typename SessionT>
void WebsocketServer<SessionT>::iteration()
{
using namespace websockets; using namespace websockets;
const auto onMessage = [this](WebsocketsClient &client, WebsocketsMessage message)
auto onMessage = [this](WebsocketsClient &client, WebsocketsMessage message) { {
if (message.isPing()) if (message.isPing())
client.pong(); client.pong();
else if (message.isBinary()) else if (message.isBinary())
{ {
const char *data = message.c_str(); const char *data = message.c_str();
const size_t length = message.length(); const size_t length = message.length();
if (length < 1)
{
client.close(CloseReason_UnsupportedData);
return;
}
uint8_t opCode = uint8_t(data[0]); MessageCode msgCode = MessageCode((uint8_t)(data[0]));
switch (opCode) this->handlMessageImpl(client, msgCode, data + 1, length - 1);
{
case START_SESSION:
this->sessionManager_.startMeasurements();
break;
case STOP_SESSION:
this->sessionManager_.stopMeasurements();
break;
case TARE:
this->sessionManager_.tare();
break;
case QUERY_USER_LIST:
this->sendUserList(client);
break;
case QUERY_SESSION_LIST:
{
StaticJsonDocument<USER_STRING_ID_MAX_LEN + 16> doc;
deserializeMsgPack(doc, data, length);
String userId = doc.as<String>();
if (userId.length() > 0)
this->sendSessionList(client, userId);
} }
break; else
default:
client.close(CloseReason_UnsupportedData); client.close(CloseReason_UnsupportedData);
return;
}
}
}; };
if (server_.poll()) if (server_.poll())
{ {
Serial.println("new websocket connection");
clients_[nextFreeClient_] = server_.accept(); clients_[nextFreeClient_] = server_.accept();
clients_[nextFreeClient_].onMessage(onMessage); clients_[nextFreeClient_].onMessage(onMessage);
Serial.println("new websocket connection"); this->onClientConnectImpl(clients_[nextFreeClient_]);
sendMessageOnConnection(clients_[nextFreeClient_]); nextFreeClient_ = (nextFreeClient_ + 1) % MAX_WEBSOCKET_CONNECTIONS;
numSentMeasurements_[nextFreeClient_] = sessionManager_.session().numMeasurements();
nextFreeClient_ = (nextFreeClient_ + 1) % MAX_CONNECTIONS;
} }
for (int i = 0; i < MAX_CONNECTIONS; ++i) for (int i = 0; i < MAX_WEBSOCKET_CONNECTIONS; ++i)
clients_[i].poll(); clients_[i].poll();
reportSessionUpdate(); this->iterationImpl<>();
}
template <typename SessionT>
void WebsocketServer<SessionT>::reportSessionUpdate()
{
if (!running_ && sessionManager_.isMeasuring())
{
sendSessionStartMessages();
for (int i = 0; i < MAX_CONNECTIONS; ++i)
numSentMeasurements_[i] = 0;
} }
else if (running_ && !sessionManager_.isMeasuring())
template <size_t bufferSize>
void sendToAll(MessageCode msgCode, const JsonDocument &content)
{ {
sendSessionStopMessages(); char buffer[bufferSize];
for (int i = 0; i < MAX_CONNECTIONS; ++i) buffer[0] = (char)(msgCode);
numSentMeasurements_[i] = 0; size_t bytesWritten = serializeMsgPack(content, buffer + sizeof(msgCode), bufferSize - sizeof(msgCode));
for (int i = 0; i < MAX_WEBSOCKET_CONNECTIONS; ++i)
if (clients_[i].available())
clients_[i].sendBinary(buffer, bytesWritten);
} }
sendNewDataMessages();
}
template <typename SessionT> void sendToAll(MessageCode msgCode, const JsonDocument &content)
void WebsocketServer<SessionT>::sendUserList(websockets::WebsocketsClient &client)
{
const auto numUsers = userStorage_.numUsers();
constexpr size_t constantSlack = 64;
DynamicJsonDocument result(JSON_ARRAY_SIZE(numUsers) + numUsers * (USER_STRING_ID_MAX_LEN + 2) + constantSlack);
JsonArray arr = result.to<JsonArray>();
for (auto userIt = userStorage_.beginWithoutUnassigned(); userIt != userStorage_.end(); ++userIt)
arr.add(userIt->stringId());
char buffer[MAX_USERS * (USER_STRING_ID_MAX_LEN + 1) + constantSlack];
size_t bytesWritten = serializeMsgPack(result, buffer, sizeof(buffer));
client.sendBinary(buffer, bytesWritten);
}
template <typename SessionT>
void WebsocketServer<SessionT>::sendSessionList(websockets::WebsocketsClient &client, const String &userId)
{
User *user = userStorage_.getUserInfo(userId);
if (user != nullptr)
{ {
DynamicJsonDocument result(JSON_ARRAY_SIZE(user->numSessions()) + user->numSessions() * (sizeof(SessionIdType) + 8)); size_t expectedSize = measureMsgPack(content);
JsonArray arr = result.to<JsonArray>(); char *buffer = (char *)malloc(expectedSize + sizeof(msgCode));
for (SessionIdType *sIt = user->sessionBegin(); sIt != user->sessionEnd(); ++sIt) buffer[0] = (char)(msgCode);
arr.add(*sIt); size_t bytesWritten = serializeMsgPack(content, buffer + sizeof(msgCode), expectedSize);
for (int i = 0; i < MAX_WEBSOCKET_CONNECTIONS; ++i)
if (clients_[i].available())
clients_[i].sendBinary(buffer, bytesWritten + sizeof(msgCode));
size_t bytesToWrite = measureMsgPack(result);
char *buffer = (char *)malloc(bytesToWrite);
size_t bytesWritten = serializeMsgPack(result, buffer, bytesToWrite);
assert(bytesWritten <= bytesToWrite);
client.sendBinary(buffer, bytesWritten);
free(buffer); free(buffer);
} }
void sendToAll(MessageCode msgCode)
{
for (int i = 0; i < MAX_WEBSOCKET_CONNECTIONS; ++i)
if (clients_[i].available())
clients_[i].sendBinary((const char*)&msgCode, sizeof(MessageCode));
}
websockets::WebsocketsClient &client(size_t i) { return clients_[i]; }
private:
// -- Tuple calls
template <size_t managerIdx = std::tuple_size<ApiManagerTuple>::value - 1, typename std::enable_if<managerIdx != 0, bool>::type = true>
void iterationImpl()
{
std::get<managerIdx>(apiManagers_).iteration(*this);
iterationImpl<managerIdx - 1>();
}
template <size_t managerIdx, typename std::enable_if<managerIdx == 0, bool>::type = true>
void iterationImpl() {}
template <size_t managerIdx = std::tuple_size<ApiManagerTuple>::value - 1, typename std::enable_if<managerIdx != 0, bool>::type = true>
bool handlMessageImpl(websockets::WebsocketsClient &client, MessageCode code, const char *payload, size_t size)
{
bool handled = std::get<managerIdx>(apiManagers_).handleMessage(client, code, payload, size);
if (handled)
return true;
else else
{ return handlMessageImpl<managerIdx - 1>(client, code, payload, size);
DynamicJsonDocument result(JSON_ARRAY_SIZE(1) + 8);
result.to<JsonArray>();
char buffer[32];
size_t bytesWritten = serializeMsgPack(result, buffer, sizeof(buffer));
client.sendBinary(buffer, bytesWritten);
} }
} template <size_t managerIdx, typename std::enable_if<managerIdx == 0, bool>::type = true>
bool handlMessageImpl(websockets::WebsocketsClient &, MessageCode, const char *, size_t) { return false; }
template <typename SessionT> template <size_t managerIdx = std::tuple_size<ApiManagerTuple>::value - 1, typename std::enable_if<managerIdx != 0, bool>::type = true>
void WebsocketServer<SessionT>::sendSessionStartMessages() void onClientConnectImpl(websockets::WebsocketsClient &client)
{
SessionStartedMessage msg(sessionManager_.session().getStartTime());
for (auto &c : clients_)
if (c.available())
msg.send(c);
running_ = sessionManager_.isMeasuring();
}
template <typename SessionT>
void WebsocketServer<SessionT>::sendSessionStopMessages()
{
SessionStoppedMessage msg;
for (auto &c : clients_)
if (c.available())
msg.send(c);
running_ = sessionManager_.isMeasuring();
}
template <typename SessionT>
void WebsocketServer<SessionT>::sendNewDataMessages()
{
using MeasurementT = typename SessionT::MeasurementType;
auto &session = sessionManager_.session();
for (int i = 0; i < MAX_CONNECTIONS; ++i)
{ {
auto &c = clients_[i]; std::get<managerIdx>(apiManagers_).onClientConnect(client);
if (c.available()) onClientConnectImpl<managerIdx - 1>(client);
{
MeasurementT *dataToSend = session.getDataPointer() + numSentMeasurements_[i];
int32_t numMeasurementsToSend = int32_t(session.numMeasurements()) - int32_t(numSentMeasurements_[i]);
if (numMeasurementsToSend > 0)
{
SessionNewDataMessage<MeasurementT> msg(dataToSend, numMeasurementsToSend);
msg.send(c);
numSentMeasurements_[i] += msg.numMeasurements();
} }
} template <size_t managerIdx, typename std::enable_if<managerIdx == 0, bool>::type = true>
} void onClientConnectImpl(websockets::WebsocketsClient &client) {}
}
template <typename SessionT> // -- Members
void WebsocketServer<SessionT>::sendMessageOnConnection(WebsocketsClient &client)
int port_;
int nextFreeClient_;
ApiManagerTuple apiManagers_;
websockets::WebsocketsServer server_;
websockets::WebsocketsClient clients_[MAX_WEBSOCKET_CONNECTIONS];
};
template<typename... ApiManagers>
inline WebsocketServer<std::tuple<ApiManagers...>> makeWebsocketServer(int port, ApiManagers... managers)
{ {
using MeasurementT = typename SessionT::MeasurementType; auto tuple = std::make_tuple(managers...);
return WebsocketServer<decltype(tuple)>(port, tuple);
// Message format: }
// - uint8_t messageType
// - uint8_t running template <size_t bufferSize>
// - uint32_t sessionId inline void sendToClient(websockets::WebsocketsClient &client, MessageCode msgCode, const JsonDocument &content)
// - MeasurementT [] measurements (if running) {
char buffer[bufferSize];
auto &session = sessionManager_.session(); buffer[0] = (char)(msgCode);
const auto numMeasurements = session.numMeasurements(); size_t bytesWritten = serializeMsgPack(content, buffer + sizeof(msgCode), bufferSize - sizeof(msgCode));
const auto sessionId = session.getStartTime(); client.sendBinary(buffer, bytesWritten + 1);
}
const size_t msgSize = sizeof(uint8_t) + sizeof(uint8_t) + sizeof(sessionId) + sizeof(MeasurementT) * numMeasurements;
char *msg = (char *)heap_caps_malloc(msgSize, MALLOC_CAP_SPIRAM); inline void sendToClient(websockets::WebsocketsClient &client, MessageCode msgCode, const JsonDocument &content)
{
char *writeHead = msg; size_t expectedSize = measureMsgPack(content);
char *buffer = (char *)malloc(expectedSize + sizeof(msgCode));
*writeHead = INITIAL_INFO; buffer[0] = static_cast<char>(msgCode);
writeHead += sizeof(uint8_t); size_t bytesWritten = serializeMsgPack(content, buffer + sizeof(msgCode), expectedSize);
client.sendBinary(buffer, bytesWritten + sizeof(msgCode));
*writeHead = sessionManager_.isMeasuring(); free(buffer);
writeHead += sizeof(uint8_t); }
*((uint32_t *)writeHead) = sessionManager_.isMeasuring() ? sessionId : 0; inline void sendErrorToClient(websockets::WebsocketsClient &client, const char *msg)
writeHead += sizeof(uint32_t); {
DynamicJsonDocument doc(strlen(msg) + 64);
assert(writeHead - msg == msgSize - sizeof(MeasurementT) * numMeasurements); doc["msg"] = msg;
sendToClient(client, MessageCode::ERROR, doc);
memcpy(writeHead, session.getDataPointer(), sizeof(MeasurementT) * numMeasurements);
client.sendBinary(msg, msgSize);
free(msg);
} }

View File

@ -0,0 +1,436 @@
#pragma once
#include "Dtypes.h"
#include "UserDB.h"
#include "MessageCodes.h"
#include <ArduinoWebsockets.h>
template <typename T>
class SessionManager;
static constexpr int NUM_DATA_CHUNK_SIZE = 1;
template <typename ApiManagerTuple>
class WebsocketInterface
{
public:
WebsocketInterface(int port) : port_(port), nextFreeClient_(0)
{
}
void begin()
{
server_.listen(port_);
}
void iteration();
template <size_t bufferSize>
void sendToAll(MessageCode msgCode, const JsonDocument &content)
{
char buffer[bufferSize];
buffer[0] = msgCode;
size_t bytesWritten = serializeMsgPack(content, buffer + sizeof(msgCode), bufferSize - sizeof(msgCode));
for (int i = 0; i < MAX_WEBSOCKET_CONNECTIONS; ++i)
if (clients_[i].available())
clients_[i].sendBinary(buffer, bytesWritten);
}
void sendToAll(MessageCode msgCode, const JsonDocument &content)
{
size_t expectedSize = measureMsgPack(content);
char *buffer = (char *)malloc(expectedSize + sizeof(msgCode));
buffer[0] = msgCode;
size_t bytesWritten = serializeMsgPack(content, buffer + sizeof(msgCode), expectedSize);
for (int i = 0; i < MAX_WEBSOCKET_CONNECTIONS; ++i)
if (clients_[i].available())
clients_[i].sendBinary(buffer, bytesWritten + sizeof(msgCode));
free(buffer);
}
void sendToAll(MessageCode msgCode)
{
for (int i = 0; i < MAX_WEBSOCKET_CONNECTIONS; ++i)
if (clients_[i].available())
clients_[i].sendBinary(&msgCode, sizeof(MessageCode));
}
websockets::WebsocketsClient &client(size_t i) { return clients_[i]; }
private:
int port_;
int nextFreeClient_;
websockets::WebsocketsServer server_;
websockets::WebsocketsClient clients_[MAX_WEBSOCKET_CONNECTIONS];
};
template <size_t bufferSize>
inline void sendToClient(websockets::WebsocketsClient &client, MessageCode msgCode, const JsonDocument &content)
{
char buffer[bufferSize];
buffer[0] = msgCode;
size_t bytesWritten = serializeMsgPack(content, buffer + sizeof(msgCode), bufferSize - sizeof(msgCode));
client.sendBinary(buffer, bytesWritten);
}
inline void sendToClient(websockets::WebsocketsClient &client, MessageCode msgCode, const JsonDocument &content)
{
size_t expectedSize = measureMsgPack(content);
char *buffer = (char *)malloc(expectedSize + sizeof(msgCode));
buffer[0] = static_cast<char>(msgCode);
size_t bytesWritten = serializeMsgPack(content, buffer + sizeof(msgCode), expectedSize);
client.sendBinary(buffer, bytesWritten + sizeof(msgCode));
free(buffer);
}
inline void sendErrorToClient(websockets::WebsocketsClient &client, const char *msg)
{
DynamicJsonDocument doc(strlen(msg) + 64);
doc["msg"] = msg;
sendToClient(client, MessageCode::ERROR, doc);
}
template <typename SessionT>
class WebsocketServer
{
public:
WebsocketServer(SessionManager<SessionT> &sessionManager, UserStorage &userStorage, int port)
: sessionManager_(sessionManager), userStorage_(userStorage), nextFreeClient_(0), port_(port),
running_(false)
{
}
void begin()
{
server_.listen(port_);
}
void iteration();
private:
void reportSessionUpdate();
void sendMessageOnConnection(websockets::WebsocketsClient &client);
void sendSessionStartMessages();
void sendSessionStopMessages();
void sendNewDataMessages();
void sendUserList(websockets::WebsocketsClient &client);
void sendSessionList(websockets::WebsocketsClient &client, const String &userId);
SessionManager<SessionT> &sessionManager_;
UserStorage &userStorage_;
int nextFreeClient_;
int port_;
size_t sentMessageCount_;
websockets::WebsocketsServer server_;
websockets::WebsocketsClient clients_[MAX_WEBSOCKET_CONNECTIONS];
// previous session state
size_t numSentMeasurements_[MAX_WEBSOCKET_CONNECTIONS];
bool running_;
};
using websockets::WebsocketsClient;
// ------------------------------------- Message types & classes ---------------------------
enum MessageType
{
// from swim tracker device to frontend
INITIAL_INFO = 1,
SESSION_STARTED = 2,
SESSION_STOPPED = 3,
SESSION_NEW_DATA = 4,
ANSWER_USER_LIST = 5,
ANSWER_SESSION_LIST = 6,
// from frontend to device
START_SESSION = 7,
STOP_SESSION = 8,
TARE = 9,
QUERY_USER_LIST = 10,
QUERY_SESSION_LIST = 11,
};
#pragma pack(push, 1)
class SessionStartedMessage
{
public:
SessionStartedMessage(uint32_t id) : messageType_(SESSION_STARTED), sessionId_(id) {}
void send(WebsocketsClient &c) const
{
c.sendBinary((const char *)(this), sizeof(*this));
}
private:
uint8_t messageType_;
uint32_t sessionId_;
};
class SessionStoppedMessage
{
public:
SessionStoppedMessage() : messageType_(SESSION_STOPPED) {}
void send(WebsocketsClient &c) const
{
c.sendBinary((const char *)(this), sizeof(*this));
}
private:
uint8_t messageType_;
};
template <typename MeasurementT>
class SessionNewDataMessage
{
public:
// typically a message contains NUM_DATA_CHUNK_SIZE measurements
// if some measurements are skipped, because loop() takes too long
// there might actually be more measurements, to be safe there is an
// additional factor here
static constexpr size_t MAX_MEASUREMENTS = 4 * NUM_DATA_CHUNK_SIZE;
SessionNewDataMessage(MeasurementT *ptr, size_t numMeasurements)
: messageType_(SESSION_NEW_DATA), numMeasurements_(min(numMeasurements, MAX_MEASUREMENTS))
{
memcpy(measurements_, ptr, sizeof(MeasurementT) * numMeasurements_);
}
void send(WebsocketsClient &c) const
{
c.sendBinary((const char *)(this), numBytes());
}
size_t numMeasurements() const
{
return numMeasurements_;
}
private:
size_t numBytes() const { return sizeof(uint8_t) + numMeasurements() * sizeof(MeasurementT); }
// data to be sent
uint8_t messageType_;
MeasurementT measurements_[MAX_MEASUREMENTS];
// book-keeping
size_t numMeasurements_;
};
#pragma pack(pop)
// ------------------------------------- WebsocketServer members ---------------------------
template <typename SessionT>
void WebsocketServer<SessionT>::iteration()
{
using namespace websockets;
auto onMessage = [this](WebsocketsClient &client, WebsocketsMessage message)
{
if (message.isPing())
client.pong();
else if (message.isBinary())
{
const char *data = message.c_str();
const size_t length = message.length();
if (length < 1)
{
client.close(CloseReason_UnsupportedData);
return;
}
uint8_t opCode = uint8_t(data[0]);
switch (opCode)
{
case START_SESSION:
this->sessionManager_.startMeasurements();
break;
case STOP_SESSION:
this->sessionManager_.stopMeasurements();
break;
case TARE:
this->sessionManager_.tare();
break;
case QUERY_USER_LIST:
this->sendUserList(client);
break;
case QUERY_SESSION_LIST:
{
StaticJsonDocument<USER_STRING_ID_MAX_LEN + 16> doc;
deserializeMsgPack(doc, data, length);
String userId = doc.as<String>();
if (userId.length() > 0)
this->sendSessionList(client, userId);
}
break;
default:
client.close(CloseReason_UnsupportedData);
return;
}
}
};
if (server_.poll())
{
clients_[nextFreeClient_] = server_.accept();
clients_[nextFreeClient_].onMessage(onMessage);
Serial.println("new websocket connection");
sendMessageOnConnection(clients_[nextFreeClient_]);
numSentMeasurements_[nextFreeClient_] = sessionManager_.session().numMeasurements();
nextFreeClient_ = (nextFreeClient_ + 1) % MAX_WEBSOCKET_CONNECTIONS;
}
for (int i = 0; i < MAX_WEBSOCKET_CONNECTIONS; ++i)
clients_[i].poll();
reportSessionUpdate();
}
template <typename SessionT>
void WebsocketServer<SessionT>::reportSessionUpdate()
{
if (!running_ && sessionManager_.isMeasuring())
{
sendSessionStartMessages();
for (int i = 0; i < MAX_WEBSOCKET_CONNECTIONS; ++i)
numSentMeasurements_[i] = 0;
}
else if (running_ && !sessionManager_.isMeasuring())
{
sendSessionStopMessages();
for (int i = 0; i < MAX_WEBSOCKET_CONNECTIONS; ++i)
numSentMeasurements_[i] = 0;
}
sendNewDataMessages();
}
template <typename SessionT>
void WebsocketServer<SessionT>::sendUserList(websockets::WebsocketsClient &client)
{
const auto numUsers = userStorage_.numUsers();
constexpr size_t constantSlack = 64;
DynamicJsonDocument result(JSON_ARRAY_SIZE(numUsers) + numUsers * (USER_STRING_ID_MAX_LEN + 2) + constantSlack);
JsonArray arr = result.to<JsonArray>();
for (auto userIt = userStorage_.beginWithoutUnassigned(); userIt != userStorage_.end(); ++userIt)
arr.add(userIt->stringId());
char buffer[MAX_USERS * (USER_STRING_ID_MAX_LEN + 1) + constantSlack];
size_t bytesWritten = serializeMsgPack(result, buffer, sizeof(buffer));
client.sendBinary(buffer, bytesWritten);
}
template <typename SessionT>
void WebsocketServer<SessionT>::sendSessionList(websockets::WebsocketsClient &client, const String &userId)
{
User *user = userStorage_.getUserInfo(userId);
if (user != nullptr)
{
DynamicJsonDocument result(JSON_ARRAY_SIZE(user->numSessions()) + user->numSessions() * (sizeof(SessionIdType) + 8));
JsonArray arr = result.to<JsonArray>();
for (SessionIdType *sIt = user->sessionBegin(); sIt != user->sessionEnd(); ++sIt)
arr.add(*sIt);
size_t bytesToWrite = measureMsgPack(result);
char *buffer = (char *)malloc(bytesToWrite);
size_t bytesWritten = serializeMsgPack(result, buffer, bytesToWrite);
assert(bytesWritten <= bytesToWrite);
client.sendBinary(buffer, bytesWritten);
free(buffer);
}
else
{
DynamicJsonDocument result(JSON_ARRAY_SIZE(1) + 8);
result.to<JsonArray>();
char buffer[32];
size_t bytesWritten = serializeMsgPack(result, buffer, sizeof(buffer));
client.sendBinary(buffer, bytesWritten);
}
}
template <typename SessionT>
void WebsocketServer<SessionT>::sendSessionStartMessages()
{
SessionStartedMessage msg(sessionManager_.session().getStartTime());
for (auto &c : clients_)
if (c.available())
msg.send(c);
running_ = sessionManager_.isMeasuring();
}
template <typename SessionT>
void WebsocketServer<SessionT>::sendSessionStopMessages()
{
SessionStoppedMessage msg;
for (auto &c : clients_)
if (c.available())
msg.send(c);
running_ = sessionManager_.isMeasuring();
}
template <typename SessionT>
void WebsocketServer<SessionT>::sendNewDataMessages()
{
using MeasurementT = typename SessionT::MeasurementType;
auto &session = sessionManager_.session();
for (int i = 0; i < MAX_WEBSOCKET_CONNECTIONS; ++i)
{
auto &c = clients_[i];
if (c.available())
{
MeasurementT *dataToSend = session.getDataPointer() + numSentMeasurements_[i];
int32_t numMeasurementsToSend = int32_t(session.numMeasurements()) - int32_t(numSentMeasurements_[i]);
if (numMeasurementsToSend > 0)
{
SessionNewDataMessage<MeasurementT> msg(dataToSend, numMeasurementsToSend);
msg.send(c);
numSentMeasurements_[i] += msg.numMeasurements();
}
}
}
}
template <typename SessionT>
void WebsocketServer<SessionT>::sendMessageOnConnection(WebsocketsClient &client)
{
using MeasurementT = typename SessionT::MeasurementType;
// Message format:
// - uint8_t messageType
// - uint8_t running
// - uint32_t sessionId
// - MeasurementT [] measurements (if running)
auto &session = sessionManager_.session();
const auto numMeasurements = session.numMeasurements();
const auto sessionId = session.getStartTime();
const size_t msgSize = sizeof(uint8_t) + sizeof(uint8_t) + sizeof(sessionId) + sizeof(MeasurementT) * numMeasurements;
char *msg = (char *)heap_caps_malloc(msgSize, MALLOC_CAP_SPIRAM);
char *writeHead = msg;
*writeHead = INITIAL_INFO;
writeHead += sizeof(uint8_t);
*writeHead = sessionManager_.isMeasuring();
writeHead += sizeof(uint8_t);
*((uint32_t *)writeHead) = sessionManager_.isMeasuring() ? sessionId : 0;
writeHead += sizeof(uint32_t);
assert(writeHead - msg == msgSize - sizeof(MeasurementT) * numMeasurements);
memcpy(writeHead, session.getDataPointer(), sizeof(MeasurementT) * numMeasurements);
client.sendBinary(msg, msgSize);
free(msg);
}

View File

@ -3,13 +3,23 @@
#include "WebsocketServer.h" #include "WebsocketServer.h"
void WifiAPI::sendWifiState(websockets::WebsocketsClient &client)
{
StaticJsonDocument<128> data;
data["state"] = wifiManager_.stateStr();
sendToClient<64>(client, MessageCode::WIFI_STATE_RESPONSE, data);
}
void WifiAPI::onClientConnect(websockets::WebsocketsClient &client)
{
sendWifiState(client);
}
bool WifiAPI::handleMessage(websockets::WebsocketsClient &client, MessageCode code, const char *payload, size_t size) bool WifiAPI::handleMessage(websockets::WebsocketsClient &client, MessageCode code, const char *payload, size_t size)
{ {
if (code == MessageCode::WIFI_STATE_GET) if (code == MessageCode::WIFI_STATE_GET)
{ {
StaticJsonDocument<128> data; sendWifiState(client);
data["state"] = wifiManager_.stateStr();
sendToClient<64>(client, MessageCode::WIFI_STATE_GET, data);
return true; return true;
} }
else if (code == MessageCode::WIFI_STATE_SET) else if (code == MessageCode::WIFI_STATE_SET)
@ -42,5 +52,10 @@ bool WifiAPI::handleMessage(websockets::WebsocketsClient &client, MessageCode co
return true; return true;
} }
} }
else if (code == MessageCode::WIFI_TRIGGER_SCAN)
{
WiFi.scanNetworks(true);
return true;
}
return false; return false;
} }

View File

@ -15,20 +15,73 @@ public:
{ {
} }
void onClientConnect(websockets::WebsocketsClient &client) {} void onClientConnect(websockets::WebsocketsClient &client);
bool handleMessage(websockets::WebsocketsClient &client, MessageCode code, const char *payload, size_t size); bool handleMessage(websockets::WebsocketsClient &client, MessageCode code, const char *payload, size_t size);
template <typename TServer> template <typename TServer>
void iteration(TServer &server) void iteration(TServer &server);
{
private:
void sendWifiState(websockets::WebsocketsClient &client);
template <typename TServer>
void reportScanResultIfAvailable(TServer &server);
WifiManager &wifiManager_;
bool restartScheduled_;
};
template <typename TServer>
void WifiAPI::iteration(TServer &server)
{
if (restartScheduled_) if (restartScheduled_)
{ {
Serial.print("Restart triggered by WifiAPI"); Serial.print("Restart triggered by WifiAPI");
ESP.restart(); ESP.restart();
} }
} reportScanResultIfAvailable(server);
}
private: template <typename TServer>
WifiManager &wifiManager_; void WifiAPI::reportScanResultIfAvailable(TServer &server)
bool restartScheduled_; {
}; auto numNetworks = WiFi.scanComplete();
if (numNetworks >= 0)
{
DynamicJsonDocument response(192 * numNetworks);
for (uint16_t i = 0; i < numNetworks; ++i)
{
JsonObject wifiObj = response.createNestedObject();
wifiObj["ssid"] = WiFi.SSID(i);
wifiObj["rssi"] = WiFi.RSSI(i);
wifiObj["channel"] = WiFi.channel(i);
switch (WiFi.encryptionType(i))
{
case WIFI_AUTH_OPEN:
wifiObj["sec"] = "open";
break;
case WIFI_AUTH_WEP:
wifiObj["sec"] = "WEP";
break;
case WIFI_AUTH_WPA_PSK:
wifiObj["sec"] = "WPA_PSK";
break;
case WIFI_AUTH_WPA2_PSK:
wifiObj["sec"] = "WPA2_PSK";
break;
case WIFI_AUTH_WPA_WPA2_PSK:
wifiObj["sec"] = "WPA_WPA2_PSK";
break;
case WIFI_AUTH_WPA2_ENTERPRISE:
wifiObj["sec"] = "WPA2_ENTP";
break;
default:
wifiObj["sec"] = "?";
}
}
server.sendToAll(MessageCode::WIFI_SCAN_RESPONSE, response);
WiFi.scanDelete();
}
}

View File

@ -20,19 +20,28 @@
#include "SimpleMeasurementSession.h" #include "SimpleMeasurementSession.h"
#include "EspHttp.h" #include "EspHttp.h"
#include "WebDAV.h" #include "WebDAV.h"
#include "WebsocketServer.h"
#include "UserDB.h" #include "UserDB.h"
// Api
#include "WebsocketServer.h"
#include "SessionAPI.h"
#include "WifiAPI.h"
using Session_T = SimpleMeasurementSession<MeasurementT, CONFIG_SESSION_MAX_SIZE>; using Session_T = SimpleMeasurementSession<MeasurementT, CONFIG_SESSION_MAX_SIZE>;
SessionManager<Session_T> sessionManager; SessionManager<Session_T> sessionManager;
UserStorage userStorage; UserStorage userStorage;
EspHttp espHttpServer; EspHttp espHttpServer;
WebsocketServer<Session_T> webSocketServer(sessionManager, userStorage, 81);
WifiManager wifiManager; WifiManager wifiManager;
auto apiTuple = std::make_tuple(SessionAPI<Session_T>(sessionManager), WifiAPI(wifiManager));
WebsocketServer<decltype(apiTuple)> websocketServer(81, apiTuple);
//WebsocketServer<Session_T> webSocketServer(sessionManager, userStorage, 81);
extern const uint8_t certificate_pem[] asm("_binary_certificate_pem_start"); extern const uint8_t certificate_pem[] asm("_binary_certificate_pem_start");
bool firmwareUpdate() bool firmwareUpdate()
@ -86,32 +95,38 @@ void sessionManagerSetup()
template <typename SessionT> template <typename SessionT>
void httpSetup(SessionManager<SessionT> *sessionManager, WifiManager *wifiManager) void httpSetup(SessionManager<SessionT> *sessionManager, WifiManager *wifiManager)
{ {
auto cbStartSession = [sessionManager](httpd_req_t *req) { auto cbStartSession = [sessionManager](httpd_req_t *req)
{
httpd_resp_set_hdr(req, "Access-Control-Allow-Origin", "*"); httpd_resp_set_hdr(req, "Access-Control-Allow-Origin", "*");
httpd_resp_send(req, "Session started", -1); httpd_resp_send(req, "Session started", -1);
sessionManager->startMeasurements(); sessionManager->startMeasurements();
Serial.println("Started session"); Serial.println("Started session");
}; };
auto cbStopSession = [sessionManager](httpd_req_t *req) { auto cbStopSession = [sessionManager](httpd_req_t *req)
{
httpd_resp_set_hdr(req, "Access-Control-Allow-Origin", "*"); httpd_resp_set_hdr(req, "Access-Control-Allow-Origin", "*");
httpd_resp_send(req, "Session stopped", -1); httpd_resp_send(req, "Session stopped", -1);
sessionManager->stopMeasurements(); sessionManager->stopMeasurements();
Serial.println("Stopped session"); Serial.println("Stopped session");
}; };
auto cbRestart = [](httpd_req_t *req) { auto cbRestart = [](httpd_req_t *req)
{
Serial.println("Restarted requested"); Serial.println("Restarted requested");
ESP.restart(); ESP.restart();
}; };
auto cbTare = [sessionManager](httpd_req_t *req) { auto cbTare = [sessionManager](httpd_req_t *req)
{
Serial.println("Tare"); Serial.println("Tare");
sessionManager->tare(); sessionManager->tare();
}; };
auto cbFirmwareUpdate = [](httpd_req_t *req) { auto cbFirmwareUpdate = [](httpd_req_t *req)
{
httpd_resp_set_hdr(req, "Access-Control-Allow-Origin", "*"); httpd_resp_set_hdr(req, "Access-Control-Allow-Origin", "*");
httpd_resp_send(req, "OK", -1); httpd_resp_send(req, "OK", -1);
firmwareUpdate(); firmwareUpdate();
}; };
auto cbStatus = [sessionManager](httpd_req_t *req) { auto cbStatus = [sessionManager](httpd_req_t *req)
{
httpd_resp_set_hdr(req, "Access-Control-Allow-Origin", "*"); httpd_resp_set_hdr(req, "Access-Control-Allow-Origin", "*");
httpd_resp_set_hdr(req, "Content-Type", "application/json"); httpd_resp_set_hdr(req, "Content-Type", "application/json");
@ -167,7 +182,8 @@ void httpSetup(SessionManager<SessionT> *sessionManager, WifiManager *wifiManage
auto bytesWritten = serializeJson(json, jsonText); auto bytesWritten = serializeJson(json, jsonText);
httpd_resp_send(req, jsonText, bytesWritten); httpd_resp_send(req, jsonText, bytesWritten);
}; };
auto cbGetData = [sessionManager](httpd_req_t *req) { auto cbGetData = [sessionManager](httpd_req_t *req)
{
auto sessionId = sessionManager->session().getStartTime(); auto sessionId = sessionManager->session().getStartTime();
uint32_t startIdx = getUrlQueryParameter(req, "startIdx", 0); uint32_t startIdx = getUrlQueryParameter(req, "startIdx", 0);
//Serial.printf("Data request, start index: %d\n", startIdx); //Serial.printf("Data request, start index: %d\n", startIdx);
@ -196,7 +212,8 @@ void httpSetup(SessionManager<SessionT> *sessionManager, WifiManager *wifiManage
httpd_resp_send(req, buf, totalSize); httpd_resp_send(req, buf, totalSize);
free(buf); free(buf);
}; };
auto cbWifiGet = [wifiManager](httpd_req_t *req) { auto cbWifiGet = [wifiManager](httpd_req_t *req)
{
httpd_resp_set_hdr(req, "Access-Control-Allow-Origin", "*"); httpd_resp_set_hdr(req, "Access-Control-Allow-Origin", "*");
StaticJsonDocument<128> json; StaticJsonDocument<128> json;
json["state"] = wifiManager->stateStr(); json["state"] = wifiManager->stateStr();
@ -204,7 +221,8 @@ void httpSetup(SessionManager<SessionT> *sessionManager, WifiManager *wifiManage
auto bytesWritten = serializeJson(json, jsonText); auto bytesWritten = serializeJson(json, jsonText);
httpd_resp_send(req, jsonText, bytesWritten); httpd_resp_send(req, jsonText, bytesWritten);
}; };
auto cbWifiPost = [wifiManager](httpd_req_t *req) { auto cbWifiPost = [wifiManager](httpd_req_t *req)
{
httpd_resp_set_hdr(req, "Access-Control-Allow-Origin", "*"); httpd_resp_set_hdr(req, "Access-Control-Allow-Origin", "*");
StaticJsonDocument<1024> json; StaticJsonDocument<1024> json;
char content[512]; char content[512];
@ -248,7 +266,8 @@ void httpSetup(SessionManager<SessionT> *sessionManager, WifiManager *wifiManage
httpd_resp_set_status(req, "400 Bad Request"); httpd_resp_set_status(req, "400 Bad Request");
httpd_resp_send(req, "Invalid keys in JSON", -1); httpd_resp_send(req, "Invalid keys in JSON", -1);
}; };
auto cbSettingsGet = [](httpd_req_t *req) { auto cbSettingsGet = [](httpd_req_t *req)
{
httpd_resp_set_hdr(req, "Access-Control-Allow-Origin", "*"); httpd_resp_set_hdr(req, "Access-Control-Allow-Origin", "*");
httpd_resp_set_hdr(req, "Content-Type", "application/json"); httpd_resp_set_hdr(req, "Content-Type", "application/json");
@ -269,7 +288,8 @@ void httpSetup(SessionManager<SessionT> *sessionManager, WifiManager *wifiManage
auto bytesWritten = serializeJson(json, jsonText); auto bytesWritten = serializeJson(json, jsonText);
httpd_resp_send(req, jsonText, bytesWritten); httpd_resp_send(req, jsonText, bytesWritten);
}; };
auto cbSettingsPost = [](httpd_req_t *req) { auto cbSettingsPost = [](httpd_req_t *req)
{
httpd_resp_set_hdr(req, "Access-Control-Allow-Origin", "*"); httpd_resp_set_hdr(req, "Access-Control-Allow-Origin", "*");
StaticJsonDocument<1024> json; StaticJsonDocument<1024> json;
char content[512]; char content[512];
@ -313,7 +333,8 @@ void httpSetup(SessionManager<SessionT> *sessionManager, WifiManager *wifiManage
sessionManagerSetup(); sessionManagerSetup();
httpd_resp_send(req, "OK", -1); httpd_resp_send(req, "OK", -1);
}; };
auto cbSettingsDelete = [](httpd_req_t *req) { auto cbSettingsDelete = [](httpd_req_t *req)
{
httpd_resp_set_hdr(req, "Access-Control-Allow-Origin", "*"); httpd_resp_set_hdr(req, "Access-Control-Allow-Origin", "*");
Preferences prefs; Preferences prefs;
prefs.begin("st_prefs"); prefs.begin("st_prefs");
@ -419,13 +440,12 @@ void setup()
// HTTP & Websocket server // HTTP & Websocket server
httpSetup(&sessionManager, &wifiManager); httpSetup(&sessionManager, &wifiManager);
if (!wifiManager.inProvisioningMode()) websocketServer.begin();
webSocketServer.begin();
} }
void loop() void loop()
{ {
sessionManager.iteration(); sessionManager.iteration();
webSocketServer.iteration(); wifiManager.iteration();
wifiManager.wifiWatchdog(); websocketServer.iteration();
} }

View File

@ -1,36 +1,158 @@
import asyncio import asyncio
import websockets import websockets
import struct import struct
import numpy as np import numpy as np
from pprint import pprint
INITIAL_INFO = 1 import datetime
SESSION_STARTED = 2 import msgpack
SESSION_STOPPED = 3 import aiomonitor
SESSION_NEW_DATA = 4
async def hello(): class MsgManager:
uri = "ws://192.168.178.110:81" def __init__(self):
self.msg_history = []
def add_msg(self, msg):
pprint(msg)
self.msg_history.append(msg)
send_functions = []
class MsgCode:
ERROR = 1
# device to frontend
INITIAL_INFO = 2
SESSION_STARTED = 3
SESSION_STOPPED = 4
SESSION_NEW_DATA = 5
ANSWER_USER_LIST = 6
ANSWER_SESSION_LIST = 7
WIFI_STATE_RESPONSE = 8
WIFI_SCAN_RESPONSE = 9
# from frontend to device
START_SESSION = 128
STOP_SESSION = 129
TARE = 130
QUERY_USER_LIST = 131
QUERY_SESSION_LIST = 132
WIFI_STATE_SET = 133
WIFI_STATE_GET = 134
WIFI_TRIGGER_SCAN = 135
async def send_message(websocket, msg_type, payload=None):
payload = struct.pack("<B", msg_type)
if payload is not None:
payload += msgpack.packb(payload, use_bin_type=True)
await websocket.send(payload)
# --------------------------------- Session API --------------------------------
def parse_session_initial_info(payload):
session_id = struct.unpack("<I", payload[1:5])[0]
return {'type': "session_initial_info",
'running': struct.unpack("<B", payload[0:1])[0],
'session_id': session_id,
'start_time': datetime.fromtimestamp(session_id),
'data': np.frombuffer(payload[5:], dtype=np.uint16),
}
def parse_session_started(payload):
session_id = struct.unpack("<I", payload)[0]
return {'type': "session_started",
'session_id': session_id,
'start_time': datetime.fromtimestamp(session_id),
}
def parse_session_stopped(payload):
assert len(payload) == 0
return {'type': "session_stopped"}
def parse_session_new_data(payload):
return {'type': "session_new_data", 'data': np.frombuffer(payload, dtype=np.uint16)}
async def send_session_start(websocket):
await send_message(websocket, MsgCode.START_SESSION)
async def send_session_stop(websocket):
await send_message(websocket, MsgCode.STOP_SESSION)
send_functions += [send_session_start, send_session_stop]
# ------------------------------- WiFi API --------------------------------------
def parse_wifi_state(payload):
return {'type': "wifi_state", 'data': msgpack.unpackb(payload, raw=True)}
def parse_wifi_scan(payload):
return {'type': "wifi_scan_response", 'data': msgpack.unpackb(payload, raw=True)}
async def send_wifi_state_get(websocket):
await send_message(websocket, MsgCode.WIFI_STATE_GET)
async def send_wifi_trigger_scan(websocket):
await send_message(websocket, MsgCode.WIFI_TRIGGER_SCAN)
send_functions += [send_wifi_state_get, send_wifi_trigger_scan]
# -------------------------------------------------------------------------------
def parse_message(data):
parse_funcs = {
MsgCode.INITIAL_INFO: parse_session_initial_info,
MsgCode.SESSION_STARTED: parse_session_started,
MsgCode.SESSION_STOPPED: parse_session_stopped,
MsgCode.SESSION_NEW_DATA: parse_session_new_data,
MsgCode.WIFI_STATE_RESPONSE: parse_wifi_state,
MsgCode.WIFI_SCAN_RESPONSE: parse_wifi_scan,
}
msg_type = struct.unpack("<B", data[:1])[0]
payload = data[1:]
try:
parsed = parse_funcs[msg_type](payload)
except KeyError:
raise KeyError(f"Unknown message type {msg_type}")
return parsed
msg_manager = MsgManager()
async def main():
global msg_manager
uri = "ws://192.168.42.1:81"
async with websockets.connect(uri) as websocket: async with websockets.connect(uri) as websocket:
for send_func in send_functions:
async def bound_func(*args, **kwargs):
await send_func(websocket, *args, **kwargs)
setattr(msg_manager, send_func.__name__, bound_func)
while True: while True:
res = await websocket.recv() res = await websocket.recv()
msg_type = struct.unpack("<B", res[:1])[0] msg = parse_message(res)
payload = res[1:] msg_manager.add_msg(msg)
if msg_type == INITIAL_INFO:
running = struct.unpack("<B", payload[0:1])[0]
session_id = struct.unpack("<I", payload[1:5])[0]
data = np.frombuffer(payload[5:], dtype=np.uint16)
print(f"Initial info: running {running} session_id {session_id} data", data)
elif msg_type == SESSION_STARTED:
id = struct.unpack("<I", payload)[0]
print(f"Session with id {id} started")
elif msg_type == SESSION_STOPPED:
assert len(payload) == 0
print("Session stopped")
elif msg_type == SESSION_NEW_DATA:
data = np.frombuffer(payload, dtype=np.uint16)
print("New data", data)
else:
print("Got unexpected packet of type", msg_type)
asyncio.get_event_loop().run_until_complete(hello())
if __name__ == "__main__":
loop = asyncio.get_event_loop()
with aiomonitor.start_monitor(loop=loop, locals={'m': msg_manager}):
loop.run_until_complete(main())