Regular ping when no measurements are running
This commit is contained in:
parent
e2f72297e4
commit
faa9b3324f
|
@ -14,6 +14,7 @@ enum class MessageCode : uint8_t
|
||||||
ANSWER_SESSION_LIST = 7,
|
ANSWER_SESSION_LIST = 7,
|
||||||
WIFI_STATE_RESPONSE = 8,
|
WIFI_STATE_RESPONSE = 8,
|
||||||
WIFI_SCAN_RESPONSE = 9,
|
WIFI_SCAN_RESPONSE = 9,
|
||||||
|
APP_LAYER_PING = 10,
|
||||||
|
|
||||||
// from frontend to device
|
// from frontend to device
|
||||||
START_SESSION = 128,
|
START_SESSION = 128,
|
||||||
|
|
|
@ -121,7 +121,19 @@ void SessionAPI<T>::iteration(TServer &server)
|
||||||
numSentMeasurements_[i] = 0;
|
numSentMeasurements_[i] = 0;
|
||||||
running_ = false;
|
running_ = false;
|
||||||
}
|
}
|
||||||
sendNewDataMessages(server);
|
|
||||||
|
if(running_)
|
||||||
|
sendNewDataMessages(server);
|
||||||
|
else
|
||||||
|
{
|
||||||
|
static unsigned long lastPing = 0;
|
||||||
|
auto timeNow = millis();
|
||||||
|
if(timeNow - lastPing > CONFIG_PING_INTERVAL_MS)
|
||||||
|
{
|
||||||
|
server.sendToAll(MessageCode::APP_LAYER_PING);
|
||||||
|
lastPing = timeNow;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
template <typename T>
|
template <typename T>
|
||||||
|
@ -168,17 +180,12 @@ void SessionAPI<T>::sendNewDataMessages(TServer &server)
|
||||||
|
|
||||||
if (numMeasurementsToSend >= WAIT_UNTIL_AT_LEAST_NUM_MEASUREMENTS)
|
if (numMeasurementsToSend >= WAIT_UNTIL_AT_LEAST_NUM_MEASUREMENTS)
|
||||||
{
|
{
|
||||||
//Serial.printf("-> i=%d, numSentMeasurements_[i]=%d, to send %d\n", i, numSentMeasurements_[i], numMeasurementsToSend);
|
|
||||||
|
|
||||||
if (numMeasurementsToSend > MAX_MEASUREMENTS_PER_MSG)
|
if (numMeasurementsToSend > MAX_MEASUREMENTS_PER_MSG)
|
||||||
numMeasurementsToSend = MAX_MEASUREMENTS_PER_MSG;
|
numMeasurementsToSend = MAX_MEASUREMENTS_PER_MSG;
|
||||||
|
|
||||||
//Serial.printf(" Sending %d measurements\n", numMeasurementsToSend);
|
|
||||||
|
|
||||||
memcpy(buffer + headerSize, dataToSend, sizeof(MeasurementT) * numMeasurementsToSend);
|
memcpy(buffer + headerSize, dataToSend, sizeof(MeasurementT) * numMeasurementsToSend);
|
||||||
c.sendBinary(buffer, headerSize + sizeof(MeasurementT) * numMeasurementsToSend);
|
c.sendBinary(buffer, headerSize + sizeof(MeasurementT) * numMeasurementsToSend);
|
||||||
numSentMeasurements_[i] += numMeasurementsToSend;
|
numSentMeasurements_[i] += numMeasurementsToSend;
|
||||||
//Serial.printf(" Sent measurements %d\n", numSentMeasurements_[i]);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -10,10 +10,10 @@ constexpr const char *CONFIG_HOSTNAME = "swimtracker";
|
||||||
|
|
||||||
// ------------------------------------- Hardware & Measurement Settings ------------------------------------------------------------
|
// ------------------------------------- Hardware & Measurement Settings ------------------------------------------------------------
|
||||||
|
|
||||||
const uint8_t CONFIG_MEASUREMENT_AVG_COUNT = 1; // number of measurements in normal phase
|
constexpr uint8_t CONFIG_MEASUREMENT_AVG_COUNT = 1; // number of measurements in normal phase
|
||||||
const uint8_t CONFIG_TARE_AVG_COUNT = 20; // number of measurements in tare-phase (to find 0 )
|
constexpr uint8_t CONFIG_TARE_AVG_COUNT = 20; // number of measurements in tare-phase (to find 0 )
|
||||||
const int CONFIG_MEASURE_DELAY = 100; // interval in ms between measurements
|
constexpr int CONFIG_MEASURE_DELAY = 100; // interval in ms between measurements
|
||||||
const uint32_t CONFIG_SESSION_MAX_LENGTH_HOURS = 3; // maximum length of one session
|
constexpr uint32_t CONFIG_SESSION_MAX_LENGTH_HOURS = 3; // maximum length of one session
|
||||||
constexpr const char *CONFIG_DATA_PATH = "/dat"; // folder in SPIFFS file system to store measurement data
|
constexpr const char *CONFIG_DATA_PATH = "/dat"; // folder in SPIFFS file system to store measurement data
|
||||||
using MeasurementT = uint16_t; // data type for one measurement
|
using MeasurementT = uint16_t; // data type for one measurement
|
||||||
#ifdef NEW_HEAVY_LOAD_CELL
|
#ifdef NEW_HEAVY_LOAD_CELL
|
||||||
|
@ -21,9 +21,9 @@ const int CONFIG_VALUE_RIGHT_SHIFT = 3; // uint32 measurements are divided by th
|
||||||
#else
|
#else
|
||||||
const int CONFIG_VALUE_RIGHT_SHIFT = 7;
|
const int CONFIG_VALUE_RIGHT_SHIFT = 7;
|
||||||
#endif
|
#endif
|
||||||
const MeasurementT CONFIG_KG_FACTOR_INV = 701; // after shifting - how many "measurement units" are one kg
|
constexpr MeasurementT CONFIG_KG_FACTOR_INV = 701; // after shifting - how many "measurement units" are one kg
|
||||||
|
|
||||||
static constexpr int MAX_WEBSOCKET_CONNECTIONS = 3; // maximal number of websocket connections maintained at the same time
|
constexpr int MAX_WEBSOCKET_CONNECTIONS = 3; // maximal number of websocket connections maintained at the same time
|
||||||
|
|
||||||
constexpr const char *UPDATE_URL = "https://swimtracker-update.bauer.tech/firmware.bin";
|
constexpr const char *UPDATE_URL = "https://swimtracker-update.bauer.tech/firmware.bin";
|
||||||
|
|
||||||
|
@ -36,15 +36,17 @@ constexpr MeasurementT CONFIG_AUTO_STOP_THRESHOLD = CONFIG_KG_FACTOR_INV * 1;
|
||||||
//uint32_t CONFIG_AUTO_STOP_NUM_MEASUREMENTS = (1000 / CONFIG_MEASURE_DELAY) * 60 * 15;
|
//uint32_t CONFIG_AUTO_STOP_NUM_MEASUREMENTS = (1000 / CONFIG_MEASURE_DELAY) * 60 * 15;
|
||||||
constexpr uint32_t CONFIG_AUTO_STOP_NUM_MEASUREMENTS = (1000 / CONFIG_MEASURE_DELAY) * 30;
|
constexpr uint32_t CONFIG_AUTO_STOP_NUM_MEASUREMENTS = (1000 / CONFIG_MEASURE_DELAY) * 30;
|
||||||
|
|
||||||
|
constexpr unsigned long CONFIG_PING_INTERVAL_MS = 1000;
|
||||||
|
|
||||||
// ------------------------------------- Derived Settings -----------------------------------------------------------------------------
|
// ------------------------------------- Derived Settings -----------------------------------------------------------------------------
|
||||||
|
|
||||||
constexpr uint32_t CONFIG_SESSION_MAX_SIZE = CONFIG_SESSION_MAX_LENGTH_HOURS * 3600 * (1000 / CONFIG_MEASURE_DELAY) * sizeof(uint16_t);
|
constexpr uint32_t CONFIG_SESSION_MAX_SIZE = CONFIG_SESSION_MAX_LENGTH_HOURS * 3600 * (1000 / CONFIG_MEASURE_DELAY) * sizeof(uint16_t);
|
||||||
static_assert(CONFIG_SESSION_MAX_SIZE < 1024 * 1024, "Measurement data takes more than 1MiB space");
|
static_assert(CONFIG_SESSION_MAX_SIZE < 1024 * 1024, "Measurement data takes more than 1MiB space");
|
||||||
|
|
||||||
#ifdef _HW_V_20
|
#ifdef _HW_V_20
|
||||||
const int CONFIG_SCALE_DOUT_PIN = 23;
|
constexpr int CONFIG_SCALE_DOUT_PIN = 23;
|
||||||
const int CONFIG_SCALE_SCK_PIN = 22;
|
constexpr int CONFIG_SCALE_SCK_PIN = 22;
|
||||||
#else
|
#else
|
||||||
const int CONFIG_SCALE_DOUT_PIN = 22;
|
constexpr int CONFIG_SCALE_DOUT_PIN = 22;
|
||||||
const int CONFIG_SCALE_SCK_PIN = 23;
|
constexpr int CONFIG_SCALE_SCK_PIN = 23;
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -32,7 +32,7 @@ public:
|
||||||
using namespace websockets;
|
using namespace websockets;
|
||||||
const auto onMessage = [this](WebsocketsClient &client, WebsocketsMessage message)
|
const auto onMessage = [this](WebsocketsClient &client, WebsocketsMessage message)
|
||||||
{
|
{
|
||||||
if (message.isPing())
|
if (message.isPing()) // websocket ping
|
||||||
client.pong();
|
client.pong();
|
||||||
else if (message.isBinary())
|
else if (message.isBinary())
|
||||||
{
|
{
|
||||||
|
@ -48,11 +48,15 @@ public:
|
||||||
|
|
||||||
if (server_.poll())
|
if (server_.poll())
|
||||||
{
|
{
|
||||||
Serial.println("new websocket connection");
|
Serial.printf("new websocket connection, storing at pos %d - occupancy: ", nextFreeClient_);
|
||||||
clients_[nextFreeClient_] = server_.accept();
|
clients_[nextFreeClient_] = server_.accept();
|
||||||
clients_[nextFreeClient_].onMessage(onMessage);
|
clients_[nextFreeClient_].onMessage(onMessage);
|
||||||
this->onClientConnectImpl(clients_[nextFreeClient_]);
|
this->onClientConnectImpl(clients_[nextFreeClient_]);
|
||||||
nextFreeClient_ = (nextFreeClient_ + 1) % MAX_WEBSOCKET_CONNECTIONS;
|
nextFreeClient_ = (nextFreeClient_ + 1) % MAX_WEBSOCKET_CONNECTIONS;
|
||||||
|
|
||||||
|
for (int i = 0; i < MAX_WEBSOCKET_CONNECTIONS; ++i)
|
||||||
|
Serial.print((clients_[i].available()) ? "x" : "o");
|
||||||
|
Serial.print("\n");
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int i = 0; i < MAX_WEBSOCKET_CONNECTIONS; ++i)
|
for (int i = 0; i < MAX_WEBSOCKET_CONNECTIONS; ++i)
|
||||||
|
@ -89,7 +93,7 @@ public:
|
||||||
{
|
{
|
||||||
for (int i = 0; i < MAX_WEBSOCKET_CONNECTIONS; ++i)
|
for (int i = 0; i < MAX_WEBSOCKET_CONNECTIONS; ++i)
|
||||||
if (clients_[i].available())
|
if (clients_[i].available())
|
||||||
clients_[i].sendBinary((const char*)&msgCode, sizeof(MessageCode));
|
clients_[i].sendBinary((const char *)&msgCode, sizeof(MessageCode));
|
||||||
}
|
}
|
||||||
|
|
||||||
websockets::WebsocketsClient &client(size_t i) { return clients_[i]; }
|
websockets::WebsocketsClient &client(size_t i) { return clients_[i]; }
|
||||||
|
@ -137,7 +141,7 @@ private:
|
||||||
websockets::WebsocketsClient clients_[MAX_WEBSOCKET_CONNECTIONS];
|
websockets::WebsocketsClient clients_[MAX_WEBSOCKET_CONNECTIONS];
|
||||||
};
|
};
|
||||||
|
|
||||||
template<typename... ApiManagers>
|
template <typename... ApiManagers>
|
||||||
inline WebsocketServer<std::tuple<ApiManagers...>> makeWebsocketServer(int port, ApiManagers... managers)
|
inline WebsocketServer<std::tuple<ApiManagers...>> makeWebsocketServer(int port, ApiManagers... managers)
|
||||||
{
|
{
|
||||||
auto tuple = std::make_tuple(managers...);
|
auto tuple = std::make_tuple(managers...);
|
||||||
|
|
|
@ -1,436 +0,0 @@
|
||||||
|
|
||||||
#pragma once
|
|
||||||
#include "Dtypes.h"
|
|
||||||
#include "UserDB.h"
|
|
||||||
#include "MessageCodes.h"
|
|
||||||
|
|
||||||
#include <ArduinoWebsockets.h>
|
|
||||||
|
|
||||||
template <typename T>
|
|
||||||
class SessionManager;
|
|
||||||
|
|
||||||
static constexpr int NUM_DATA_CHUNK_SIZE = 1;
|
|
||||||
|
|
||||||
template <typename ApiManagerTuple>
|
|
||||||
class WebsocketInterface
|
|
||||||
{
|
|
||||||
public:
|
|
||||||
WebsocketInterface(int port) : port_(port), nextFreeClient_(0)
|
|
||||||
{
|
|
||||||
}
|
|
||||||
|
|
||||||
void begin()
|
|
||||||
{
|
|
||||||
server_.listen(port_);
|
|
||||||
}
|
|
||||||
|
|
||||||
void iteration();
|
|
||||||
|
|
||||||
template <size_t bufferSize>
|
|
||||||
void sendToAll(MessageCode msgCode, const JsonDocument &content)
|
|
||||||
{
|
|
||||||
char buffer[bufferSize];
|
|
||||||
buffer[0] = msgCode;
|
|
||||||
size_t bytesWritten = serializeMsgPack(content, buffer + sizeof(msgCode), bufferSize - sizeof(msgCode));
|
|
||||||
for (int i = 0; i < MAX_WEBSOCKET_CONNECTIONS; ++i)
|
|
||||||
if (clients_[i].available())
|
|
||||||
clients_[i].sendBinary(buffer, bytesWritten);
|
|
||||||
}
|
|
||||||
|
|
||||||
void sendToAll(MessageCode msgCode, const JsonDocument &content)
|
|
||||||
{
|
|
||||||
size_t expectedSize = measureMsgPack(content);
|
|
||||||
char *buffer = (char *)malloc(expectedSize + sizeof(msgCode));
|
|
||||||
buffer[0] = msgCode;
|
|
||||||
size_t bytesWritten = serializeMsgPack(content, buffer + sizeof(msgCode), expectedSize);
|
|
||||||
for (int i = 0; i < MAX_WEBSOCKET_CONNECTIONS; ++i)
|
|
||||||
if (clients_[i].available())
|
|
||||||
clients_[i].sendBinary(buffer, bytesWritten + sizeof(msgCode));
|
|
||||||
|
|
||||||
free(buffer);
|
|
||||||
}
|
|
||||||
|
|
||||||
void sendToAll(MessageCode msgCode)
|
|
||||||
{
|
|
||||||
for (int i = 0; i < MAX_WEBSOCKET_CONNECTIONS; ++i)
|
|
||||||
if (clients_[i].available())
|
|
||||||
clients_[i].sendBinary(&msgCode, sizeof(MessageCode));
|
|
||||||
}
|
|
||||||
|
|
||||||
websockets::WebsocketsClient &client(size_t i) { return clients_[i]; }
|
|
||||||
|
|
||||||
private:
|
|
||||||
int port_;
|
|
||||||
int nextFreeClient_;
|
|
||||||
|
|
||||||
websockets::WebsocketsServer server_;
|
|
||||||
websockets::WebsocketsClient clients_[MAX_WEBSOCKET_CONNECTIONS];
|
|
||||||
};
|
|
||||||
|
|
||||||
template <size_t bufferSize>
|
|
||||||
inline void sendToClient(websockets::WebsocketsClient &client, MessageCode msgCode, const JsonDocument &content)
|
|
||||||
{
|
|
||||||
char buffer[bufferSize];
|
|
||||||
buffer[0] = msgCode;
|
|
||||||
size_t bytesWritten = serializeMsgPack(content, buffer + sizeof(msgCode), bufferSize - sizeof(msgCode));
|
|
||||||
client.sendBinary(buffer, bytesWritten);
|
|
||||||
}
|
|
||||||
|
|
||||||
inline void sendToClient(websockets::WebsocketsClient &client, MessageCode msgCode, const JsonDocument &content)
|
|
||||||
{
|
|
||||||
size_t expectedSize = measureMsgPack(content);
|
|
||||||
char *buffer = (char *)malloc(expectedSize + sizeof(msgCode));
|
|
||||||
buffer[0] = static_cast<char>(msgCode);
|
|
||||||
size_t bytesWritten = serializeMsgPack(content, buffer + sizeof(msgCode), expectedSize);
|
|
||||||
client.sendBinary(buffer, bytesWritten + sizeof(msgCode));
|
|
||||||
free(buffer);
|
|
||||||
}
|
|
||||||
|
|
||||||
inline void sendErrorToClient(websockets::WebsocketsClient &client, const char *msg)
|
|
||||||
{
|
|
||||||
DynamicJsonDocument doc(strlen(msg) + 64);
|
|
||||||
doc["msg"] = msg;
|
|
||||||
sendToClient(client, MessageCode::ERROR, doc);
|
|
||||||
}
|
|
||||||
|
|
||||||
template <typename SessionT>
|
|
||||||
class WebsocketServer
|
|
||||||
{
|
|
||||||
public:
|
|
||||||
WebsocketServer(SessionManager<SessionT> &sessionManager, UserStorage &userStorage, int port)
|
|
||||||
: sessionManager_(sessionManager), userStorage_(userStorage), nextFreeClient_(0), port_(port),
|
|
||||||
running_(false)
|
|
||||||
{
|
|
||||||
}
|
|
||||||
|
|
||||||
void begin()
|
|
||||||
{
|
|
||||||
server_.listen(port_);
|
|
||||||
}
|
|
||||||
|
|
||||||
void iteration();
|
|
||||||
|
|
||||||
private:
|
|
||||||
void reportSessionUpdate();
|
|
||||||
|
|
||||||
void sendMessageOnConnection(websockets::WebsocketsClient &client);
|
|
||||||
void sendSessionStartMessages();
|
|
||||||
void sendSessionStopMessages();
|
|
||||||
void sendNewDataMessages();
|
|
||||||
|
|
||||||
void sendUserList(websockets::WebsocketsClient &client);
|
|
||||||
void sendSessionList(websockets::WebsocketsClient &client, const String &userId);
|
|
||||||
|
|
||||||
SessionManager<SessionT> &sessionManager_;
|
|
||||||
UserStorage &userStorage_;
|
|
||||||
|
|
||||||
int nextFreeClient_;
|
|
||||||
int port_;
|
|
||||||
|
|
||||||
size_t sentMessageCount_;
|
|
||||||
websockets::WebsocketsServer server_;
|
|
||||||
websockets::WebsocketsClient clients_[MAX_WEBSOCKET_CONNECTIONS];
|
|
||||||
|
|
||||||
// previous session state
|
|
||||||
size_t numSentMeasurements_[MAX_WEBSOCKET_CONNECTIONS];
|
|
||||||
bool running_;
|
|
||||||
};
|
|
||||||
|
|
||||||
using websockets::WebsocketsClient;
|
|
||||||
|
|
||||||
// ------------------------------------- Message types & classes ---------------------------
|
|
||||||
|
|
||||||
enum MessageType
|
|
||||||
{
|
|
||||||
// from swim tracker device to frontend
|
|
||||||
INITIAL_INFO = 1,
|
|
||||||
SESSION_STARTED = 2,
|
|
||||||
SESSION_STOPPED = 3,
|
|
||||||
SESSION_NEW_DATA = 4,
|
|
||||||
ANSWER_USER_LIST = 5,
|
|
||||||
ANSWER_SESSION_LIST = 6,
|
|
||||||
|
|
||||||
// from frontend to device
|
|
||||||
START_SESSION = 7,
|
|
||||||
STOP_SESSION = 8,
|
|
||||||
TARE = 9,
|
|
||||||
QUERY_USER_LIST = 10,
|
|
||||||
QUERY_SESSION_LIST = 11,
|
|
||||||
};
|
|
||||||
|
|
||||||
#pragma pack(push, 1)
|
|
||||||
class SessionStartedMessage
|
|
||||||
{
|
|
||||||
public:
|
|
||||||
SessionStartedMessage(uint32_t id) : messageType_(SESSION_STARTED), sessionId_(id) {}
|
|
||||||
void send(WebsocketsClient &c) const
|
|
||||||
{
|
|
||||||
c.sendBinary((const char *)(this), sizeof(*this));
|
|
||||||
}
|
|
||||||
|
|
||||||
private:
|
|
||||||
uint8_t messageType_;
|
|
||||||
uint32_t sessionId_;
|
|
||||||
};
|
|
||||||
|
|
||||||
class SessionStoppedMessage
|
|
||||||
{
|
|
||||||
public:
|
|
||||||
SessionStoppedMessage() : messageType_(SESSION_STOPPED) {}
|
|
||||||
void send(WebsocketsClient &c) const
|
|
||||||
{
|
|
||||||
c.sendBinary((const char *)(this), sizeof(*this));
|
|
||||||
}
|
|
||||||
|
|
||||||
private:
|
|
||||||
uint8_t messageType_;
|
|
||||||
};
|
|
||||||
|
|
||||||
template <typename MeasurementT>
|
|
||||||
class SessionNewDataMessage
|
|
||||||
{
|
|
||||||
public:
|
|
||||||
// typically a message contains NUM_DATA_CHUNK_SIZE measurements
|
|
||||||
// if some measurements are skipped, because loop() takes too long
|
|
||||||
// there might actually be more measurements, to be safe there is an
|
|
||||||
// additional factor here
|
|
||||||
static constexpr size_t MAX_MEASUREMENTS = 4 * NUM_DATA_CHUNK_SIZE;
|
|
||||||
|
|
||||||
SessionNewDataMessage(MeasurementT *ptr, size_t numMeasurements)
|
|
||||||
: messageType_(SESSION_NEW_DATA), numMeasurements_(min(numMeasurements, MAX_MEASUREMENTS))
|
|
||||||
{
|
|
||||||
memcpy(measurements_, ptr, sizeof(MeasurementT) * numMeasurements_);
|
|
||||||
}
|
|
||||||
|
|
||||||
void send(WebsocketsClient &c) const
|
|
||||||
{
|
|
||||||
c.sendBinary((const char *)(this), numBytes());
|
|
||||||
}
|
|
||||||
|
|
||||||
size_t numMeasurements() const
|
|
||||||
{
|
|
||||||
return numMeasurements_;
|
|
||||||
}
|
|
||||||
|
|
||||||
private:
|
|
||||||
size_t numBytes() const { return sizeof(uint8_t) + numMeasurements() * sizeof(MeasurementT); }
|
|
||||||
|
|
||||||
// data to be sent
|
|
||||||
uint8_t messageType_;
|
|
||||||
MeasurementT measurements_[MAX_MEASUREMENTS];
|
|
||||||
|
|
||||||
// book-keeping
|
|
||||||
size_t numMeasurements_;
|
|
||||||
};
|
|
||||||
|
|
||||||
#pragma pack(pop)
|
|
||||||
|
|
||||||
// ------------------------------------- WebsocketServer members ---------------------------
|
|
||||||
|
|
||||||
template <typename SessionT>
|
|
||||||
void WebsocketServer<SessionT>::iteration()
|
|
||||||
{
|
|
||||||
using namespace websockets;
|
|
||||||
|
|
||||||
auto onMessage = [this](WebsocketsClient &client, WebsocketsMessage message)
|
|
||||||
{
|
|
||||||
if (message.isPing())
|
|
||||||
client.pong();
|
|
||||||
else if (message.isBinary())
|
|
||||||
{
|
|
||||||
const char *data = message.c_str();
|
|
||||||
const size_t length = message.length();
|
|
||||||
if (length < 1)
|
|
||||||
{
|
|
||||||
client.close(CloseReason_UnsupportedData);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
uint8_t opCode = uint8_t(data[0]);
|
|
||||||
switch (opCode)
|
|
||||||
{
|
|
||||||
case START_SESSION:
|
|
||||||
this->sessionManager_.startMeasurements();
|
|
||||||
break;
|
|
||||||
case STOP_SESSION:
|
|
||||||
this->sessionManager_.stopMeasurements();
|
|
||||||
break;
|
|
||||||
case TARE:
|
|
||||||
this->sessionManager_.tare();
|
|
||||||
break;
|
|
||||||
case QUERY_USER_LIST:
|
|
||||||
this->sendUserList(client);
|
|
||||||
break;
|
|
||||||
case QUERY_SESSION_LIST:
|
|
||||||
{
|
|
||||||
StaticJsonDocument<USER_STRING_ID_MAX_LEN + 16> doc;
|
|
||||||
deserializeMsgPack(doc, data, length);
|
|
||||||
String userId = doc.as<String>();
|
|
||||||
if (userId.length() > 0)
|
|
||||||
this->sendSessionList(client, userId);
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
default:
|
|
||||||
client.close(CloseReason_UnsupportedData);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
if (server_.poll())
|
|
||||||
{
|
|
||||||
clients_[nextFreeClient_] = server_.accept();
|
|
||||||
clients_[nextFreeClient_].onMessage(onMessage);
|
|
||||||
Serial.println("new websocket connection");
|
|
||||||
sendMessageOnConnection(clients_[nextFreeClient_]);
|
|
||||||
numSentMeasurements_[nextFreeClient_] = sessionManager_.session().numMeasurements();
|
|
||||||
nextFreeClient_ = (nextFreeClient_ + 1) % MAX_WEBSOCKET_CONNECTIONS;
|
|
||||||
}
|
|
||||||
|
|
||||||
for (int i = 0; i < MAX_WEBSOCKET_CONNECTIONS; ++i)
|
|
||||||
clients_[i].poll();
|
|
||||||
|
|
||||||
reportSessionUpdate();
|
|
||||||
}
|
|
||||||
|
|
||||||
template <typename SessionT>
|
|
||||||
void WebsocketServer<SessionT>::reportSessionUpdate()
|
|
||||||
{
|
|
||||||
if (!running_ && sessionManager_.isMeasuring())
|
|
||||||
{
|
|
||||||
sendSessionStartMessages();
|
|
||||||
for (int i = 0; i < MAX_WEBSOCKET_CONNECTIONS; ++i)
|
|
||||||
numSentMeasurements_[i] = 0;
|
|
||||||
}
|
|
||||||
else if (running_ && !sessionManager_.isMeasuring())
|
|
||||||
{
|
|
||||||
sendSessionStopMessages();
|
|
||||||
for (int i = 0; i < MAX_WEBSOCKET_CONNECTIONS; ++i)
|
|
||||||
numSentMeasurements_[i] = 0;
|
|
||||||
}
|
|
||||||
sendNewDataMessages();
|
|
||||||
}
|
|
||||||
|
|
||||||
template <typename SessionT>
|
|
||||||
void WebsocketServer<SessionT>::sendUserList(websockets::WebsocketsClient &client)
|
|
||||||
{
|
|
||||||
const auto numUsers = userStorage_.numUsers();
|
|
||||||
constexpr size_t constantSlack = 64;
|
|
||||||
DynamicJsonDocument result(JSON_ARRAY_SIZE(numUsers) + numUsers * (USER_STRING_ID_MAX_LEN + 2) + constantSlack);
|
|
||||||
JsonArray arr = result.to<JsonArray>();
|
|
||||||
for (auto userIt = userStorage_.beginWithoutUnassigned(); userIt != userStorage_.end(); ++userIt)
|
|
||||||
arr.add(userIt->stringId());
|
|
||||||
|
|
||||||
char buffer[MAX_USERS * (USER_STRING_ID_MAX_LEN + 1) + constantSlack];
|
|
||||||
size_t bytesWritten = serializeMsgPack(result, buffer, sizeof(buffer));
|
|
||||||
client.sendBinary(buffer, bytesWritten);
|
|
||||||
}
|
|
||||||
|
|
||||||
template <typename SessionT>
|
|
||||||
void WebsocketServer<SessionT>::sendSessionList(websockets::WebsocketsClient &client, const String &userId)
|
|
||||||
{
|
|
||||||
|
|
||||||
User *user = userStorage_.getUserInfo(userId);
|
|
||||||
if (user != nullptr)
|
|
||||||
{
|
|
||||||
DynamicJsonDocument result(JSON_ARRAY_SIZE(user->numSessions()) + user->numSessions() * (sizeof(SessionIdType) + 8));
|
|
||||||
JsonArray arr = result.to<JsonArray>();
|
|
||||||
for (SessionIdType *sIt = user->sessionBegin(); sIt != user->sessionEnd(); ++sIt)
|
|
||||||
arr.add(*sIt);
|
|
||||||
|
|
||||||
size_t bytesToWrite = measureMsgPack(result);
|
|
||||||
char *buffer = (char *)malloc(bytesToWrite);
|
|
||||||
size_t bytesWritten = serializeMsgPack(result, buffer, bytesToWrite);
|
|
||||||
assert(bytesWritten <= bytesToWrite);
|
|
||||||
client.sendBinary(buffer, bytesWritten);
|
|
||||||
free(buffer);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
DynamicJsonDocument result(JSON_ARRAY_SIZE(1) + 8);
|
|
||||||
result.to<JsonArray>();
|
|
||||||
char buffer[32];
|
|
||||||
size_t bytesWritten = serializeMsgPack(result, buffer, sizeof(buffer));
|
|
||||||
client.sendBinary(buffer, bytesWritten);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
template <typename SessionT>
|
|
||||||
void WebsocketServer<SessionT>::sendSessionStartMessages()
|
|
||||||
{
|
|
||||||
SessionStartedMessage msg(sessionManager_.session().getStartTime());
|
|
||||||
for (auto &c : clients_)
|
|
||||||
if (c.available())
|
|
||||||
msg.send(c);
|
|
||||||
running_ = sessionManager_.isMeasuring();
|
|
||||||
}
|
|
||||||
|
|
||||||
template <typename SessionT>
|
|
||||||
void WebsocketServer<SessionT>::sendSessionStopMessages()
|
|
||||||
{
|
|
||||||
SessionStoppedMessage msg;
|
|
||||||
for (auto &c : clients_)
|
|
||||||
if (c.available())
|
|
||||||
msg.send(c);
|
|
||||||
running_ = sessionManager_.isMeasuring();
|
|
||||||
}
|
|
||||||
|
|
||||||
template <typename SessionT>
|
|
||||||
void WebsocketServer<SessionT>::sendNewDataMessages()
|
|
||||||
{
|
|
||||||
using MeasurementT = typename SessionT::MeasurementType;
|
|
||||||
auto &session = sessionManager_.session();
|
|
||||||
|
|
||||||
for (int i = 0; i < MAX_WEBSOCKET_CONNECTIONS; ++i)
|
|
||||||
{
|
|
||||||
auto &c = clients_[i];
|
|
||||||
if (c.available())
|
|
||||||
{
|
|
||||||
MeasurementT *dataToSend = session.getDataPointer() + numSentMeasurements_[i];
|
|
||||||
int32_t numMeasurementsToSend = int32_t(session.numMeasurements()) - int32_t(numSentMeasurements_[i]);
|
|
||||||
if (numMeasurementsToSend > 0)
|
|
||||||
{
|
|
||||||
SessionNewDataMessage<MeasurementT> msg(dataToSend, numMeasurementsToSend);
|
|
||||||
msg.send(c);
|
|
||||||
numSentMeasurements_[i] += msg.numMeasurements();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
template <typename SessionT>
|
|
||||||
void WebsocketServer<SessionT>::sendMessageOnConnection(WebsocketsClient &client)
|
|
||||||
{
|
|
||||||
using MeasurementT = typename SessionT::MeasurementType;
|
|
||||||
|
|
||||||
// Message format:
|
|
||||||
// - uint8_t messageType
|
|
||||||
// - uint8_t running
|
|
||||||
// - uint32_t sessionId
|
|
||||||
// - MeasurementT [] measurements (if running)
|
|
||||||
|
|
||||||
auto &session = sessionManager_.session();
|
|
||||||
const auto numMeasurements = session.numMeasurements();
|
|
||||||
const auto sessionId = session.getStartTime();
|
|
||||||
|
|
||||||
const size_t msgSize = sizeof(uint8_t) + sizeof(uint8_t) + sizeof(sessionId) + sizeof(MeasurementT) * numMeasurements;
|
|
||||||
char *msg = (char *)heap_caps_malloc(msgSize, MALLOC_CAP_SPIRAM);
|
|
||||||
|
|
||||||
char *writeHead = msg;
|
|
||||||
|
|
||||||
*writeHead = INITIAL_INFO;
|
|
||||||
writeHead += sizeof(uint8_t);
|
|
||||||
|
|
||||||
*writeHead = sessionManager_.isMeasuring();
|
|
||||||
writeHead += sizeof(uint8_t);
|
|
||||||
|
|
||||||
*((uint32_t *)writeHead) = sessionManager_.isMeasuring() ? sessionId : 0;
|
|
||||||
writeHead += sizeof(uint32_t);
|
|
||||||
|
|
||||||
assert(writeHead - msg == msgSize - sizeof(MeasurementT) * numMeasurements);
|
|
||||||
|
|
||||||
memcpy(writeHead, session.getDataPointer(), sizeof(MeasurementT) * numMeasurements);
|
|
||||||
client.sendBinary(msg, msgSize);
|
|
||||||
|
|
||||||
free(msg);
|
|
||||||
}
|
|
Loading…
Reference in New Issue