diff --git a/Framework/Core/include/Framework/DataProcessingContext.h b/Framework/Core/include/Framework/DataProcessingContext.h index d71ad203b1580..9b7cbc238c942 100644 --- a/Framework/Core/include/Framework/DataProcessingContext.h +++ b/Framework/Core/include/Framework/DataProcessingContext.h @@ -26,9 +26,7 @@ struct DataProcessorSpec; struct DataProcessorContext { DataProcessorContext(DataProcessorContext const&) = delete; DataProcessorContext() = default; - // These are specific of a given context and therefore - // not shared by threads. - bool* wasActive = nullptr; + bool allDone = false; /// Latest run number we processed globally for this DataProcessor. int64_t lastRunNumberProcessed = -1; diff --git a/Framework/Core/include/Framework/DataProcessingDevice.h b/Framework/Core/include/Framework/DataProcessingDevice.h index d9565ebef84a2..67edaa99e532b 100644 --- a/Framework/Core/include/Framework/DataProcessingDevice.h +++ b/Framework/Core/include/Framework/DataProcessingDevice.h @@ -113,7 +113,6 @@ class DataProcessingDevice : public fair::mq::Device std::vector mPendingRegionInfos; /// A list of the region infos not yet notified. std::mutex mRegionInfoMutex; ProcessingPolicies mProcessingPolicies; /// User policies related to data processing - bool mWasActive = false; /// Whether or not the device was active at last iteration. std::vector mHandles; /// Handles to use to schedule work. std::vector mStreams; /// Information about the task running in the associated mHandle. /// Handle to wake up the main loop from other threads diff --git a/Framework/Core/include/Framework/DeviceState.h b/Framework/Core/include/Framework/DeviceState.h index 89961b3e92dc7..f6d863064ee66 100644 --- a/Framework/Core/include/Framework/DeviceState.h +++ b/Framework/Core/include/Framework/DeviceState.h @@ -30,6 +30,8 @@ typedef struct uv_async_s uv_async_t; namespace o2::framework { +struct DataProcessorContext; + /// Running state information of a given device struct DeviceState { /// Motivation for the loop being triggered. @@ -108,6 +110,11 @@ struct DeviceState { /// the bits we are interested in. std::vector severityStack; TransitionHandlingState transitionHandling = TransitionHandlingState::NoTransition; + + // The DataProcessorContext which was most recently active. + // We use this to determine if we should trigger the loop without + // waiting for some events. + std::atomic lastActiveDataProcessor = nullptr; }; } // namespace o2::framework diff --git a/Framework/Core/src/DataProcessingDevice.cxx b/Framework/Core/src/DataProcessingDevice.cxx index f3fe328e78a06..169613b18a2ee 100644 --- a/Framework/Core/src/DataProcessingDevice.cxx +++ b/Framework/Core/src/DataProcessingDevice.cxx @@ -1061,7 +1061,11 @@ void DataProcessingDevice::InitTask() // Whenever we InitTask, we consider as if the previous iteration // was successful, so that even if there is no timer or receiving // channel, we can still start an enumeration. - mWasActive = true; + DataProcessorContext* initialContext = nullptr; + bool idle = state.lastActiveDataProcessor.compare_exchange_strong(initialContext, (DataProcessorContext*)-1); + if (!idle) { + LOG(error) << "DataProcessor " << state.lastActiveDataProcessor.load()->spec->name << " was unexpectedly active"; + } // We should be ready to run here. Therefore we copy all the // required parts in the DataProcessorContext. Eventually we should @@ -1093,8 +1097,6 @@ void DataProcessingDevice::InitTask() void DataProcessingDevice::fillContext(DataProcessorContext& context, DeviceContext& deviceContext) { - context.wasActive = &mWasActive; - context.isSink = false; // If nothing is a sink, the rate limiting simply does not trigger. bool enableRateLimiting = std::stoi(fConfig->GetValue("timeframes-rate-limit")); @@ -1308,14 +1310,19 @@ void DataProcessingDevice::Run() { ServiceRegistryRef ref{mServiceRegistry}; ref.get().flushPending(mServiceRegistry); - auto shouldNotWait = (mWasActive && + DataProcessorContext* lastActive = state.lastActiveDataProcessor.load(); + // Reset to zero unless some other DataPorcessorContext completed in the meanwhile. + // In such case we will take care of it at next iteration. + state.lastActiveDataProcessor.compare_exchange_strong(lastActive, nullptr); + + auto shouldNotWait = (lastActive != nullptr && (state.streaming != StreamingState::Idle) && (state.activeSignals.empty())) || (state.streaming == StreamingState::EndOfStreaming); if (firstLoop) { shouldNotWait = true; firstLoop = false; } - if (mWasActive) { + if (lastActive != nullptr) { state.loopReason |= DeviceState::LoopReason::PREVIOUSLY_ACTIVE; } if (NewStatePending()) { @@ -1485,10 +1492,7 @@ void DataProcessingDevice::Run() } else { auto ref = ServiceRegistryRef{mServiceRegistry}; ref.get().handleExpired(reportExpiredOffer); - mWasActive = false; } - } else { - mWasActive = false; } } @@ -1510,7 +1514,6 @@ void DataProcessingDevice::doPrepare(ServiceRegistryRef ref) O2_SIGNPOST_ID_FROM_POINTER(dpid, device, &context); O2_SIGNPOST_START(device, dpid, "do_prepare", "Starting DataProcessorContext::doPrepare."); - *context.wasActive = false; { ref.get().call(); } @@ -1669,7 +1672,10 @@ void DataProcessingDevice::doPrepare(ServiceRegistryRef ref) socket.Events(&info.hasPendingEvents); if (info.hasPendingEvents) { info.readPolled = false; - *context.wasActive |= newMessages; + // In case there were messages, we consider it as activity + if (newMessages) { + state.lastActiveDataProcessor.store(&context); + } } O2_SIGNPOST_END(device, cid, "channels", "Done processing channel %{public}s (%d).", channelSpec.name.c_str(), info.id.value); @@ -1693,24 +1699,29 @@ void DataProcessingDevice::doRun(ServiceRegistryRef ref) auto& spec = ref.get(); if (state.streaming == StreamingState::Idle) { - *context.wasActive = false; return; } context.completed.clear(); context.completed.reserve(16); - *context.wasActive |= DataProcessingDevice::tryDispatchComputation(ref, context.completed); + if (DataProcessingDevice::tryDispatchComputation(ref, context.completed)) { + state.lastActiveDataProcessor.store(&context); + } DanglingContext danglingContext{*context.registry}; context.preDanglingCallbacks(danglingContext); - if (*context.wasActive == false) { + if (state.lastActiveDataProcessor.load() == nullptr) { ref.get().call(); } auto activity = ref.get().processDanglingInputs(context.expirationHandlers, *context.registry, true); - *context.wasActive |= activity.expiredSlots > 0; + if (activity.expiredSlots > 0) { + state.lastActiveDataProcessor = &context; + } context.completed.clear(); - *context.wasActive |= DataProcessingDevice::tryDispatchComputation(ref, context.completed); + if (DataProcessingDevice::tryDispatchComputation(ref, context.completed)) { + state.lastActiveDataProcessor = &context; + } context.postDanglingCallbacks(danglingContext); @@ -1720,7 +1731,7 @@ void DataProcessingDevice::doRun(ServiceRegistryRef ref) // framework itself. if (context.allDone == true && state.streaming == StreamingState::Streaming) { switchState(StreamingState::EndOfStreaming); - *context.wasActive = true; + state.lastActiveDataProcessor = &context; } if (state.streaming == StreamingState::EndOfStreaming) { @@ -1766,7 +1777,10 @@ void DataProcessingDevice::doRun(ServiceRegistryRef ref) // This is needed because the transport is deleted before the device. relayer.clear(); switchState(StreamingState::Idle); - *context.wasActive = shouldProcess; + // In case we should process, note the data processor responsible for it + if (shouldProcess) { + state.lastActiveDataProcessor = &context; + } // On end of stream we shut down all output pollers. O2_SIGNPOST_EVENT_EMIT(device, dpid, "state", "Shutting down output pollers."); for (auto& poller : state.activeOutputPollers) { @@ -1834,6 +1848,7 @@ void DataProcessingDevice::handleData(ServiceRegistryRef ref, InputChannelInfo& O2_SIGNPOST_ID_FROM_POINTER(cid, device, &info); auto ref = ServiceRegistryRef{*context.registry}; auto& stats = ref.get(); + auto& state = ref.get(); auto& parts = info.parts; stats.updateStats({(int)ProcessingStatsId::TOTAL_INPUTS, DataProcessingStats::Op::Set, (int64_t)parts.Size()}); @@ -1856,14 +1871,14 @@ void DataProcessingDevice::handleData(ServiceRegistryRef ref, InputChannelInfo& O2_SIGNPOST_EVENT_EMIT(device, cid, "handle_data", "Got SourceInfoHeader with state %d", (int)sih->state); info.state = sih->state; insertInputInfo(pi, 2, InputType::SourceInfo, info.id); - *context.wasActive = true; + state.lastActiveDataProcessor = &context; continue; } auto dih = o2::header::get(headerData); if (dih) { O2_SIGNPOST_EVENT_EMIT(device, cid, "handle_data", "Got DomainInfoHeader with oldestPossibleTimeslice %d", (int)dih->oldestPossibleTimeslice); insertInputInfo(pi, 2, InputType::DomainInfo, info.id); - *context.wasActive = true; + state.lastActiveDataProcessor = &context; continue; } auto dh = o2::header::get(headerData); @@ -1925,6 +1940,7 @@ void DataProcessingDevice::handleData(ServiceRegistryRef ref, InputChannelInfo& auto handleValidMessages = [&info, ref, &reportError](std::vector const& inputInfos) { auto& relayer = ref.get(); + auto& state = ref.get(); static WaitBackpressurePolicy policy; auto& parts = info.parts; // We relay execution to make sure we have a complete set of parts @@ -2012,7 +2028,7 @@ void DataProcessingDevice::handleData(ServiceRegistryRef ref, InputChannelInfo& case InputType::SourceInfo: { LOGP(detail, "Received SourceInfo"); auto& context = ref.get(); - *context.wasActive = true; + state.lastActiveDataProcessor = &context; auto headerIndex = input.position; auto payloadIndex = input.position + 1; assert(payloadIndex < parts.Size()); @@ -2030,7 +2046,7 @@ void DataProcessingDevice::handleData(ServiceRegistryRef ref, InputChannelInfo& /// We have back pressure, therefore we do not process DomainInfo anymore. /// until the previous message are processed. auto& context = ref.get(); - *context.wasActive = true; + state.lastActiveDataProcessor = &context; auto headerIndex = input.position; auto payloadIndex = input.position + 1; assert(payloadIndex < parts.Size()); @@ -2058,7 +2074,7 @@ void DataProcessingDevice::handleData(ServiceRegistryRef ref, InputChannelInfo& auto& context = ref.get(); context.domainInfoUpdatedCallback(*context.registry, oldestPossibleTimeslice, info.id); ref.get().call((ServiceRegistryRef)*context.registry, (size_t)oldestPossibleTimeslice, (ChannelIndex)info.id); - *context.wasActive = true; + state.lastActiveDataProcessor = &context; } auto it = std::remove_if(parts.fParts.begin(), parts.fParts.end(), [](auto& msg) -> bool { return msg.get() == nullptr; }); parts.fParts.erase(it, parts.end());