diff --git a/firmware/src/MessageCodes.h b/firmware/src/MessageCodes.h index f8c4325..00e78ff 100644 --- a/firmware/src/MessageCodes.h +++ b/firmware/src/MessageCodes.h @@ -14,6 +14,7 @@ enum class MessageCode : uint8_t ANSWER_SESSION_LIST = 7, WIFI_STATE_RESPONSE = 8, WIFI_SCAN_RESPONSE = 9, + APP_LAYER_PING = 10, // from frontend to device START_SESSION = 128, diff --git a/firmware/src/SessionAPI.h b/firmware/src/SessionAPI.h index 611211a..fa42937 100644 --- a/firmware/src/SessionAPI.h +++ b/firmware/src/SessionAPI.h @@ -121,7 +121,19 @@ void SessionAPI::iteration(TServer &server) numSentMeasurements_[i] = 0; running_ = false; } - sendNewDataMessages(server); + + if(running_) + sendNewDataMessages(server); + else + { + static unsigned long lastPing = 0; + auto timeNow = millis(); + if(timeNow - lastPing > CONFIG_PING_INTERVAL_MS) + { + server.sendToAll(MessageCode::APP_LAYER_PING); + lastPing = timeNow; + } + } } template @@ -168,17 +180,12 @@ void SessionAPI::sendNewDataMessages(TServer &server) if (numMeasurementsToSend >= WAIT_UNTIL_AT_LEAST_NUM_MEASUREMENTS) { - //Serial.printf("-> i=%d, numSentMeasurements_[i]=%d, to send %d\n", i, numSentMeasurements_[i], numMeasurementsToSend); - if (numMeasurementsToSend > MAX_MEASUREMENTS_PER_MSG) numMeasurementsToSend = MAX_MEASUREMENTS_PER_MSG; - //Serial.printf(" Sending %d measurements\n", numMeasurementsToSend); - memcpy(buffer + headerSize, dataToSend, sizeof(MeasurementT) * numMeasurementsToSend); c.sendBinary(buffer, headerSize + sizeof(MeasurementT) * numMeasurementsToSend); numSentMeasurements_[i] += numMeasurementsToSend; - //Serial.printf(" Sent measurements %d\n", numSentMeasurements_[i]); } } } diff --git a/firmware/src/SwimTrackerConfig.h b/firmware/src/SwimTrackerConfig.h index f8e57b1..2676d03 100644 --- a/firmware/src/SwimTrackerConfig.h +++ b/firmware/src/SwimTrackerConfig.h @@ -10,10 +10,10 @@ constexpr const char *CONFIG_HOSTNAME = "swimtracker"; // ------------------------------------- Hardware & Measurement Settings ------------------------------------------------------------ -const uint8_t CONFIG_MEASUREMENT_AVG_COUNT = 1; // number of measurements in normal phase -const uint8_t CONFIG_TARE_AVG_COUNT = 20; // number of measurements in tare-phase (to find 0 ) -const int CONFIG_MEASURE_DELAY = 100; // interval in ms between measurements -const uint32_t CONFIG_SESSION_MAX_LENGTH_HOURS = 3; // maximum length of one session +constexpr uint8_t CONFIG_MEASUREMENT_AVG_COUNT = 1; // number of measurements in normal phase +constexpr uint8_t CONFIG_TARE_AVG_COUNT = 20; // number of measurements in tare-phase (to find 0 ) +constexpr int CONFIG_MEASURE_DELAY = 100; // interval in ms between measurements +constexpr uint32_t CONFIG_SESSION_MAX_LENGTH_HOURS = 3; // maximum length of one session constexpr const char *CONFIG_DATA_PATH = "/dat"; // folder in SPIFFS file system to store measurement data using MeasurementT = uint16_t; // data type for one measurement #ifdef NEW_HEAVY_LOAD_CELL @@ -21,9 +21,9 @@ const int CONFIG_VALUE_RIGHT_SHIFT = 3; // uint32 measurements are divided by th #else const int CONFIG_VALUE_RIGHT_SHIFT = 7; #endif -const MeasurementT CONFIG_KG_FACTOR_INV = 701; // after shifting - how many "measurement units" are one kg +constexpr MeasurementT CONFIG_KG_FACTOR_INV = 701; // after shifting - how many "measurement units" are one kg -static constexpr int MAX_WEBSOCKET_CONNECTIONS = 3; // maximal number of websocket connections maintained at the same time +constexpr int MAX_WEBSOCKET_CONNECTIONS = 3; // maximal number of websocket connections maintained at the same time constexpr const char *UPDATE_URL = "https://swimtracker-update.bauer.tech/firmware.bin"; @@ -36,15 +36,17 @@ constexpr MeasurementT CONFIG_AUTO_STOP_THRESHOLD = CONFIG_KG_FACTOR_INV * 1; //uint32_t CONFIG_AUTO_STOP_NUM_MEASUREMENTS = (1000 / CONFIG_MEASURE_DELAY) * 60 * 15; constexpr uint32_t CONFIG_AUTO_STOP_NUM_MEASUREMENTS = (1000 / CONFIG_MEASURE_DELAY) * 30; +constexpr unsigned long CONFIG_PING_INTERVAL_MS = 1000; + // ------------------------------------- Derived Settings ----------------------------------------------------------------------------- constexpr uint32_t CONFIG_SESSION_MAX_SIZE = CONFIG_SESSION_MAX_LENGTH_HOURS * 3600 * (1000 / CONFIG_MEASURE_DELAY) * sizeof(uint16_t); static_assert(CONFIG_SESSION_MAX_SIZE < 1024 * 1024, "Measurement data takes more than 1MiB space"); #ifdef _HW_V_20 -const int CONFIG_SCALE_DOUT_PIN = 23; -const int CONFIG_SCALE_SCK_PIN = 22; +constexpr int CONFIG_SCALE_DOUT_PIN = 23; +constexpr int CONFIG_SCALE_SCK_PIN = 22; #else -const int CONFIG_SCALE_DOUT_PIN = 22; -const int CONFIG_SCALE_SCK_PIN = 23; +constexpr int CONFIG_SCALE_DOUT_PIN = 22; +constexpr int CONFIG_SCALE_SCK_PIN = 23; #endif diff --git a/firmware/src/WebsocketServer.h b/firmware/src/WebsocketServer.h index dbacabe..3244d6a 100644 --- a/firmware/src/WebsocketServer.h +++ b/firmware/src/WebsocketServer.h @@ -32,7 +32,7 @@ public: using namespace websockets; const auto onMessage = [this](WebsocketsClient &client, WebsocketsMessage message) { - if (message.isPing()) + if (message.isPing()) // websocket ping client.pong(); else if (message.isBinary()) { @@ -48,11 +48,15 @@ public: if (server_.poll()) { - Serial.println("new websocket connection"); + Serial.printf("new websocket connection, storing at pos %d - occupancy: ", nextFreeClient_); clients_[nextFreeClient_] = server_.accept(); clients_[nextFreeClient_].onMessage(onMessage); this->onClientConnectImpl(clients_[nextFreeClient_]); nextFreeClient_ = (nextFreeClient_ + 1) % MAX_WEBSOCKET_CONNECTIONS; + + for (int i = 0; i < MAX_WEBSOCKET_CONNECTIONS; ++i) + Serial.print((clients_[i].available()) ? "x" : "o"); + Serial.print("\n"); } for (int i = 0; i < MAX_WEBSOCKET_CONNECTIONS; ++i) @@ -89,7 +93,7 @@ public: { for (int i = 0; i < MAX_WEBSOCKET_CONNECTIONS; ++i) if (clients_[i].available()) - clients_[i].sendBinary((const char*)&msgCode, sizeof(MessageCode)); + clients_[i].sendBinary((const char *)&msgCode, sizeof(MessageCode)); } websockets::WebsocketsClient &client(size_t i) { return clients_[i]; } @@ -137,7 +141,7 @@ private: websockets::WebsocketsClient clients_[MAX_WEBSOCKET_CONNECTIONS]; }; -template +template inline WebsocketServer> makeWebsocketServer(int port, ApiManagers... managers) { auto tuple = std::make_tuple(managers...); diff --git a/firmware/src/WebsocketServerOld.h b/firmware/src/WebsocketServerOld.h deleted file mode 100644 index eb4611c..0000000 --- a/firmware/src/WebsocketServerOld.h +++ /dev/null @@ -1,436 +0,0 @@ - -#pragma once -#include "Dtypes.h" -#include "UserDB.h" -#include "MessageCodes.h" - -#include - -template -class SessionManager; - -static constexpr int NUM_DATA_CHUNK_SIZE = 1; - -template -class WebsocketInterface -{ -public: - WebsocketInterface(int port) : port_(port), nextFreeClient_(0) - { - } - - void begin() - { - server_.listen(port_); - } - - void iteration(); - - template - void sendToAll(MessageCode msgCode, const JsonDocument &content) - { - char buffer[bufferSize]; - buffer[0] = msgCode; - size_t bytesWritten = serializeMsgPack(content, buffer + sizeof(msgCode), bufferSize - sizeof(msgCode)); - for (int i = 0; i < MAX_WEBSOCKET_CONNECTIONS; ++i) - if (clients_[i].available()) - clients_[i].sendBinary(buffer, bytesWritten); - } - - void sendToAll(MessageCode msgCode, const JsonDocument &content) - { - size_t expectedSize = measureMsgPack(content); - char *buffer = (char *)malloc(expectedSize + sizeof(msgCode)); - buffer[0] = msgCode; - size_t bytesWritten = serializeMsgPack(content, buffer + sizeof(msgCode), expectedSize); - for (int i = 0; i < MAX_WEBSOCKET_CONNECTIONS; ++i) - if (clients_[i].available()) - clients_[i].sendBinary(buffer, bytesWritten + sizeof(msgCode)); - - free(buffer); - } - - void sendToAll(MessageCode msgCode) - { - for (int i = 0; i < MAX_WEBSOCKET_CONNECTIONS; ++i) - if (clients_[i].available()) - clients_[i].sendBinary(&msgCode, sizeof(MessageCode)); - } - - websockets::WebsocketsClient &client(size_t i) { return clients_[i]; } - -private: - int port_; - int nextFreeClient_; - - websockets::WebsocketsServer server_; - websockets::WebsocketsClient clients_[MAX_WEBSOCKET_CONNECTIONS]; -}; - -template -inline void sendToClient(websockets::WebsocketsClient &client, MessageCode msgCode, const JsonDocument &content) -{ - char buffer[bufferSize]; - buffer[0] = msgCode; - size_t bytesWritten = serializeMsgPack(content, buffer + sizeof(msgCode), bufferSize - sizeof(msgCode)); - client.sendBinary(buffer, bytesWritten); -} - -inline void sendToClient(websockets::WebsocketsClient &client, MessageCode msgCode, const JsonDocument &content) -{ - size_t expectedSize = measureMsgPack(content); - char *buffer = (char *)malloc(expectedSize + sizeof(msgCode)); - buffer[0] = static_cast(msgCode); - size_t bytesWritten = serializeMsgPack(content, buffer + sizeof(msgCode), expectedSize); - client.sendBinary(buffer, bytesWritten + sizeof(msgCode)); - free(buffer); -} - -inline void sendErrorToClient(websockets::WebsocketsClient &client, const char *msg) -{ - DynamicJsonDocument doc(strlen(msg) + 64); - doc["msg"] = msg; - sendToClient(client, MessageCode::ERROR, doc); -} - -template -class WebsocketServer -{ -public: - WebsocketServer(SessionManager &sessionManager, UserStorage &userStorage, int port) - : sessionManager_(sessionManager), userStorage_(userStorage), nextFreeClient_(0), port_(port), - running_(false) - { - } - - void begin() - { - server_.listen(port_); - } - - void iteration(); - -private: - void reportSessionUpdate(); - - void sendMessageOnConnection(websockets::WebsocketsClient &client); - void sendSessionStartMessages(); - void sendSessionStopMessages(); - void sendNewDataMessages(); - - void sendUserList(websockets::WebsocketsClient &client); - void sendSessionList(websockets::WebsocketsClient &client, const String &userId); - - SessionManager &sessionManager_; - UserStorage &userStorage_; - - int nextFreeClient_; - int port_; - - size_t sentMessageCount_; - websockets::WebsocketsServer server_; - websockets::WebsocketsClient clients_[MAX_WEBSOCKET_CONNECTIONS]; - - // previous session state - size_t numSentMeasurements_[MAX_WEBSOCKET_CONNECTIONS]; - bool running_; -}; - -using websockets::WebsocketsClient; - -// ------------------------------------- Message types & classes --------------------------- - -enum MessageType -{ - // from swim tracker device to frontend - INITIAL_INFO = 1, - SESSION_STARTED = 2, - SESSION_STOPPED = 3, - SESSION_NEW_DATA = 4, - ANSWER_USER_LIST = 5, - ANSWER_SESSION_LIST = 6, - - // from frontend to device - START_SESSION = 7, - STOP_SESSION = 8, - TARE = 9, - QUERY_USER_LIST = 10, - QUERY_SESSION_LIST = 11, -}; - -#pragma pack(push, 1) -class SessionStartedMessage -{ -public: - SessionStartedMessage(uint32_t id) : messageType_(SESSION_STARTED), sessionId_(id) {} - void send(WebsocketsClient &c) const - { - c.sendBinary((const char *)(this), sizeof(*this)); - } - -private: - uint8_t messageType_; - uint32_t sessionId_; -}; - -class SessionStoppedMessage -{ -public: - SessionStoppedMessage() : messageType_(SESSION_STOPPED) {} - void send(WebsocketsClient &c) const - { - c.sendBinary((const char *)(this), sizeof(*this)); - } - -private: - uint8_t messageType_; -}; - -template -class SessionNewDataMessage -{ -public: - // typically a message contains NUM_DATA_CHUNK_SIZE measurements - // if some measurements are skipped, because loop() takes too long - // there might actually be more measurements, to be safe there is an - // additional factor here - static constexpr size_t MAX_MEASUREMENTS = 4 * NUM_DATA_CHUNK_SIZE; - - SessionNewDataMessage(MeasurementT *ptr, size_t numMeasurements) - : messageType_(SESSION_NEW_DATA), numMeasurements_(min(numMeasurements, MAX_MEASUREMENTS)) - { - memcpy(measurements_, ptr, sizeof(MeasurementT) * numMeasurements_); - } - - void send(WebsocketsClient &c) const - { - c.sendBinary((const char *)(this), numBytes()); - } - - size_t numMeasurements() const - { - return numMeasurements_; - } - -private: - size_t numBytes() const { return sizeof(uint8_t) + numMeasurements() * sizeof(MeasurementT); } - - // data to be sent - uint8_t messageType_; - MeasurementT measurements_[MAX_MEASUREMENTS]; - - // book-keeping - size_t numMeasurements_; -}; - -#pragma pack(pop) - -// ------------------------------------- WebsocketServer members --------------------------- - -template -void WebsocketServer::iteration() -{ - using namespace websockets; - - auto onMessage = [this](WebsocketsClient &client, WebsocketsMessage message) - { - if (message.isPing()) - client.pong(); - else if (message.isBinary()) - { - const char *data = message.c_str(); - const size_t length = message.length(); - if (length < 1) - { - client.close(CloseReason_UnsupportedData); - return; - } - - uint8_t opCode = uint8_t(data[0]); - switch (opCode) - { - case START_SESSION: - this->sessionManager_.startMeasurements(); - break; - case STOP_SESSION: - this->sessionManager_.stopMeasurements(); - break; - case TARE: - this->sessionManager_.tare(); - break; - case QUERY_USER_LIST: - this->sendUserList(client); - break; - case QUERY_SESSION_LIST: - { - StaticJsonDocument doc; - deserializeMsgPack(doc, data, length); - String userId = doc.as(); - if (userId.length() > 0) - this->sendSessionList(client, userId); - } - break; - default: - client.close(CloseReason_UnsupportedData); - return; - } - } - }; - - if (server_.poll()) - { - clients_[nextFreeClient_] = server_.accept(); - clients_[nextFreeClient_].onMessage(onMessage); - Serial.println("new websocket connection"); - sendMessageOnConnection(clients_[nextFreeClient_]); - numSentMeasurements_[nextFreeClient_] = sessionManager_.session().numMeasurements(); - nextFreeClient_ = (nextFreeClient_ + 1) % MAX_WEBSOCKET_CONNECTIONS; - } - - for (int i = 0; i < MAX_WEBSOCKET_CONNECTIONS; ++i) - clients_[i].poll(); - - reportSessionUpdate(); -} - -template -void WebsocketServer::reportSessionUpdate() -{ - if (!running_ && sessionManager_.isMeasuring()) - { - sendSessionStartMessages(); - for (int i = 0; i < MAX_WEBSOCKET_CONNECTIONS; ++i) - numSentMeasurements_[i] = 0; - } - else if (running_ && !sessionManager_.isMeasuring()) - { - sendSessionStopMessages(); - for (int i = 0; i < MAX_WEBSOCKET_CONNECTIONS; ++i) - numSentMeasurements_[i] = 0; - } - sendNewDataMessages(); -} - -template -void WebsocketServer::sendUserList(websockets::WebsocketsClient &client) -{ - const auto numUsers = userStorage_.numUsers(); - constexpr size_t constantSlack = 64; - DynamicJsonDocument result(JSON_ARRAY_SIZE(numUsers) + numUsers * (USER_STRING_ID_MAX_LEN + 2) + constantSlack); - JsonArray arr = result.to(); - for (auto userIt = userStorage_.beginWithoutUnassigned(); userIt != userStorage_.end(); ++userIt) - arr.add(userIt->stringId()); - - char buffer[MAX_USERS * (USER_STRING_ID_MAX_LEN + 1) + constantSlack]; - size_t bytesWritten = serializeMsgPack(result, buffer, sizeof(buffer)); - client.sendBinary(buffer, bytesWritten); -} - -template -void WebsocketServer::sendSessionList(websockets::WebsocketsClient &client, const String &userId) -{ - - User *user = userStorage_.getUserInfo(userId); - if (user != nullptr) - { - DynamicJsonDocument result(JSON_ARRAY_SIZE(user->numSessions()) + user->numSessions() * (sizeof(SessionIdType) + 8)); - JsonArray arr = result.to(); - for (SessionIdType *sIt = user->sessionBegin(); sIt != user->sessionEnd(); ++sIt) - arr.add(*sIt); - - size_t bytesToWrite = measureMsgPack(result); - char *buffer = (char *)malloc(bytesToWrite); - size_t bytesWritten = serializeMsgPack(result, buffer, bytesToWrite); - assert(bytesWritten <= bytesToWrite); - client.sendBinary(buffer, bytesWritten); - free(buffer); - } - else - { - DynamicJsonDocument result(JSON_ARRAY_SIZE(1) + 8); - result.to(); - char buffer[32]; - size_t bytesWritten = serializeMsgPack(result, buffer, sizeof(buffer)); - client.sendBinary(buffer, bytesWritten); - } -} - -template -void WebsocketServer::sendSessionStartMessages() -{ - SessionStartedMessage msg(sessionManager_.session().getStartTime()); - for (auto &c : clients_) - if (c.available()) - msg.send(c); - running_ = sessionManager_.isMeasuring(); -} - -template -void WebsocketServer::sendSessionStopMessages() -{ - SessionStoppedMessage msg; - for (auto &c : clients_) - if (c.available()) - msg.send(c); - running_ = sessionManager_.isMeasuring(); -} - -template -void WebsocketServer::sendNewDataMessages() -{ - using MeasurementT = typename SessionT::MeasurementType; - auto &session = sessionManager_.session(); - - for (int i = 0; i < MAX_WEBSOCKET_CONNECTIONS; ++i) - { - auto &c = clients_[i]; - if (c.available()) - { - MeasurementT *dataToSend = session.getDataPointer() + numSentMeasurements_[i]; - int32_t numMeasurementsToSend = int32_t(session.numMeasurements()) - int32_t(numSentMeasurements_[i]); - if (numMeasurementsToSend > 0) - { - SessionNewDataMessage msg(dataToSend, numMeasurementsToSend); - msg.send(c); - numSentMeasurements_[i] += msg.numMeasurements(); - } - } - } -} - -template -void WebsocketServer::sendMessageOnConnection(WebsocketsClient &client) -{ - using MeasurementT = typename SessionT::MeasurementType; - - // Message format: - // - uint8_t messageType - // - uint8_t running - // - uint32_t sessionId - // - MeasurementT [] measurements (if running) - - auto &session = sessionManager_.session(); - const auto numMeasurements = session.numMeasurements(); - const auto sessionId = session.getStartTime(); - - const size_t msgSize = sizeof(uint8_t) + sizeof(uint8_t) + sizeof(sessionId) + sizeof(MeasurementT) * numMeasurements; - char *msg = (char *)heap_caps_malloc(msgSize, MALLOC_CAP_SPIRAM); - - char *writeHead = msg; - - *writeHead = INITIAL_INFO; - writeHead += sizeof(uint8_t); - - *writeHead = sessionManager_.isMeasuring(); - writeHead += sizeof(uint8_t); - - *((uint32_t *)writeHead) = sessionManager_.isMeasuring() ? sessionId : 0; - writeHead += sizeof(uint32_t); - - assert(writeHead - msg == msgSize - sizeof(MeasurementT) * numMeasurements); - - memcpy(writeHead, session.getDataPointer(), sizeof(MeasurementT) * numMeasurements); - client.sendBinary(msg, msgSize); - - free(msg); -}