Skip to content

Commit

Permalink
GPU SA Multi-Threading: Distinguish between EoS from DPL and from asy…
Browse files Browse the repository at this point in the history
…nc out-of-band channel
  • Loading branch information
davidrohr committed Oct 23, 2023
1 parent 9892a91 commit ced8b44
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 6 deletions.
3 changes: 2 additions & 1 deletion GPU/Workflow/src/GPUWorkflowInternal.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ struct GPURecoWorkflowSpec_PipelineInternals {
fair::mq::Device* fmqDevice = nullptr;

fair::mq::State fmqState = fair::mq::State::Undefined;
volatile bool endOfStreamReceived = false;
volatile bool endOfStreamAsyncReceived = false;
volatile bool endOfStreamDplReceived = false;
volatile bool runStarted = false;
volatile bool shouldTerminate = false;
std::mutex stateMutex;
Expand Down
11 changes: 6 additions & 5 deletions GPU/Workflow/src/GPUWorkflowPipeline.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ int GPURecoWorkflowSpec::handlePipeline(ProcessingContext& pc, GPUTrackingInOutP
void GPURecoWorkflowSpec::handlePipelineEndOfStream(EndOfStreamContext& ec)
{
if (mSpecConfig.enableDoublePipeline == 1) {
mPipeline->endOfStreamReceived = true;
mPipeline->endOfStreamDplReceived = true;
mPipeline->stateNotify.notify_all();
}
if (mSpecConfig.enableDoublePipeline == 2) {
Expand All @@ -262,7 +262,8 @@ void GPURecoWorkflowSpec::receiveFMQStateCallback(fair::mq::State newState)
{
std::lock_guard lk(mPipeline->stateMutex);
if (mPipeline->fmqState != fair::mq::State::Running && newState == fair::mq::State::Running) {
mPipeline->endOfStreamReceived = false;
mPipeline->endOfStreamAsyncReceived = false;
mPipeline->endOfStreamDplReceived = false;
}
mPipeline->fmqState = newState;
if (newState == fair::mq::State::Exiting) {
Expand All @@ -283,7 +284,7 @@ void GPURecoWorkflowSpec::RunReceiveThread()
do {
{
std::unique_lock lk(mPipeline->stateMutex);
mPipeline->stateNotify.wait(lk, [this]() { return (mPipeline->fmqState == fair::mq::State::Running && !mPipeline->endOfStreamReceived) || mPipeline->shouldTerminate; }); // Do not check mPipeline->fmqDevice->NewStatePending() since we wait for EndOfStream!
mPipeline->stateNotify.wait(lk, [this]() { return (mPipeline->fmqState == fair::mq::State::Running && !mPipeline->endOfStreamAsyncReceived) || mPipeline->shouldTerminate; }); // Do not check mPipeline->fmqDevice->NewStatePending() since we wait for EndOfStream!
}
if (mPipeline->shouldTerminate) {
break;
Expand All @@ -310,7 +311,7 @@ void GPURecoWorkflowSpec::RunReceiveThread()
if (m->flagEndOfStream) {
LOG(info) << "Received end-of-stream from out-of-band channel";
std::lock_guard lk(mPipeline->stateMutex);
mPipeline->endOfStreamReceived = true;
mPipeline->endOfStreamAsyncReceived = true;
mPipeline->mNTFReceived = 0;
mPipeline->runStarted = false;
continue;
Expand All @@ -324,7 +325,7 @@ void GPURecoWorkflowSpec::RunReceiveThread()

{
std::unique_lock lk(mPipeline->stateMutex);
mPipeline->stateNotify.wait(lk, [this]() { return (mPipeline->runStarted && !mPipeline->endOfStreamReceived) || mPipeline->shouldTerminate; });
mPipeline->stateNotify.wait(lk, [this]() { return (mPipeline->runStarted && !mPipeline->endOfStreamAsyncReceived) || mPipeline->shouldTerminate; });
if (!mPipeline->runStarted) {
continue;
}
Expand Down

0 comments on commit ced8b44

Please sign in to comment.