Skip to content

Commit

Permalink
Use WaitingTaskHolder to signal doneWaiting() instead of WaitingTaskW…
Browse files Browse the repository at this point in the history
…ithArenalHolder in framework

In the framework the doneWaiting() is always called from within the
main arena in the TBB thread pool, and therefore using
WaitingTaskHolder is safe (WaitingTaskWithArenaHolder is needed only
when doneWaiting() is called outside of the TBB arena).

Avoiding WaitingTaskWithArenaHolder allows to avoid enqueue()
operation when the doneWaiting() calls in the framework are the ones
that decrease the task reference count to 0.
  • Loading branch information
makortel committed Dec 27, 2024
1 parent ab76956 commit 16a69d8
Show file tree
Hide file tree
Showing 6 changed files with 39 additions and 40 deletions.
2 changes: 1 addition & 1 deletion FWCore/Concurrency/interface/WaitingTaskWithArenaHolder.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ namespace edm {

// Takes ownership of the underlying task and uses the current
// arena.
explicit WaitingTaskWithArenaHolder(WaitingTaskHolder&& iTask);
explicit WaitingTaskWithArenaHolder(WaitingTaskHolder iTask);

~WaitingTaskWithArenaHolder();

Expand Down
2 changes: 1 addition & 1 deletion FWCore/Concurrency/src/WaitingTaskWithArenaHolder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ namespace edm {
m_task->increment_ref_count();
}

WaitingTaskWithArenaHolder::WaitingTaskWithArenaHolder(WaitingTaskHolder&& iTask)
WaitingTaskWithArenaHolder::WaitingTaskWithArenaHolder(WaitingTaskHolder iTask)
: m_task(iTask.release_no_decrement()),
m_group(iTask.group()),
m_arena(std::make_shared<oneapi::tbb::task_arena>(oneapi::tbb::task_arena::attach())) {}
Expand Down
48 changes: 23 additions & 25 deletions FWCore/Framework/interface/CallbackExternalWork.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,9 @@ namespace edm {
WaitingTaskHolder produceTask =
Base::makeProduceTask(group, token, record, es, emitPostPrefetchingSignal, std::move(produceFunctor));

WaitingTaskWithArenaHolder waitingTaskWithArenaHolder =
makeExceptionHandlerTask(std::move(produceTask), group);
WaitingTaskHolder waitingTaskHolder = makeExceptionHandlerTask(std::move(produceTask), group);

return makeAcquireTask(std::move(waitingTaskWithArenaHolder), group, token, record, es);
return makeAcquireTask(std::move(waitingTaskHolder), group, token, record, es);
},
std::move(iTask),
iRecord,
Expand All @@ -134,15 +133,15 @@ namespace edm {
const TDecorator& iDec = TDecorator())
: Base(iProd, std::move(iProduceFunc), iID, iDec), acquireFunction_(std::move(iAcquireFunc)) {}

WaitingTaskHolder makeAcquireTask(WaitingTaskWithArenaHolder waitingTaskWithArenaHolder,
WaitingTaskHolder makeAcquireTask(WaitingTaskHolder waitingTaskHolder,
oneapi::tbb::task_group* group,
ServiceWeakToken const& serviceToken,
EventSetupRecordImpl const* record,
EventSetupImpl const* eventSetupImpl) {
return WaitingTaskHolder(
*group,
make_waiting_task(
[this, holder = std::move(waitingTaskWithArenaHolder), group, serviceToken, record, eventSetupImpl](
[this, holder = std::move(waitingTaskHolder), group, serviceToken, record, eventSetupImpl](
std::exception_ptr const* iException) mutable {
std::exception_ptr excptr;
if (iException) {
Expand Down Expand Up @@ -191,7 +190,7 @@ namespace edm {
ESModuleCallingContext const& context_;
};
EndGuard guard(record, context);
acquireCache_ = (*acquireFunction_)(rec, holder);
acquireCache_ = (*acquireFunction_)(rec, WaitingTaskWithArenaHolder(holder));
});
} catch (cms::Exception& iException) {
iException.addContext("Running acquire");
Expand All @@ -202,25 +201,24 @@ namespace edm {
}));
}

WaitingTaskWithArenaHolder makeExceptionHandlerTask(WaitingTaskHolder produceTask,
oneapi::tbb::task_group* group) {
return WaitingTaskWithArenaHolder(*group,
make_waiting_task([this, produceTask = std::move(produceTask)](
std::exception_ptr const* iException) mutable {
std::exception_ptr excptr;
if (iException) {
excptr = *iException;
}
if (excptr) {
try {
convertException::wrap([excptr]() { std::rethrow_exception(excptr); });
} catch (cms::Exception& exception) {
exception.addContext("Running acquire and external work");
edm::exceptionContext(exception, Base::callingContext());
produceTask.doneWaiting(std::current_exception());
}
}
}));
WaitingTaskHolder makeExceptionHandlerTask(WaitingTaskHolder produceTask, oneapi::tbb::task_group* group) {
return WaitingTaskHolder(*group,
make_waiting_task([this, produceTask = std::move(produceTask)](
std::exception_ptr const* iException) mutable {
std::exception_ptr excptr;
if (iException) {
excptr = *iException;
}
if (excptr) {
try {
convertException::wrap([excptr]() { std::rethrow_exception(excptr); });
} catch (cms::Exception& exception) {
exception.addContext("Running acquire and external work");
edm::exceptionContext(exception, Base::callingContext());
produceTask.doneWaiting(std::current_exception());
}
}
}));
}

std::shared_ptr<TAcquireFunc> acquireFunction_;
Expand Down
14 changes: 7 additions & 7 deletions FWCore/Framework/interface/maker/Worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -394,12 +394,12 @@ namespace edm {
ParentContext const&,
typename T::Context const*) noexcept;

void runAcquire(EventTransitionInfo const&, ParentContext const&, WaitingTaskWithArenaHolder&);
void runAcquire(EventTransitionInfo const&, ParentContext const&, WaitingTaskHolder&);

void runAcquireAfterAsyncPrefetch(std::exception_ptr,
EventTransitionInfo const&,
ParentContext const&,
WaitingTaskWithArenaHolder) noexcept;
WaitingTaskHolder) noexcept;

std::exception_ptr handleExternalWorkException(std::exception_ptr iEPtr,
ParentContext const& parentContext) noexcept;
Expand Down Expand Up @@ -519,7 +519,7 @@ namespace edm {
typename T::TransitionInfoType const&,
ServiceToken const&,
ParentContext const&,
WaitingTaskWithArenaHolder) noexcept {}
WaitingTaskHolder) noexcept {}
void execute() final {}
};

Expand All @@ -530,7 +530,7 @@ namespace edm {
EventTransitionInfo const& eventTransitionInfo,
ServiceToken const& token,
ParentContext const& parentContext,
WaitingTaskWithArenaHolder holder) noexcept
WaitingTaskHolder holder) noexcept
: m_worker(worker),
m_eventTransitionInfo(eventTransitionInfo),
m_parentContext(parentContext),
Expand Down Expand Up @@ -581,7 +581,7 @@ namespace edm {
Worker* m_worker;
EventTransitionInfo m_eventTransitionInfo;
ParentContext const m_parentContext;
WaitingTaskWithArenaHolder m_holder;
WaitingTaskHolder m_holder;
ServiceWeakToken m_serviceToken;
};

Expand Down Expand Up @@ -1127,7 +1127,7 @@ namespace edm {
auto* group = task.group();
moduleTask = make_waiting_task(
[this, weakToken, transitionInfo, parentContext, ownRunTask, group](std::exception_ptr const* iExcept) {
WaitingTaskWithArenaHolder runTaskHolder(
WaitingTaskHolder runTaskHolder(
*group, new HandleExternalWorkExceptionTask(this, group, ownRunTask->release(), parentContext));
AcquireTask<T> t(this, transitionInfo, weakToken.lock(), parentContext, runTaskHolder);
t.execute();
Expand All @@ -1154,7 +1154,7 @@ namespace edm {
auto group = task.group();
if constexpr (T::isEvent_) {
if (hasAcquire()) {
WaitingTaskWithArenaHolder runTaskHolder(
WaitingTaskHolder runTaskHolder(
*group, new HandleExternalWorkExceptionTask(this, group, moduleTask, parentContext));
moduleTask = new AcquireTask<T>(this, transitionInfo, token, parentContext, std::move(runTaskHolder));
}
Expand Down
4 changes: 2 additions & 2 deletions FWCore/Framework/src/TransformerBase.cc
Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,9 @@ namespace edm {
handle);
}
});
WaitingTaskWithArenaHolder wta(*iHolder.group(), nextTask);
WaitingTaskHolder wta(*iHolder.group(), nextTask);
CMS_SA_ALLOW try {
*cache = transformInfo_.get<kPreTransform>(iIndex)(*(handle->wrapper()), wta);
*cache = transformInfo_.get<kPreTransform>(iIndex)(*(handle->wrapper()), WaitingTaskWithArenaHolder(wta));
} catch (...) {
wta.doneWaiting(std::current_exception());
}
Expand Down
9 changes: 5 additions & 4 deletions FWCore/Framework/src/Worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -395,10 +395,11 @@ namespace edm {

void Worker::runAcquire(EventTransitionInfo const& info,
ParentContext const& parentContext,
WaitingTaskWithArenaHolder& holder) {
WaitingTaskHolder& holder) {
ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
try {
convertException::wrap([&]() { this->implDoAcquire(info, &moduleCallingContext_, holder); });
WaitingTaskWithArenaHolder holderWithArena{holder};
convertException::wrap([&]() { this->implDoAcquire(info, &moduleCallingContext_, holderWithArena); });
} catch (cms::Exception& ex) {
edm::exceptionContext(ex, moduleCallingContext_);
if (shouldRethrowException(std::current_exception(), parentContext, true, shouldTryToContinue_)) {
Expand All @@ -411,7 +412,7 @@ namespace edm {
void Worker::runAcquireAfterAsyncPrefetch(std::exception_ptr iEPtr,
EventTransitionInfo const& eventTransitionInfo,
ParentContext const& parentContext,
WaitingTaskWithArenaHolder holder) noexcept {
WaitingTaskHolder holder) noexcept {
ranAcquireWithoutException_ = false;
std::exception_ptr exceptionPtr;
if (iEPtr) {
Expand All @@ -420,7 +421,7 @@ namespace edm {
}
moduleCallingContext_.setContext(ModuleCallingContext::State::kInvalid, ParentContext(), nullptr);
} else {
// Caught exception is propagated via WaitingTaskWithArenaHolder
// Caught exception is propagated via WaitingTaskHolder
CMS_SA_ALLOW try {
runAcquire(eventTransitionInfo, parentContext, holder);
ranAcquireWithoutException_ = true;
Expand Down

0 comments on commit 16a69d8

Please sign in to comment.