From d8732643f326b1bebd19d189f40eed98b4a59376 Mon Sep 17 00:00:00 2001 From: Martin Bauer Date: Sun, 8 Sep 2019 18:00:09 +0200 Subject: [PATCH] Firmware: chunked sending --- lib/basic/Dtypes.h | 3 + lib/basic/MockDtypes.h | 2 + lib/session/MeasurementSession.h | 10 +-- lib/session/MockStorage.h | 6 ++ lib/session/SessionChunk.h | 32 +-------- lib/session/StreamingMsgPackEncoder.h | 99 ++++++++++++++++++++++++--- platformio.ini | 7 +- test/test_common/sessiontest.cpp | 43 ++++++++++++ 8 files changed, 156 insertions(+), 46 deletions(-) diff --git a/lib/basic/Dtypes.h b/lib/basic/Dtypes.h index 512fe62..577af81 100644 --- a/lib/basic/Dtypes.h +++ b/lib/basic/Dtypes.h @@ -1,3 +1,5 @@ +#pragma once + inline void _assert(const char* expression, const char* message, const char* file, int line) { Serial.print("Assert "); @@ -7,6 +9,7 @@ inline void _assert(const char* expression, const char* message, const char* fil Serial.print(" '"); Serial.print(expression); Serial.println("' failed."); + Serial.println(message); } template< typename T> diff --git a/lib/basic/MockDtypes.h b/lib/basic/MockDtypes.h index 3b57802..5596886 100644 --- a/lib/basic/MockDtypes.h +++ b/lib/basic/MockDtypes.h @@ -1,3 +1,5 @@ +#pragma once + #include #include #include diff --git a/lib/session/MeasurementSession.h b/lib/session/MeasurementSession.h index 30e4f51..300a899 100644 --- a/lib/session/MeasurementSession.h +++ b/lib/session/MeasurementSession.h @@ -36,8 +36,8 @@ public: otherChunk->init(0, 0); } - template - void serialize(StreamingMsgPackEncoder & encoder, uint32_t startIdx) const + template + void serialize(Encoder_T & encoder, uint32_t startIdx) const { const uint32_t lastIdx = currentChunk->getStartIndex() + currentChunk->numMeasurements(); if( lastIdx <= startIdx) { @@ -78,8 +78,8 @@ private: chunk->serialize(writer.encoder()); }; - template< typename T> - uint32_t serializeChunk(StreamingMsgPackEncoder & encoder, uint32_t startIdx) const { + template< typename Encoder_T> + uint32_t serializeChunk(Encoder_T & encoder, uint32_t startIdx) const { assert( startIdx < currentChunk->getStartIndex() + currentChunk->numMeasurements(), "serializeChunk: invalid startIdx" ); @@ -97,7 +97,7 @@ private: const uint32_t chunkNr = startIdx / CHUNK_SIZE; const auto chunkFileNameStr = chunkFileName(chunkNr, currentChunk->getStartTime()); Reader reader(chunkFileNameStr); - reader.seek(Chunk_T::template valueOffset()); + reader.seek(Chunk_T::valueOffset()); const uint32_t PART_SIZE = 32; #ifndef ARDUINO diff --git a/lib/session/MockStorage.h b/lib/session/MockStorage.h index 9320a36..9b59164 100644 --- a/lib/session/MockStorage.h +++ b/lib/session/MockStorage.h @@ -2,6 +2,8 @@ #include #include +#include +#include #include @@ -24,6 +26,10 @@ public: static const String baseDirectory("."); auto fullFileName = baseDirectory + fileName; fptr = fopen(fullFileName.c_str(), "wb"); + if (fptr == NULL) { + printf("fopen of %s failed, errno = %d - %s\n", fullFileName.c_str(), errno, strerror(errno)); + exit(1); + } } ~FilePtrAdaptor() { diff --git a/lib/session/SessionChunk.h b/lib/session/SessionChunk.h index ed16f40..a7db596 100644 --- a/lib/session/SessionChunk.h +++ b/lib/session/SessionChunk.h @@ -46,35 +46,9 @@ public: encoder.sendArray(values, nextFree); } - /* - template - void serialize(StreamingMsgPackEncoder & encoder, uint32_t start, uint32_t end) const - { - if( start < this->startIndex ) - start = this->startIndex; - if( end == 0 ) { - end = start + this->nextFree; - } - - sendHeader(encoder, sessionStartTime, start); - - bool sendEmpty = - (start >= end) || - (end <= this->startIndex) || - (start >= (this->startIndex + this->nextFree)); - if( sendEmpty ) { - encoder.sendArray(nullptr, 0); - } else { - const uint32_t idxStart = (start - this->startIndex); - const uint32_t length = min(nextFree, end - start); - encoder.sendArray(values + idxStart, length); - } - }*/ - - template static uint32_t valueOffset() { - StreamingMsgPackEncoder encoder(nullptr); + StreamingMsgPackEncoder encoder(nullptr); encoder.setSizeCountMode(true); sendHeader(encoder, 0, 0); encoder.template sendArrayHeader(0); @@ -89,8 +63,8 @@ public: return values; } - template - static void sendHeader(StreamingMsgPackEncoder & encoder, uint32_t sessionStartTime, uint32_t startIndex) + template + static void sendHeader(Encoder_T & encoder, uint32_t sessionStartTime, uint32_t startIndex) { encoder.sendMap16(3); diff --git a/lib/session/StreamingMsgPackEncoder.h b/lib/session/StreamingMsgPackEncoder.h index 9b42421..148b354 100644 --- a/lib/session/StreamingMsgPackEncoder.h +++ b/lib/session/StreamingMsgPackEncoder.h @@ -15,6 +15,10 @@ const char TypeToMsgPackCode::CODE = '\xd0'; const char TypeToMsgPackCode::CODE = '\xd1'; const char TypeToMsgPackCode::CODE = '\xd2'; +struct DummyWriter { + void write(const void*, uint32_t) {} +}; + template class StreamingMsgPackEncoder { @@ -87,7 +91,7 @@ public: { if( sizeCountMode ) { - contentLength += 1 + sizeof(uint32_t) + 1 + length * sizeof(T); + contentLength += 1 + sizeof(uint32_t) + 1; } else { @@ -103,6 +107,8 @@ public: { if( !sizeCountMode ) { writer->write((char*)(data), length * sizeof(T)); + } else { + contentLength += sizeof(T) * length; } } @@ -119,6 +125,9 @@ public: return contentLength; } + void resetContentLength() { + contentLength = 0; + } private: Writer * writer; uint32_t contentLength; @@ -132,28 +141,100 @@ class ChunkedStreamingMsgPackEncoder { public: ChunkedStreamingMsgPackEncoder(Writer * writer_, uint32_t offset, uint32_t maxSize) - : encoder_(writer_), offset_(offset), maxSize_(maxSize), sent_(0) + : encoder_(writer_), offsetToStart_(offset), maxBytes_(maxSize), sentBytes_(0), sendingFinished_(false) {} void sendMap16(byte size) { - // check if it fits, using separate object - encoder_.setSizeCountMode(true); + sendIfSpaceLeft([&]() { encoder_.sendMap16(size); }); } void sendString255(PGM_P s) { - + sendIfSpaceLeft([&]() { encoder_.sendString255(s); }); } template void sendInt(T value) { - + sendIfSpaceLeft([&]() { encoder_.template sendInt(value); }); } + template + void sendArray(const T * data, uint32_t length) + { + sendArrayHeader(length); + sendArrayPartialContents(data, length); + } + template + void sendArrayHeader(uint32_t length) { + sendIfSpaceLeft([&]() { encoder_.template sendArrayHeader(length); }); + } + + template + void sendArrayPartialContents(T * data, uint32_t length) { + if( sendingFinished_ ) { + return; + } + + uint32_t sizeForFullArray = sizeof(T) * length; + uint32_t elementsToSkip = 0; + if( sentBytes_ < offsetToStart_ ) { + elementsToSkip = (offsetToStart_ - sentBytes_) / sizeof(T); + assert((offsetToStart_ - sentBytes_) % sizeof(T) == 0, + "Looks like previous sent operation send fraction of an element."); + } + if( elementsToSkip >= length) { + sentBytes_ += sizeof(T) * length; + return; + } else { + const uint32_t elementsToSend = length - elementsToSkip; + if( elementsToSend == 0 ) { + sendingFinished_ = true; + return; + } else { + encoder_.sendArrayPartialContents(data + elementsToSkip, elementsToSend); + } + } + } + + uint32_t sentBytes() const { + return sentBytes_; + } + + bool getSizeCountMode() const { + return false; + } private: + template + void sendIfSpaceLeft(T sendFunction) { + if( sendingFinished_ ) { + return; + } + + encoder_.setSizeCountMode(true); + encoder_.resetContentLength(); + sendFunction(); + auto sizeRequired = encoder_.getContentLength(); + encoder_.setSizeCountMode(false); + + if( sentBytes_ < offsetToStart_ ) { + // already sent + sentBytes_ += sizeRequired; + assert( sentBytes_ <= offsetToStart_, "Partial sending not supported by this function" ); + return; + } + + if( sentBytes_ + sizeRequired < maxBytes_ ) { + sendFunction(); + sentBytes_ += sizeRequired; + } else { + sendingFinished_ = true; + } + } + StreamingMsgPackEncoder encoder_; - uint32_t offset_; - uint32_t maxSize_; - uint32_t sent_; + uint32_t sentBytes_; + uint32_t maxBytes_; + uint32_t offsetToStart_; + bool sendingFinished_; }; diff --git a/platformio.ini b/platformio.ini index 2e85943..a1f3aba 100644 --- a/platformio.ini +++ b/platformio.ini @@ -20,6 +20,7 @@ lib_deps = AsyncTCP NTPClient -;[env:native] -;platform = native -;test_ignore = test_embedded +[env:native] +platform = native +test_ignore = test_embedded +build_flags = -g \ No newline at end of file diff --git a/test/test_common/sessiontest.cpp b/test/test_common/sessiontest.cpp index 2c9cbb0..c64d27b 100644 --- a/test/test_common/sessiontest.cpp +++ b/test/test_common/sessiontest.cpp @@ -117,6 +117,7 @@ void testSessionChunkSerialization() } } + void testSession() { const uint32_t SESSION_SIZE = 128; typedef MeasurementSession MockSession; @@ -144,6 +145,47 @@ void testSession() { } } + +void testPartialSessionSerialization() { + const uint32_t SESSION_SIZE = 128; + typedef MeasurementSession MockSession; + + const uint32_t startTime = 194842; + const uint_t fillSize = SESSION_SIZE * 4 + 7; + + MockSession session; + session.init(startTime); + for (uint16_t i = 0; i < fillSize; ++i) { + session.addPoint(i); + } + + std::vector data; + VectorAdaptor adaptor( &data ); + + StreamingMsgPackEncoder encoder(&adaptor); + encoder.setSizeCountMode(true); + session.serialize(encoder, 0); + auto totalSize = encoder.getContentLength(); + + std::vector splits = {16, 32, 128, 256, 512, 721, 1024, totalSize}; + uint32_t written = 0; + data.clear(); + for(auto & split : splits) { + ChunkedStreamingMsgPackEncoder encoder(&adaptor, written, split); + session.serialize(encoder, 0); + written = encoder.sentBytes(); + } + + uint32_t readStartTime=0; + uint32_t readStartIndex=0; + auto result = parseMessagePack(&data[0], readStartTime, readStartIndex); + TEST_ASSERT_MESSAGE(readStartIndex == 0 && startTime == readStartTime, ""); + TEST_ASSERT_MESSAGE(result.size() == fillSize, "Wrong result array size"); + for( uint16_t i=0; i < fillSize; ++i) { + TEST_ASSERT_MESSAGE(result[i] == i, "Wrong array contents"); + } +} + void allTests() { UNITY_BEGIN(); @@ -151,6 +193,7 @@ void allTests() RUN_TEST(testSessionChunkGetterSetter); RUN_TEST(testSessionChunkSerialization); RUN_TEST(testSession); + RUN_TEST(testPartialSessionSerialization); UNITY_END(); }