Skip to content

Commit

Permalink
DPL: add Signposts for some of the DataProcessing / Stream callbacks
Browse files Browse the repository at this point in the history
  • Loading branch information
ktf committed Jan 18, 2024
1 parent 4e22db3 commit 77dac53
Show file tree
Hide file tree
Showing 6 changed files with 95 additions and 67 deletions.
8 changes: 5 additions & 3 deletions Framework/Core/include/Framework/DeviceState.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,11 @@ struct DeviceState {

enum LogStreams : int {
NO_LOG = 0,
DEVICE_LOG = 1 << 0, // Log for Data Processing Device activities.
COMPLETION_LOG = 1 << 1, // Log for the completion policy of the device.
MONITORING_SERVICE_LOG = 1 << 2, // Log for the monitoring service flushing.
DEVICE_LOG = 1 << 0, // Log for Data Processing Device activities.
COMPLETION_LOG = 1 << 1, // Log for the completion policy of the device.
MONITORING_SERVICE_LOG = 1 << 2, // Log for the monitoring service flushing.
DATA_PROCESSOR_CONTEXT_LOG = 1 << 3, // Log for the DataProcessorContext callbacks
STREAM_CONTEXT_LOG = 1 << 4, // Log for the StreamContext callbacks
};

std::vector<InputChannelInfo> inputChannelInfos;
Expand Down
6 changes: 6 additions & 0 deletions Framework/Core/src/CommonServices.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
#include <Configuration/ConfigurationInterface.h>
#include <Configuration/ConfigurationFactory.h>
#include <Monitoring/MonitoringFactory.h>
#include "Framework/Signpost.h"

#include <fairmq/Device.h>
#include <fairmq/shmem/Monitor.h>
Expand All @@ -79,6 +80,9 @@ using Value = o2::monitoring::tags::Value;
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wpedantic"

O2_DECLARE_DYNAMIC_LOG(data_processor_context);
O2_DECLARE_DYNAMIC_LOG(stream_context);

namespace o2::framework
{

Expand Down Expand Up @@ -188,6 +192,8 @@ o2::framework::ServiceSpec CommonServices::streamContextSpec()
}
}
if (didCreate == false && messageContext.didDispatch() == true) {
O2_SIGNPOST_ID_FROM_POINTER(cid, stream_context, service);
O2_SIGNPOST_EVENT_EMIT(stream_context, cid, "postProcessingCallbacks", "Data created out of band");
LOGP(debug, "Data created out of band");
return;
}
Expand Down
122 changes: 60 additions & 62 deletions Framework/Core/src/DataProcessingContext.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -10,145 +10,143 @@
// or submit itself to any jurisdiction.

#include "Framework/DataProcessingContext.h"
#include "Framework/DataProcessorSpec.h"
#include "Framework/Signpost.h"

O2_DECLARE_DYNAMIC_LOG(data_processor_context);
namespace o2::framework
{

namespace {
template <typename T, typename... ARGS>
void invokeAll(T& handles, char const* callbackName, o2::framework::DataProcessorSpec *spec, ARGS&... args)
{
assert(callbackName);
O2_SIGNPOST_ID_FROM_POINTER(dpid, data_processor_context, spec);
// FIXME: for now spec is nullptr because we don't have a list of possible DataProcessorSpecs
// per device.
char const* dataProcessorName = spec ? spec->name.c_str() : "DataProcessorContext";
O2_SIGNPOST_START(data_processor_context, dpid, "callbacks", "Starting %{public}s::%{public}s", dataProcessorName, callbackName);
for (auto& handle : handles) {
O2_SIGNPOST_ID_FROM_POINTER(cid, data_processor_context, handle.service);
O2_SIGNPOST_START(data_processor_context, cid, "callbacks", "Starting %{public}s::%{public}s::%{public}s", dataProcessorName, handle.spec.name.c_str(), callbackName);
handle.callback(args..., handle.service);
O2_SIGNPOST_END(data_processor_context, cid, "callbacks", "Ending %{public}s::%{public}s::%{public}s", dataProcessorName, handle.spec.name.c_str(), callbackName);
}
O2_SIGNPOST_END(data_processor_context, dpid, "callbacks", "Ending %{public}s::%{public}s", dataProcessorName, callbackName);
}
}

/// Invoke callbacks to be executed before every dangling check
void DataProcessorContext::preProcessingCallbacks(ProcessingContext& ctx)
{
for (auto& handle : preProcessingHandlers) {
LOGP(debug, "Invoking preDanglingCallback for service {}", handle.spec.name);
handle.callback(ctx, handle.service);
}
invokeAll(preProcessingHandlers, "preProcessingCallbacks", spec, ctx);
}

void DataProcessorContext::finaliseOutputsCallbacks(ProcessingContext& ctx)
{
for (auto& handle : finaliseOutputsHandles) {
LOGP(debug, "Invoking postProcessingCallback for service {}", handle.spec.name);
handle.callback(ctx, handle.service);
}
invokeAll(finaliseOutputsHandles, "finaliseOutputsCallbacks", spec, ctx);
}

/// Invoke callbacks to be executed before every dangling check
void DataProcessorContext::postProcessingCallbacks(ProcessingContext& ctx)
{
for (auto& handle : postProcessingHandlers) {
LOGP(debug, "Invoking postProcessingCallback for service {}", handle.spec.name);
handle.callback(ctx, handle.service);
}
invokeAll(postProcessingHandlers, "postProcessingCallbacks", spec, ctx);
}

/// Invoke callbacks to be executed before every dangling check
void DataProcessorContext::preDanglingCallbacks(DanglingContext& danglingContext)
void DataProcessorContext::preDanglingCallbacks(DanglingContext& ctx)
{
for (auto& handle : preDanglingHandles) {
LOGP(debug, "Invoking preDanglingCallback for service {}", handle.spec.name);
handle.callback(danglingContext, handle.service);
}
invokeAll(preDanglingHandles, "preDanglingCallbacks", spec, ctx);
}

/// Invoke callbacks to be executed after every dangling check
void DataProcessorContext::postDanglingCallbacks(DanglingContext& danglingContext)
void DataProcessorContext::postDanglingCallbacks(DanglingContext& ctx)
{
for (auto& handle : postDanglingHandles) {
LOGP(debug, "Invoking postDanglingCallback for service {}", handle.spec.name);
handle.callback(danglingContext, handle.service);
}
invokeAll(postDanglingHandles, "postDanglingCallbacks", spec, ctx);
}

/// Invoke callbacks to be executed before every EOS user callback invokation
void DataProcessorContext::preEOSCallbacks(EndOfStreamContext& eosContext)
void DataProcessorContext::preEOSCallbacks(EndOfStreamContext& ctx)
{
for (auto& handle : preEOSHandles) {
LOGP(detail, "Invoking preEosCallback for service {}", handle.spec.name);
handle.callback(eosContext, handle.service);
}
invokeAll(preEOSHandles, "preEOSCallbacks", spec, ctx);
}

/// Invoke callbacks to be executed after every EOS user callback invokation
void DataProcessorContext::postEOSCallbacks(EndOfStreamContext& eosContext)
void DataProcessorContext::postEOSCallbacks(EndOfStreamContext& ctx)
{
for (auto& handle : postEOSHandles) {
LOGP(detail, "Invoking postEoSCallback for service {}", handle.spec.name);
handle.callback(eosContext, handle.service);
}
invokeAll(postEOSHandles, "postEOSCallbacks", spec, ctx);
}

/// Invoke callbacks to be executed after every data Dispatching
void DataProcessorContext::postDispatchingCallbacks(ProcessingContext& processContext)
void DataProcessorContext::postDispatchingCallbacks(ProcessingContext& ctx)
{
for (auto& handle : postDispatchingHandles) {
LOGP(debug, "Invoking postDispatchingCallback for service {}", handle.spec.name);
handle.callback(processContext, handle.service);
}
invokeAll(postDispatchingHandles, "postDispatchingCallbacks", spec, ctx);
}

/// Invoke callbacks to be executed after every data Dispatching
void DataProcessorContext::postForwardingCallbacks(ProcessingContext& processContext)
void DataProcessorContext::postForwardingCallbacks(ProcessingContext& ctx)
{
for (auto& handle : postForwardingHandles) {
LOGP(debug, "Invoking postForwardingCallback for service {}", handle.spec.name);
handle.callback(processContext, handle.service);
}
invokeAll(postForwardingHandles, "postForwardingCallbacks", spec, ctx);
}

/// Callbacks to be called in fair::mq::Device::PreRun()
void DataProcessorContext::preStartCallbacks(ServiceRegistryRef ref)
{
for (auto& handle : preStartHandles) {
LOGP(detail, "Invoking preStartCallback for service {}", handle.spec.name);
handle.callback(ref, handle.service);
}
invokeAll(preStartHandles, "preStartCallbacks", spec, ref);
}

void DataProcessorContext::postStopCallbacks(ServiceRegistryRef ref)
{
// FIXME: we need to call the callback only once for the global services
/// I guess...
for (auto& handle : postStopHandles) {
LOGP(detail, "Invoking postStopCallback for service {}", handle.spec.name);
handle.callback(ref, handle.service);
}
invokeAll(postStopHandles, "postStopCallbacks", spec, ref);
}

/// Invoke callback to be executed on exit, in reverse order.
void DataProcessorContext::preExitCallbacks(std::vector<ServiceExitHandle> handles, ServiceRegistryRef ref)
{
O2_SIGNPOST_ID_FROM_POINTER(dpid, data_processor_context, &ref);
O2_SIGNPOST_START(data_processor_context, dpid, "callbacks", "Starting DataProcessorContext preExitCallbacks");
// FIXME: we need to call the callback only once for the global services
/// I guess...
for (auto handle = handles.rbegin(); handle != handles.rend(); ++handle) {
LOGP(detail, "Invoking preExitCallback for service {}", handle->spec.name);
O2_SIGNPOST_ID_FROM_POINTER(cid, data_processor_context, handle->service);
O2_SIGNPOST_START(data_processor_context, cid, "callbacks", "Starting DataProcessorContext::preExitCallbacks for service %{public}s", handle->spec.name.c_str());
handle->callback(ref, handle->service);
O2_SIGNPOST_END(data_processor_context, cid, "callbacks", "Ending DataProcessorContext::preExitCallbacks for service %{public}s", handle->spec.name.c_str());
}
O2_SIGNPOST_END(data_processor_context, dpid, "callbacks", "Ending DataProcessorContext preExitCallbacks");
}

/// Invoke callback to be executed on exit, in reverse order.
void DataProcessorContext::preLoopCallbacks(ServiceRegistryRef ref)
{
// FIXME: we need to call the callback only once for the global services
/// I guess...
LOGP(debug, "Invoking preLoopCallbacks");
for (auto& handle : preLoopHandles) {
LOGP(debug, "Invoking preLoopCallback for service {}", handle.spec.name);
handle.callback(ref, handle.service);
}
invokeAll(preLoopHandles, "preLoopCallbacks", spec, ref);
}

void DataProcessorContext::domainInfoUpdatedCallback(ServiceRegistryRef ref, size_t oldestPossibleTimeslice, ChannelIndex channelIndex)
{
O2_SIGNPOST_ID_FROM_POINTER(dpid, data_processor_context, this);
O2_SIGNPOST_START(data_processor_context, dpid, "callbacks", "Starting DataProcessorContext domainInfoUpdatedCallback");
for (auto& handle : domainInfoHandles) {
LOGP(debug, "Invoking domainInfoHandles for service {}", handle.spec.name);
O2_SIGNPOST_ID_FROM_POINTER(cid, data_processor_context, handle.service);
O2_SIGNPOST_START(data_processor_context, cid, "callbacks", "Starting DataProcessorContext::domainInfoUpdatedCallback for service %{public}s", handle.spec.name.c_str());
handle.callback(ref, oldestPossibleTimeslice, channelIndex);
O2_SIGNPOST_END(data_processor_context, cid, "callbacks", "Ending DataProcessorContext::domainInfoUpdatedCallback for service %{public}s", handle.spec.name.c_str());
}
O2_SIGNPOST_END(data_processor_context, dpid, "callbacks", "Ending DataProcessorContext domainInfoUpdatedCallback");
}

void DataProcessorContext::preSendingMessagesCallbacks(ServiceRegistryRef ref, fair::mq::Parts& parts, ChannelIndex channelIndex)
{
O2_SIGNPOST_ID_FROM_POINTER(dpid, data_processor_context, this);
O2_SIGNPOST_START(data_processor_context, dpid, "callbacks", "Starting DataProcessorContext preSendingMessagesCallbacks");
for (auto& handle : preSendingMessagesHandles) {
LOGP(debug, "Invoking preSending for service {}", handle.spec.name);
O2_SIGNPOST_ID_FROM_POINTER(cid, data_processor_context, handle.service);
O2_SIGNPOST_START(data_processor_context, cid, "callbacks", "Starting DataProcessorContext::preSendingMessagesCallbacks for service %{public}s", handle.spec.name.c_str());
handle.callback(ref, parts, channelIndex);
O2_SIGNPOST_END(data_processor_context, cid, "callbacks", "Ending DataProcessorContext::preSendingMessagesCallbacks for service %{public}s", handle.spec.name.c_str());
}
O2_SIGNPOST_END(data_processor_context, dpid, "callbacks", "Ending DataProcessorContext preSendingMessagesCallbacks");
}

} // namespace o2::framework
11 changes: 10 additions & 1 deletion Framework/Core/src/StreamContext.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@

#include "Framework/StreamContext.h"

#include "Framework/Signpost.h"

O2_DECLARE_DYNAMIC_LOG(stream_context);

namespace o2::framework
{

Expand Down Expand Up @@ -49,12 +53,17 @@ void StreamContext::finaliseOutputsCallbacks(ProcessingContext& pcx)
/// Invoke callbacks to be executed after every process method invokation
void StreamContext::postProcessingCallbacks(ProcessingContext& pcx)
{
O2_SIGNPOST_ID_FROM_POINTER(dpid, stream_context, &pcx);
O2_SIGNPOST_START(stream_context, dpid, "callbacks", "Starting StreamContext postProcessingCallbacks");
for (auto& handle : postProcessingHandles) {
LOG(debug) << "Invoking postProcessingCallbacks for " << handle.service;
O2_SIGNPOST_ID_FROM_POINTER(cid, stream_context, handle.service);
O2_SIGNPOST_START(stream_context, cid, "callbacks", "Starting StreamContext::postProcessingCallbacks for service %{public}s", handle.spec.name.c_str());
assert(handle.service);
assert(handle.callback);
handle.callback(pcx, handle.service);
O2_SIGNPOST_END(stream_context, cid, "callbacks", "Ending StreamContext::postProcessingCallbacks for service %{public}s", handle.spec.name.c_str());
}
O2_SIGNPOST_END(stream_context, dpid, "callbacks", "Ending StreamContext postProcessingCallbacks");
}

/// Invoke callbacks to be executed before every EOS user callback invokation
Expand Down
13 changes: 12 additions & 1 deletion Framework/Core/src/WSDriverClient.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
O2_DECLARE_DYNAMIC_LOG(device);
O2_DECLARE_DYNAMIC_LOG(completion);
O2_DECLARE_DYNAMIC_LOG(monitoring_service);
O2_DECLARE_DYNAMIC_LOG(data_processor_context);
O2_DECLARE_DYNAMIC_LOG(stream_context);

namespace o2::framework
{
Expand Down Expand Up @@ -184,12 +186,21 @@ void on_connect(uv_connect_t* connection, int status)
} else {
O2_LOG_DISABLE(completion);
}

if ((state.logStreams & DeviceState::LogStreams::MONITORING_SERVICE_LOG) != 0) {
O2_LOG_ENABLE(monitoring_service);
} else {
O2_LOG_DISABLE(monitoring_service);
}
if ((state.logStreams & DeviceState::LogStreams::DATA_PROCESSOR_CONTEXT_LOG) != 0) {
O2_LOG_ENABLE(data_processor_context);
} else {
O2_LOG_DISABLE(data_processor_context);
}
if ((state.logStreams & DeviceState::LogStreams::STREAM_CONTEXT_LOG) != 0) {
O2_LOG_ENABLE(stream_context);
} else {
O2_LOG_DISABLE(stream_context);
}
});

// Client will be filled in the line after. I can probably have a single
Expand Down
2 changes: 2 additions & 0 deletions Framework/GUISupport/src/FrameworkGUIDeviceInspector.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,8 @@ void displayDeviceInspector(DeviceSpec const& spec,
logsChanged = ImGui::CheckboxFlags("Device", &control.logStreams, DeviceState::LogStreams::DEVICE_LOG);
logsChanged = ImGui::CheckboxFlags("Completion", &control.logStreams, DeviceState::LogStreams::COMPLETION_LOG);
logsChanged = ImGui::CheckboxFlags("Monitoring", &control.logStreams, DeviceState::LogStreams::MONITORING_SERVICE_LOG);
logsChanged = ImGui::CheckboxFlags("DataProcessorContext", &control.logStreams, DeviceState::LogStreams::DATA_PROCESSOR_CONTEXT_LOG);
logsChanged = ImGui::CheckboxFlags("StreamContext", &control.logStreams, DeviceState::LogStreams::STREAM_CONTEXT_LOG);
if (logsChanged && control.controller) {
std::string cmd = fmt::format("/log-streams {}", control.logStreams);
control.controller->write(cmd.c_str(), cmd.size());
Expand Down

0 comments on commit 77dac53

Please sign in to comment.