This commit is contained in:
Martin Bauer 2020-06-28 10:30:01 +02:00
parent f2f1889918
commit 03d9741113
4 changed files with 40 additions and 49 deletions

View File

@ -3,14 +3,14 @@
#include "StreamingMsgPackEncoder.h" #include "StreamingMsgPackEncoder.h"
#include <cstdint> #include <cstdint>
template <typename Measurement_T, uint32_t SIZE> template <typename Measurement_T, uint32_t SIZE>
class SessionChunk class SessionChunk
{ {
public: public:
SessionChunk() SessionChunk()
: nextFree_(0), sessionStartTime(0), startIndex(0) : nextFree_(0), sessionStartTime(0), startIndex(0)
{} {
}
void init(uint32_t epochStartTime, uint32_t startIdx) void init(uint32_t epochStartTime, uint32_t startIdx)
{ {
@ -19,15 +19,18 @@ public:
startIndex = startIdx; startIndex = startIdx;
} }
uint32_t getStartTime() const { uint32_t getStartTime() const
{
return sessionStartTime; return sessionStartTime;
} }
uint32_t getStartIndex() const { uint32_t getStartIndex() const
{
return startIndex; return startIndex;
} }
uint32_t numMeasurements() const { uint32_t numMeasurements() const
{
return nextFree_; return nextFree_;
} }
@ -57,11 +60,21 @@ public:
return encoder.getContentLength(); return encoder.getContentLength();
} }
Measurement_T * getDataPointer() { static uint32_t arrayHeaderOffset()
{
StreamingMsgPackEncoder<DummyWriter> encoder(nullptr);
encoder.setSizeCountMode(true);
sendHeader(encoder, 0, 0);
return encoder.getContentLength();
}
Measurement_T *getDataPointer()
{
return values; return values;
} }
const Measurement_T * getDataPointer() const { const Measurement_T *getDataPointer() const
{
return values; return values;
} }

View File

@ -93,8 +93,10 @@ void SessionManager<SessionT>::stopMeasurements()
template <typename SessionT> template <typename SessionT>
void SessionManager<SessionT>::iteration() void SessionManager<SessionT>::iteration()
{ {
if (!measuring_) if (!measuring_) {
return; delay(1);
return; // give control to HTTP server thread
}
uint16_t measurement = -1; uint16_t measurement = -1;
bool measurementDone = false; bool measurementDone = false;
@ -106,6 +108,7 @@ void SessionManager<SessionT>::iteration()
{ {
Serial.println("Maximum time of session reached - stopping"); Serial.println("Maximum time of session reached - stopping");
stopMeasurements(); stopMeasurements();
delay(1); // give control to HTTP server thread
return; return;
} }
if (lastCallTime_ != 0) if (lastCallTime_ != 0)

View File

@ -9,7 +9,7 @@ public:
using ChunkT = SessionChunk<Measurement_T, MAX_SIZE>; using ChunkT = SessionChunk<Measurement_T, MAX_SIZE>;
using MeasurementType = Measurement_T; using MeasurementType = Measurement_T;
// save interval in number of measurements (by default every minute) // save interval in number of measurements (by default every minute)
SimpleMeasurementSession(uint32_t saveInterval = 10 * 60) SimpleMeasurementSession(uint32_t saveInterval = 10 * 20)
: chunk(nullptr), saveInterval_(saveInterval) : chunk(nullptr), saveInterval_(saveInterval)
{ {
} }
@ -76,22 +76,25 @@ private:
{ {
// todo: check this! free doesn't mean that the file writing actually works ok // todo: check this! free doesn't mean that the file writing actually works ok
// use error codes of write instead? anyway: test it! // use error codes of write instead? anyway: test it!
Serial.printf("%ld saveToFileSystem start()\n", millis());
deleteUntilBytesFree(CONFIG_SESSION_MAX_SIZE); deleteUntilBytesFree(CONFIG_SESSION_MAX_SIZE);
Serial.printf("%ld after deleteUntilBytesFree()\n", millis());
String filename = String(CONFIG_DATA_PATH) + "/" + String(chunk->getStartTime()); String filename = String(CONFIG_DATA_PATH) + "/" + String(chunk->getStartTime());
if (portablefs::exists(filename.c_str())) if (portablefs::exists(filename.c_str()))
{ {
auto file = portablefs::open(filename.c_str(), "a"); auto file = portablefs::open(filename.c_str(), "a");
file.seek(0, SeekSet);
StreamingMsgPackEncoder<portablefs::File> encoder(&file);
chunk->sendHeader(encoder, chunk->getStartTime(), 0);
file.seek(0, SeekEnd); file.seek(0, SeekEnd);
size_t existingMeasurements = (file.size() - ChunkT::valueOffset()) / sizeof(Measurement_T); size_t existingMeasurements = (file.size() - ChunkT::valueOffset()) / sizeof(Measurement_T);
Serial.printf("Incremental save, existing %d\n", existingMeasurements); Serial.printf("Incremental save, existing %d\n", existingMeasurements);
size_t measurementsToWrite = chunk->numMeasurements() - existingMeasurements; size_t measurementsToWrite = chunk->numMeasurements() - existingMeasurements;
Measurement_T *startPtr = chunk->getDataPointer() + existingMeasurements; Measurement_T *startPtr = chunk->getDataPointer() + existingMeasurements;
file.write((uint8_t *)(startPtr), measurementsToWrite * sizeof(Measurement_T)); file.write((uint8_t *)(startPtr), measurementsToWrite * sizeof(Measurement_T));
Serial.printf("%ld Incr Save: before header patch\n", millis());
file.seek(ChunkT::arrayHeaderOffset(), SeekSet);
StreamingMsgPackEncoder<portablefs::File> encoder(&file);
encoder.template sendArrayHeader<Measurement_T>(numMeasurements());
} }
else else
{ {
@ -100,6 +103,7 @@ private:
StreamingMsgPackEncoder<portablefs::File> encoder(&file); StreamingMsgPackEncoder<portablefs::File> encoder(&file);
chunk->serialize(encoder); chunk->serialize(encoder);
} }
Serial.printf("%ld saveToFileSystem done\n", millis());
} }
void deleteUntilBytesFree(size_t requiredSpace) void deleteUntilBytesFree(size_t requiredSpace)

View File

@ -189,33 +189,4 @@ void loop()
{ {
sessionManager.iteration(); sessionManager.iteration();
webSocketServer.iteration(); webSocketServer.iteration();
/*
if (webSocketServer.poll())
{
websocketClients[nextFreeWebsocketClient] = webSocketServer.accept();
websocketClients[nextFreeWebsocketClient].onMessage(onMessage);
Serial.println("Websocket connection");
nextFreeWebsocketClient = (nextFreeWebsocketClient + 1) % MAX_WEBSOCKET_CONNECTIONS;
}
for (int i = 0; i < MAX_WEBSOCKET_CONNECTIONS; ++i)
//if (websocketClients[i].available()) {
//Serial.printf("Polling client %d\n", i);
websocketClients[i].poll();
//}
auto &session = sessionManager.session();
if (session.numMeasurements() < measurementsSent)
measurementsSent = 0;
else if (session.numMeasurements() > measurementsSent)
{
for (int i = 0; i < MAX_WEBSOCKET_CONNECTIONS; ++i)
if (websocketClients[i].available())
{
auto dataToSend = (const char*)(session.getDataPointer() + measurementsSent);
auto numBytes = (session.numMeasurements() - measurementsSent) * sizeof(MeasurementT);
Serial.printf("Sent %d bytes via websocket\n", numBytes);
websocketClients[i].sendBinary(dataToSend, numBytes);
measurementsSent = session.numMeasurements();
}
}*/
} }