Skip to content

Commit

Permalink
GPU SA Multi-Threading: Make sure we do not try to receive while not …
Browse files Browse the repository at this point in the history
…in Running or Stopping
  • Loading branch information
davidrohr committed Oct 23, 2023
1 parent ced8b44 commit 0f125fe
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 5 deletions.
6 changes: 3 additions & 3 deletions GPU/Workflow/src/GPUWorkflowInternal.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ struct GPURecoWorkflowSpec_PipelineInternals {

fair::mq::Device* fmqDevice = nullptr;

fair::mq::State fmqState = fair::mq::State::Undefined;
volatile fair::mq::State fmqState = fair::mq::State::Undefined, fmqPreviousState = fair::mq::State::Undefined;
volatile bool endOfStreamAsyncReceived = false;
volatile bool endOfStreamDplReceived = false;
volatile bool runStarted = false;
Expand Down Expand Up @@ -91,8 +91,8 @@ struct GPURecoWorkflowSpec_PipelineInternals {

unsigned long mNTFReceived = 0;

bool mayInject = true;
unsigned long mayInjectTFId = 0;
volatile bool mayInject = true;
volatile unsigned long mayInjectTFId = 0;
std::mutex mayInjectMutex;
std::condition_variable mayInjectCondition;
};
Expand Down
10 changes: 8 additions & 2 deletions GPU/Workflow/src/GPUWorkflowPipeline.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@ void GPURecoWorkflowSpec::receiveFMQStateCallback(fair::mq::State newState)
mPipeline->endOfStreamAsyncReceived = false;
mPipeline->endOfStreamDplReceived = false;
}
mPipeline->fmqPreviousState = mPipeline->fmqState;
mPipeline->fmqState = newState;
if (newState == fair::mq::State::Exiting) {
mPipeline->fmqDevice->UnsubscribeFromStateChange(GPURecoWorkflowSpec_FMQCallbackKey);
Expand All @@ -281,17 +282,22 @@ void GPURecoWorkflowSpec::RunReceiveThread()
int recvTimeot = 1000;
fair::mq::MessagePtr msg;
LOG(debug) << "Waiting for out of band message";
auto shouldReceive = [this]() { return ((mPipeline->fmqState == fair::mq::State::Running || (mPipeline->fmqState == fair::mq::State::Ready && mPipeline->fmqPreviousState == fair::mq::State::Running)) && !mPipeline->endOfStreamAsyncReceived); };
do {
{
std::unique_lock lk(mPipeline->stateMutex);
mPipeline->stateNotify.wait(lk, [this]() { return (mPipeline->fmqState == fair::mq::State::Running && !mPipeline->endOfStreamAsyncReceived) || mPipeline->shouldTerminate; }); // Do not check mPipeline->fmqDevice->NewStatePending() since we wait for EndOfStream!
mPipeline->stateNotify.wait(lk, [this, shouldReceive]() { return shouldReceive() || mPipeline->shouldTerminate; }); // Do not check mPipeline->fmqDevice->NewStatePending() since we wait for EndOfStream!
}
if (mPipeline->shouldTerminate) {
break;
}
try {
msg = device->NewMessageFor("gpu-prepare-channel", 0, 0);
do {
std::unique_lock lk(mPipeline->stateMutex);
if (!shouldReceive()) {
break;
}
msg = device->NewMessageFor("gpu-prepare-channel", 0, 0);
received = device->Receive(msg, "gpu-prepare-channel", 0, recvTimeot) > 0;
} while (!received && !mPipeline->shouldTerminate);
} catch (...) {
Expand Down

0 comments on commit 0f125fe

Please sign in to comment.