From 7b75b3fbf45799af08fad3a788bed9e916a0d5a3 Mon Sep 17 00:00:00 2001 From: Giulio Eulisse <10544+ktf@users.noreply.github.com> Date: Sat, 20 Jul 2024 10:20:03 +0200 Subject: [PATCH] DPL Data Model: introduce bit to keep track of data created after EoS --- DataFormats/Headers/include/Headers/DataHeader.h | 5 +++-- Framework/Core/include/Framework/DataProcessingHeader.h | 3 +++ Framework/Core/include/Framework/TimingInfo.h | 4 +++- Framework/Core/src/DataAllocator.cxx | 1 + Framework/Core/src/DataProcessingDevice.cxx | 9 ++++++++- 5 files changed, 18 insertions(+), 4 deletions(-) diff --git a/DataFormats/Headers/include/Headers/DataHeader.h b/DataFormats/Headers/include/Headers/DataHeader.h index c37eff9b34f20..e4ddaded20aba 100644 --- a/DataFormats/Headers/include/Headers/DataHeader.h +++ b/DataFormats/Headers/include/Headers/DataHeader.h @@ -372,8 +372,9 @@ struct BaseHeader { union { uint32_t flags; struct { - uint32_t flagsNextHeader : 1, // do we have a next header after this one? - flagsUnused : 31; // currently unused + uint32_t flagsNextHeader : 1, // do we have a next header after this one? + flagsReserved : 15, // reserved for future use + flagsDerivedHeader : 16; // reserved for usage by the derived header }; }; diff --git a/Framework/Core/include/Framework/DataProcessingHeader.h b/Framework/Core/include/Framework/DataProcessingHeader.h index 5c068b4e4179a..484dbb9d51a8e 100644 --- a/Framework/Core/include/Framework/DataProcessingHeader.h +++ b/Framework/Core/include/Framework/DataProcessingHeader.h @@ -42,6 +42,9 @@ namespace o2::framework /// @ingroup aliceo2_dataformats_dataheader struct DataProcessingHeader : public header::BaseHeader { static constexpr uint64_t DUMMY_CREATION_TIME_OFFSET = 0x8000000000000000; + // The following flags are used to indicate the behavior of the data processing + static constexpr int32_t KEEP_AT_EOS_FLAG = 1; + /// We return some number of milliseconds, offsetting int by 0x8000000000000000 /// to make sure we can understand when the dummy constructor of DataProcessingHeader was /// used without overriding it with an actual real time from epoch. diff --git a/Framework/Core/include/Framework/TimingInfo.h b/Framework/Core/include/Framework/TimingInfo.h index 84f3971ad3a4a..1734da8dd3941 100644 --- a/Framework/Core/include/Framework/TimingInfo.h +++ b/Framework/Core/include/Framework/TimingInfo.h @@ -36,9 +36,11 @@ struct TimingInfo { /// from a new run, as being processed by the current stream. /// FIXME: for now this is the same as the above. bool streamRunNumberChanged = false; + /// Wether this kind of data should be flushed during end of stream. + bool keepAtEndOfStream = false; static bool timesliceIsTimer(size_t timeslice) { return timeslice > 1652945069870351; } - bool isTimer() const { return timesliceIsTimer(timeslice); }; + [[nodiscard]] bool isTimer() const { return timesliceIsTimer(timeslice); }; }; } // namespace o2::framework diff --git a/Framework/Core/src/DataAllocator.cxx b/Framework/Core/src/DataAllocator.cxx index 0bf0f86f812ec..bae40f2b47947 100644 --- a/Framework/Core/src/DataAllocator.cxx +++ b/Framework/Core/src/DataAllocator.cxx @@ -121,6 +121,7 @@ fair::mq::MessagePtr DataAllocator::headerMessageFromOutput(Output const& spec, dh.runNumber = timingInfo.runNumber; DataProcessingHeader dph{timingInfo.timeslice, 1, timingInfo.creation}; + static_cast(dph).flagsDerivedHeader |= timingInfo.keepAtEndOfStream ? DataProcessingHeader::KEEP_AT_EOS_FLAG : 0; auto& proxy = mRegistry.get(); auto* transport = proxy.getOutputTransport(routeIndex); diff --git a/Framework/Core/src/DataProcessingDevice.cxx b/Framework/Core/src/DataProcessingDevice.cxx index 93940ef5ec21a..452681a06dfd1 100644 --- a/Framework/Core/src/DataProcessingDevice.cxx +++ b/Framework/Core/src/DataProcessingDevice.cxx @@ -1298,6 +1298,7 @@ void DataProcessingDevice::Run() state.loopReason |= DeviceState::LoopReason::NEW_STATE_PENDING; } if (state.transitionHandling == TransitionHandlingState::NoTransition && NewStatePending()) { + O2_SIGNPOST_EVENT_EMIT(calibration, lid, "timer_setup", "A new state is pending so we requested a new transition."); state.transitionHandling = TransitionHandlingState::Requested; auto& deviceContext = ref.get(); auto timeout = deviceContext.exitTransitionTimeout; @@ -1697,7 +1698,7 @@ void DataProcessingDevice::doRun(ServiceRegistryRef ref) } if (state.streaming == StreamingState::EndOfStreaming) { - O2_SIGNPOST_EVENT_EMIT(device, dpid, "state", "We are in EndOfStreaming. Flushing queues."); + O2_SIGNPOST_EVENT_EMIT(device, dpid, "state", "Flushing queues."); // We keep processing data until we are Idle. // FIXME: not sure this is the correct way to drain the queues, but // I guess we will see. @@ -1710,6 +1711,12 @@ void DataProcessingDevice::doRun(ServiceRegistryRef ref) while (DataProcessingDevice::tryDispatchComputation(ref, context.completed) && shouldProcess) { relayer.processDanglingInputs(context.expirationHandlers, *context.registry, false); } + + auto& timingInfo = ref.get(); + // We should keep the data generated at end of stream only for those + // which are not sources. + timingInfo.keepAtEndOfStream = shouldProcess; + EndOfStreamContext eosContext{*context.registry, ref.get()}; context.preEOSCallbacks(eosContext);