Firmware: chunked sending

This commit is contained in:
Martin Bauer 2019-09-08 18:00:09 +02:00
parent ce4fa96771
commit d8732643f3
8 changed files with 156 additions and 46 deletions

View File

@ -1,3 +1,5 @@
#pragma once
inline void _assert(const char* expression, const char* message, const char* file, int line) inline void _assert(const char* expression, const char* message, const char* file, int line)
{ {
Serial.print("Assert "); Serial.print("Assert ");
@ -7,6 +9,7 @@ inline void _assert(const char* expression, const char* message, const char* fil
Serial.print(" '"); Serial.print(" '");
Serial.print(expression); Serial.print(expression);
Serial.println("' failed."); Serial.println("' failed.");
Serial.println(message);
} }
template< typename T> template< typename T>

View File

@ -1,3 +1,5 @@
#pragma once
#include <string> #include <string>
#include <cstdint> #include <cstdint>
#include <cstring> #include <cstring>

View File

@ -36,8 +36,8 @@ public:
otherChunk->init(0, 0); otherChunk->init(0, 0);
} }
template<typename T> template<typename Encoder_T>
void serialize(StreamingMsgPackEncoder<T> & encoder, uint32_t startIdx) const void serialize(Encoder_T & encoder, uint32_t startIdx) const
{ {
const uint32_t lastIdx = currentChunk->getStartIndex() + currentChunk->numMeasurements(); const uint32_t lastIdx = currentChunk->getStartIndex() + currentChunk->numMeasurements();
if( lastIdx <= startIdx) { if( lastIdx <= startIdx) {
@ -78,8 +78,8 @@ private:
chunk->serialize(writer.encoder()); chunk->serialize(writer.encoder());
}; };
template< typename T> template< typename Encoder_T>
uint32_t serializeChunk(StreamingMsgPackEncoder<T> & encoder, uint32_t startIdx) const { uint32_t serializeChunk(Encoder_T & encoder, uint32_t startIdx) const {
assert( startIdx < currentChunk->getStartIndex() + currentChunk->numMeasurements(), assert( startIdx < currentChunk->getStartIndex() + currentChunk->numMeasurements(),
"serializeChunk: invalid startIdx" ); "serializeChunk: invalid startIdx" );
@ -97,7 +97,7 @@ private:
const uint32_t chunkNr = startIdx / CHUNK_SIZE; const uint32_t chunkNr = startIdx / CHUNK_SIZE;
const auto chunkFileNameStr = chunkFileName(chunkNr, currentChunk->getStartTime()); const auto chunkFileNameStr = chunkFileName(chunkNr, currentChunk->getStartTime());
Reader reader(chunkFileNameStr); Reader reader(chunkFileNameStr);
reader.seek(Chunk_T::template valueOffset<T>()); reader.seek(Chunk_T::valueOffset());
const uint32_t PART_SIZE = 32; const uint32_t PART_SIZE = 32;
#ifndef ARDUINO #ifndef ARDUINO

View File

@ -2,6 +2,8 @@
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include <errno.h>
#include <string.h>
#include <vector> #include <vector>
@ -24,6 +26,10 @@ public:
static const String baseDirectory("."); static const String baseDirectory(".");
auto fullFileName = baseDirectory + fileName; auto fullFileName = baseDirectory + fileName;
fptr = fopen(fullFileName.c_str(), "wb"); fptr = fopen(fullFileName.c_str(), "wb");
if (fptr == NULL) {
printf("fopen of %s failed, errno = %d - %s\n", fullFileName.c_str(), errno, strerror(errno));
exit(1);
}
} }
~FilePtrAdaptor() { ~FilePtrAdaptor() {

View File

@ -46,35 +46,9 @@ public:
encoder.sendArray(values, nextFree); encoder.sendArray(values, nextFree);
} }
/*
template<typename T>
void serialize(StreamingMsgPackEncoder<T> & encoder, uint32_t start, uint32_t end) const
{
if( start < this->startIndex )
start = this->startIndex;
if( end == 0 ) {
end = start + this->nextFree;
}
sendHeader(encoder, sessionStartTime, start);
bool sendEmpty =
(start >= end) ||
(end <= this->startIndex) ||
(start >= (this->startIndex + this->nextFree));
if( sendEmpty ) {
encoder.sendArray(nullptr, 0);
} else {
const uint32_t idxStart = (start - this->startIndex);
const uint32_t length = min(nextFree, end - start);
encoder.sendArray(values + idxStart, length);
}
}*/
template<typename T>
static uint32_t valueOffset() static uint32_t valueOffset()
{ {
StreamingMsgPackEncoder<T> encoder(nullptr); StreamingMsgPackEncoder<DummyWriter> encoder(nullptr);
encoder.setSizeCountMode(true); encoder.setSizeCountMode(true);
sendHeader(encoder, 0, 0); sendHeader(encoder, 0, 0);
encoder.template sendArrayHeader<Measurement_T>(0); encoder.template sendArrayHeader<Measurement_T>(0);
@ -89,8 +63,8 @@ public:
return values; return values;
} }
template<typename T> template<typename Encoder_T>
static void sendHeader(StreamingMsgPackEncoder<T> & encoder, uint32_t sessionStartTime, uint32_t startIndex) static void sendHeader(Encoder_T & encoder, uint32_t sessionStartTime, uint32_t startIndex)
{ {
encoder.sendMap16(3); encoder.sendMap16(3);

View File

@ -15,6 +15,10 @@ const char TypeToMsgPackCode<int8_t>::CODE = '\xd0';
const char TypeToMsgPackCode<int16_t>::CODE = '\xd1'; const char TypeToMsgPackCode<int16_t>::CODE = '\xd1';
const char TypeToMsgPackCode<int32_t>::CODE = '\xd2'; const char TypeToMsgPackCode<int32_t>::CODE = '\xd2';
struct DummyWriter {
void write(const void*, uint32_t) {}
};
template<typename Writer> template<typename Writer>
class StreamingMsgPackEncoder class StreamingMsgPackEncoder
{ {
@ -87,7 +91,7 @@ public:
{ {
if( sizeCountMode ) if( sizeCountMode )
{ {
contentLength += 1 + sizeof(uint32_t) + 1 + length * sizeof(T); contentLength += 1 + sizeof(uint32_t) + 1;
} }
else else
{ {
@ -103,6 +107,8 @@ public:
{ {
if( !sizeCountMode ) { if( !sizeCountMode ) {
writer->write((char*)(data), length * sizeof(T)); writer->write((char*)(data), length * sizeof(T));
} else {
contentLength += sizeof(T) * length;
} }
} }
@ -119,6 +125,9 @@ public:
return contentLength; return contentLength;
} }
void resetContentLength() {
contentLength = 0;
}
private: private:
Writer * writer; Writer * writer;
uint32_t contentLength; uint32_t contentLength;
@ -132,28 +141,100 @@ class ChunkedStreamingMsgPackEncoder
{ {
public: public:
ChunkedStreamingMsgPackEncoder(Writer * writer_, uint32_t offset, uint32_t maxSize) ChunkedStreamingMsgPackEncoder(Writer * writer_, uint32_t offset, uint32_t maxSize)
: encoder_(writer_), offset_(offset), maxSize_(maxSize), sent_(0) : encoder_(writer_), offsetToStart_(offset), maxBytes_(maxSize), sentBytes_(0), sendingFinished_(false)
{} {}
void sendMap16(byte size) { void sendMap16(byte size) {
// check if it fits, using separate object sendIfSpaceLeft([&]() { encoder_.sendMap16(size); });
encoder_.setSizeCountMode(true);
} }
void sendString255(PGM_P s) { void sendString255(PGM_P s) {
sendIfSpaceLeft([&]() { encoder_.sendString255(s); });
} }
template<typename T> template<typename T>
void sendInt(T value) { void sendInt(T value) {
sendIfSpaceLeft([&]() { encoder_.template sendInt<T>(value); });
} }
template<typename T>
void sendArray(const T * data, uint32_t length)
{
sendArrayHeader<T>(length);
sendArrayPartialContents(data, length);
}
template<typename T>
void sendArrayHeader(uint32_t length) {
sendIfSpaceLeft([&]() { encoder_.template sendArrayHeader<T>(length); });
}
template<typename T>
void sendArrayPartialContents(T * data, uint32_t length) {
if( sendingFinished_ ) {
return;
}
uint32_t sizeForFullArray = sizeof(T) * length;
uint32_t elementsToSkip = 0;
if( sentBytes_ < offsetToStart_ ) {
elementsToSkip = (offsetToStart_ - sentBytes_) / sizeof(T);
assert((offsetToStart_ - sentBytes_) % sizeof(T) == 0,
"Looks like previous sent operation send fraction of an element.");
}
if( elementsToSkip >= length) {
sentBytes_ += sizeof(T) * length;
return;
} else {
const uint32_t elementsToSend = length - elementsToSkip;
if( elementsToSend == 0 ) {
sendingFinished_ = true;
return;
} else {
encoder_.sendArrayPartialContents(data + elementsToSkip, elementsToSend);
}
}
}
uint32_t sentBytes() const {
return sentBytes_;
}
bool getSizeCountMode() const {
return false;
}
private: private:
template<typename T, typename... Args>
void sendIfSpaceLeft(T sendFunction) {
if( sendingFinished_ ) {
return;
}
encoder_.setSizeCountMode(true);
encoder_.resetContentLength();
sendFunction();
auto sizeRequired = encoder_.getContentLength();
encoder_.setSizeCountMode(false);
if( sentBytes_ < offsetToStart_ ) {
// already sent
sentBytes_ += sizeRequired;
assert( sentBytes_ <= offsetToStart_, "Partial sending not supported by this function" );
return;
}
if( sentBytes_ + sizeRequired < maxBytes_ ) {
sendFunction();
sentBytes_ += sizeRequired;
} else {
sendingFinished_ = true;
}
}
StreamingMsgPackEncoder<Writer> encoder_; StreamingMsgPackEncoder<Writer> encoder_;
uint32_t offset_; uint32_t sentBytes_;
uint32_t maxSize_; uint32_t maxBytes_;
uint32_t sent_; uint32_t offsetToStart_;
bool sendingFinished_;
}; };

View File

@ -20,6 +20,7 @@ lib_deps =
AsyncTCP AsyncTCP
NTPClient NTPClient
;[env:native] [env:native]
;platform = native platform = native
;test_ignore = test_embedded test_ignore = test_embedded
build_flags = -g

View File

@ -117,6 +117,7 @@ void testSessionChunkSerialization()
} }
} }
void testSession() { void testSession() {
const uint32_t SESSION_SIZE = 128; const uint32_t SESSION_SIZE = 128;
typedef MeasurementSession<uint16_t, MockStorageReader, MockStorageWriter, SESSION_SIZE> MockSession; typedef MeasurementSession<uint16_t, MockStorageReader, MockStorageWriter, SESSION_SIZE> MockSession;
@ -144,6 +145,47 @@ void testSession() {
} }
} }
void testPartialSessionSerialization() {
const uint32_t SESSION_SIZE = 128;
typedef MeasurementSession<uint16_t, MockStorageReader, MockStorageWriter, SESSION_SIZE> MockSession;
const uint32_t startTime = 194842;
const uint_t fillSize = SESSION_SIZE * 4 + 7;
MockSession session;
session.init(startTime);
for (uint16_t i = 0; i < fillSize; ++i) {
session.addPoint(i);
}
std::vector<uint8_t> data;
VectorAdaptor adaptor( &data );
StreamingMsgPackEncoder<VectorAdaptor> encoder(&adaptor);
encoder.setSizeCountMode(true);
session.serialize(encoder, 0);
auto totalSize = encoder.getContentLength();
std::vector<uint32_t> splits = {16, 32, 128, 256, 512, 721, 1024, totalSize};
uint32_t written = 0;
data.clear();
for(auto & split : splits) {
ChunkedStreamingMsgPackEncoder<VectorAdaptor> encoder(&adaptor, written, split);
session.serialize(encoder, 0);
written = encoder.sentBytes();
}
uint32_t readStartTime=0;
uint32_t readStartIndex=0;
auto result = parseMessagePack<uint16_t>(&data[0], readStartTime, readStartIndex);
TEST_ASSERT_MESSAGE(readStartIndex == 0 && startTime == readStartTime, "");
TEST_ASSERT_MESSAGE(result.size() == fillSize, "Wrong result array size");
for( uint16_t i=0; i < fillSize; ++i) {
TEST_ASSERT_MESSAGE(result[i] == i, "Wrong array contents");
}
}
void allTests() void allTests()
{ {
UNITY_BEGIN(); UNITY_BEGIN();
@ -151,6 +193,7 @@ void allTests()
RUN_TEST(testSessionChunkGetterSetter); RUN_TEST(testSessionChunkGetterSetter);
RUN_TEST(testSessionChunkSerialization); RUN_TEST(testSessionChunkSerialization);
RUN_TEST(testSession); RUN_TEST(testSession);
RUN_TEST(testPartialSessionSerialization);
UNITY_END(); UNITY_END();
} }