Skip to content

Commit

Permalink
DPL Data Model: introduce bit to keep track of data created after EoS
Browse files Browse the repository at this point in the history
  • Loading branch information
ktf committed Jul 20, 2024
1 parent 7458f67 commit 7b75b3f
Show file tree
Hide file tree
Showing 5 changed files with 18 additions and 4 deletions.
5 changes: 3 additions & 2 deletions DataFormats/Headers/include/Headers/DataHeader.h
Original file line number Diff line number Diff line change
Expand Up @@ -372,8 +372,9 @@ struct BaseHeader {
union {
uint32_t flags;
struct {
uint32_t flagsNextHeader : 1, // do we have a next header after this one?
flagsUnused : 31; // currently unused
uint32_t flagsNextHeader : 1, // do we have a next header after this one?
flagsReserved : 15, // reserved for future use
flagsDerivedHeader : 16; // reserved for usage by the derived header
};
};

Expand Down
3 changes: 3 additions & 0 deletions Framework/Core/include/Framework/DataProcessingHeader.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ namespace o2::framework
/// @ingroup aliceo2_dataformats_dataheader
struct DataProcessingHeader : public header::BaseHeader {
static constexpr uint64_t DUMMY_CREATION_TIME_OFFSET = 0x8000000000000000;
// The following flags are used to indicate the behavior of the data processing
static constexpr int32_t KEEP_AT_EOS_FLAG = 1;

/// We return some number of milliseconds, offsetting int by 0x8000000000000000
/// to make sure we can understand when the dummy constructor of DataProcessingHeader was
/// used without overriding it with an actual real time from epoch.
Expand Down
4 changes: 3 additions & 1 deletion Framework/Core/include/Framework/TimingInfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,11 @@ struct TimingInfo {
/// from a new run, as being processed by the current stream.
/// FIXME: for now this is the same as the above.
bool streamRunNumberChanged = false;
/// Wether this kind of data should be flushed during end of stream.
bool keepAtEndOfStream = false;

static bool timesliceIsTimer(size_t timeslice) { return timeslice > 1652945069870351; }
bool isTimer() const { return timesliceIsTimer(timeslice); };
[[nodiscard]] bool isTimer() const { return timesliceIsTimer(timeslice); };
};

} // namespace o2::framework
Expand Down
1 change: 1 addition & 0 deletions Framework/Core/src/DataAllocator.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ fair::mq::MessagePtr DataAllocator::headerMessageFromOutput(Output const& spec,
dh.runNumber = timingInfo.runNumber;

DataProcessingHeader dph{timingInfo.timeslice, 1, timingInfo.creation};
static_cast<o2::header::BaseHeader&>(dph).flagsDerivedHeader |= timingInfo.keepAtEndOfStream ? DataProcessingHeader::KEEP_AT_EOS_FLAG : 0;
auto& proxy = mRegistry.get<FairMQDeviceProxy>();
auto* transport = proxy.getOutputTransport(routeIndex);

Expand Down
9 changes: 8 additions & 1 deletion Framework/Core/src/DataProcessingDevice.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -1298,6 +1298,7 @@ void DataProcessingDevice::Run()
state.loopReason |= DeviceState::LoopReason::NEW_STATE_PENDING;
}
if (state.transitionHandling == TransitionHandlingState::NoTransition && NewStatePending()) {
O2_SIGNPOST_EVENT_EMIT(calibration, lid, "timer_setup", "A new state is pending so we requested a new transition.");
state.transitionHandling = TransitionHandlingState::Requested;
auto& deviceContext = ref.get<DeviceContext>();
auto timeout = deviceContext.exitTransitionTimeout;
Expand Down Expand Up @@ -1697,7 +1698,7 @@ void DataProcessingDevice::doRun(ServiceRegistryRef ref)
}

if (state.streaming == StreamingState::EndOfStreaming) {
O2_SIGNPOST_EVENT_EMIT(device, dpid, "state", "We are in EndOfStreaming. Flushing queues.");
O2_SIGNPOST_EVENT_EMIT(device, dpid, "state", "Flushing queues.");
// We keep processing data until we are Idle.
// FIXME: not sure this is the correct way to drain the queues, but
// I guess we will see.
Expand All @@ -1710,6 +1711,12 @@ void DataProcessingDevice::doRun(ServiceRegistryRef ref)
while (DataProcessingDevice::tryDispatchComputation(ref, context.completed) && shouldProcess) {
relayer.processDanglingInputs(context.expirationHandlers, *context.registry, false);
}

auto& timingInfo = ref.get<TimingInfo>();
// We should keep the data generated at end of stream only for those
// which are not sources.
timingInfo.keepAtEndOfStream = shouldProcess;

EndOfStreamContext eosContext{*context.registry, ref.get<DataAllocator>()};

context.preEOSCallbacks(eosContext);
Expand Down

0 comments on commit 7b75b3f

Please sign in to comment.