Skip to content

Commit

Permalink
DPL: improve signposts for adding parts
Browse files Browse the repository at this point in the history
  • Loading branch information
ktf committed May 25, 2024
1 parent bf62a54 commit 5b42afc
Showing 1 changed file with 10 additions and 4 deletions.
14 changes: 10 additions & 4 deletions Framework/Core/src/DataAllocator.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
#include "Framework/DeviceSpec.h"
#include "Framework/StreamContext.h"
#include "Framework/Signpost.h"
#include "Headers/DataHeader.h"
#include "Headers/DataHeaderHelpers.h"
#include "Headers/Stack.h"

#include <fairmq/Device.h>
Expand Down Expand Up @@ -131,13 +133,13 @@ void DataAllocator::addPartToContext(RouteIndex routeIndex, fair::mq::MessagePtr
{
auto headerMessage = headerMessageFromOutput(spec, routeIndex, serializationMethod, 0);
O2_SIGNPOST_ID_FROM_POINTER(pid, parts, headerMessage->GetData());
O2_SIGNPOST_START(parts, pid, "parts", "addPartToContext %p", headerMessage->GetData());

// FIXME: this is kind of ugly, we know that we can change the content of the
// header message because we have just created it, but the API declares it const
const DataHeader* cdh = o2::header::get<DataHeader*>(headerMessage->GetData());
auto* dh = const_cast<DataHeader*>(cdh);
dh->payloadSize = payloadMessage->GetSize();
O2_SIGNPOST_START(parts, pid, "parts", "addPartToContext %{public}s@%p %" PRIu64,
cdh ? fmt::format("{}/{}/{}", cdh->dataOrigin, cdh->dataDescription, cdh->subSpecification).c_str() : "unknown", headerMessage->GetData(), dh->payloadSize);
auto& context = mRegistry.get<MessageContext>();
// make_scoped creates the context object inside of a scope handler, since it goes out of
// scope immediately, the created object is scheduled and can be directly sent if the context
Expand All @@ -153,8 +155,10 @@ void DataAllocator::adopt(const Output& spec, std::string* ptr)
// the correct payload size is set later when sending the
// StringContext, see DataProcessor::doSend
auto header = headerMessageFromOutput(spec, routeIndex, o2::header::gSerializationMethodNone, 0);
const DataHeader* cdh = o2::header::get<DataHeader*>(header->GetData());
O2_SIGNPOST_ID_FROM_POINTER(pid, parts, header->GetData());
O2_SIGNPOST_START(parts, pid, "parts", "addPartToContext %p", header->GetData());
O2_SIGNPOST_START(parts, pid, "parts", "addPartToContext %{public}s@%p %" PRIu64,
cdh ? fmt::format("{}/{}/{}", cdh->dataOrigin, cdh->dataDescription, cdh->subSpecification).c_str() : "unknown", header->GetData(), cdh->payloadSize);
mRegistry.get<StringContext>().addString(std::move(header), std::move(payload), routeIndex);
assert(payload.get() == nullptr);
}
Expand Down Expand Up @@ -211,8 +215,10 @@ void DataAllocator::adopt(const Output& spec, LifetimeHolder<TableBuilder>& tb)
auto& timingInfo = mRegistry.get<TimingInfo>();
RouteIndex routeIndex = matchDataHeader(spec, timingInfo.timeslice);
auto header = headerMessageFromOutput(spec, routeIndex, o2::header::gSerializationMethodArrow, 0);
const DataHeader* cdh = o2::header::get<DataHeader*>(header->GetData());
O2_SIGNPOST_ID_FROM_POINTER(pid, parts, header->GetData());
O2_SIGNPOST_START(parts, pid, "parts", "adopt %p", header->GetData());
O2_SIGNPOST_START(parts, pid, "parts", "adopt %{public}s@%p %" PRIu64,
cdh ? fmt::format("{}/{}/{}", cdh->dataOrigin, cdh->dataDescription, cdh->subSpecification).c_str() : "unknown", header->GetData(), cdh->payloadSize);
auto& context = mRegistry.get<ArrowContext>();

auto creator = [transport = context.proxy().getOutputTransport(routeIndex)](size_t s) -> std::unique_ptr<fair::mq::Message> {
Expand Down

0 comments on commit 5b42afc

Please sign in to comment.