From 2f7add519aa4495f3ab2f6005d5e6b90acdc3c02 Mon Sep 17 00:00:00 2001 From: Giulio Eulisse <10544+ktf@users.noreply.github.com> Date: Sun, 22 Sep 2024 09:38:27 +0200 Subject: [PATCH] DPL: thread safe activity detection This makes sure that the detection of active data processors in a device is threadsafe by recording the last active one. The main loop will then short circuit libuv in case the last active dataprocessor was reported and it will reset the last active pointer to nullptr if it has not changed in the meanwhile (meaning that another data processor was actually able to process something). --- .../include/Framework/DataProcessingContext.h | 4 +- .../include/Framework/DataProcessingDevice.h | 1 - .../Core/include/Framework/DeviceState.h | 7 +++ Framework/Core/src/DataProcessingDevice.cxx | 60 ++++++++++++------- 4 files changed, 46 insertions(+), 26 deletions(-) diff --git a/Framework/Core/include/Framework/DataProcessingContext.h b/Framework/Core/include/Framework/DataProcessingContext.h index d71ad203b1580..9b7cbc238c942 100644 --- a/Framework/Core/include/Framework/DataProcessingContext.h +++ b/Framework/Core/include/Framework/DataProcessingContext.h @@ -26,9 +26,7 @@ struct DataProcessorSpec; struct DataProcessorContext { DataProcessorContext(DataProcessorContext const&) = delete; DataProcessorContext() = default; - // These are specific of a given context and therefore - // not shared by threads. - bool* wasActive = nullptr; + bool allDone = false; /// Latest run number we processed globally for this DataProcessor. int64_t lastRunNumberProcessed = -1; diff --git a/Framework/Core/include/Framework/DataProcessingDevice.h b/Framework/Core/include/Framework/DataProcessingDevice.h index d9565ebef84a2..67edaa99e532b 100644 --- a/Framework/Core/include/Framework/DataProcessingDevice.h +++ b/Framework/Core/include/Framework/DataProcessingDevice.h @@ -113,7 +113,6 @@ class DataProcessingDevice : public fair::mq::Device std::vector mPendingRegionInfos; /// A list of the region infos not yet notified. std::mutex mRegionInfoMutex; ProcessingPolicies mProcessingPolicies; /// User policies related to data processing - bool mWasActive = false; /// Whether or not the device was active at last iteration. std::vector mHandles; /// Handles to use to schedule work. std::vector mStreams; /// Information about the task running in the associated mHandle. /// Handle to wake up the main loop from other threads diff --git a/Framework/Core/include/Framework/DeviceState.h b/Framework/Core/include/Framework/DeviceState.h index 89961b3e92dc7..f6d863064ee66 100644 --- a/Framework/Core/include/Framework/DeviceState.h +++ b/Framework/Core/include/Framework/DeviceState.h @@ -30,6 +30,8 @@ typedef struct uv_async_s uv_async_t; namespace o2::framework { +struct DataProcessorContext; + /// Running state information of a given device struct DeviceState { /// Motivation for the loop being triggered. @@ -108,6 +110,11 @@ struct DeviceState { /// the bits we are interested in. std::vector severityStack; TransitionHandlingState transitionHandling = TransitionHandlingState::NoTransition; + + // The DataProcessorContext which was most recently active. + // We use this to determine if we should trigger the loop without + // waiting for some events. + std::atomic lastActiveDataProcessor = nullptr; }; } // namespace o2::framework diff --git a/Framework/Core/src/DataProcessingDevice.cxx b/Framework/Core/src/DataProcessingDevice.cxx index f3fe328e78a06..169613b18a2ee 100644 --- a/Framework/Core/src/DataProcessingDevice.cxx +++ b/Framework/Core/src/DataProcessingDevice.cxx @@ -1061,7 +1061,11 @@ void DataProcessingDevice::InitTask() // Whenever we InitTask, we consider as if the previous iteration // was successful, so that even if there is no timer or receiving // channel, we can still start an enumeration. - mWasActive = true; + DataProcessorContext* initialContext = nullptr; + bool idle = state.lastActiveDataProcessor.compare_exchange_strong(initialContext, (DataProcessorContext*)-1); + if (!idle) { + LOG(error) << "DataProcessor " << state.lastActiveDataProcessor.load()->spec->name << " was unexpectedly active"; + } // We should be ready to run here. Therefore we copy all the // required parts in the DataProcessorContext. Eventually we should @@ -1093,8 +1097,6 @@ void DataProcessingDevice::InitTask() void DataProcessingDevice::fillContext(DataProcessorContext& context, DeviceContext& deviceContext) { - context.wasActive = &mWasActive; - context.isSink = false; // If nothing is a sink, the rate limiting simply does not trigger. bool enableRateLimiting = std::stoi(fConfig->GetValue("timeframes-rate-limit")); @@ -1308,14 +1310,19 @@ void DataProcessingDevice::Run() { ServiceRegistryRef ref{mServiceRegistry}; ref.get().flushPending(mServiceRegistry); - auto shouldNotWait = (mWasActive && + DataProcessorContext* lastActive = state.lastActiveDataProcessor.load(); + // Reset to zero unless some other DataPorcessorContext completed in the meanwhile. + // In such case we will take care of it at next iteration. + state.lastActiveDataProcessor.compare_exchange_strong(lastActive, nullptr); + + auto shouldNotWait = (lastActive != nullptr && (state.streaming != StreamingState::Idle) && (state.activeSignals.empty())) || (state.streaming == StreamingState::EndOfStreaming); if (firstLoop) { shouldNotWait = true; firstLoop = false; } - if (mWasActive) { + if (lastActive != nullptr) { state.loopReason |= DeviceState::LoopReason::PREVIOUSLY_ACTIVE; } if (NewStatePending()) { @@ -1485,10 +1492,7 @@ void DataProcessingDevice::Run() } else { auto ref = ServiceRegistryRef{mServiceRegistry}; ref.get().handleExpired(reportExpiredOffer); - mWasActive = false; } - } else { - mWasActive = false; } } @@ -1510,7 +1514,6 @@ void DataProcessingDevice::doPrepare(ServiceRegistryRef ref) O2_SIGNPOST_ID_FROM_POINTER(dpid, device, &context); O2_SIGNPOST_START(device, dpid, "do_prepare", "Starting DataProcessorContext::doPrepare."); - *context.wasActive = false; { ref.get().call(); } @@ -1669,7 +1672,10 @@ void DataProcessingDevice::doPrepare(ServiceRegistryRef ref) socket.Events(&info.hasPendingEvents); if (info.hasPendingEvents) { info.readPolled = false; - *context.wasActive |= newMessages; + // In case there were messages, we consider it as activity + if (newMessages) { + state.lastActiveDataProcessor.store(&context); + } } O2_SIGNPOST_END(device, cid, "channels", "Done processing channel %{public}s (%d).", channelSpec.name.c_str(), info.id.value); @@ -1693,24 +1699,29 @@ void DataProcessingDevice::doRun(ServiceRegistryRef ref) auto& spec = ref.get(); if (state.streaming == StreamingState::Idle) { - *context.wasActive = false; return; } context.completed.clear(); context.completed.reserve(16); - *context.wasActive |= DataProcessingDevice::tryDispatchComputation(ref, context.completed); + if (DataProcessingDevice::tryDispatchComputation(ref, context.completed)) { + state.lastActiveDataProcessor.store(&context); + } DanglingContext danglingContext{*context.registry}; context.preDanglingCallbacks(danglingContext); - if (*context.wasActive == false) { + if (state.lastActiveDataProcessor.load() == nullptr) { ref.get().call(); } auto activity = ref.get().processDanglingInputs(context.expirationHandlers, *context.registry, true); - *context.wasActive |= activity.expiredSlots > 0; + if (activity.expiredSlots > 0) { + state.lastActiveDataProcessor = &context; + } context.completed.clear(); - *context.wasActive |= DataProcessingDevice::tryDispatchComputation(ref, context.completed); + if (DataProcessingDevice::tryDispatchComputation(ref, context.completed)) { + state.lastActiveDataProcessor = &context; + } context.postDanglingCallbacks(danglingContext); @@ -1720,7 +1731,7 @@ void DataProcessingDevice::doRun(ServiceRegistryRef ref) // framework itself. if (context.allDone == true && state.streaming == StreamingState::Streaming) { switchState(StreamingState::EndOfStreaming); - *context.wasActive = true; + state.lastActiveDataProcessor = &context; } if (state.streaming == StreamingState::EndOfStreaming) { @@ -1766,7 +1777,10 @@ void DataProcessingDevice::doRun(ServiceRegistryRef ref) // This is needed because the transport is deleted before the device. relayer.clear(); switchState(StreamingState::Idle); - *context.wasActive = shouldProcess; + // In case we should process, note the data processor responsible for it + if (shouldProcess) { + state.lastActiveDataProcessor = &context; + } // On end of stream we shut down all output pollers. O2_SIGNPOST_EVENT_EMIT(device, dpid, "state", "Shutting down output pollers."); for (auto& poller : state.activeOutputPollers) { @@ -1834,6 +1848,7 @@ void DataProcessingDevice::handleData(ServiceRegistryRef ref, InputChannelInfo& O2_SIGNPOST_ID_FROM_POINTER(cid, device, &info); auto ref = ServiceRegistryRef{*context.registry}; auto& stats = ref.get(); + auto& state = ref.get(); auto& parts = info.parts; stats.updateStats({(int)ProcessingStatsId::TOTAL_INPUTS, DataProcessingStats::Op::Set, (int64_t)parts.Size()}); @@ -1856,14 +1871,14 @@ void DataProcessingDevice::handleData(ServiceRegistryRef ref, InputChannelInfo& O2_SIGNPOST_EVENT_EMIT(device, cid, "handle_data", "Got SourceInfoHeader with state %d", (int)sih->state); info.state = sih->state; insertInputInfo(pi, 2, InputType::SourceInfo, info.id); - *context.wasActive = true; + state.lastActiveDataProcessor = &context; continue; } auto dih = o2::header::get(headerData); if (dih) { O2_SIGNPOST_EVENT_EMIT(device, cid, "handle_data", "Got DomainInfoHeader with oldestPossibleTimeslice %d", (int)dih->oldestPossibleTimeslice); insertInputInfo(pi, 2, InputType::DomainInfo, info.id); - *context.wasActive = true; + state.lastActiveDataProcessor = &context; continue; } auto dh = o2::header::get(headerData); @@ -1925,6 +1940,7 @@ void DataProcessingDevice::handleData(ServiceRegistryRef ref, InputChannelInfo& auto handleValidMessages = [&info, ref, &reportError](std::vector const& inputInfos) { auto& relayer = ref.get(); + auto& state = ref.get(); static WaitBackpressurePolicy policy; auto& parts = info.parts; // We relay execution to make sure we have a complete set of parts @@ -2012,7 +2028,7 @@ void DataProcessingDevice::handleData(ServiceRegistryRef ref, InputChannelInfo& case InputType::SourceInfo: { LOGP(detail, "Received SourceInfo"); auto& context = ref.get(); - *context.wasActive = true; + state.lastActiveDataProcessor = &context; auto headerIndex = input.position; auto payloadIndex = input.position + 1; assert(payloadIndex < parts.Size()); @@ -2030,7 +2046,7 @@ void DataProcessingDevice::handleData(ServiceRegistryRef ref, InputChannelInfo& /// We have back pressure, therefore we do not process DomainInfo anymore. /// until the previous message are processed. auto& context = ref.get(); - *context.wasActive = true; + state.lastActiveDataProcessor = &context; auto headerIndex = input.position; auto payloadIndex = input.position + 1; assert(payloadIndex < parts.Size()); @@ -2058,7 +2074,7 @@ void DataProcessingDevice::handleData(ServiceRegistryRef ref, InputChannelInfo& auto& context = ref.get(); context.domainInfoUpdatedCallback(*context.registry, oldestPossibleTimeslice, info.id); ref.get().call((ServiceRegistryRef)*context.registry, (size_t)oldestPossibleTimeslice, (ChannelIndex)info.id); - *context.wasActive = true; + state.lastActiveDataProcessor = &context; } auto it = std::remove_if(parts.fParts.begin(), parts.fParts.end(), [](auto& msg) -> bool { return msg.get() == nullptr; }); parts.fParts.erase(it, parts.end());