diff --git a/lib/session/StreamingMsgPackEncoder.h b/lib/session/StreamingMsgPackEncoder.h index 148b354..a9964e6 100644 --- a/lib/session/StreamingMsgPackEncoder.h +++ b/lib/session/StreamingMsgPackEncoder.h @@ -19,6 +19,21 @@ struct DummyWriter { void write(const void*, uint32_t) {} }; +class CopyWriter { +public: + CopyWriter(uint8_t * bufferToWrite) + : bufferToWrite_( bufferToWrite ) + {} + + void write(const void* data, uint32_t length) { + memcpy(bufferToWrite_, data, length); + bufferToWrite_ += length; + } + +private: + uint8_t * bufferToWrite_; +}; + template class StreamingMsgPackEncoder { @@ -141,7 +156,7 @@ class ChunkedStreamingMsgPackEncoder { public: ChunkedStreamingMsgPackEncoder(Writer * writer_, uint32_t offset, uint32_t maxSize) - : encoder_(writer_), offsetToStart_(offset), maxBytes_(maxSize), sentBytes_(0), sendingFinished_(false) + : encoder_(writer_), sentBytes_(0), maxBytes_(maxSize), offsetToStart_(offset), sendingFinished_(false) {} void sendMap16(byte size) { @@ -175,7 +190,6 @@ public: return; } - uint32_t sizeForFullArray = sizeof(T) * length; uint32_t elementsToSkip = 0; if( sentBytes_ < offsetToStart_ ) { elementsToSkip = (offsetToStart_ - sentBytes_) / sizeof(T); diff --git a/platformio.ini b/platformio.ini index a1f3aba..a668b4b 100644 --- a/platformio.ini +++ b/platformio.ini @@ -20,7 +20,7 @@ lib_deps = AsyncTCP NTPClient -[env:native] -platform = native -test_ignore = test_embedded -build_flags = -g \ No newline at end of file +;[env:native] +;platform = native +;test_ignore = test_embedded +;build_flags = -g \ No newline at end of file diff --git a/src/ConfigHardware.h b/src/ConfigHardware.h index 8cfbef7..f9818fb 100644 --- a/src/ConfigHardware.h +++ b/src/ConfigHardware.h @@ -5,6 +5,7 @@ const int CONFIG_SCALE_DOUT_PIN = D2; const int CONFIG_SCALE_SCK_PIN = D3; const uint8_t CONFIG_TARE_AVG_COUNT = 50; // number of measurements in tare-phase (to find 0 ) const int CONFIG_VALUE_DIVIDER = 128; // uint32 measurements are divided by this factor, before stored in uint16_t +const int CONFIG_MEASURE_DELAY = 100; // interval in ms between measurements const uint32_t CONFIG_SESSION_CHUNK_SIZE = 1024*8 - 3 * sizeof(uint32_t); diff --git a/src/firmware_main.cpp b/src/firmware_main.cpp index 46a50ba..f8d0532 100644 --- a/src/firmware_main.cpp +++ b/src/firmware_main.cpp @@ -26,14 +26,60 @@ template class SessionManager { public: - void begin(); + SessionManager() : measuring_(false), lastCallTime_(0) + {} + + void begin() { + scale.begin(CONFIG_SCALE_DOUT_PIN, CONFIG_SCALE_SCK_PIN); + scale.tare( CONFIG_TARE_AVG_COUNT ); + + session.init( timeClient.getEpochTime() ); + } + + void startMeasurements() { + measuring_ = true; + } + + void stopMeasurements() { + measuring_ = false; + } + + void iteration() { + if( ! measuring_ ) { + return; + } + uint16_t measurement; + scale.measure(measurement); + session.addPoint(measurement); + + if( lastCallTime_ != 0) { + const long cycleDuration = millis() - lastCallTime_; + if( cycleDuration <= CONFIG_MEASURE_DELAY) + { + delay(CONFIG_MEASURE_DELAY - cycleDuration); + } + else + { + const long skipped = (cycleDuration / CONFIG_MEASURE_DELAY); + Serial.print("Warning: measurements skipped: "); + Serial.println(skipped); + + for(int i=0; i < skipped; ++i) + session.addPoint(measurement); + + delay(CONFIG_MEASURE_DELAY * (skipped + 1) - cycleDuration); + } + } + lastCallTime_ = millis(); + } + + Session_T & getSession() { return session; } - void startMeasurements(); - void stopMeasurements(); - void iteration(); private: MockScale scale; Session_T session; + bool measuring_; + long lastCallTime_; }; SessionManager sessionManager; @@ -48,43 +94,35 @@ void httpSetup(SessionManager * sessionManager) server.on("/api/session/start", HTTP_POST, [sessionManager](AsyncWebServerRequest * req) { req->send(200, "text/plain", F("OK")); sessionManager->startMeasurements(); + Serial.println("Started measurements"); }); server.on("/api/session/stop", HTTP_POST, [sessionManager](AsyncWebServerRequest * req) { req->send(200, "text/plain", F("OK")); sessionManager->stopMeasurements(); + Serial.println("Stopped measurements"); }); server.on("/api/session/data", HTTP_GET, [sessionManager](AsyncWebServerRequest * req) { - StreamingMsgPackEncoder encoder(&adaptor); - - req->send_P(200, "application/x-msgpack", const uint8_t * content, size_t len) - - }); - - - server.on("/", HTTP_GET, [](AsyncWebServerRequest *request){ - request->send(200, "text/plain", "Hello, world"); - }); - - // Send a GET request to /get?message= - server.on("/get", HTTP_GET, [] (AsyncWebServerRequest *request) { - String message; - if (request->hasParam(PARAM_MESSAGE)) { - message = request->getParam(PARAM_MESSAGE)->value(); - } else { - message = "No message sent"; + uint32_t startIdx = 0; + if( req->hasParam("startIdx") ) { + startIdx = req->getParam("startIdx")->value().toInt(); } - request->send(200, "text/plain", "Hello, GET: " + message); - }); - // Send a POST request to /post with a form field message set to - server.on("/post", HTTP_POST, [](AsyncWebServerRequest *request){ - String message; - if (request->hasParam(PARAM_MESSAGE, true)) { - message = request->getParam(PARAM_MESSAGE, true)->value(); - } else { - message = "No message sent"; - } - request->send(200, "text/plain", "Hello, POST: " + message); + StreamingMsgPackEncoder encoderToDetermineSize(nullptr); + encoderToDetermineSize.setSizeCountMode(true); + sessionManager->getSession().serialize(encoderToDetermineSize, startIdx); + auto totalSize = encoderToDetermineSize.getContentLength(); + Serial.print("Sending started of total size "); + Serial.println(totalSize); + req->send("application/x-msgpack", totalSize, [=](uint8_t *buffer, size_t maxLen, size_t index) -> size_t { + Serial.print("Partial send maxLen "); + Serial.print(maxLen); + Serial.print(" index "); + Serial.println("index"); + CopyWriter copyWriter(buffer); + ChunkedStreamingMsgPackEncoder encoder(©Writer, index, index + maxLen); + sessionManager->getSession().serialize(encoder, startIdx); + return encoder.sentBytes() - index; + }); }); server.onNotFound(onNotFound); @@ -112,15 +150,14 @@ void setup() // NTP timeClient.begin(); timeClient.update(); - session.init( timeClient.getEpochTime() ); - // Scale - scale.begin(CONFIG_SCALE_DOUT_PIN, CONFIG_SCALE_SCK_PIN); - scale.tare( CONFIG_TARE_AVG_COUNT ); + // Session + sessionManager.begin(); // HTTP & Websocket server - httpSetup(); + httpSetup(&sessionManager); } void loop() { + sessionManager.iteration(); }