#pragma once template struct TypeToMsgPackCode{}; template<> struct TypeToMsgPackCode { static const char CODE; }; template<> struct TypeToMsgPackCode{ static const char CODE; }; template<> struct TypeToMsgPackCode{ static const char CODE; }; template<> struct TypeToMsgPackCode { static const char CODE; }; template<> struct TypeToMsgPackCode { static const char CODE; }; template<> struct TypeToMsgPackCode { static const char CODE; }; const char TypeToMsgPackCode::CODE = '\xcc'; const char TypeToMsgPackCode::CODE = '\xcd'; const char TypeToMsgPackCode::CODE = '\xce'; const char TypeToMsgPackCode::CODE = '\xd0'; const char TypeToMsgPackCode::CODE = '\xd1'; const char TypeToMsgPackCode::CODE = '\xd2'; struct DummyWriter { void write(const void*, uint32_t) {} }; class CopyWriter { public: CopyWriter(uint8_t * bufferToWrite) : bufferToWrite_( bufferToWrite ) {} void write(const void* data, uint32_t length) { memcpy(bufferToWrite_, data, length); bufferToWrite_ += length; } private: uint8_t * bufferToWrite_; }; template class StreamingMsgPackEncoder { public: StreamingMsgPackEncoder(Writer * writer_) : writer(writer_), contentLength(0), sizeCountMode(false) {} void sendMap16(byte size) { if( sizeCountMode ) { contentLength += 1; } else { size |= 0b10000000; writer->write((byte*)(&size), 1); } } void sendString255(PGM_P s) { auto len = strlen_P(s); if( len >= 255 ) { LOG_WARNING("ERROR: StreamingMsgPackEncoder::string255 - string too long"); return; } byte castedLen = (byte)(len); if( sizeCountMode ) { contentLength += 2 + castedLen; } else { writer->write((uint8_t*)"\xd9", 1); writer->write((uint8_t*)&castedLen, 1); writer->write((uint8_t*)s, len); } } template void sendInt(T value) { if( sizeCountMode ) { contentLength += 1 + sizeof(T); } else { if( sizeof(T) == 4 ) value = htonl(value); else if( sizeof(T) == 2) value = htons(value); writer->write((uint8_t*)(&TypeToMsgPackCode::CODE), 1); writer->write((uint8_t*)&value, sizeof(T)); } } template void sendArray(const T * data, uint32_t length) { sendArrayHeader(length); sendArrayPartialContents(data, length); } template void sendArrayHeader(uint32_t length) { if( sizeCountMode ) { contentLength += 1 + sizeof(uint32_t) + 1; } else { uint32_t nlength = htonl(length * sizeof(T)); writer->write((uint8_t*)("\xc9"), 1); // ext dtype since typed arrays are not supported by msgpack writer->write((uint8_t*)(&nlength), sizeof(uint32_t) ); writer->write((uint8_t*)(&TypeToMsgPackCode::CODE), 1); // put code for type here, this is not part of msgpack but custom } } template void sendArrayPartialContents(T * data, uint32_t length) { if( !sizeCountMode ) { writer->write((uint8_t*)(data), length * sizeof(T)); } else { contentLength += sizeof(T) * length; } } void setSizeCountMode(bool sizeCountMode_=true) { sizeCountMode = sizeCountMode_; } bool getSizeCountMode() const { return sizeCountMode; } uint32_t getContentLength() const { return contentLength; } void resetContentLength() { contentLength = 0; } private: Writer * writer; uint32_t contentLength; bool sizeCountMode; }; template class ChunkedStreamingMsgPackEncoder { public: ChunkedStreamingMsgPackEncoder(Writer * writer_, uint32_t offset, uint32_t maxSize) : encoder_(writer_), sentBytes_(0), maxBytes_(maxSize), offsetToStart_(offset), sendingFinished_(false) {} void sendMap16(byte size) { sendIfSpaceLeft([&]() { encoder_.sendMap16(size); }); } void sendString255(PGM_P s) { sendIfSpaceLeft([&]() { encoder_.sendString255(s); }); } template void sendInt(T value) { sendIfSpaceLeft([&]() { encoder_.template sendInt(value); }); } template void sendArray(const T * data, uint32_t length) { sendArrayHeader(length); sendArrayPartialContents(data, length); } template void sendArrayHeader(uint32_t length) { sendIfSpaceLeft([&]() { encoder_.template sendArrayHeader(length); }); } template void sendArrayPartialContents(T * data, uint32_t length) { if( sendingFinished_ ) { return; } uint32_t elementsToSkip = 0; if( sentBytes_ < offsetToStart_ ) { elementsToSkip = (offsetToStart_ - sentBytes_) / sizeof(T); assert_msg((offsetToStart_ - sentBytes_) % sizeof(T) == 0, "Looks like previous sent operation send fraction of an element."); } if( elementsToSkip >= length) { sentBytes_ += sizeof(T) * length; return; } else { sentBytes_ += sizeof(T) * elementsToSkip; const uint32_t elementsRemaining = length - elementsToSkip; const uint32_t maxElementsToSend = (maxBytes_ - sentBytes_) / sizeof(T); const uint32_t elementsToSend = min(elementsRemaining, maxElementsToSend); if( elementsToSend == 0 ) { sendingFinished_ = true; return; } else { encoder_.sendArrayPartialContents(data + elementsToSkip, elementsToSend); sentBytes_ += sizeof(T) * elementsToSend; if( elementsRemaining > elementsToSend ) { sendingFinished_ = true; } } } } uint32_t sentBytes() const { return sentBytes_; } bool getSizeCountMode() const { return false; } private: template void sendIfSpaceLeft(T sendFunction) { if( sendingFinished_ ) { return; } encoder_.setSizeCountMode(true); encoder_.resetContentLength(); sendFunction(); auto sizeRequired = encoder_.getContentLength(); encoder_.setSizeCountMode(false); if( sentBytes_ < offsetToStart_ ) { // already sent sentBytes_ += sizeRequired; assert_msg( sentBytes_ <= offsetToStart_, "Partial sending not supported by this function" ); return; } if( sentBytes_ + sizeRequired <= maxBytes_ ) { sendFunction(); sentBytes_ += sizeRequired; } else { sendingFinished_ = true; } } StreamingMsgPackEncoder encoder_; uint32_t sentBytes_; uint32_t maxBytes_; uint32_t offsetToStart_; bool sendingFinished_; };