Fix in websocket newData message sending

This commit is contained in:
Martin Bauer 2020-06-28 18:26:19 +02:00
parent 0f7389744d
commit 94df778a5a
2 changed files with 181 additions and 134 deletions

View File

@ -75,14 +75,13 @@ private:
void saveToFileSystem() void saveToFileSystem()
{ {
static const uint32_t arrayHeaderOffset = ChunkT::arrayHeaderOffset(); static const uint32_t arrayHeaderOffset = ChunkT::arrayHeaderOffset();
Serial.printf(" -------- Array header offset ---- %u\n", arrayHeaderOffset);
const uint32_t numMeasurements = chunk->numMeasurements(); const uint32_t numMeasurements = chunk->numMeasurements();
// todo: check this! free doesn't mean that the file writing actually works ok // todo: check this! free doesn't mean that the file writing actually works ok
// use error codes of write instead? anyway: test it! // use error codes of write instead? anyway: test it!
Serial.printf("%ld saveToFileSystem start()\n", millis()); Serial.printf("%ld saveToFileSystem start\n", millis());
deleteUntilBytesFree(CONFIG_SESSION_MAX_SIZE); deleteUntilBytesFree(CONFIG_SESSION_MAX_SIZE);
Serial.printf("%ld after deleteUntilBytesFree()\n", millis()); Serial.printf(" %ld after deleteUntilBytesFree()\n", millis());
String filename = String(CONFIG_DATA_PATH) + "/" + String(chunk->getStartTime()); String filename = String(CONFIG_DATA_PATH) + "/" + String(chunk->getStartTime());
if (portablefs::exists(filename.c_str())) if (portablefs::exists(filename.c_str()))
@ -94,12 +93,12 @@ private:
Measurement_T *startPtr = chunk->getDataPointer() + existingMeasurements; Measurement_T *startPtr = chunk->getDataPointer() + existingMeasurements;
file.write((uint8_t *)(startPtr), measurementsToWrite * sizeof(Measurement_T)); file.write((uint8_t *)(startPtr), measurementsToWrite * sizeof(Measurement_T));
Serial.printf("%ld Incr Save: before header patch\n", millis()); Serial.printf(" %ld Incr Save: before header patch\n", millis());
file.seek(arrayHeaderOffset); file.seek(arrayHeaderOffset);
StreamingMsgPackEncoder<portablefs::File> encoder(&file); StreamingMsgPackEncoder<portablefs::File> encoder(&file);
encoder.template sendArrayHeader<Measurement_T>(numMeasurements); encoder.template sendArrayHeader<Measurement_T>(numMeasurements);
Serial.printf("%ld total measurements up to now %u\n", millis(), numMeasurements); Serial.printf(" %ld total measurements up to now %u\n", millis(), numMeasurements);
} }
else else
{ {
@ -108,7 +107,7 @@ private:
StreamingMsgPackEncoder<portablefs::File> encoder(&file); StreamingMsgPackEncoder<portablefs::File> encoder(&file);
chunk->serialize(encoder); chunk->serialize(encoder);
} }
Serial.printf("%ld saveToFileSystem done-------------\n", millis()); Serial.printf(" %ld saveToFileSystem done\n", millis());
} }
void deleteUntilBytesFree(size_t requiredSpace) void deleteUntilBytesFree(size_t requiredSpace)

View File

@ -13,38 +13,38 @@ template <typename SessionT>
class WebsocketServer class WebsocketServer
{ {
public: public:
WebsocketServer(SessionManager<SessionT> &sessionManager, int port) WebsocketServer(SessionManager<SessionT> &sessionManager, int port)
: sessionManager_(sessionManager), nextFreeClient_(0), port_(port), : sessionManager_(sessionManager), nextFreeClient_(0), port_(port),
numSentMeasurements_(0), running_(false) running_(false)
{ {
} }
void begin() void begin()
{ {
server_.listen(port_); server_.listen(port_);
} }
void iteration(); void iteration();
private: private:
void reportSessionUpdate(); void reportSessionUpdate();
void sendMessageOnConnection(websockets::WebsocketsClient &client); void sendMessageOnConnection(websockets::WebsocketsClient &client);
void sendSessionStartMessages(); void sendSessionStartMessages();
void sendSessionStopMessages(); void sendSessionStopMessages();
void sendNewDataMessages(); void sendNewDataMessages();
SessionManager<SessionT> &sessionManager_; SessionManager<SessionT> &sessionManager_;
int nextFreeClient_; int nextFreeClient_;
int port_; int port_;
size_t sentMessageCount_; size_t sentMessageCount_;
websockets::WebsocketsServer server_; websockets::WebsocketsServer server_;
websockets::WebsocketsClient clients_[MAX_CONNECTIONS]; websockets::WebsocketsClient clients_[MAX_CONNECTIONS];
// previous session state // previous session state
size_t numSentMeasurements_; size_t numSentMeasurements_[MAX_CONNECTIONS];
bool running_; bool running_;
}; };
using websockets::WebsocketsClient; using websockets::WebsocketsClient;
@ -53,75 +53,81 @@ using websockets::WebsocketsClient;
enum MessageType enum MessageType
{ {
INITIAL_INFO = 1, // from swim tracker device to frontend
SESSION_STARTED = 2, INITIAL_INFO = 1,
SESSION_STOPPED = 3, SESSION_STARTED = 2,
SESSION_NEW_DATA = 4 SESSION_STOPPED = 3,
SESSION_NEW_DATA = 4,
// from frontend to device
START_SESSION = 5,
STOP_SESSION = 6,
TARE = 7
}; };
#pragma pack(push, 1) #pragma pack(push, 1)
class SessionStartedMessage class SessionStartedMessage
{ {
public: public:
SessionStartedMessage(uint32_t id) : messageType_(SESSION_STARTED), sessionId_(id) {} SessionStartedMessage(uint32_t id) : messageType_(SESSION_STARTED), sessionId_(id) {}
void send(WebsocketsClient &c) const void send(WebsocketsClient &c) const
{ {
c.sendBinary((const char *)(this), sizeof(*this)); c.sendBinary((const char *)(this), sizeof(*this));
} }
private: private:
uint8_t messageType_; uint8_t messageType_;
uint32_t sessionId_; uint32_t sessionId_;
}; };
class SessionStoppedMessage class SessionStoppedMessage
{ {
public: public:
SessionStoppedMessage() : messageType_(SESSION_STOPPED) {} SessionStoppedMessage() : messageType_(SESSION_STOPPED) {}
void send(WebsocketsClient &c) const void send(WebsocketsClient &c) const
{ {
c.sendBinary((const char *)(this), sizeof(*this)); c.sendBinary((const char *)(this), sizeof(*this));
} }
private: private:
uint8_t messageType_; uint8_t messageType_;
}; };
template <typename MeasurementT> template <typename MeasurementT>
class SessionNewDataMessage class SessionNewDataMessage
{ {
public: public:
// typically a message contains NUM_DATA_CHUNK_SIZE measurements // typically a message contains NUM_DATA_CHUNK_SIZE measurements
// if some measurements are skipped, because loop() takes too long // if some measurements are skipped, because loop() takes too long
// there might actually be more measurements, to be safe there is an // there might actually be more measurements, to be safe there is an
// additional factor here // additional factor here
static constexpr size_t MAX_MEASUREMENTS = 4 * NUM_DATA_CHUNK_SIZE; static constexpr size_t MAX_MEASUREMENTS = 4 * NUM_DATA_CHUNK_SIZE;
SessionNewDataMessage(MeasurementT *ptr, size_t numMeasurements) SessionNewDataMessage(MeasurementT *ptr, size_t numMeasurements)
: messageType_(SESSION_NEW_DATA), numMeasurements_(min(numMeasurements, MAX_MEASUREMENTS)) : messageType_(SESSION_NEW_DATA), numMeasurements_(min(numMeasurements, MAX_MEASUREMENTS))
{ {
memcpy(measurements_, ptr, sizeof(MeasurementT) * numMeasurements_); memcpy(measurements_, ptr, sizeof(MeasurementT) * numMeasurements_);
} }
void send(WebsocketsClient &c) const void send(WebsocketsClient &c) const
{ {
c.sendBinary((const char *)(this), numBytes()); c.sendBinary((const char *)(this), numBytes());
} }
size_t numMeasurements() const size_t numMeasurements() const
{ {
return numMeasurements_; return numMeasurements_;
} }
private: private:
size_t numBytes() const { return sizeof(uint8_t) + numMeasurements() * sizeof(MeasurementT); } size_t numBytes() const { return sizeof(uint8_t) + numMeasurements() * sizeof(MeasurementT); }
// data to be sent // data to be sent
uint8_t messageType_; uint8_t messageType_;
MeasurementT measurements_[MAX_MEASUREMENTS]; MeasurementT measurements_[MAX_MEASUREMENTS];
// book-keeping // book-keeping
size_t numMeasurements_; size_t numMeasurements_;
}; };
#pragma pack(pop) #pragma pack(pop)
@ -130,108 +136,150 @@ private:
template <typename SessionT> template <typename SessionT>
void WebsocketServer<SessionT>::iteration() void WebsocketServer<SessionT>::iteration()
{ {
if (server_.poll()) using namespace websockets;
auto onMessage = [this](WebsocketsClient &client, WebsocketsMessage message) {
if (message.isPing())
client.pong();
else if (message.isBinary())
{ {
clients_[nextFreeClient_] = server_.accept(); const char *data = message.c_str();
//clients_[nextFreeClient_].onMessage(onMessage); // TODO const size_t length = message.length();
Serial.println("new websocket connection"); if (length < 1)
sendMessageOnConnection(clients_[nextFreeClient_]); {
nextFreeClient_ = (nextFreeClient_ + 1) % MAX_CONNECTIONS; client.close(CloseReason_UnsupportedData);
return;
}
uint8_t opCode = uint8_t(data[0]);
switch (opCode)
{
case START_SESSION:
this->sessionManager_.startMeasurements();
break;
case STOP_SESSION:
this->sessionManager_.stopMeasurements();
break;
case TARE:
this->sessionManager_.tare();
break;
default:
client.close(CloseReason_UnsupportedData);
return;
}
} }
};
for (int i = 0; i < MAX_CONNECTIONS; ++i) if (server_.poll())
clients_[i].poll(); {
clients_[nextFreeClient_] = server_.accept();
clients_[nextFreeClient_].onMessage(onMessage);
Serial.println("new websocket connection");
sendMessageOnConnection(clients_[nextFreeClient_]);
numSentMeasurements_[nextFreeClient_] = sessionManager_.session().numMeasurements();
nextFreeClient_ = (nextFreeClient_ + 1) % MAX_CONNECTIONS;
}
reportSessionUpdate(); for (int i = 0; i < MAX_CONNECTIONS; ++i)
clients_[i].poll();
reportSessionUpdate();
} }
template <typename SessionT> template <typename SessionT>
void WebsocketServer<SessionT>::reportSessionUpdate() void WebsocketServer<SessionT>::reportSessionUpdate()
{ {
auto &session = sessionManager_.session(); if (!running_ && sessionManager_.isMeasuring())
{
sendSessionStartMessages();
for (int i = 0; i < MAX_CONNECTIONS; ++i)
numSentMeasurements_[i] = 0;
}
else if (running_ && !sessionManager_.isMeasuring())
{
sendSessionStopMessages();
for (int i = 0; i < MAX_CONNECTIONS; ++i)
numSentMeasurements_[i] = 0;
}
// start/stop messages sendNewDataMessages();
if (!running_ && sessionManager_.isMeasuring())
sendSessionStartMessages();
else if (running_ && !sessionManager_.isMeasuring())
sendSessionStopMessages();
// new data
if (session.numMeasurements() - (NUM_DATA_CHUNK_SIZE - 1) > numSentMeasurements_)
{
sendNewDataMessages();
}
} }
template <typename SessionT> template <typename SessionT>
void WebsocketServer<SessionT>::sendSessionStartMessages() void WebsocketServer<SessionT>::sendSessionStartMessages()
{ {
SessionStartedMessage msg(sessionManager_.session().getStartTime()); SessionStartedMessage msg(sessionManager_.session().getStartTime());
for (auto &c : clients_) for (auto &c : clients_)
if (c.available()) if (c.available())
msg.send(c); msg.send(c);
running_ = sessionManager_.isMeasuring(); running_ = sessionManager_.isMeasuring();
} }
template <typename SessionT> template <typename SessionT>
void WebsocketServer<SessionT>::sendSessionStopMessages() void WebsocketServer<SessionT>::sendSessionStopMessages()
{ {
SessionStoppedMessage msg; SessionStoppedMessage msg;
for (auto &c : clients_) for (auto &c : clients_)
if (c.available()) if (c.available())
msg.send(c); msg.send(c);
running_ = sessionManager_.isMeasuring(); running_ = sessionManager_.isMeasuring();
} }
template <typename SessionT> template <typename SessionT>
void WebsocketServer<SessionT>::sendNewDataMessages() void WebsocketServer<SessionT>::sendNewDataMessages()
{ {
using MeasurementT = typename SessionT::MeasurementType; using MeasurementT = typename SessionT::MeasurementType;
auto &session = sessionManager_.session(); auto &session = sessionManager_.session();
MeasurementT *dataToSend = session.getDataPointer() + numSentMeasurements_;
size_t numMeasurementsToSend = session.numMeasurements() - numSentMeasurements_;
SessionNewDataMessage<MeasurementT> msg(dataToSend, numMeasurementsToSend);
for (auto &c : clients_) for (int i = 0; i < MAX_CONNECTIONS; ++i)
if (c.available()) {
msg.send(c); auto &c = clients_[i];
if (c.available())
numSentMeasurements_ += msg.numMeasurements(); {
MeasurementT *dataToSend = session.getDataPointer() + numSentMeasurements_[i];
int32_t numMeasurementsToSend = int32_t(session.numMeasurements()) - int32_t(numSentMeasurements_[i]);
if (numMeasurementsToSend > 0)
{
SessionNewDataMessage<MeasurementT> msg(dataToSend, numMeasurementsToSend);
msg.send(c);
numSentMeasurements_[i] += msg.numMeasurements();
}
}
}
} }
template <typename SessionT> template <typename SessionT>
void WebsocketServer<SessionT>::sendMessageOnConnection(WebsocketsClient &client) void WebsocketServer<SessionT>::sendMessageOnConnection(WebsocketsClient &client)
{ {
using MeasurementT = typename SessionT::MeasurementType; using MeasurementT = typename SessionT::MeasurementType;
// Message format: // Message format:
// - uint8_t messageType // - uint8_t messageType
// - uint8_t running // - uint8_t running
// - uint32_t sessionId // - uint32_t sessionId
// - MeasurementT [] measurements (if running) // - MeasurementT [] measurements (if running)
auto &session = sessionManager_.session(); auto &session = sessionManager_.session();
const auto numMeasurements = session.numMeasurements(); const auto numMeasurements = session.numMeasurements();
const auto sessionId = session.getStartTime(); const auto sessionId = session.getStartTime();
const size_t msgSize = sizeof(uint8_t) + sizeof(uint8_t) + sizeof(sessionId) + sizeof(MeasurementT) * numMeasurements; const size_t msgSize = sizeof(uint8_t) + sizeof(uint8_t) + sizeof(sessionId) + sizeof(MeasurementT) * numMeasurements;
char *msg = (char *)heap_caps_malloc(msgSize, MALLOC_CAP_SPIRAM); char *msg = (char *)heap_caps_malloc(msgSize, MALLOC_CAP_SPIRAM);
char *writeHead = msg; char *writeHead = msg;
*writeHead = INITIAL_INFO; *writeHead = INITIAL_INFO;
writeHead += sizeof(uint8_t); writeHead += sizeof(uint8_t);
*writeHead = sessionManager_.isMeasuring(); *writeHead = sessionManager_.isMeasuring();
writeHead += sizeof(uint8_t); writeHead += sizeof(uint8_t);
*((uint32_t *)writeHead) = sessionManager_.isMeasuring() ? sessionId : 0; *((uint32_t *)writeHead) = sessionManager_.isMeasuring() ? sessionId : 0;
writeHead += sizeof(uint32_t); writeHead += sizeof(uint32_t);
assert(writeHead - msg == msgSize - sizeof(MeasurementT) * numMeasurements); assert(writeHead - msg == msgSize - sizeof(MeasurementT) * numMeasurements);
memcpy(writeHead, session.getDataPointer(), sizeof(MeasurementT) * numMeasurements); memcpy(writeHead, session.getDataPointer(), sizeof(MeasurementT) * numMeasurements);
client.sendBinary(msg, msgSize); client.sendBinary(msg, msgSize);
free(msg); free(msg);
} }