From 94df778a5a8223410ec46e08371ae399588aa685 Mon Sep 17 00:00:00 2001 From: Martin Bauer Date: Sun, 28 Jun 2020 18:26:19 +0200 Subject: [PATCH] Fix in websocket newData message sending --- .../lib/session/SimpleMeasurementSession.h | 11 +- firmware/src/WebsocketServer.h | 304 ++++++++++-------- 2 files changed, 181 insertions(+), 134 deletions(-) diff --git a/firmware/lib/session/SimpleMeasurementSession.h b/firmware/lib/session/SimpleMeasurementSession.h index 950f61d..7bcb204 100644 --- a/firmware/lib/session/SimpleMeasurementSession.h +++ b/firmware/lib/session/SimpleMeasurementSession.h @@ -75,14 +75,13 @@ private: void saveToFileSystem() { static const uint32_t arrayHeaderOffset = ChunkT::arrayHeaderOffset(); - Serial.printf(" -------- Array header offset ---- %u\n", arrayHeaderOffset); const uint32_t numMeasurements = chunk->numMeasurements(); // todo: check this! free doesn't mean that the file writing actually works ok // use error codes of write instead? anyway: test it! - Serial.printf("%ld saveToFileSystem start()\n", millis()); + Serial.printf("%ld saveToFileSystem start\n", millis()); deleteUntilBytesFree(CONFIG_SESSION_MAX_SIZE); - Serial.printf("%ld after deleteUntilBytesFree()\n", millis()); + Serial.printf(" %ld after deleteUntilBytesFree()\n", millis()); String filename = String(CONFIG_DATA_PATH) + "/" + String(chunk->getStartTime()); if (portablefs::exists(filename.c_str())) @@ -94,12 +93,12 @@ private: Measurement_T *startPtr = chunk->getDataPointer() + existingMeasurements; file.write((uint8_t *)(startPtr), measurementsToWrite * sizeof(Measurement_T)); - Serial.printf("%ld Incr Save: before header patch\n", millis()); + Serial.printf(" %ld Incr Save: before header patch\n", millis()); file.seek(arrayHeaderOffset); StreamingMsgPackEncoder encoder(&file); encoder.template sendArrayHeader(numMeasurements); - Serial.printf("%ld total measurements up to now %u\n", millis(), numMeasurements); + Serial.printf(" %ld total measurements up to now %u\n", millis(), numMeasurements); } else { @@ -108,7 +107,7 @@ private: StreamingMsgPackEncoder encoder(&file); chunk->serialize(encoder); } - Serial.printf("%ld saveToFileSystem done-------------\n", millis()); + Serial.printf(" %ld saveToFileSystem done\n", millis()); } void deleteUntilBytesFree(size_t requiredSpace) diff --git a/firmware/src/WebsocketServer.h b/firmware/src/WebsocketServer.h index f7ab0da..40ed804 100644 --- a/firmware/src/WebsocketServer.h +++ b/firmware/src/WebsocketServer.h @@ -13,38 +13,38 @@ template class WebsocketServer { public: - WebsocketServer(SessionManager &sessionManager, int port) - : sessionManager_(sessionManager), nextFreeClient_(0), port_(port), - numSentMeasurements_(0), running_(false) - { - } + WebsocketServer(SessionManager &sessionManager, int port) + : sessionManager_(sessionManager), nextFreeClient_(0), port_(port), + running_(false) + { + } - void begin() - { - server_.listen(port_); - } + void begin() + { + server_.listen(port_); + } - void iteration(); + void iteration(); private: - void reportSessionUpdate(); + void reportSessionUpdate(); - void sendMessageOnConnection(websockets::WebsocketsClient &client); - void sendSessionStartMessages(); - void sendSessionStopMessages(); - void sendNewDataMessages(); + void sendMessageOnConnection(websockets::WebsocketsClient &client); + void sendSessionStartMessages(); + void sendSessionStopMessages(); + void sendNewDataMessages(); - SessionManager &sessionManager_; - int nextFreeClient_; - int port_; + SessionManager &sessionManager_; + int nextFreeClient_; + int port_; - size_t sentMessageCount_; - websockets::WebsocketsServer server_; - websockets::WebsocketsClient clients_[MAX_CONNECTIONS]; + size_t sentMessageCount_; + websockets::WebsocketsServer server_; + websockets::WebsocketsClient clients_[MAX_CONNECTIONS]; - // previous session state - size_t numSentMeasurements_; - bool running_; + // previous session state + size_t numSentMeasurements_[MAX_CONNECTIONS]; + bool running_; }; using websockets::WebsocketsClient; @@ -53,75 +53,81 @@ using websockets::WebsocketsClient; enum MessageType { - INITIAL_INFO = 1, - SESSION_STARTED = 2, - SESSION_STOPPED = 3, - SESSION_NEW_DATA = 4 + // from swim tracker device to frontend + INITIAL_INFO = 1, + SESSION_STARTED = 2, + SESSION_STOPPED = 3, + SESSION_NEW_DATA = 4, + + // from frontend to device + START_SESSION = 5, + STOP_SESSION = 6, + TARE = 7 }; #pragma pack(push, 1) class SessionStartedMessage { public: - SessionStartedMessage(uint32_t id) : messageType_(SESSION_STARTED), sessionId_(id) {} - void send(WebsocketsClient &c) const - { - c.sendBinary((const char *)(this), sizeof(*this)); - } + SessionStartedMessage(uint32_t id) : messageType_(SESSION_STARTED), sessionId_(id) {} + void send(WebsocketsClient &c) const + { + c.sendBinary((const char *)(this), sizeof(*this)); + } private: - uint8_t messageType_; - uint32_t sessionId_; + uint8_t messageType_; + uint32_t sessionId_; }; class SessionStoppedMessage { public: - SessionStoppedMessage() : messageType_(SESSION_STOPPED) {} - void send(WebsocketsClient &c) const - { - c.sendBinary((const char *)(this), sizeof(*this)); - } + SessionStoppedMessage() : messageType_(SESSION_STOPPED) {} + void send(WebsocketsClient &c) const + { + c.sendBinary((const char *)(this), sizeof(*this)); + } private: - uint8_t messageType_; + uint8_t messageType_; }; template class SessionNewDataMessage { public: - // typically a message contains NUM_DATA_CHUNK_SIZE measurements - // if some measurements are skipped, because loop() takes too long - // there might actually be more measurements, to be safe there is an - // additional factor here - static constexpr size_t MAX_MEASUREMENTS = 4 * NUM_DATA_CHUNK_SIZE; + // typically a message contains NUM_DATA_CHUNK_SIZE measurements + // if some measurements are skipped, because loop() takes too long + // there might actually be more measurements, to be safe there is an + // additional factor here + static constexpr size_t MAX_MEASUREMENTS = 4 * NUM_DATA_CHUNK_SIZE; - SessionNewDataMessage(MeasurementT *ptr, size_t numMeasurements) - : messageType_(SESSION_NEW_DATA), numMeasurements_(min(numMeasurements, MAX_MEASUREMENTS)) - { - memcpy(measurements_, ptr, sizeof(MeasurementT) * numMeasurements_); - } + SessionNewDataMessage(MeasurementT *ptr, size_t numMeasurements) + : messageType_(SESSION_NEW_DATA), numMeasurements_(min(numMeasurements, MAX_MEASUREMENTS)) + { + memcpy(measurements_, ptr, sizeof(MeasurementT) * numMeasurements_); + } - void send(WebsocketsClient &c) const - { - c.sendBinary((const char *)(this), numBytes()); - } + void send(WebsocketsClient &c) const + { + c.sendBinary((const char *)(this), numBytes()); + } - size_t numMeasurements() const - { - return numMeasurements_; - } + size_t numMeasurements() const + { + return numMeasurements_; + } private: - size_t numBytes() const { return sizeof(uint8_t) + numMeasurements() * sizeof(MeasurementT); } + size_t numBytes() const { return sizeof(uint8_t) + numMeasurements() * sizeof(MeasurementT); } - // data to be sent - uint8_t messageType_; - MeasurementT measurements_[MAX_MEASUREMENTS]; + // data to be sent + uint8_t messageType_; + MeasurementT measurements_[MAX_MEASUREMENTS]; - // book-keeping - size_t numMeasurements_; + // book-keeping + size_t numMeasurements_; }; #pragma pack(pop) @@ -130,108 +136,150 @@ private: template void WebsocketServer::iteration() { - if (server_.poll()) + using namespace websockets; + auto onMessage = [this](WebsocketsClient &client, WebsocketsMessage message) { + if (message.isPing()) + client.pong(); + else if (message.isBinary()) { - clients_[nextFreeClient_] = server_.accept(); - //clients_[nextFreeClient_].onMessage(onMessage); // TODO - Serial.println("new websocket connection"); - sendMessageOnConnection(clients_[nextFreeClient_]); - nextFreeClient_ = (nextFreeClient_ + 1) % MAX_CONNECTIONS; + const char *data = message.c_str(); + const size_t length = message.length(); + if (length < 1) + { + client.close(CloseReason_UnsupportedData); + return; + } + + uint8_t opCode = uint8_t(data[0]); + switch (opCode) + { + case START_SESSION: + this->sessionManager_.startMeasurements(); + break; + case STOP_SESSION: + this->sessionManager_.stopMeasurements(); + break; + case TARE: + this->sessionManager_.tare(); + break; + default: + client.close(CloseReason_UnsupportedData); + return; + } } + }; - for (int i = 0; i < MAX_CONNECTIONS; ++i) - clients_[i].poll(); + if (server_.poll()) + { + clients_[nextFreeClient_] = server_.accept(); + clients_[nextFreeClient_].onMessage(onMessage); + Serial.println("new websocket connection"); + sendMessageOnConnection(clients_[nextFreeClient_]); + numSentMeasurements_[nextFreeClient_] = sessionManager_.session().numMeasurements(); + nextFreeClient_ = (nextFreeClient_ + 1) % MAX_CONNECTIONS; + } - reportSessionUpdate(); + for (int i = 0; i < MAX_CONNECTIONS; ++i) + clients_[i].poll(); + + reportSessionUpdate(); } template void WebsocketServer::reportSessionUpdate() { - auto &session = sessionManager_.session(); + if (!running_ && sessionManager_.isMeasuring()) + { + sendSessionStartMessages(); + for (int i = 0; i < MAX_CONNECTIONS; ++i) + numSentMeasurements_[i] = 0; + } + else if (running_ && !sessionManager_.isMeasuring()) + { + sendSessionStopMessages(); + for (int i = 0; i < MAX_CONNECTIONS; ++i) + numSentMeasurements_[i] = 0; + } - // start/stop messages - if (!running_ && sessionManager_.isMeasuring()) - sendSessionStartMessages(); - else if (running_ && !sessionManager_.isMeasuring()) - sendSessionStopMessages(); - - // new data - if (session.numMeasurements() - (NUM_DATA_CHUNK_SIZE - 1) > numSentMeasurements_) - { - sendNewDataMessages(); - } + sendNewDataMessages(); } template void WebsocketServer::sendSessionStartMessages() { - SessionStartedMessage msg(sessionManager_.session().getStartTime()); - for (auto &c : clients_) - if (c.available()) - msg.send(c); - running_ = sessionManager_.isMeasuring(); + SessionStartedMessage msg(sessionManager_.session().getStartTime()); + for (auto &c : clients_) + if (c.available()) + msg.send(c); + running_ = sessionManager_.isMeasuring(); } template void WebsocketServer::sendSessionStopMessages() { - SessionStoppedMessage msg; - for (auto &c : clients_) - if (c.available()) - msg.send(c); - running_ = sessionManager_.isMeasuring(); + SessionStoppedMessage msg; + for (auto &c : clients_) + if (c.available()) + msg.send(c); + running_ = sessionManager_.isMeasuring(); } template void WebsocketServer::sendNewDataMessages() { - using MeasurementT = typename SessionT::MeasurementType; - auto &session = sessionManager_.session(); - MeasurementT *dataToSend = session.getDataPointer() + numSentMeasurements_; - size_t numMeasurementsToSend = session.numMeasurements() - numSentMeasurements_; - SessionNewDataMessage msg(dataToSend, numMeasurementsToSend); + using MeasurementT = typename SessionT::MeasurementType; + auto &session = sessionManager_.session(); - for (auto &c : clients_) - if (c.available()) - msg.send(c); - - numSentMeasurements_ += msg.numMeasurements(); + for (int i = 0; i < MAX_CONNECTIONS; ++i) + { + auto &c = clients_[i]; + if (c.available()) + { + MeasurementT *dataToSend = session.getDataPointer() + numSentMeasurements_[i]; + int32_t numMeasurementsToSend = int32_t(session.numMeasurements()) - int32_t(numSentMeasurements_[i]); + if (numMeasurementsToSend > 0) + { + SessionNewDataMessage msg(dataToSend, numMeasurementsToSend); + msg.send(c); + numSentMeasurements_[i] += msg.numMeasurements(); + } + } + } } template void WebsocketServer::sendMessageOnConnection(WebsocketsClient &client) { - using MeasurementT = typename SessionT::MeasurementType; + using MeasurementT = typename SessionT::MeasurementType; - // Message format: - // - uint8_t messageType - // - uint8_t running - // - uint32_t sessionId - // - MeasurementT [] measurements (if running) + // Message format: + // - uint8_t messageType + // - uint8_t running + // - uint32_t sessionId + // - MeasurementT [] measurements (if running) - auto &session = sessionManager_.session(); - const auto numMeasurements = session.numMeasurements(); - const auto sessionId = session.getStartTime(); + auto &session = sessionManager_.session(); + const auto numMeasurements = session.numMeasurements(); + const auto sessionId = session.getStartTime(); - const size_t msgSize = sizeof(uint8_t) + sizeof(uint8_t) + sizeof(sessionId) + sizeof(MeasurementT) * numMeasurements; - char *msg = (char *)heap_caps_malloc(msgSize, MALLOC_CAP_SPIRAM); + const size_t msgSize = sizeof(uint8_t) + sizeof(uint8_t) + sizeof(sessionId) + sizeof(MeasurementT) * numMeasurements; + char *msg = (char *)heap_caps_malloc(msgSize, MALLOC_CAP_SPIRAM); - char *writeHead = msg; + char *writeHead = msg; - *writeHead = INITIAL_INFO; - writeHead += sizeof(uint8_t); + *writeHead = INITIAL_INFO; + writeHead += sizeof(uint8_t); - *writeHead = sessionManager_.isMeasuring(); - writeHead += sizeof(uint8_t); + *writeHead = sessionManager_.isMeasuring(); + writeHead += sizeof(uint8_t); - *((uint32_t *)writeHead) = sessionManager_.isMeasuring() ? sessionId : 0; - writeHead += sizeof(uint32_t); + *((uint32_t *)writeHead) = sessionManager_.isMeasuring() ? sessionId : 0; + writeHead += sizeof(uint32_t); - assert(writeHead - msg == msgSize - sizeof(MeasurementT) * numMeasurements); + assert(writeHead - msg == msgSize - sizeof(MeasurementT) * numMeasurements); - memcpy(writeHead, session.getDataPointer(), sizeof(MeasurementT) * numMeasurements); - client.sendBinary(msg, msgSize); + memcpy(writeHead, session.getDataPointer(), sizeof(MeasurementT) * numMeasurements); + client.sendBinary(msg, msgSize); - free(msg); + free(msg); }