Skip to content

Commit

Permalink
DPL: wait as long as possible for Sporadic inputs
Browse files Browse the repository at this point in the history
Right now if we have the standard consumeWhenAll policy and we have a sporadic
input, it will wait indefinitely until all the inputs arrive or it will drop
timeframes without that sporadic input.

This changes the behavior and waits only until the oldest possible timeframe
does not allow the Sporadic input to be there. At which point, it schedules the
processing in any case, under the assumption that a task declaring a sporadic
input knows what to do in case it's not there.
  • Loading branch information
ktf committed Dec 12, 2023
1 parent 022b9cc commit bdecace
Showing 1 changed file with 24 additions and 1 deletion.
25 changes: 24 additions & 1 deletion Framework/Core/src/CompletionPolicyHelpers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "Framework/DeviceSpec.h"
#include "Framework/CompilerBuiltins.h"
#include "Framework/Logger.h"
#include "Framework/TimesliceIndex.h"
#include "Framework/TimingInfo.h"
#include "DecongestionService.h"

Expand Down Expand Up @@ -107,10 +108,32 @@ CompletionPolicy CompletionPolicyHelpers::consumeWhenAll(const char* name, Compl
{
auto callback = [](InputSpan const& inputs, std::vector<InputSpec> const& specs, ServiceRegistryRef& ref) -> CompletionPolicy::CompletionOp {
assert(inputs.size() == spec.size());

size_t si = 0;
bool missingSporadic = false;
size_t currentTimeslice = -1;
for (auto& input : inputs) {
if (input.header == nullptr) {
assert(si < specs.size());
auto& spec = specs[si++];
if (input.header == nullptr && spec.lifetime != Lifetime::Sporadic) {
return CompletionPolicy::CompletionOp::Wait;
}
if (input.header == nullptr && spec.lifetime == Lifetime::Sporadic) {
missingSporadic |= true;
}
if (input.header != nullptr) {
auto const* dph = framework::DataRefUtils::getHeader<o2::framework::DataProcessingHeader*>(input);
if (dph && !TimingInfo::timesliceIsTimer(dph->startTime)) {
currentTimeslice = dph->startTime;
break;
}
}
}
// If some sporadic inputs are missing, we wait for them util we are sure they will not come,
// i.e. until the oldest possible timeslice is beyond the timeslice of the input.
auto& timesliceIndex = ref.get<TimesliceIndex>();
if (missingSporadic && currentTimeslice >= timesliceIndex.getOldestPossibleInput().timeslice.value) {
return CompletionPolicy::CompletionOp::Wait;
}
return CompletionPolicy::CompletionOp::Consume;
};
Expand Down

0 comments on commit bdecace

Please sign in to comment.