From 51f5932e80a53378455006ff460058948d73507c Mon Sep 17 00:00:00 2001 From: Inna Brodkin Date: Thu, 2 May 2024 15:17:31 -0500 Subject: [PATCH 1/3] StoryChunk should be created on the heap, also added debug logging to track story chunk processing --- ChronoGrapher/CSVFileChunkExtractor.cpp | 10 +- ChronoGrapher/ChronoGrapher.cpp | 8 +- ChronoGrapher/ChunkIngestionQueue.h | 13 +- ChronoGrapher/GrapherRecordingService.h | 14 +-- ChronoGrapher/KeeperDataStore.cpp | 20 +-- ChronoGrapher/KeeperDataStore.h | 2 +- ChronoGrapher/StoryPipeline.cpp | 156 ++++++++---------------- ChronoKeeper/StoryPipeline.cpp | 2 +- chrono_common/ConfigurationManager.cpp | 2 +- chrono_common/ConfigurationManager.h | 2 +- 10 files changed, 91 insertions(+), 138 deletions(-) diff --git a/ChronoGrapher/CSVFileChunkExtractor.cpp b/ChronoGrapher/CSVFileChunkExtractor.cpp index f3f4c20c..c1993392 100644 --- a/ChronoGrapher/CSVFileChunkExtractor.cpp +++ b/ChronoGrapher/CSVFileChunkExtractor.cpp @@ -4,6 +4,7 @@ #include "chronolog_types.h" #include "CSVFileChunkExtractor.h" +#include "log.h" namespace tl = thallium; @@ -23,11 +24,12 @@ void chronolog::CSVFileStoryChunkExtractor::processStoryChunk(chronolog::StoryCh { std::ofstream chunk_fstream; std::string chunk_filename(rootDirectory); - chunk_filename += chrono_process_id + "." + std::to_string(story_chunk->getStoryId()) + "." + - std::to_string(story_chunk->getStartTime() / 1000000000) + ".csv"; + chunk_filename += "/" + std::to_string(story_chunk->getStoryId()) + "." + //INNA : + chrono_process_id.getIPasDottedString(chunk_filename) + "." + std::to_string(chrono_process_id.getPort()) + "." + + std::to_string(story_chunk->getStartTime() / 1000000000) + ".csv"; tl::xstream es = tl::xstream::self(); - LOG_INFO("[CSVFileStoryChunkExtractor] Processing StoryChunk: ES={}, ULT={}, StoryID={}, StartTime={}", es.get_rank() + LOG_DEBUG("[CSVFileStoryChunkExtractor] Processing StoryChunk: ES={}, ULT={}, StoryID={}, StartTime={}", es.get_rank() , tl::thread::self_id(), story_chunk->getStoryId(), story_chunk->getStartTime()); // current thread if the only one that has this storyChunk and the only one that's writing to this chunk csv file // thus no additional locking is needed ... @@ -38,6 +40,6 @@ void chronolog::CSVFileStoryChunkExtractor::processStoryChunk(chronolog::StoryCh chunk_fstream << event << std::endl; } chunk_fstream.close(); - LOG_INFO("[CSVFileStoryChunkExtractor] Finished processing StoryChunk. File={}", chunk_filename); + LOG_DEBUG("[CSVFileStoryChunkExtractor] Finished processing StoryChunk. File={}", chunk_filename); } diff --git a/ChronoGrapher/ChronoGrapher.cpp b/ChronoGrapher/ChronoGrapher.cpp index 5b257d29..a7d185f8 100644 --- a/ChronoGrapher/ChronoGrapher.cpp +++ b/ChronoGrapher/ChronoGrapher.cpp @@ -164,13 +164,12 @@ int main(int argc, char**argv) catch(tl::exception const &) { LOG_ERROR("[ChronoGrapher] failed to create DataStoreAdminService"); + keeperDataAdminService = nullptr; } if(nullptr == keeperDataAdminService) { LOG_CRITICAL("[ChronoGrapher] failed to create DataStoreAdminService exiting"); - if(dataAdminEngine) - { delete dataAdminEngine; } return (-1); } @@ -194,6 +193,7 @@ int main(int argc, char**argv) catch(tl::exception const &) { LOG_ERROR("[ChronoGrapher] failed to create RecordingService"); + grapherRecordingService = nullptr; } if(nullptr == grapherRecordingService) @@ -225,7 +225,9 @@ int main(int argc, char**argv) /// Registration with ChronoVisor __________________________________________________________________________________ // try to register with chronoVisor a few times than log ERROR and exit... - int registration_status = chronolog::CL_ERR_UNKNOWN; + int registration_status = grapherRegistryClient->send_register_msg( + chronolog::GrapherRegistrationMsg(processIdCard, collectionServiceId)); + //if the first attemp failes retry int retries = 5; while((chronolog::CL_SUCCESS != registration_status) && (retries > 0)) { diff --git a/ChronoGrapher/ChunkIngestionQueue.h b/ChronoGrapher/ChunkIngestionQueue.h index 67c82011..6646dd2e 100644 --- a/ChronoGrapher/ChunkIngestionQueue.h +++ b/ChronoGrapher/ChunkIngestionQueue.h @@ -54,7 +54,8 @@ class ChunkIngestionQueue void ingestStoryChunk(StoryChunk* chunk) { - LOG_DEBUG("[IngestionQueue] Received chunk for StoryID={}: HandleMapSize={}", chunk->getStoryId(), storyIngestionHandles.size()); + LOG_DEBUG("[IngestionQueue] has {} StoryHandles; Received chunk for StoryID={} startTime {} eventCount{}", storyIngestionHandles.size(), + chunk->getStoryId(), chunk->getStartTime(), chunk->getEventCount()); auto ingestionHandle_iter = storyIngestionHandles.find(chunk->getStoryId()); if(ingestionHandle_iter == storyIngestionHandles.end()) { @@ -76,6 +77,13 @@ class ChunkIngestionQueue LOG_DEBUG("[IngestionQueue] Orphan chunk queue is empty. No actions taken."); return; } + + if (storyIngestionHandles.empty()) + { + LOG_DEBUG("[IngestionQueue] has 0 storyIngestionHandles to place {} orphaned chunks", orphanQueue.size()); + return; + } + std::lock_guard lock(ingestionQueueMutex); for(StoryChunkDeque::iterator iter = orphanQueue.begin(); iter != orphanQueue.end();) { @@ -93,7 +101,8 @@ class ChunkIngestionQueue ++iter; } } - LOG_DEBUG("[IngestionQueue] Drained {} orphan chunks into known handles.", orphanQueue.size()); + + LOG_WARNING("[IngestionQueue] has {} orphaned chunks", orphanQueue.size()); } bool is_empty() const diff --git a/ChronoGrapher/GrapherRecordingService.h b/ChronoGrapher/GrapherRecordingService.h index 5ba71057..7cd1b8f2 100644 --- a/ChronoGrapher/GrapherRecordingService.h +++ b/ChronoGrapher/GrapherRecordingService.h @@ -56,26 +56,26 @@ class GrapherRecordingService: public tl::provider // std::cout << (char)*(char*)(&mem_vec[0]+i) << " "; // } // std::cout << std::endl; - StoryChunk story_chunk; + + StoryChunk * story_chunk= new StoryChunk(); #ifndef NDEBUG start = std::chrono::high_resolution_clock::now(); #endif - deserializedWithCereal(&mem_vec[0], b.size() - 1, story_chunk); + deserializedWithCereal(&mem_vec[0], b.size() - 1, *story_chunk); #ifndef NDEBUG end = std::chrono::high_resolution_clock::now(); LOG_INFO("[GrapherRecordingService] Deserialization took {} us, ThreadID={}", std::chrono::duration_cast (end - start).count() / 1000.0 , tl::thread::self_id()); #endif - LOG_DEBUG("[GrapherRecordingService] StoryChunk received: StoryID: {}, StartTime: {}, ThreadID={}" - , story_chunk.getStoryId(), story_chunk.getStartTime(), tl::thread::self_id()); + LOG_DEBUG("[GrapherRecordingService] StoryChunk received: StoryId {} StartTime {} eventCount {} ThreadID={}" + , story_chunk->getStoryId(), story_chunk->getStartTime(), story_chunk->getEventCount(), tl::thread::self_id()); + request.respond(b.size()); LOG_DEBUG("[GrapherRecordingService] StoryChunk recording RPC responded {}, ThreadID={}" , b.size(), tl::thread::self_id()); - theIngestionQueue.ingestStoryChunk(&story_chunk); - LOG_DEBUG("[GrapherRecordingService] Ingested a StoryChunk, StoryID: {}, StartTime: {}, ThreadID={}" - , story_chunk.getStoryId(), story_chunk.getStartTime(), tl::thread::self_id()); + theIngestionQueue.ingestStoryChunk(story_chunk); } private: diff --git a/ChronoGrapher/KeeperDataStore.cpp b/ChronoGrapher/KeeperDataStore.cpp index 5a020854..56eaa18c 100644 --- a/ChronoGrapher/KeeperDataStore.cpp +++ b/ChronoGrapher/KeeperDataStore.cpp @@ -20,14 +20,14 @@ int chronolog::KeeperDataStore::startStoryRecording(std::string const &chronicle , chronolog::StoryId const &story_id, uint64_t start_time , uint32_t time_chunk_duration, uint32_t access_window) { - LOG_INFO("[KeeperDataStore] Start recording story: Chronicle={}, Story={}, StoryID={}", chronicle, story, story_id); + LOG_INFO("[KeeperDataStore] Start recording story: Chronicle={}, Story={}, StoryId={}", chronicle, story, story_id); // Get dataStoreMutex, check for story_id_presense & add new StoryPipeline if needed std::lock_guard storeLock(dataStoreMutex); auto pipeline_iter = theMapOfStoryPipelines.find(story_id); if(pipeline_iter != theMapOfStoryPipelines.end()) { - LOG_INFO("[KeeperDataStore] Story already being recorded. StoryID: {}", story_id); + LOG_INFO("[KeeperDataStore] Story already being recorded. StoryId: {}", story_id); //check it the pipeline was put on the waitingForExit list by the previous acquisition // and remove it from there auto waiting_iter = pipelinesWaitingForExit.find(story_id); @@ -42,11 +42,11 @@ int chronolog::KeeperDataStore::startStoryRecording(std::string const &chronicle auto result = theMapOfStoryPipelines.emplace( std::pair (story_id, new chl::StoryPipeline(theExtractionQueue, chronicle , story, story_id, start_time - , time_chunk_duration))); + , time_chunk_duration, access_window))); if(result.second) { - LOG_INFO("[KeeperDataStore] New StoryPipeline created successfully. StoryID: {}", story_id); + LOG_INFO("[KeeperDataStore] New StoryPipeline created successfully. StoryId {}", story_id); pipeline_iter = result.first; //engage StoryPipeline with the IngestionQueue chl::StoryChunkIngestionHandle*ingestionHandle = (*pipeline_iter).second->getActiveIngestionHandle(); @@ -55,7 +55,7 @@ int chronolog::KeeperDataStore::startStoryRecording(std::string const &chronicle } else { - LOG_ERROR("[KeeperDataStore] Failed to create StoryPipeline for StoryID: {}. Possible memory or resource issue." + LOG_ERROR("[KeeperDataStore] Failed to create StoryPipeline for StoryId: {}. Possible memory or resource issue." , story_id); return CL_ERR_UNKNOWN; } @@ -64,7 +64,7 @@ int chronolog::KeeperDataStore::startStoryRecording(std::string const &chronicle int chronolog::KeeperDataStore::stopStoryRecording(chronolog::StoryId const &story_id) { - LOG_DEBUG("[KeeperDataStore] Initiating stop recording for StoryID={}", story_id); + LOG_DEBUG("[KeeperDataStore] Initiating stop recording for StoryId={}", story_id); // we do not yet disengage the StoryPipeline from the IngestionQueue right away // but put it on the WaitingForExit list to be finalized, persisted to disk , and // removed from memory at exit_time = now+acceptance_window... @@ -74,15 +74,15 @@ int chronolog::KeeperDataStore::stopStoryRecording(chronolog::StoryId const &sto if(pipeline_iter != theMapOfStoryPipelines.end()) { uint64_t exit_time = std::chrono::high_resolution_clock::now().time_since_epoch().count() + - (*pipeline_iter).second->getAcceptanceWindow(); + (*pipeline_iter).second->getAcceptanceWindow()*5; pipelinesWaitingForExit[(*pipeline_iter).first] = (std::pair ( (*pipeline_iter).second, exit_time)); - LOG_INFO("[KeeperDataStore] Added StoryPipeline to waiting list for finalization. StoryID={}, ExitTime={}", story_id + LOG_INFO("[KeeperDataStore] Added StoryPipeline to waiting list for finalization. StoryId={}, ExitTime={}", story_id , exit_time); } else { - LOG_WARNING("[KeeperDataStore] Attempted to stop recording for non-existent StoryID={}", story_id); + LOG_WARNING("[KeeperDataStore] Attempted to stop recording for non-existent StoryId={}", story_id); } return chronolog::CL_SUCCESS; } @@ -148,7 +148,7 @@ void chronolog::KeeperDataStore::retireDecayedPipelines() } } - //swipe through pipelineswaiting and remove all those with nullptr + LOG_DEBUG("[KeeperDataStore] Completed retirement of decayed pipelines. Current state={}, Active StoryPipelines={}, PipelinesWaitingForExit={}, ThreadID={}" , state, theMapOfStoryPipelines.size(), pipelinesWaitingForExit.size(), tl::thread::self_id()); } diff --git a/ChronoGrapher/KeeperDataStore.h b/ChronoGrapher/KeeperDataStore.h index f8836fe1..6ef99e8b 100644 --- a/ChronoGrapher/KeeperDataStore.h +++ b/ChronoGrapher/KeeperDataStore.h @@ -42,7 +42,7 @@ class KeeperDataStore { return (SHUTTING_DOWN == state); } int startStoryRecording(ChronicleName const &, StoryName const &, StoryId const &, uint64_t start_time - , uint32_t time_chunk_ranularity = 30, uint32_t access_window = 60); + , uint32_t time_chunk_ranularity = 30, uint32_t access_window = 300); int stopStoryRecording(StoryId const &); diff --git a/ChronoGrapher/StoryPipeline.cpp b/ChronoGrapher/StoryPipeline.cpp index 3b5fb3e4..f14353eb 100644 --- a/ChronoGrapher/StoryPipeline.cpp +++ b/ChronoGrapher/StoryPipeline.cpp @@ -32,9 +32,8 @@ chronolog::StoryPipeline::StoryPipeline(StoryChunkExtractionQueue &extractionQue auto story_start_point = std::chrono::time_point {} + std::chrono::nanoseconds(timelineStart); std::time_t time_t_story_start = std::chrono::high_resolution_clock::to_time_t(story_start_point); - LOG_INFO("[StoryPipeline] Initialized with StoryID={}, StoryStartTime={}, Chronology={}, ChunkGranularity={} seconds, AcceptanceWindow={} seconds" - , storyId, std::ctime(&time_t_story_start), chronicleName, chunkGranularity / 1000000000, - acceptanceWindow / 1000000000); + LOG_INFO("[StoryPipeline] Initialized : Chronicle {} Story {} StoryId {} start {} Granularity {} AcceptanceWindow {}" + , chronicleName, storyName, storyId, std::ctime(&time_t_story_start), chunkGranularity, acceptanceWindow ); chunkGranularity *= 1000000000; // seconds =>nanoseconds acceptanceWindow *= 1000000000; // seconds =>nanoseconds @@ -43,7 +42,7 @@ chronolog::StoryPipeline::StoryPipeline(StoryChunkExtractionQueue &extractionQue timelineStart -= (timelineStart % chunkGranularity); timelineEnd = timelineStart; - for(uint64_t start = timelineStart; timelineEnd < (timelineStart + chunkGranularity * 3);) + while( timelineEnd < (timelineStart + chunkGranularity * 3)) { appendStoryChunk(); } @@ -55,7 +54,7 @@ chronolog::StoryPipeline::StoryPipeline(StoryChunkExtractionQueue &extractionQue auto chunk_end_point = std::chrono::time_point{} + std::chrono::nanoseconds(timelineEnd); std::time_t time_t_chunk_end = std::chrono::high_resolution_clock::to_time_t(chunk_end_point); - LOG_DEBUG("[StoryPipeline] Created StoryPipeline with StoryID={}, StartChunk={}, EndChunk={}", + LOG_TRACE("[StoryPipeline] Created StoryPipeline with StoryId {}, StartChunk {}, EndChunk {}", storyId, std::ctime(&time_t_chunk_start), std::ctime(&time_t_chunk_end)); #endif @@ -70,7 +69,7 @@ chl::StoryChunkIngestionHandle*chl::StoryPipeline::getActiveIngestionHandle() /////////////////////// chronolog::StoryPipeline::~StoryPipeline() { - LOG_DEBUG("[StoryPipeline] Destructor called for StoryID={}", storyId); + LOG_DEBUG("[StoryPipeline] Destructor called for StoryId {}", storyId); finalize(); } /////////////////////// @@ -83,18 +82,20 @@ void chronolog::StoryPipeline::finalize() { while(!activeIngestionHandle->getPassiveDeque().empty()) { - //INNA: consider adding mergeEvents(StoryChunk*) - mergeEvents(*(activeIngestionHandle->getPassiveDeque().front())); + StoryChunk* next_chunk = activeIngestionHandle->getPassiveDeque().front(); activeIngestionHandle->getPassiveDeque().pop_front(); + mergeEvents(*next_chunk); + delete next_chunk; } - while(!activeIngestionHandle->getPassiveDeque().empty()) + while(!activeIngestionHandle->getActiveDeque().empty()) { - //INNA: consider adding mergeEvents(StoryChunk*) - mergeEvents(*(activeIngestionHandle->getPassiveDeque().front())); - activeIngestionHandle->getPassiveDeque().pop_front(); + StoryChunk* next_chunk = activeIngestionHandle->getActiveDeque().front(); + activeIngestionHandle->getActiveDeque().pop_front(); + mergeEvents(*next_chunk); + delete next_chunk; } delete activeIngestionHandle; - LOG_INFO("[StoryPipeline] Finalized ingestion handle for storyId: {}", storyId); + LOG_INFO("[StoryPipeline] Finalized ingestion handle for storyId {}", storyId); } //extract any remianing non-empty StoryChunks regardless of decay_time @@ -108,10 +109,9 @@ void chronolog::StoryPipeline::finalize() extractedChunk = (*storyTimelineMap.begin()).second; storyTimelineMap.erase(storyTimelineMap.begin()); -#ifdef TRACE_CHUNK_EXTRACTION - LOG_TRACE("[StoryPipeline] Finalized chunk for StoryID={}. Is empty: {}", storyId, extractedChunk->empty() - ? "Yes" : "No"); -#endif + LOG_DEBUG("[StoryPipeline] Finalized chunk for StoryId {} startTime {} eventCount {}", storyId, + extractedChunk->getStartTime(), extractedChunk->getEventCount()); + if(extractedChunk->empty()) { // no need to carry an empty chunk any further... delete extractedChunk; @@ -136,7 +136,7 @@ std::map ::iterator chronolog::StoryPipeline:: std::time_t time_t_chunk_start = std::chrono::high_resolution_clock::to_time_t(chunk_start_point); auto chunk_end_point = epoch_time_point + std::chrono::nanoseconds(timelineStart-chunkGranularity); std::time_t time_t_chunk_end = std::chrono::high_resolution_clock::to_time_t(chunk_end_point); - LOG_TRACE("[StoryPipeline] Prepending new chunk for StoryID={} starting at: {}", + LOG_TRACE("[StoryPipeline] Prepending new chunk for StoryId {} starting at {}", storyId, timelineStart); #endif auto result = storyTimelineMap.insert( @@ -163,7 +163,7 @@ std::map ::iterator chronolog::StoryPipeline:: std::time_t time_t_chunk_start = std::chrono::high_resolution_clock::to_time_t(chunk_start_point); auto chunk_end_point = epoch_time_point + std::chrono::nanoseconds(timelineEnd+chunkGranularity); std::time_t time_t_chunk_end = std::chrono::high_resolution_clock::to_time_t(chunk_end_point); - LOG_TRACE("[StoryPipeline] Appending new chunk for StoryID={} starting at: {}", + LOG_TRACE("[StoryPipeline] Appending new chunk for StoryId {} starting at {}", storyId, timelineEnd); #endif auto result = storyTimelineMap.insert( @@ -184,13 +184,17 @@ std::map ::iterator chronolog::StoryPipeline:: void chronolog::StoryPipeline::collectIngestedEvents() { activeIngestionHandle->swapActiveDeque(); + StoryChunk * next_chunk = nullptr; while(!activeIngestionHandle->getPassiveDeque().empty()) { - //INNA: consider adding mergeEvents(StoryChunk*) - mergeEvents(*(activeIngestionHandle->getPassiveDeque().front())); + next_chunk = activeIngestionHandle->getPassiveDeque().front(); activeIngestionHandle->getPassiveDeque().pop_front(); + if( next_chunk != nullptr) + { + mergeEvents(*next_chunk); + delete next_chunk; + } } - LOG_DEBUG("[StoryPipeline] Collected ingested events for StoryID={}", storyId); } void chronolog::StoryPipeline::extractDecayedStoryChunks(uint64_t current_time) @@ -205,7 +209,7 @@ void chronolog::StoryPipeline::extractDecayedStoryChunks(uint64_t current_time) std::chrono::time_point {} // epoch_time_point{}; + std::chrono::nanoseconds(head_chunk_end_time + acceptanceWindow); std::time_t time_t_decay = std::chrono::high_resolution_clock::to_time_t(decay_point); - LOG_TRACE("[StoryPipeline] StoryID: {} - Current time: {} - Timeline size: {} - Head chunk decay time: {}", storyId + LOG_TRACE("[StoryPipeline] StoryId: {} - Current time: {} - Timeline size: {} - Head chunk decay time: {}", storyId , std::ctime(&time_t_current_time), storyTimelineMap.size(), std::ctime(&time_t_decay)); #endif @@ -228,10 +232,9 @@ void chronolog::StoryPipeline::extractDecayedStoryChunks(uint64_t current_time) if(extractedChunk != nullptr) { -#ifdef TRACE_CHUNK_EXTRACTION - LOG_TRACE("[StoryPipeline] StoryID: {} - Extracted chunk with start time {} is empty: {}", storyId - , extractedChunk->getStartTime(), (extractedChunk->empty() ? "Yes" : "No")); -#endif + LOG_TRACE("[StoryPipeline] StoryId: {} - Extracted chunk with start time {} eventCount {}", storyId + , extractedChunk->getStartTime(), extractedChunk->getEventCount()); + if(extractedChunk->empty()) { // there's no need to carry an empty chunk any further... delete extractedChunk; @@ -242,81 +245,12 @@ void chronolog::StoryPipeline::extractDecayedStoryChunks(uint64_t current_time) } } } -#ifdef TRACE_CHUNK_EXTRACTION - LOG_TRACE("[StoryPipeline] Extracting decayed chunks for StoryID={}. Queue size: {}", storyId - , theExtractionQueue.size()); -#endif -} -//////////////////// -/* -void chronolog::StoryPipeline::mergeEvents(chronolog::EventDeque &event_deque) -{ - if(event_deque.empty()) - { return; } + LOG_TRACE("[StoryPipeline] Extracting decayed chunks for StoryId {} ExtractionQueue size {}", storyId + , theExtractionQueue.size()); - std::lock_guard lock(sequencingMutex); - chl::LogEvent event; - // the last chunk is most likely the one that would get the events, so we'd start with the last - // chunk and do the lookup only if it's not the one - // NOTE: we should never have less than 2 chunks in the active storyTimelineMap !!! - std::map ::iterator chunk_to_merge_iter = --storyTimelineMap.end(); - while(!event_deque.empty()) - { - event = event_deque.front(); - LOG_DEBUG("[StoryPipeline] StoryID: {} [Start: {}, End: {}]: Merging event time: {}", storyId, timelineStart - , timelineEnd, event.time()); - if(timelineStart <= event.time() && event.time() < timelineEnd) - { - // we expect the events in the deque to be mostly monotonous - // so we'd try the most recently used chunk first and only look for the new chunk - // if the event does not belong to the recently used chunk - if(!(*chunk_to_merge_iter).second->insertEvent(event)) - { - // find the new chunk_to_merge the event into : we are lookingt for - // the chunk preceeding the first chunk with the startTime > event.time() - chunk_to_merge_iter = storyTimelineMap.upper_bound(event.time()); - //merge into the preceeding chunk - if(!(*(--chunk_to_merge_iter)).second->insertEvent(event)) - { - LOG_ERROR("[StoryPipeline] StoryID: {} - Discarded event with timestamp: {}", storyId, event.time()); - } - } - } - else if(event.time() >= timelineEnd) - { //extend timeline forward - while(event.time() >= timelineEnd) - { - chunk_to_merge_iter = appendStoryChunk(); - if(chunk_to_merge_iter == storyTimelineMap.end()) - { break; } - } - if(chunk_to_merge_iter != storyTimelineMap.end()) - { (*chunk_to_merge_iter).second->insertEvent(event); } - else - { - LOG_ERROR("[StoryPipeline] StoryID: {} - Discarding event with timestamp: {}", storyId, event.time()); - } - } - else - { //extend timeline backward - while(event.time() < timelineStart) - { - chunk_to_merge_iter = chl::StoryPipeline::prependStoryChunk(); - if(chunk_to_merge_iter == storyTimelineMap.end()) - { break; } - } - if(chunk_to_merge_iter != storyTimelineMap.end()) - { (*chunk_to_merge_iter).second->insertEvent(event); } - else - { - LOG_ERROR("[StoryPipeline] StoryID: {} - Discarding event with timestamp: {}", storyId, event.time()); - } - } - event_deque.pop_front(); - } } -*/ + ////////////////////// // Merge the StoryChunk obtained from external source into the StoryPipeline // Note that the granularity of the StoryChunk being merged may be @@ -331,8 +265,8 @@ void chronolog::StoryPipeline::mergeEvents(chronolog::StoryChunk &other_chunk) std::lock_guard lock(sequencingMutex); - LOG_DEBUG("[StoryPipeline] StoryID: {} - Merging StoryChunk from {} to {}", storyId, other_chunk.getStartTime() - , other_chunk.getEndTime()); + LOG_DEBUG("[StoryPipeline] StoryId {} timeline {}-{} : Merging StoryChunk {}-{} eventCount {}", storyId,timelineStart, timelineEnd, other_chunk.getStartTime() + , other_chunk.getEndTime(), other_chunk.getEventCount()); // locate the storyChunk in the StoryPipeline with the time Key not less than // other_chunk.startTime and start merging @@ -343,30 +277,34 @@ void chronolog::StoryPipeline::mergeEvents(chronolog::StoryChunk &other_chunk) // find the chunk_to_merge into : we are lookingt for // the chunk preceeding the one with the startTime > other_chunk.getStartTime() chunk_to_merge_iter = --storyTimelineMap.upper_bound(other_chunk.getStartTime()); + LOG_DEBUG("[StoryPipeline] StoryId {} timeline {}-{} : Merging StoryChunk {} starts with chunk {}", storyId, timelineStart, timelineEnd, + other_chunk.getStartTime(), (*chunk_to_merge_iter).second->getStartTime()); } else { // unlikely but possible that we get some delayed events and need to prepend some chunks // extending the timeline back to the past - LOG_DEBUG("[StoryPipeline] StoryID: {} - Prepending merge starting at timestamp {}", storyId - , other_chunk.getStartTime()); + LOG_DEBUG("[StoryPipeline] StoryId {} timeline {}-{} : Merging StoryChunk {} -timelineStart {} prepending chunks ", storyId, timelineStart, timelineEnd, other_chunk.getStartTime(), timelineStart); + while(timelineStart > other_chunk.getStartTime()) { chunk_to_merge_iter = chl::StoryPipeline::prependStoryChunk(); if(chunk_to_merge_iter == storyTimelineMap.end()) { - //INNA:: if prepend fails we have no choice but to discard the events we can't merge !! - LOG_ERROR("[StoryPipeline] StoryID: {} - Merge operation discards events between timestamps: {} and {}" - , storyId, other_chunk.getStartTime(), timelineStart); + // if prepend fails we have no choice but to discard the events we can't merge !! + LOG_ERROR("[StoryPipeline] StoryId {} timeline {}-{} : Merge operation discards events between timestamps: {} and {}" + , storyId, timelineStart, timelineEnd, other_chunk.getStartTime(), timelineStart); other_chunk.eraseEvents(other_chunk.getStartTime(), timelineStart); chunk_to_merge_iter = storyTimelineMap.begin(); } } } - //iterate through the storyTimelineMap draining the other_chunk events + // iterate through the storyTimelineMap draining the other_chunk events while(chunk_to_merge_iter != storyTimelineMap.end() && !other_chunk.empty()) { + LOG_DEBUG("[StoryPipeline] StoryId {} timeline {}-{} : Merging StoryChunk {} into chunk {}", storyId, timelineStart, timelineEnd, + other_chunk.getStartTime(), (*chunk_to_merge_iter).second->getStartTime()); (*chunk_to_merge_iter).second->mergeEvents(other_chunk); chunk_to_merge_iter++; } @@ -376,10 +314,12 @@ void chronolog::StoryPipeline::mergeEvents(chronolog::StoryChunk &other_chunk) while(!other_chunk.empty()) { + LOG_DEBUG("[StoryPipeline] StoryId {} timeline {}-{} : Merging StoryChunk {} events {} - appending chunks", storyId, other_chunk.getStartTime(), other_chunk.getEventCount()); chunk_to_merge_iter = appendStoryChunk(); if(chunk_to_merge_iter == storyTimelineMap.end()) { break; } - + LOG_DEBUG("[StoryPipeline] StoryId {} timeline {}-{} : Merging StoryChunk {} into chunk {}", storyId, timelineStart, timelineEnd, + other_chunk.getStartTime(), (*chunk_to_merge_iter).second->getStartTime()); (*chunk_to_merge_iter).second->mergeEvents(other_chunk); } diff --git a/ChronoKeeper/StoryPipeline.cpp b/ChronoKeeper/StoryPipeline.cpp index 1115b1b1..169925a5 100644 --- a/ChronoKeeper/StoryPipeline.cpp +++ b/ChronoKeeper/StoryPipeline.cpp @@ -43,7 +43,7 @@ chronolog::StoryPipeline::StoryPipeline(StoryChunkExtractionQueue &extractionQue timelineStart -= (timelineStart % chunkGranularity); timelineEnd = timelineStart; - for(uint64_t start = timelineStart; timelineEnd < (timelineStart + chunkGranularity * 3);) + while( timelineEnd < timelineStart + chunkGranularity * 3) { appendStoryChunk(); } diff --git a/chrono_common/ConfigurationManager.cpp b/chrono_common/ConfigurationManager.cpp index 90ffc87d..66f2ddae 100644 --- a/chrono_common/ConfigurationManager.cpp +++ b/chrono_common/ConfigurationManager.cpp @@ -7,7 +7,7 @@ void ChronoLog::ConfigurationManager::parseGrapherConf(json_object*json_conf) { if(strcmp(key, "RecordingGroup") == 0) { - assert(json_object_is_type(val, json_type_object)); + assert(json_object_is_type(val, json_type_int)); int value = json_object_get_int(val); GRAPHER_CONF.RECORDING_GROUP = (value >= 0 ? value : 0); } diff --git a/chrono_common/ConfigurationManager.h b/chrono_common/ConfigurationManager.h index 5b861e62..00c36810 100644 --- a/chrono_common/ConfigurationManager.h +++ b/chrono_common/ConfigurationManager.h @@ -809,7 +809,7 @@ class ConfigurationManager { if(strcmp(key, "RecordingGroup") == 0) { - assert(json_object_is_type(val, json_type_object)); + assert(json_object_is_type(val, json_type_int)); int value = json_object_get_int(val); KEEPER_CONF.RECORDING_GROUP = (value >= 0 ? value : 0); } From fb027ef383a5e0ad364a2a08c6d3998b7f1a4797 Mon Sep 17 00:00:00 2001 From: Inna Brodkin Date: Thu, 2 May 2024 15:18:36 -0500 Subject: [PATCH 2/3] story merging logic, plus const correctness --- chrono_common/StoryChunk.cpp | 166 +++++++++++++++++++++++++++++------ chrono_common/StoryChunk.h | 28 +++--- 2 files changed, 155 insertions(+), 39 deletions(-) diff --git a/chrono_common/StoryChunk.cpp b/chrono_common/StoryChunk.cpp index 767f9336..921fff73 100644 --- a/chrono_common/StoryChunk.cpp +++ b/chrono_common/StoryChunk.cpp @@ -11,13 +11,19 @@ chl::StoryChunk::StoryChunk(chl::StoryId const &story_id , uint64_t start_time , : storyId(story_id) , startTime(start_time) , endTime(end_time) - , revisionTime(start_time) - { - } + , revisionTime(end_time) +{ + +} + +///// chl::StoryChunk::~StoryChunk() - { - } +{ + logEvents.clear(); +} + +////// int chl::StoryChunk::insertEvent(chl::LogEvent const &event) { @@ -30,67 +36,173 @@ int chl::StoryChunk::insertEvent(chl::LogEvent const &event) { return 0; } } -/////////// +// +// merge into this master chunk all the events from the events map startign at iterator position merge_start +// return the merged even count -uint32_t chl::StoryChunk::mergeEvents(std::map &events - , std::map ::iterator &merge_start) +uint32_t chl::StoryChunk::mergeEvents(std::map & events, + std::map::const_iterator & merge_start) { + LOG_TRACE("[StoryChunk] merge StoryId{} master chunk {}-{} : merging map eventCount {}", + storyId, startTime, endTime, events.size()); + uint32_t merged_event_count = 0; - std::map ::iterator first_merged, last_merged; + std::map::const_iterator first_merged, last_merged; + + if( events.empty()) + { return merged_event_count; } + if((*merge_start).second.time() < startTime) { merge_start = events.lower_bound(chl::EventSequence{startTime, 0, 0}); - LOG_DEBUG("[StoryChunk] Adjusted merge start time to align with StoryChunk's start time: {}", startTime); + LOG_DEBUG("[StoryChunk] merge StoryId{} master chunk {} : Adjusted merge_start to align with master chunk startTime", + storyId, startTime); } - for(auto iter = merge_start; iter != events.end(); ++iter) + for(auto iter = merge_start; (iter != events.end()) && ((*iter).second.time() < endTime); ++iter) { + LOG_TRACE("[StoryChunk] merge StoryId{} master chunk {} : merging event {}, master endTime{}", storyId, + startTime, (*iter).second.time(), endTime); if(insertEvent((*iter).second) > 0) { - if(merged_event_count == 0) + if(merged_event_count == 0) { first_merged = iter; } last_merged = iter; merged_event_count++; } else { - LOG_DEBUG("[StoryChunk] Stopped merging due to a record that couldn't be inserted."); + //stop at the first record that can't be merged + LOG_TRACE("[StoryChunk] merge StoryId {} master chunk {} : stopped merging due to event that couldn't be " + "inserted.{}", + storyId, startTime, (*iter).second.time()); break; - } //stop at the first record that can't be merged + } } if(merged_event_count > 0) { //remove the merged records from the original map - events.erase(first_merged, last_merged); - LOG_DEBUG("[StoryChunk] Removed {} merged records from the original event map.", merged_event_count); + // removing records in range [first_merged, last_merged] + events.erase(first_merged, ++last_merged); + LOG_TRACE("[StoryChunk] merge StoryId {} master chunk {} : merged {} records , remaining map eventCount {}", + storyId, startTime, merged_event_count, events.size()); } else { - LOG_DEBUG("[StoryChunk] No events merged during the operation."); + LOG_TRACE("[StoryChunk] merge StoryId {} master chunk {} : No events merged during the operation.", storyId, + startTime); } return merged_event_count; } -uint32_t chl::StoryChunk::mergeEvents(chl::StoryChunk &other_chunk, uint64_t start_time, uint64_t end_time) - { return 0; } +// +// merge into this master chunk all the events from the other_chunk with timestamps starting at merge_start_time +// return the merged even count + +uint32_t chl::StoryChunk::mergeEvents(chl::StoryChunk & other_chunk, uint64_t merge_start_time) +{ + LOG_TRACE("[StoryChunk] merge StoryId{} master chunk {}-{} : merging chunk {}-{} eventCount {}", storyId, startTime, + endTime, other_chunk.getStartTime(), other_chunk.getEndTime(), other_chunk.getEventCount()); + + uint32_t merged_event_count = 0; + + if( other_chunk.empty()) + { return merged_event_count; } + + if( merge_start_time == 0 || merge_start_time >= other_chunk.getEndTime()) + { merge_start_time = other_chunk.getStartTime(); } + + std::map::const_iterator first_merged, last_merged; + + std::map::const_iterator merge_start = + (merge_start_time < startTime ? other_chunk.lower_bound(startTime) + : other_chunk.lower_bound(merge_start_time)); + LOG_TRACE("[StoryChunk] merge StoryId{} master chunk {} : adjusted merge_start : other_chunk {} merge_start {}", + storyId, startTime, other_chunk.getStartTime(), + (merge_start != other_chunk.end() ? (*merge_start).second.time() : (uint64_t)0)); + + for(auto iter = merge_start; (iter != other_chunk.end()) && ((*iter).second.time() < endTime); ++iter) + { + LOG_TRACE("[StoryChunk] merge StoryId{} master chunk {} : merging event {}, master endTime{}", storyId, + startTime, (*iter).second.time(), endTime); + if(insertEvent((*iter).second) > 0) + { + if(merged_event_count == 0) + { first_merged = iter; } + last_merged = iter; + merged_event_count++; + } + else + { + //stop at the first record that can't be merged + LOG_TRACE("[StoryChunk] merge StoryId {} master chunk {} : stopped merging due to event that couldn't be " + "inserted.{}", + storyId, startTime, (*iter).second.time()); + break; + } + } + + if(merged_event_count > 0) + { + //remove the merged records from the original map + // removing recordis in range [first_merged, last_merged] + other_chunk.eraseEvents(first_merged, ++last_merged); + LOG_TRACE("[StoryChunk] merge StoryId {} master chunk {} : merged {} records from chunk {} remaining " + "eventCount {}", + storyId, startTime, merged_event_count, other_chunk.getStartTime(), other_chunk.getEventCount()); + } + else + { + LOG_TRACE("[StoryChunk] merge StoryId {} master chunk {} : No events merged during the operation.", storyId, + startTime); + } + + return merged_event_count; +} + +/* uint32_t chl::StoryChunk::extractEvents(std::map &target_map, std::map ::iterator first_pos , std::map ::iterator last_pos) { return 0; } -uint32_t chl::StoryChunk::extractEvents(std::map &target_map, uint64_t start_time, uint64_t end_time) - { return 0; } - uint32_t chl::StoryChunk::extractEvents( chl::StoryChunk & target_chunk, uint64_t start_time, uint64_t end_time) { return 0; } -uint32_t chl::StoryChunk::split(chl::StoryChunk & split_chunk, uint64_t time_boundary) - { return 0; } +*/ +// +// remove events falling into range [ range_start, range_end ) +// return iterator to the first element folowing the last removed one -uint32_t chl::StoryChunk::eraseEvents(uint64_t start_time, uint64_t end_time) - { return 0; } - +std::map::iterator +chl::StoryChunk::eraseEvents(std::map::const_iterator & range_start, + std::map::const_iterator & range_end) +{ + return logEvents.erase(range_start, range_end); +} + +// +// remove events falling into range [ start_time, end_time ) +// return iterator to the first element folowing the last removed one + +std::map::iterator chl::StoryChunk::eraseEvents(uint64_t start_time, uint64_t end_time) +{ + if( logEvents.empty() || start_time == 0 || start_time >= end_time || start_time>= endTime || end_time < startTime ) + { return logEvents.end(); } + + std::map::const_iterator range_start = + (start_time < startTime ? logEvents.lower_bound(chl::EventSequence{startTime, 0, 0}) + : logEvents.lower_bound(chl::EventSequence{start_time, 0, 0})); + + std::map::const_iterator range_end = + (end_time > endTime ? logEvents.upper_bound(chl::EventSequence{endTime,0,0}) + : logEvents.upper_bound(chl::EventSequence{end_time,0,0})); + + return logEvents.erase(range_start, range_end); +} + +/////////////////// diff --git a/chrono_common/StoryChunk.h b/chrono_common/StoryChunk.h index 75dad64b..fc4b5cad 100644 --- a/chrono_common/StoryChunk.h +++ b/chrono_common/StoryChunk.h @@ -41,6 +41,9 @@ class StoryChunk uint64_t getEndTime() const { return endTime; } + int getEventCount() const + { return logEvents.size(); } + bool empty() const { return (logEvents.empty() ? true : false); } @@ -54,29 +57,31 @@ class StoryChunk { return logEvents.lower_bound(EventSequence{chrono_time, 0, 0}); } uint64_t firstEventTime() const - { return (*logEvents.begin()).second.time(); } - + { return (logEvents.empty() ? 0 : (*logEvents.begin()).second.time()); } + uint64_t lastEventTime() const - { return (*logEvents.begin()).second.time(); } + { return (logEvents.empty() ? 0 : (*(--logEvents.end())).second.time()); } int insertEvent(LogEvent const &); - - uint32_t - mergeEvents(std::map &events, std::map ::iterator &merge_start); - uint32_t mergeEvents(StoryChunk &other_chunk, uint64_t start_time =0, uint64_t end_time=0); + uint32_t mergeEvents(std::map & events, + std::map::const_iterator & merge_start); + + uint32_t mergeEvents(StoryChunk & other_chunk, uint64_t start_time = 0); - uint32_t + /* uint32_t extractEvents(std::map &target_map, std::map ::iterator first_pos , std::map ::iterator last_pos); uint32_t extractEvents(std::map &target_map, uint64_t start_time, uint64_t end_time); uint32_t extractEvents( StoryChunk & target_chunk, uint64_t start_time, uint64_t end_time); +*/ + std::map::iterator + eraseEvents(std::map::const_iterator & first_pos, + std::map::const_iterator & last_pos); - uint32_t split(StoryChunk & split_chunk, uint64_t time_boundary); - - uint32_t eraseEvents(uint64_t start_time, uint64_t end_time); + std::map::iterator eraseEvents(uint64_t start_time, uint64_t end_time); // serialization function used by thallium RPC providers template @@ -89,7 +94,6 @@ class StoryChunk serT& logEvents; } - private: StoryId storyId; uint64_t startTime; From 3e201e45ba3a48acbcda038d095aa37fa598d33a Mon Sep 17 00:00:00 2001 From: Inna Brodkin Date: Fri, 3 May 2024 18:30:04 -0500 Subject: [PATCH 3/3] corrected getAcceptanceWindow() return type --- ChronoGrapher/KeeperDataStore.cpp | 17 ++++++++++------- ChronoGrapher/StoryPipeline.cpp | 11 +++++++---- ChronoGrapher/StoryPipeline.h | 8 +++++++- 3 files changed, 24 insertions(+), 12 deletions(-) diff --git a/ChronoGrapher/KeeperDataStore.cpp b/ChronoGrapher/KeeperDataStore.cpp index 56eaa18c..22f2e0b1 100644 --- a/ChronoGrapher/KeeperDataStore.cpp +++ b/ChronoGrapher/KeeperDataStore.cpp @@ -74,15 +74,16 @@ int chronolog::KeeperDataStore::stopStoryRecording(chronolog::StoryId const &sto if(pipeline_iter != theMapOfStoryPipelines.end()) { uint64_t exit_time = std::chrono::high_resolution_clock::now().time_since_epoch().count() + - (*pipeline_iter).second->getAcceptanceWindow()*5; + (*pipeline_iter).second->getAcceptanceWindow(); pipelinesWaitingForExit[(*pipeline_iter).first] = (std::pair ( (*pipeline_iter).second, exit_time)); - LOG_INFO("[KeeperDataStore] Added StoryPipeline to waiting list for finalization. StoryId={}, ExitTime={}", story_id - , exit_time); + LOG_INFO("[KeeperDataStore] Scheduled pipeline to retire: StoryId {} timeline {}-{} acceptanceWindow {} retirementTime {}", + (*pipeline_iter).second->getStoryId(), (*pipeline_iter).second->getTimelineStart(), (*pipeline_iter).second->getTimelineEnd(), + (*pipeline_iter).second->getAcceptanceWindow(), exit_time); } else { - LOG_WARNING("[KeeperDataStore] Attempted to stop recording for non-existent StoryId={}", story_id); + LOG_WARNING("[KeeperDataStore] Attempt to stop recording for non-existent StoryId={}", story_id); } return chronolog::CL_SUCCESS; } @@ -124,7 +125,7 @@ void chronolog::KeeperDataStore::extractDecayedStoryChunks() void chronolog::KeeperDataStore::retireDecayedPipelines() { - LOG_DEBUG("[KeeperDataStore] Initiating retirement of decayed pipelines. Current state={}, Active StoryPipelines={}, PipelinesWaitingForExit={}, ThreadID={}" + LOG_TRACE("[KeeperDataStore] Initiating retirement of decayed pipelines. Current state={}, Active StoryPipelines={}, PipelinesWaitingForExit={}, ThreadID={}" , state, theMapOfStoryPipelines.size(), pipelinesWaitingForExit.size(), tl::thread::self_id()); if(!theMapOfStoryPipelines.empty()) @@ -137,7 +138,9 @@ void chronolog::KeeperDataStore::retireDecayedPipelines() if(current_time >= (*pipeline_iter).second.second) { //current_time >= pipeline exit_time - StoryPipeline*pipeline = (*pipeline_iter).second.first; + StoryPipeline * pipeline = (*pipeline_iter).second.first; + LOG_DEBUG("[KeeperDataStore] retiring pipeline StoryId {} timeline {} {} acceptanceWindow {} current_time {} retirementTime {}", + pipeline->getStoryId(), pipeline->getTimelineStart(), pipeline->getTimelineEnd(), current_time, (*pipeline_iter).second.second); theMapOfStoryPipelines.erase(pipeline->getStoryId()); theIngestionQueue.removeStoryIngestionHandle(pipeline->getStoryId()); pipeline_iter = pipelinesWaitingForExit.erase(pipeline_iter); //pipeline->getStoryId()); @@ -149,7 +152,7 @@ void chronolog::KeeperDataStore::retireDecayedPipelines() } } - LOG_DEBUG("[KeeperDataStore] Completed retirement of decayed pipelines. Current state={}, Active StoryPipelines={}, PipelinesWaitingForExit={}, ThreadID={}" + LOG_TRACE("[KeeperDataStore] Completed retirement of decayed pipelines. Current state={}, Active StoryPipelines={}, PipelinesWaitingForExit={}, ThreadID={}" , state, theMapOfStoryPipelines.size(), pipelinesWaitingForExit.size(), tl::thread::self_id()); } diff --git a/ChronoGrapher/StoryPipeline.cpp b/ChronoGrapher/StoryPipeline.cpp index f14353eb..04a01e2d 100644 --- a/ChronoGrapher/StoryPipeline.cpp +++ b/ChronoGrapher/StoryPipeline.cpp @@ -32,8 +32,8 @@ chronolog::StoryPipeline::StoryPipeline(StoryChunkExtractionQueue &extractionQue auto story_start_point = std::chrono::time_point {} + std::chrono::nanoseconds(timelineStart); std::time_t time_t_story_start = std::chrono::high_resolution_clock::to_time_t(story_start_point); - LOG_INFO("[StoryPipeline] Initialized : Chronicle {} Story {} StoryId {} start {} Granularity {} AcceptanceWindow {}" - , chronicleName, storyName, storyId, std::ctime(&time_t_story_start), chunkGranularity, acceptanceWindow ); + LOG_INFO("[StoryPipeline] Initialized : Chronicle {} Story {} StoryId {} starting at {} " + , chronicleName, storyName, storyId, std::ctime(&time_t_story_start)); chunkGranularity *= 1000000000; // seconds =>nanoseconds acceptanceWindow *= 1000000000; // seconds =>nanoseconds @@ -47,6 +47,9 @@ chronolog::StoryPipeline::StoryPipeline(StoryChunkExtractionQueue &extractionQue appendStoryChunk(); } + LOG_DEBUG("[StoryPipeline] Initialized pipeline : Chronicle {} Story {} StoryId {} timeline {}-{} Granularity {} AcceptanceWindow {}" + , chronicleName, storyName, storyId, timelineStart, timelineEnd, chunkGranularity, acceptanceWindow ); + #ifdef TRACE_CHUNKING auto chunk_start_point = std::chrono::time_point{} // epoch_time_point{}; + std::chrono::nanoseconds(timelineStart); @@ -232,8 +235,8 @@ void chronolog::StoryPipeline::extractDecayedStoryChunks(uint64_t current_time) if(extractedChunk != nullptr) { - LOG_TRACE("[StoryPipeline] StoryId: {} - Extracted chunk with start time {} eventCount {}", storyId - , extractedChunk->getStartTime(), extractedChunk->getEventCount()); + LOG_TRACE("[StoryPipeline] StoryId: {} - Extracted chunk {}-{} eventCount {}", storyId + , extractedChunk->getStartTime(), extractedChunk->getEndTime(), extractedChunk->getEventCount()); if(extractedChunk->empty()) { // there's no need to carry an empty chunk any further... diff --git a/ChronoGrapher/StoryPipeline.h b/ChronoGrapher/StoryPipeline.h index be198573..7b2eea9f 100644 --- a/ChronoGrapher/StoryPipeline.h +++ b/ChronoGrapher/StoryPipeline.h @@ -46,9 +46,15 @@ class StoryPipeline StoryId const &getStoryId() const { return storyId; } - uint16_t getAcceptanceWindow() const + uint64_t getAcceptanceWindow() const { return acceptanceWindow; } + uint64_t getTimelineStart() const + { return timelineStart; } + + uint64_t getTimelineEnd() const + { return timelineEnd; } + private: StoryChunkExtractionQueue &theExtractionQueue;