Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

157 chunk stuck orphan queue #162

Merged
merged 3 commits into from
May 6, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions ChronoGrapher/CSVFileChunkExtractor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

#include "chronolog_types.h"
#include "CSVFileChunkExtractor.h"
#include "log.h"

namespace tl = thallium;

Expand All @@ -23,11 +24,12 @@ void chronolog::CSVFileStoryChunkExtractor::processStoryChunk(chronolog::StoryCh
{
std::ofstream chunk_fstream;
std::string chunk_filename(rootDirectory);
chunk_filename += chrono_process_id + "." + std::to_string(story_chunk->getStoryId()) + "." +
std::to_string(story_chunk->getStartTime() / 1000000000) + ".csv";
chunk_filename += "/" + std::to_string(story_chunk->getStoryId()) + "."
//INNA : + chrono_process_id.getIPasDottedString(chunk_filename) + "." + std::to_string(chrono_process_id.getPort()) + "."
+ std::to_string(story_chunk->getStartTime() / 1000000000) + ".csv";

tl::xstream es = tl::xstream::self();
LOG_INFO("[CSVFileStoryChunkExtractor] Processing StoryChunk: ES={}, ULT={}, StoryID={}, StartTime={}", es.get_rank()
LOG_DEBUG("[CSVFileStoryChunkExtractor] Processing StoryChunk: ES={}, ULT={}, StoryID={}, StartTime={}", es.get_rank()
, tl::thread::self_id(), story_chunk->getStoryId(), story_chunk->getStartTime());
// current thread if the only one that has this storyChunk and the only one that's writing to this chunk csv file
// thus no additional locking is needed ...
Expand All @@ -38,6 +40,6 @@ void chronolog::CSVFileStoryChunkExtractor::processStoryChunk(chronolog::StoryCh
chunk_fstream << event << std::endl;
}
chunk_fstream.close();
LOG_INFO("[CSVFileStoryChunkExtractor] Finished processing StoryChunk. File={}", chunk_filename);
LOG_DEBUG("[CSVFileStoryChunkExtractor] Finished processing StoryChunk. File={}", chunk_filename);
}

8 changes: 5 additions & 3 deletions ChronoGrapher/ChronoGrapher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -164,13 +164,12 @@ int main(int argc, char**argv)
catch(tl::exception const &)
{
LOG_ERROR("[ChronoGrapher] failed to create DataStoreAdminService");
keeperDataAdminService = nullptr;
}

if(nullptr == keeperDataAdminService)
{
LOG_CRITICAL("[ChronoGrapher] failed to create DataStoreAdminService exiting");
if(dataAdminEngine)
{ delete dataAdminEngine; }
return (-1);
}

Expand All @@ -194,6 +193,7 @@ int main(int argc, char**argv)
catch(tl::exception const &)
{
LOG_ERROR("[ChronoGrapher] failed to create RecordingService");
grapherRecordingService = nullptr;
}

if(nullptr == grapherRecordingService)
Expand Down Expand Up @@ -225,7 +225,9 @@ int main(int argc, char**argv)

/// Registration with ChronoVisor __________________________________________________________________________________
// try to register with chronoVisor a few times than log ERROR and exit...
int registration_status = chronolog::CL_ERR_UNKNOWN;
int registration_status = grapherRegistryClient->send_register_msg(
chronolog::GrapherRegistrationMsg(processIdCard, collectionServiceId));
//if the first attemp failes retry
int retries = 5;
while((chronolog::CL_SUCCESS != registration_status) && (retries > 0))
{
Expand Down
13 changes: 11 additions & 2 deletions ChronoGrapher/ChunkIngestionQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ class ChunkIngestionQueue

void ingestStoryChunk(StoryChunk* chunk)
{
LOG_DEBUG("[IngestionQueue] Received chunk for StoryID={}: HandleMapSize={}", chunk->getStoryId(), storyIngestionHandles.size());
LOG_DEBUG("[IngestionQueue] has {} StoryHandles; Received chunk for StoryID={} startTime {} eventCount{}", storyIngestionHandles.size(),
chunk->getStoryId(), chunk->getStartTime(), chunk->getEventCount());
auto ingestionHandle_iter = storyIngestionHandles.find(chunk->getStoryId());
if(ingestionHandle_iter == storyIngestionHandles.end())
{
Expand All @@ -76,6 +77,13 @@ class ChunkIngestionQueue
LOG_DEBUG("[IngestionQueue] Orphan chunk queue is empty. No actions taken.");
return;
}

if (storyIngestionHandles.empty())
{
LOG_DEBUG("[IngestionQueue] has 0 storyIngestionHandles to place {} orphaned chunks", orphanQueue.size());
return;
}

std::lock_guard <std::mutex> lock(ingestionQueueMutex);
for(StoryChunkDeque::iterator iter = orphanQueue.begin(); iter != orphanQueue.end();)
{
Expand All @@ -93,7 +101,8 @@ class ChunkIngestionQueue
++iter;
}
}
LOG_DEBUG("[IngestionQueue] Drained {} orphan chunks into known handles.", orphanQueue.size());

LOG_WARNING("[IngestionQueue] has {} orphaned chunks", orphanQueue.size());
}

bool is_empty() const
Expand Down
14 changes: 7 additions & 7 deletions ChronoGrapher/GrapherRecordingService.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,26 +56,26 @@ class GrapherRecordingService: public tl::provider <GrapherRecordingService>
// std::cout << (char)*(char*)(&mem_vec[0]+i) << " ";
// }
// std::cout << std::endl;
StoryChunk story_chunk;

StoryChunk * story_chunk= new StoryChunk();
#ifndef NDEBUG
start = std::chrono::high_resolution_clock::now();
#endif
deserializedWithCereal(&mem_vec[0], b.size() - 1, story_chunk);
deserializedWithCereal(&mem_vec[0], b.size() - 1, *story_chunk);
#ifndef NDEBUG
end = std::chrono::high_resolution_clock::now();
LOG_INFO("[GrapherRecordingService] Deserialization took {} us, ThreadID={}",
std::chrono::duration_cast <std::chrono::nanoseconds>(end - start).count() / 1000.0
, tl::thread::self_id());
#endif
LOG_DEBUG("[GrapherRecordingService] StoryChunk received: StoryID: {}, StartTime: {}, ThreadID={}"
, story_chunk.getStoryId(), story_chunk.getStartTime(), tl::thread::self_id());
LOG_DEBUG("[GrapherRecordingService] StoryChunk received: StoryId {} StartTime {} eventCount {} ThreadID={}"
, story_chunk->getStoryId(), story_chunk->getStartTime(), story_chunk->getEventCount(), tl::thread::self_id());

request.respond(b.size());
LOG_DEBUG("[GrapherRecordingService] StoryChunk recording RPC responded {}, ThreadID={}"
, b.size(), tl::thread::self_id());

theIngestionQueue.ingestStoryChunk(&story_chunk);
LOG_DEBUG("[GrapherRecordingService] Ingested a StoryChunk, StoryID: {}, StartTime: {}, ThreadID={}"
, story_chunk.getStoryId(), story_chunk.getStartTime(), tl::thread::self_id());
theIngestionQueue.ingestStoryChunk(story_chunk);
}

private:
Expand Down
20 changes: 10 additions & 10 deletions ChronoGrapher/KeeperDataStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@ int chronolog::KeeperDataStore::startStoryRecording(std::string const &chronicle
, chronolog::StoryId const &story_id, uint64_t start_time
, uint32_t time_chunk_duration, uint32_t access_window)
{
LOG_INFO("[KeeperDataStore] Start recording story: Chronicle={}, Story={}, StoryID={}", chronicle, story, story_id);
LOG_INFO("[KeeperDataStore] Start recording story: Chronicle={}, Story={}, StoryId={}", chronicle, story, story_id);

// Get dataStoreMutex, check for story_id_presense & add new StoryPipeline if needed
std::lock_guard storeLock(dataStoreMutex);
auto pipeline_iter = theMapOfStoryPipelines.find(story_id);
if(pipeline_iter != theMapOfStoryPipelines.end())
{
LOG_INFO("[KeeperDataStore] Story already being recorded. StoryID: {}", story_id);
LOG_INFO("[KeeperDataStore] Story already being recorded. StoryId: {}", story_id);
//check it the pipeline was put on the waitingForExit list by the previous acquisition
// and remove it from there
auto waiting_iter = pipelinesWaitingForExit.find(story_id);
Expand All @@ -42,11 +42,11 @@ int chronolog::KeeperDataStore::startStoryRecording(std::string const &chronicle
auto result = theMapOfStoryPipelines.emplace(
std::pair <chl::StoryId, chl::StoryPipeline*>(story_id, new chl::StoryPipeline(theExtractionQueue, chronicle
, story, story_id, start_time
, time_chunk_duration)));
, time_chunk_duration, access_window)));

if(result.second)
{
LOG_INFO("[KeeperDataStore] New StoryPipeline created successfully. StoryID: {}", story_id);
LOG_INFO("[KeeperDataStore] New StoryPipeline created successfully. StoryId {}", story_id);
pipeline_iter = result.first;
//engage StoryPipeline with the IngestionQueue
chl::StoryChunkIngestionHandle*ingestionHandle = (*pipeline_iter).second->getActiveIngestionHandle();
Expand All @@ -55,7 +55,7 @@ int chronolog::KeeperDataStore::startStoryRecording(std::string const &chronicle
}
else
{
LOG_ERROR("[KeeperDataStore] Failed to create StoryPipeline for StoryID: {}. Possible memory or resource issue."
LOG_ERROR("[KeeperDataStore] Failed to create StoryPipeline for StoryId: {}. Possible memory or resource issue."
, story_id);
return CL_ERR_UNKNOWN;
}
Expand All @@ -64,7 +64,7 @@ int chronolog::KeeperDataStore::startStoryRecording(std::string const &chronicle

int chronolog::KeeperDataStore::stopStoryRecording(chronolog::StoryId const &story_id)
{
LOG_DEBUG("[KeeperDataStore] Initiating stop recording for StoryID={}", story_id);
LOG_DEBUG("[KeeperDataStore] Initiating stop recording for StoryId={}", story_id);
// we do not yet disengage the StoryPipeline from the IngestionQueue right away
// but put it on the WaitingForExit list to be finalized, persisted to disk , and
// removed from memory at exit_time = now+acceptance_window...
Expand All @@ -74,15 +74,15 @@ int chronolog::KeeperDataStore::stopStoryRecording(chronolog::StoryId const &sto
if(pipeline_iter != theMapOfStoryPipelines.end())
{
uint64_t exit_time = std::chrono::high_resolution_clock::now().time_since_epoch().count() +
(*pipeline_iter).second->getAcceptanceWindow();
(*pipeline_iter).second->getAcceptanceWindow()*5;
pipelinesWaitingForExit[(*pipeline_iter).first] = (std::pair <chl::StoryPipeline*, uint64_t>(
(*pipeline_iter).second, exit_time));
LOG_INFO("[KeeperDataStore] Added StoryPipeline to waiting list for finalization. StoryID={}, ExitTime={}", story_id
LOG_INFO("[KeeperDataStore] Added StoryPipeline to waiting list for finalization. StoryId={}, ExitTime={}", story_id
, exit_time);
}
else
{
LOG_WARNING("[KeeperDataStore] Attempted to stop recording for non-existent StoryID={}", story_id);
LOG_WARNING("[KeeperDataStore] Attempted to stop recording for non-existent StoryId={}", story_id);
}
return chronolog::CL_SUCCESS;
}
Expand Down Expand Up @@ -148,7 +148,7 @@ void chronolog::KeeperDataStore::retireDecayedPipelines()

}
}
//swipe through pipelineswaiting and remove all those with nullptr

LOG_DEBUG("[KeeperDataStore] Completed retirement of decayed pipelines. Current state={}, Active StoryPipelines={}, PipelinesWaitingForExit={}, ThreadID={}"
, state, theMapOfStoryPipelines.size(), pipelinesWaitingForExit.size(), tl::thread::self_id());
}
Expand Down
2 changes: 1 addition & 1 deletion ChronoGrapher/KeeperDataStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class KeeperDataStore
{ return (SHUTTING_DOWN == state); }

int startStoryRecording(ChronicleName const &, StoryName const &, StoryId const &, uint64_t start_time
, uint32_t time_chunk_ranularity = 30, uint32_t access_window = 60);
, uint32_t time_chunk_ranularity = 30, uint32_t access_window = 300);

int stopStoryRecording(StoryId const &);

Expand Down
Loading