Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Please consider the following formatting changes to #11606 #212

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Framework/Core/include/Framework/FairMQDeviceProxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class FairMQDeviceProxy
/// Retrieve the channel index from a given OutputSpec and the associated timeslice
[[nodiscard]] ChannelIndex getOutputChannelIndex(OutputSpec const& spec, size_t timeslice) const;
/// Retrieve the channel index from a given OutputSpec and the associated timeslice
[[nodiscard]] ChannelIndex getForwardChannelIndex(header::DataHeader const& header, size_t timeslice) const;
void getMatchingForwardChannelIndexes(std::vector<ChannelIndex>& result, header::DataHeader const& header, size_t timeslice) const;
/// ChannelIndex from a RouteIndex
[[nodiscard]] ChannelIndex getOutputChannelIndex(RouteIndex routeIndex) const;
[[nodiscard]] ChannelIndex getInputChannelIndex(RouteIndex routeIndex) const;
Expand Down
121 changes: 72 additions & 49 deletions Framework/Core/src/DataProcessingDevice.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,7 @@ static auto toBeForwardedHeader = [](void* header) -> bool {
return true;
};

static auto toBeforwardedMessageSet = [](ChannelIndex& cachedForwardingChoice,
static auto toBeforwardedMessageSet = [](std::vector<ChannelIndex>& cachedForwardingChoices,
FairMQDeviceProxy& proxy,
std::unique_ptr<fair::mq::Message>& header,
std::unique_ptr<fair::mq::Message>& payload,
Expand Down Expand Up @@ -589,13 +589,9 @@ static auto toBeforwardedMessageSet = [](ChannelIndex& cachedForwardingChoice,
// part of a split payload. All the others will use the same.
// but always check if we have a sequence of multiple payloads
if (fdh->splitPayloadIndex == 0 || fdh->splitPayloadParts <= 1 || total > 1) {
cachedForwardingChoice = proxy.getForwardChannelIndex(*fdh, fdph->startTime);
proxy.getMatchingForwardChannelIndexes(cachedForwardingChoices, *fdh, fdph->startTime);
}
/// We did not find a match. Skip it.
if (cachedForwardingChoice.value == ChannelIndex::INVALID) {
return false;
}
return true;
return cachedForwardingChoices.empty() == false;
};

// This is how we do the forwarding, i.e. we push
Expand All @@ -609,6 +605,7 @@ static auto forwardInputs = [](ServiceRegistryRef registry, TimesliceSlot slot,
// we collect all messages per forward in a map and send them together
std::vector<fair::mq::Parts> forwardedParts;
forwardedParts.resize(proxy.getNumForwards());
std::vector<ChannelIndex> cachedForwardingChoices{};

for (size_t ii = 0, ie = currentSetOfInputs.size(); ii < ie; ++ii) {
auto& messageSet = currentSetOfInputs[ii];
Expand All @@ -619,32 +616,49 @@ static auto forwardInputs = [](ServiceRegistryRef registry, TimesliceSlot slot,
if (!toBeForwardedHeader(messageSet.header(0)->GetData())) {
continue;
}
ChannelIndex cachedForwardingChoice{ChannelIndex::INVALID};
cachedForwardingChoices.clear();

for (size_t pi = 0; pi < currentSetOfInputs[ii].size(); ++pi) {
auto& messageSet = currentSetOfInputs[ii];
auto& header = messageSet.header(pi);
auto& payload = messageSet.payload(pi);
auto total = messageSet.getNumberOfPayloads(pi);

if (!toBeforwardedMessageSet(cachedForwardingChoice, proxy, header, payload, total, consume)) {
if (!toBeforwardedMessageSet(cachedForwardingChoices, proxy, header, payload, total, consume)) {
continue;
}

// In case of more than one forward route, we need to copy the message.
// This will eventually use the same mamory if running with the same backend.
if (cachedForwardingChoices.size() > 1) {
copy = true;
}
if (copy) {
auto&& newHeader = header->GetTransport()->CreateMessage();
newHeader->Copy(*header);
forwardedParts[cachedForwardingChoice.value].AddPart(std::move(newHeader));
for (auto& cachedForwardingChoice : cachedForwardingChoices) {
auto&& newHeader = header->GetTransport()->CreateMessage();
auto* dh = o2::header::get<DataHeader*>(header->GetData());
if (dh) {
LOGP(info, "Forwarding {} to {}", DataSpecUtils::describe({dh->dataOrigin, dh->dataDescription, dh->subSpecification}), cachedForwardingChoice.value);
} else {
auto dih = o2::header::get<DomainInfoHeader*>(newHeader->GetData());
if (dih) {
LOGP(info, "DomainInfoHeader being forwarded");
}
LOGP(error, "Data is missing DataHeader");
}
newHeader->Copy(*header);
forwardedParts[cachedForwardingChoice.value].AddPart(std::move(newHeader));

for (size_t payloadIndex = 0; payloadIndex < messageSet.getNumberOfPayloads(pi); ++payloadIndex) {
auto&& newPayload = header->GetTransport()->CreateMessage();
newPayload->Copy(*messageSet.payload(pi, payloadIndex));
forwardedParts[cachedForwardingChoice.value].AddPart(std::move(newPayload));
for (size_t payloadIndex = 0; payloadIndex < messageSet.getNumberOfPayloads(pi); ++payloadIndex) {
auto&& newPayload = header->GetTransport()->CreateMessage();
newPayload->Copy(*messageSet.payload(pi, payloadIndex));
forwardedParts[cachedForwardingChoice.value].AddPart(std::move(newPayload));
}
}
} else {
forwardedParts[cachedForwardingChoice.value].AddPart(std::move(messageSet.header(pi)));
forwardedParts[cachedForwardingChoices.back().value].AddPart(std::move(messageSet.header(pi)));
for (size_t payloadIndex = 0; payloadIndex < messageSet.getNumberOfPayloads(pi); ++payloadIndex) {
forwardedParts[cachedForwardingChoice.value].AddPart(std::move(messageSet.payload(pi, payloadIndex)));
forwardedParts[cachedForwardingChoices.back().value].AddPart(std::move(messageSet.payload(pi, payloadIndex)));
}
}
}
Expand Down Expand Up @@ -1065,38 +1079,47 @@ void DataProcessingDevice::fillContext(DataProcessorContext& context, DeviceCont
};
}

/// We must make sure there is no optional
/// if we want to optimize the forwarding
context.canForwardEarly = (spec.forwards.empty() == false) && mProcessingPolicies.earlyForward != EarlyForwardPolicy::NEVER;
bool onlyConditions = true;
bool overriddenEarlyForward = false;
for (auto& forwarded : spec.forwards) {
if (forwarded.matcher.lifetime != Lifetime::Condition) {
onlyConditions = false;
}
if (strncmp(DataSpecUtils::asConcreteOrigin(forwarded.matcher).str, "AOD", 3) == 0) {
context.canForwardEarly = false;
overriddenEarlyForward = true;
LOG(detail) << "Cannot forward early because of AOD input: " << DataSpecUtils::describe(forwarded.matcher);
break;
}
if (DataSpecUtils::partialMatch(forwarded.matcher, o2::header::DataDescription{"RAWDATA"}) && mProcessingPolicies.earlyForward == EarlyForwardPolicy::NORAW) {
context.canForwardEarly = false;
overriddenEarlyForward = true;
LOG(detail) << "Cannot forward early because of RAWDATA input: " << DataSpecUtils::describe(forwarded.matcher);
break;
auto decideEarlyForward = [&context, &spec, this]() -> bool {
// There is nothing produced by this device, so we can forward early
// because this is a proxy.
if (spec.forwards.empty() == false && spec.outputs.empty() == true) {
return true;
}
/// We must make sure there is no optional
/// if we want to optimize the forwarding
bool canForwardEarly = (spec.forwards.empty() == false) && mProcessingPolicies.earlyForward != EarlyForwardPolicy::NEVER;
bool onlyConditions = true;
bool overriddenEarlyForward = false;
for (auto& forwarded : spec.forwards) {
if (forwarded.matcher.lifetime != Lifetime::Condition) {
onlyConditions = false;
}
if (strncmp(DataSpecUtils::asConcreteOrigin(forwarded.matcher).str, "AOD", 3) == 0) {
context.canForwardEarly = false;
overriddenEarlyForward = true;
LOG(detail) << "Cannot forward early because of AOD input: " << DataSpecUtils::describe(forwarded.matcher);
break;
}
if (DataSpecUtils::partialMatch(forwarded.matcher, o2::header::DataDescription{"RAWDATA"}) && mProcessingPolicies.earlyForward == EarlyForwardPolicy::NORAW) {
context.canForwardEarly = false;
overriddenEarlyForward = true;
LOG(detail) << "Cannot forward early because of RAWDATA input: " << DataSpecUtils::describe(forwarded.matcher);
break;
}
if (forwarded.matcher.lifetime == Lifetime::Optional) {
context.canForwardEarly = false;
overriddenEarlyForward = true;
LOG(detail) << "Cannot forward early because of Optional input: " << DataSpecUtils::describe(forwarded.matcher);
break;
}
}
if (forwarded.matcher.lifetime == Lifetime::Optional) {
context.canForwardEarly = false;
overriddenEarlyForward = true;
LOG(detail) << "Cannot forward early because of Optional input: " << DataSpecUtils::describe(forwarded.matcher);
break;
if (!overriddenEarlyForward && onlyConditions) {
context.canForwardEarly = true;
LOG(detail) << "Enabling early forwarding because only conditions to be forwarded";
}
}
if (!overriddenEarlyForward && onlyConditions) {
context.canForwardEarly = true;
LOG(detail) << "Enabling early forwarding because only conditions to be forwarded";
}
return canForwardEarly;
};
context.canForwardEarly = decideEarlyForward();
}

void DataProcessingDevice::PreRun()
Expand Down Expand Up @@ -1342,7 +1365,7 @@ void DataProcessingDevice::Run()
auto& dpStats = ref.get<DataProcessingStats>();
dpStats.updateStats({static_cast<short>(ProcessingStatsId::RESOURCE_OFFER_EXPIRED), DataProcessingStats::Op::Set, stats.totalExpiredOffers});
dpStats.updateStats({static_cast<short>(ProcessingStatsId::ARROW_BYTES_EXPIRED), DataProcessingStats::Op::Set, stats.totalExpiredBytes});
dpStats.processCommandQueue();
dpStats.processCommandQueue();
};
auto ref = ServiceRegistryRef{mServiceRegistry};

Expand Down
12 changes: 0 additions & 12 deletions Framework/Core/src/ExternalFairMQDeviceProxy.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -711,12 +711,6 @@ DataProcessorSpec specifyFairMQDeviceOutputProxy(char const* name,
callbacks.set<CallbackService::Id::Start>(channelConfigurationChecker);
auto lastDataProcessingHeader = std::make_shared<DataProcessingHeader>(0, 0);

if (deviceSpec.forwards.size() > 0) {
// check that no internal forwards are existing, i.e. that proxy is at the end of the workflow
// in principle we can be less strict here if we check only for the defined input specs that there
// are no internal forwards
throw std::runtime_error("can not add forward targets outside DPL if internal forwards are existing, the proxy must be at the end of the workflow");
}
auto& spec = const_cast<DeviceSpec&>(deviceSpec);
for (auto const& inputSpec : inputSpecs) {
// this is a prototype, in principle we want to have all spec objects const
Expand Down Expand Up @@ -798,12 +792,6 @@ DataProcessorSpec specifyFairMQDeviceMultiOutputProxy(char const* name,
// also we set forwards for all input specs and keep a list of all channels so we can send EOS on them
auto channelNames = std::make_shared<std::vector<std::string>>();
auto channelConfigurationInitializer = [&proxy, inputSpecs = std::move(inputSpecs), device, channelSelector, &deviceSpec, channelNames]() {
if (deviceSpec.forwards.size() > 0) {
// check that no internal forwards are existing, i.e. that proxy is at the end of the workflow
// in principle we can be less strict here if we check only for the defined input specs that there
// are no internal forwards
throw std::runtime_error("can not add forward targets outside DPL if internal forwards are existing, the proxy must be at the end of the workflow");
}
channelNames->clear();
auto& mutableDeviceSpec = const_cast<DeviceSpec&>(deviceSpec);
for (auto const& spec : inputSpecs) {
Expand Down
24 changes: 21 additions & 3 deletions Framework/Core/src/FairMQDeviceProxy.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
#include <fairmq/Message.h>
#include <fairmq/TransportFactory.h>

#include <unordered_set>

namespace o2::framework
{

Expand Down Expand Up @@ -117,21 +119,37 @@ ChannelIndex FairMQDeviceProxy::getOutputChannelIndex(OutputSpec const& query, s
return ChannelIndex{ChannelIndex::INVALID};
}

ChannelIndex FairMQDeviceProxy::getForwardChannelIndex(header::DataHeader const& dh, size_t timeslice) const
void FairMQDeviceProxy::getMatchingForwardChannelIndexes(std::vector<ChannelIndex>& result, header::DataHeader const& dh, size_t timeslice) const
{
assert(mForwardRoutes.size() == mForwards.size());
// Notice we need to match against a data header and not against
// the InputMatcher, because an input might match something which
// is then rerouted to two different output routes, depending on the content.
// Also notice that we need to match against all the routes, because we
// might have multiple outputs routes (e.g. in the output proxy) with the same matcher.
bool dplChannelMatched = false;
for (size_t ri = 0; ri < mForwards.size(); ++ri) {
auto& route = mForwards[ri];

LOGP(debug, "matching: {} to route {}", dh, DataSpecUtils::describe(route.matcher));
if (DataSpecUtils::match(route.matcher, dh.dataOrigin, dh.dataDescription, dh.subSpecification) && ((timeslice % route.maxTimeslices) == route.timeslice)) {
return mForwardRoutes[ri].channel;
auto channelInfoIndex = mForwardRoutes[ri].channel;
auto& info = mForwardChannelInfos[channelInfoIndex.value];
// We need to make sure that we forward the same payload only once per channel.
if (info.channelType == ChannelAccountingType::DPL) {
if (dplChannelMatched) {
continue;
}
dplChannelMatched = true;
}
result.emplace_back(channelInfoIndex);
}
}
return ChannelIndex{ChannelIndex::INVALID};
// Remove duplicates, keeping the order of the channels.
std::unordered_set<int> numSet;
auto iter = std::stable_partition(result.begin(), result.end(),
[&](ChannelIndex n) { bool ret = !numSet.count(n.value); numSet.insert(n.value); return ret; }); // returns true if the item has not been "seen"
result.erase(iter, result.end());
}

ChannelIndex FairMQDeviceProxy::getOutputChannelIndexByName(std::string const& name) const
Expand Down