114 lines
4.1 KiB
C++
114 lines
4.1 KiB
C++
#include "SessionChunk.h"
|
|
|
|
|
|
template<typename Measurement_T, typename Reader, typename Writer, uint_t CHUNK_SIZE>
|
|
class Session {
|
|
public:
|
|
typedef SessionChunk<Measurement_T, CHUNK_SIZE> Chunk_T;
|
|
|
|
Session()
|
|
: currentChunk(&chunks[0]),
|
|
otherChunk(&chunks[1]) {}
|
|
|
|
void init(uint32_t epochStartTime) {
|
|
currentChunk = &chunks[0];
|
|
currentChunk->init(epochStartTime, 0);
|
|
}
|
|
|
|
bool addPoint(Measurement_T measurement) {
|
|
const bool successful = currentChunk->addPoint(measurement);
|
|
if (!successful) {
|
|
rotate();
|
|
const bool secondInsertSuccess = currentChunk->addPoint(measurement);
|
|
assert(secondInsertSuccess, "Session: insertion after rotation failed");
|
|
//TODO check that there is place for file - remove old files
|
|
}
|
|
return true;
|
|
}
|
|
|
|
template<typename T>
|
|
void serialize(StreamingMsgPackEncoder<T> & encoder, uint32_t startIdx) const
|
|
{
|
|
const uint32_t lastIdx = currentChunk->getStartIndex() + currentChunk->numMeasurements();
|
|
if( lastIdx <= startIdx) {
|
|
encoder.sendArray(nullptr, 0);
|
|
return;
|
|
}
|
|
|
|
Chunk_T::sendHeader(encoder, currentChunk->getStartTime(), startIdx);
|
|
encoder.sendArrayHeader(lastIdx - startIdx);
|
|
while(startIdx < lastIdx)
|
|
startIdx = serializeChunk(encoder, startIdx);
|
|
assert(startIdx == lastIdx, "Not all data was sent");
|
|
}
|
|
|
|
private:
|
|
void rotate() {
|
|
if( otherChunkFilled() )
|
|
saveChunkToFile(otherChunk);
|
|
swapChunks();
|
|
|
|
currentChunk->init(otherChunk->getStartTime(), otherChunk->getStartIndex() + CHUNK_SIZE);
|
|
}
|
|
|
|
bool otherChunkFilled() const {
|
|
return otherChunk->numMeasurements() > 0;
|
|
}
|
|
|
|
void swapChunks() {
|
|
Chunk_T *tmp = currentChunk;
|
|
currentChunk = otherChunk;
|
|
otherChunk = tmp;
|
|
}
|
|
|
|
void saveChunkToFile(Chunk_T *chunk) const {
|
|
const uint32_t chunkNr = chunk->getStartIndex() / CHUNK_SIZE;
|
|
const auto fileName = chunkFileName(chunkNr, chunk->getStartTime());
|
|
Writer writer( fileName );
|
|
chunk->serialize(writer.encoder());
|
|
};
|
|
|
|
template< typename T>
|
|
uint32_t serializeChunk(StreamingMsgPackEncoder<T> & encoder, uint32_t startIdx) {
|
|
assert( startIdx < currentChunk->getStartIndex() + currentChunk->numMeasurements(),
|
|
"serializeChunk: invalid startIdx" );
|
|
|
|
if( startIdx >= currentChunk->getStartIndex() ) {
|
|
encoder.sendArrayPartialContents( currentChunk->getDataPointer(), currentChunk->numMeasurements() );
|
|
return currentChunk->getStartIndex() + currentChunk->numMeasurements();
|
|
} else if( startIdx >= otherChunk->getStartIndex() && otherChunkFilled() ) {
|
|
encoder.sendArrayPartialContents( otherChunk->getDataPointer(), otherChunk->numMeasurements() );
|
|
assert( otherChunk->numMeasurements(), CHUNK_SIZE );
|
|
return otherChunk->getStartIndex() + otherChunk->numMeasurements();
|
|
} else {
|
|
if( encoder.getSizeCountMode() ) {
|
|
encoder.sendArrayPartialContents(nullptr, CHUNK_SIZE);
|
|
} else {
|
|
const uint32_t chunkNr = startIdx / CHUNK_SIZE;
|
|
const auto chunkFileName = (chunkNr, currentChunk->getStartTime());
|
|
Reader reader(chunkFileName);
|
|
reader.seek(Chunk_T::valueOffset());
|
|
|
|
const uint32_t PART_SIZE = 32;
|
|
static_assert( PART_SIZE < CHUNK_SIZE && CHUNK_SIZE % PART_SIZE == 0);
|
|
|
|
Measurement_T buffer[PART_SIZE];
|
|
for(uint32_t i = 0; i < CHUNK_SIZE; i += PART_SIZE)
|
|
{
|
|
reader.readBytes((char*) buffer, sizeof(Measurement_T) * PART_SIZE);
|
|
encoder.sendArrayPartialContents(buffer, PART_SIZE);
|
|
}
|
|
}
|
|
return startIdx + CHUNK_SIZE;
|
|
}
|
|
}
|
|
|
|
static String chunkFileName(uint32_t chunkNr, uint32_t startTime) {
|
|
return("/dat/" + toString(startTime) + "_" + toString(chunkNr));
|
|
}
|
|
|
|
Chunk_T chunks[2];
|
|
Chunk_T *currentChunk;
|
|
Chunk_T *otherChunk;
|
|
};
|