diff --git a/Framework/Core/include/Framework/FairMQDeviceProxy.h b/Framework/Core/include/Framework/FairMQDeviceProxy.h index 49ccd529d5d2e..46b35f54f21ba 100644 --- a/Framework/Core/include/Framework/FairMQDeviceProxy.h +++ b/Framework/Core/include/Framework/FairMQDeviceProxy.h @@ -55,7 +55,7 @@ class FairMQDeviceProxy /// Retrieve the channel index from a given OutputSpec and the associated timeslice [[nodiscard]] ChannelIndex getOutputChannelIndex(OutputSpec const& spec, size_t timeslice) const; /// Retrieve the channel index from a given OutputSpec and the associated timeslice - [[nodiscard]] ChannelIndex getForwardChannelIndex(header::DataHeader const& header, size_t timeslice) const; + void getMatchingForwardChannelIndexes(std::vector& result, header::DataHeader const& header, size_t timeslice) const; /// ChannelIndex from a RouteIndex [[nodiscard]] ChannelIndex getOutputChannelIndex(RouteIndex routeIndex) const; [[nodiscard]] ChannelIndex getInputChannelIndex(RouteIndex routeIndex) const; diff --git a/Framework/Core/src/DataProcessingDevice.cxx b/Framework/Core/src/DataProcessingDevice.cxx index da502d016de78..65cac4a83a027 100644 --- a/Framework/Core/src/DataProcessingDevice.cxx +++ b/Framework/Core/src/DataProcessingDevice.cxx @@ -553,7 +553,7 @@ static auto toBeForwardedHeader = [](void* header) -> bool { return true; }; -static auto toBeforwardedMessageSet = [](ChannelIndex& cachedForwardingChoice, +static auto toBeforwardedMessageSet = [](std::vector& cachedForwardingChoices, FairMQDeviceProxy& proxy, std::unique_ptr& header, std::unique_ptr& payload, @@ -589,13 +589,9 @@ static auto toBeforwardedMessageSet = [](ChannelIndex& cachedForwardingChoice, // part of a split payload. All the others will use the same. // but always check if we have a sequence of multiple payloads if (fdh->splitPayloadIndex == 0 || fdh->splitPayloadParts <= 1 || total > 1) { - cachedForwardingChoice = proxy.getForwardChannelIndex(*fdh, fdph->startTime); + proxy.getMatchingForwardChannelIndexes(cachedForwardingChoices, *fdh, fdph->startTime); } - /// We did not find a match. Skip it. - if (cachedForwardingChoice.value == ChannelIndex::INVALID) { - return false; - } - return true; + return cachedForwardingChoices.empty() == false; }; // This is how we do the forwarding, i.e. we push @@ -609,6 +605,7 @@ static auto forwardInputs = [](ServiceRegistryRef registry, TimesliceSlot slot, // we collect all messages per forward in a map and send them together std::vector forwardedParts; forwardedParts.resize(proxy.getNumForwards()); + std::vector cachedForwardingChoices{}; for (size_t ii = 0, ie = currentSetOfInputs.size(); ii < ie; ++ii) { auto& messageSet = currentSetOfInputs[ii]; @@ -619,7 +616,7 @@ static auto forwardInputs = [](ServiceRegistryRef registry, TimesliceSlot slot, if (!toBeForwardedHeader(messageSet.header(0)->GetData())) { continue; } - ChannelIndex cachedForwardingChoice{ChannelIndex::INVALID}; + cachedForwardingChoices.clear(); for (size_t pi = 0; pi < currentSetOfInputs[ii].size(); ++pi) { auto& messageSet = currentSetOfInputs[ii]; @@ -627,24 +624,41 @@ static auto forwardInputs = [](ServiceRegistryRef registry, TimesliceSlot slot, auto& payload = messageSet.payload(pi); auto total = messageSet.getNumberOfPayloads(pi); - if (!toBeforwardedMessageSet(cachedForwardingChoice, proxy, header, payload, total, consume)) { + if (!toBeforwardedMessageSet(cachedForwardingChoices, proxy, header, payload, total, consume)) { continue; } + // In case of more than one forward route, we need to copy the message. + // This will eventually use the same mamory if running with the same backend. + if (cachedForwardingChoices.size() > 1) { + copy = true; + } if (copy) { - auto&& newHeader = header->GetTransport()->CreateMessage(); - newHeader->Copy(*header); - forwardedParts[cachedForwardingChoice.value].AddPart(std::move(newHeader)); + for (auto& cachedForwardingChoice : cachedForwardingChoices) { + auto&& newHeader = header->GetTransport()->CreateMessage(); + auto* dh = o2::header::get(header->GetData()); + if (dh) { + LOGP(info, "Forwarding {} to {}", DataSpecUtils::describe({dh->dataOrigin, dh->dataDescription, dh->subSpecification}), cachedForwardingChoice.value); + } else { + auto dih = o2::header::get(newHeader->GetData()); + if (dih) { + LOGP(info, "DomainInfoHeader being forwarded"); + } + LOGP(error, "Data is missing DataHeader"); + } + newHeader->Copy(*header); + forwardedParts[cachedForwardingChoice.value].AddPart(std::move(newHeader)); - for (size_t payloadIndex = 0; payloadIndex < messageSet.getNumberOfPayloads(pi); ++payloadIndex) { - auto&& newPayload = header->GetTransport()->CreateMessage(); - newPayload->Copy(*messageSet.payload(pi, payloadIndex)); - forwardedParts[cachedForwardingChoice.value].AddPart(std::move(newPayload)); + for (size_t payloadIndex = 0; payloadIndex < messageSet.getNumberOfPayloads(pi); ++payloadIndex) { + auto&& newPayload = header->GetTransport()->CreateMessage(); + newPayload->Copy(*messageSet.payload(pi, payloadIndex)); + forwardedParts[cachedForwardingChoice.value].AddPart(std::move(newPayload)); + } } } else { - forwardedParts[cachedForwardingChoice.value].AddPart(std::move(messageSet.header(pi))); + forwardedParts[cachedForwardingChoices.back().value].AddPart(std::move(messageSet.header(pi))); for (size_t payloadIndex = 0; payloadIndex < messageSet.getNumberOfPayloads(pi); ++payloadIndex) { - forwardedParts[cachedForwardingChoice.value].AddPart(std::move(messageSet.payload(pi, payloadIndex))); + forwardedParts[cachedForwardingChoices.back().value].AddPart(std::move(messageSet.payload(pi, payloadIndex))); } } } @@ -1065,38 +1079,47 @@ void DataProcessingDevice::fillContext(DataProcessorContext& context, DeviceCont }; } - /// We must make sure there is no optional - /// if we want to optimize the forwarding - context.canForwardEarly = (spec.forwards.empty() == false) && mProcessingPolicies.earlyForward != EarlyForwardPolicy::NEVER; - bool onlyConditions = true; - bool overriddenEarlyForward = false; - for (auto& forwarded : spec.forwards) { - if (forwarded.matcher.lifetime != Lifetime::Condition) { - onlyConditions = false; - } - if (strncmp(DataSpecUtils::asConcreteOrigin(forwarded.matcher).str, "AOD", 3) == 0) { - context.canForwardEarly = false; - overriddenEarlyForward = true; - LOG(detail) << "Cannot forward early because of AOD input: " << DataSpecUtils::describe(forwarded.matcher); - break; - } - if (DataSpecUtils::partialMatch(forwarded.matcher, o2::header::DataDescription{"RAWDATA"}) && mProcessingPolicies.earlyForward == EarlyForwardPolicy::NORAW) { - context.canForwardEarly = false; - overriddenEarlyForward = true; - LOG(detail) << "Cannot forward early because of RAWDATA input: " << DataSpecUtils::describe(forwarded.matcher); - break; + auto decideEarlyForward = [&context, &spec, this]() -> bool { + // There is nothing produced by this device, so we can forward early + // because this is a proxy. + if (spec.forwards.empty() == false && spec.outputs.empty() == true) { + return true; + } + /// We must make sure there is no optional + /// if we want to optimize the forwarding + bool canForwardEarly = (spec.forwards.empty() == false) && mProcessingPolicies.earlyForward != EarlyForwardPolicy::NEVER; + bool onlyConditions = true; + bool overriddenEarlyForward = false; + for (auto& forwarded : spec.forwards) { + if (forwarded.matcher.lifetime != Lifetime::Condition) { + onlyConditions = false; + } + if (strncmp(DataSpecUtils::asConcreteOrigin(forwarded.matcher).str, "AOD", 3) == 0) { + context.canForwardEarly = false; + overriddenEarlyForward = true; + LOG(detail) << "Cannot forward early because of AOD input: " << DataSpecUtils::describe(forwarded.matcher); + break; + } + if (DataSpecUtils::partialMatch(forwarded.matcher, o2::header::DataDescription{"RAWDATA"}) && mProcessingPolicies.earlyForward == EarlyForwardPolicy::NORAW) { + context.canForwardEarly = false; + overriddenEarlyForward = true; + LOG(detail) << "Cannot forward early because of RAWDATA input: " << DataSpecUtils::describe(forwarded.matcher); + break; + } + if (forwarded.matcher.lifetime == Lifetime::Optional) { + context.canForwardEarly = false; + overriddenEarlyForward = true; + LOG(detail) << "Cannot forward early because of Optional input: " << DataSpecUtils::describe(forwarded.matcher); + break; + } } - if (forwarded.matcher.lifetime == Lifetime::Optional) { - context.canForwardEarly = false; - overriddenEarlyForward = true; - LOG(detail) << "Cannot forward early because of Optional input: " << DataSpecUtils::describe(forwarded.matcher); - break; + if (!overriddenEarlyForward && onlyConditions) { + context.canForwardEarly = true; + LOG(detail) << "Enabling early forwarding because only conditions to be forwarded"; } - } - if (!overriddenEarlyForward && onlyConditions) { - context.canForwardEarly = true; - LOG(detail) << "Enabling early forwarding because only conditions to be forwarded"; - } + return canForwardEarly; + }; + context.canForwardEarly = decideEarlyForward(); } void DataProcessingDevice::PreRun() @@ -1342,7 +1365,7 @@ void DataProcessingDevice::Run() auto& dpStats = ref.get(); dpStats.updateStats({static_cast(ProcessingStatsId::RESOURCE_OFFER_EXPIRED), DataProcessingStats::Op::Set, stats.totalExpiredOffers}); dpStats.updateStats({static_cast(ProcessingStatsId::ARROW_BYTES_EXPIRED), DataProcessingStats::Op::Set, stats.totalExpiredBytes}); - dpStats.processCommandQueue(); + dpStats.processCommandQueue(); }; auto ref = ServiceRegistryRef{mServiceRegistry}; diff --git a/Framework/Core/src/ExternalFairMQDeviceProxy.cxx b/Framework/Core/src/ExternalFairMQDeviceProxy.cxx index 085aec61d4e55..dcf3eec2452de 100644 --- a/Framework/Core/src/ExternalFairMQDeviceProxy.cxx +++ b/Framework/Core/src/ExternalFairMQDeviceProxy.cxx @@ -711,12 +711,6 @@ DataProcessorSpec specifyFairMQDeviceOutputProxy(char const* name, callbacks.set(channelConfigurationChecker); auto lastDataProcessingHeader = std::make_shared(0, 0); - if (deviceSpec.forwards.size() > 0) { - // check that no internal forwards are existing, i.e. that proxy is at the end of the workflow - // in principle we can be less strict here if we check only for the defined input specs that there - // are no internal forwards - throw std::runtime_error("can not add forward targets outside DPL if internal forwards are existing, the proxy must be at the end of the workflow"); - } auto& spec = const_cast(deviceSpec); for (auto const& inputSpec : inputSpecs) { // this is a prototype, in principle we want to have all spec objects const @@ -798,12 +792,6 @@ DataProcessorSpec specifyFairMQDeviceMultiOutputProxy(char const* name, // also we set forwards for all input specs and keep a list of all channels so we can send EOS on them auto channelNames = std::make_shared>(); auto channelConfigurationInitializer = [&proxy, inputSpecs = std::move(inputSpecs), device, channelSelector, &deviceSpec, channelNames]() { - if (deviceSpec.forwards.size() > 0) { - // check that no internal forwards are existing, i.e. that proxy is at the end of the workflow - // in principle we can be less strict here if we check only for the defined input specs that there - // are no internal forwards - throw std::runtime_error("can not add forward targets outside DPL if internal forwards are existing, the proxy must be at the end of the workflow"); - } channelNames->clear(); auto& mutableDeviceSpec = const_cast(deviceSpec); for (auto const& spec : inputSpecs) { diff --git a/Framework/Core/src/FairMQDeviceProxy.cxx b/Framework/Core/src/FairMQDeviceProxy.cxx index 47056558c9b22..a83057e9578be 100644 --- a/Framework/Core/src/FairMQDeviceProxy.cxx +++ b/Framework/Core/src/FairMQDeviceProxy.cxx @@ -21,6 +21,8 @@ #include #include +#include + namespace o2::framework { @@ -117,21 +119,37 @@ ChannelIndex FairMQDeviceProxy::getOutputChannelIndex(OutputSpec const& query, s return ChannelIndex{ChannelIndex::INVALID}; } -ChannelIndex FairMQDeviceProxy::getForwardChannelIndex(header::DataHeader const& dh, size_t timeslice) const +void FairMQDeviceProxy::getMatchingForwardChannelIndexes(std::vector& result, header::DataHeader const& dh, size_t timeslice) const { assert(mForwardRoutes.size() == mForwards.size()); // Notice we need to match against a data header and not against // the InputMatcher, because an input might match something which // is then rerouted to two different output routes, depending on the content. + // Also notice that we need to match against all the routes, because we + // might have multiple outputs routes (e.g. in the output proxy) with the same matcher. + bool dplChannelMatched = false; for (size_t ri = 0; ri < mForwards.size(); ++ri) { auto& route = mForwards[ri]; LOGP(debug, "matching: {} to route {}", dh, DataSpecUtils::describe(route.matcher)); if (DataSpecUtils::match(route.matcher, dh.dataOrigin, dh.dataDescription, dh.subSpecification) && ((timeslice % route.maxTimeslices) == route.timeslice)) { - return mForwardRoutes[ri].channel; + auto channelInfoIndex = mForwardRoutes[ri].channel; + auto& info = mForwardChannelInfos[channelInfoIndex.value]; + // We need to make sure that we forward the same payload only once per channel. + if (info.channelType == ChannelAccountingType::DPL) { + if (dplChannelMatched) { + continue; + } + dplChannelMatched = true; + } + result.emplace_back(channelInfoIndex); } } - return ChannelIndex{ChannelIndex::INVALID}; + // Remove duplicates, keeping the order of the channels. + std::unordered_set numSet; + auto iter = std::stable_partition(result.begin(), result.end(), + [&](ChannelIndex n) { bool ret = !numSet.count(n.value); numSet.insert(n.value); return ret; }); // returns true if the item has not been "seen" + result.erase(iter, result.end()); } ChannelIndex FairMQDeviceProxy::getOutputChannelIndexByName(std::string const& name) const