no delay on first filesystem save - loggin over websocket

This commit is contained in:
Martin Bauer 2023-10-02 20:32:13 +02:00
parent 5e56ede55c
commit 4d77d9e328
6 changed files with 37 additions and 21 deletions

View File

@ -33,8 +33,12 @@ public:
new (chunk) ChunkT(); // placement new to init chunk new (chunk) ChunkT(); // placement new to init chunk
} }
chunk->init(epochStartTime, 0); chunk->init(epochStartTime, 0);
// write header here - since this takes a long time later when measurement is running
writeToNewFile();
} }
bool addPoint(Measurement_T measurement) bool addPoint(Measurement_T measurement)
{ {
bool success = chunk->addPoint(measurement); bool success = chunk->addPoint(measurement);
@ -76,6 +80,19 @@ public:
} }
private: private:
portablefs::string getFilename() {
return portablefs::string(CONFIG_DATA_PATH) + "/" + portablefs::to_string(chunk->getStartTime());
}
void writeToNewFile() {
LOG_INFO("Initializing file");
const auto filename = getFilename();
auto file = portablefs::open(filename.c_str(), "w");
StreamingMsgPackEncoder<portablefs::File> encoder(&file);
chunk->serialize(encoder);
LOG_INFO("Initializing file done");
}
void saveToFileSystem() void saveToFileSystem()
{ {
static const uint32_t arrayHeaderOffset = ChunkT::arrayHeaderOffset(); static const uint32_t arrayHeaderOffset = ChunkT::arrayHeaderOffset();
@ -83,12 +100,11 @@ private:
// todo: check this! free doesn't mean that the file writing actually works ok // todo: check this! free doesn't mean that the file writing actually works ok
// use error codes of write instead? anyway: test it! // use error codes of write instead? anyway: test it!
LOG_INFO("%ld saveToFileSystem start", millis()); LOG_INFO("saveToFileSystem start");
deleteUntilBytesFree(CONFIG_SESSION_MAX_SIZE); deleteUntilBytesFree(CONFIG_SESSION_MAX_SIZE);
LOG_INFO("%ld after deleteUntilBytesFree()", millis()); LOG_TRACE("after deleteUntilBytesFree()");
using fs_string = portablefs::string; const auto filename = getFilename();
fs_string filename = fs_string(CONFIG_DATA_PATH) + "/" + portablefs::to_string(chunk->getStartTime());
if (portablefs::exists(filename.c_str())) if (portablefs::exists(filename.c_str()))
{ {
auto file = portablefs::open(filename.c_str(), "r+"); auto file = portablefs::open(filename.c_str(), "r+");
@ -105,12 +121,10 @@ private:
} }
else else
{ {
LOG_INFO("Creating new session file"); LOG_INFO("Creating new session file, should have been done already");
auto file = portablefs::open(filename.c_str(), "w"); writeToNewFile();
StreamingMsgPackEncoder<portablefs::File> encoder(&file);
chunk->serialize(encoder);
} }
LOG_INFO("%ld saveToFileSystem done", millis()); LOG_INFO("saveToFileSystem done");
} }
void deleteUntilBytesFree(size_t requiredSpace) void deleteUntilBytesFree(size_t requiredSpace)

View File

@ -10,7 +10,7 @@
class LoggingAPI class LoggingAPI
{ {
public: public:
void onClientConnect(websockets::WebsocketsClient &client) {} void onClientConnect(websockets::WebsocketsClient &client, int /*clientId*/) {}
bool handleMessage(websockets::WebsocketsClient &client, MessageCode code, const char *payload, size_t size) { bool handleMessage(websockets::WebsocketsClient &client, MessageCode code, const char *payload, size_t size) {
switch(code) { switch(code) {

View File

@ -20,7 +20,7 @@ public:
numSentMeasurements_[i] = 0; numSentMeasurements_[i] = 0;
} }
void onClientConnect(websockets::WebsocketsClient &client); void onClientConnect(websockets::WebsocketsClient &client, int clientId);
bool handleMessage(websockets::WebsocketsClient &client, MessageCode code, const char *payload, size_t size); bool handleMessage(websockets::WebsocketsClient &client, MessageCode code, const char *payload, size_t size);
template <typename TServer> template <typename TServer>
@ -41,7 +41,7 @@ private:
// sending message about current session // sending message about current session
template <typename T> template <typename T>
void SessionAPI<T>::onClientConnect(websockets::WebsocketsClient &client) void SessionAPI<T>::onClientConnect(websockets::WebsocketsClient &client, int clientId)
{ {
// TODO write msgpack instead for consistency? // TODO write msgpack instead for consistency?
@ -73,10 +73,12 @@ void SessionAPI<T>::onClientConnect(websockets::WebsocketsClient &client)
assert(writeHead - msg == msgSize - sizeof(MeasurementT) * numMeasurements); assert(writeHead - msg == msgSize - sizeof(MeasurementT) * numMeasurements);
LOG_INFO("Start sending existing measurements");
memcpy(writeHead, session.getDataPointer(), sizeof(MeasurementT) * numMeasurements); memcpy(writeHead, session.getDataPointer(), sizeof(MeasurementT) * numMeasurements);
client.sendBinary(msg, msgSize); client.sendBinary(msg, msgSize);
numSentMeasurements_[clientId] = numMeasurements;
free(msg); free(msg);
LOG_INFO("Finished sending existing measurements");
} }
template <typename T> template <typename T>
@ -157,7 +159,7 @@ template <typename TServer>
void SessionAPI<T>::sendNewDataMessages(TServer &server) void SessionAPI<T>::sendNewDataMessages(TServer &server)
{ {
constexpr size_t MAX_MEASUREMENTS_PER_MSG = 15; constexpr size_t MAX_MEASUREMENTS_PER_MSG = 50;
constexpr size_t WAIT_UNTIL_AT_LEAST_NUM_MEASUREMENTS = 1; constexpr size_t WAIT_UNTIL_AT_LEAST_NUM_MEASUREMENTS = 1;
// new data messages are the only messages not sent in msgpack format // new data messages are the only messages not sent in msgpack format

View File

@ -52,7 +52,7 @@ public:
{ {
clients_[nextFreeClient_] = server_.accept(); clients_[nextFreeClient_] = server_.accept();
clients_[nextFreeClient_].onMessage(onMessage); clients_[nextFreeClient_].onMessage(onMessage);
this->onClientConnectImpl(clients_[nextFreeClient_]); this->onClientConnectImpl(clients_[nextFreeClient_], nextFreeClient_);
nextFreeClient_ = (nextFreeClient_ + 1) % MAX_WEBSOCKET_CONNECTIONS; nextFreeClient_ = (nextFreeClient_ + 1) % MAX_WEBSOCKET_CONNECTIONS;
if(MAX_WEBSOCKET_CONNECTIONS == 3) { if(MAX_WEBSOCKET_CONNECTIONS == 3) {
@ -128,13 +128,13 @@ private:
bool handleMessageImpl(websockets::WebsocketsClient &, MessageCode, const char *, size_t) { return false; } bool handleMessageImpl(websockets::WebsocketsClient &, MessageCode, const char *, size_t) { return false; }
template <size_t managerIdx = 0, typename std::enable_if<(managerIdx < std::tuple_size<ApiManagerTuple>::value), bool>::type = true> template <size_t managerIdx = 0, typename std::enable_if<(managerIdx < std::tuple_size<ApiManagerTuple>::value), bool>::type = true>
void onClientConnectImpl(websockets::WebsocketsClient &client) void onClientConnectImpl(websockets::WebsocketsClient &client, int clientId)
{ {
std::get<managerIdx>(apiManagers_).onClientConnect(client); std::get<managerIdx>(apiManagers_).onClientConnect(client, clientId);
onClientConnectImpl<managerIdx + 1>(client); onClientConnectImpl<managerIdx + 1>(client, clientId);
} }
template <size_t managerIdx, typename std::enable_if<managerIdx == std::tuple_size<ApiManagerTuple>::value, bool>::type = true> template <size_t managerIdx, typename std::enable_if<managerIdx == std::tuple_size<ApiManagerTuple>::value, bool>::type = true>
void onClientConnectImpl(websockets::WebsocketsClient &client) {} void onClientConnectImpl(websockets::WebsocketsClient &client, int clientId) {}
// -- Members // -- Members

View File

@ -11,7 +11,7 @@ void WifiAPI::sendWifiState(websockets::WebsocketsClient &client)
sendToClient<192>(client, MessageCode::WIFI_STATE_RESPONSE, data); sendToClient<192>(client, MessageCode::WIFI_STATE_RESPONSE, data);
} }
void WifiAPI::onClientConnect(websockets::WebsocketsClient &client) void WifiAPI::onClientConnect(websockets::WebsocketsClient &client, int /*clientId*/)
{ {
sendWifiState(client); sendWifiState(client);
} }

View File

@ -15,7 +15,7 @@ public:
{ {
} }
void onClientConnect(websockets::WebsocketsClient &client); void onClientConnect(websockets::WebsocketsClient &client, int clientId);
bool handleMessage(websockets::WebsocketsClient &client, MessageCode code, const char *payload, size_t size); bool handleMessage(websockets::WebsocketsClient &client, MessageCode code, const char *payload, size_t size);
template <typename TServer> template <typename TServer>