Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Please consider the following formatting changes to #12553 #247

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions Framework/Core/include/Framework/DeviceState.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,11 @@ struct DeviceState {

enum LogStreams : int {
NO_LOG = 0,
DEVICE_LOG = 1 << 0, // Log for Data Processing Device activities.
COMPLETION_LOG = 1 << 1, // Log for the completion policy of the device.
MONITORING_SERVICE_LOG = 1 << 2, // Log for the monitoring service flushing.
DEVICE_LOG = 1 << 0, // Log for Data Processing Device activities.
COMPLETION_LOG = 1 << 1, // Log for the completion policy of the device.
MONITORING_SERVICE_LOG = 1 << 2, // Log for the monitoring service flushing.
DATA_PROCESSOR_CONTEXT_LOG = 1 << 3, // Log for the DataProcessorContext callbacks
STREAM_CONTEXT_LOG = 1 << 4, // Log for the StreamContext callbacks
};

std::vector<InputChannelInfo> inputChannelInfos;
Expand Down
6 changes: 6 additions & 0 deletions Framework/Core/src/CommonServices.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
#include <Configuration/ConfigurationInterface.h>
#include <Configuration/ConfigurationFactory.h>
#include <Monitoring/MonitoringFactory.h>
#include "Framework/Signpost.h"

#include <fairmq/Device.h>
#include <fairmq/shmem/Monitor.h>
Expand All @@ -79,6 +80,9 @@ using Value = o2::monitoring::tags::Value;
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wpedantic"

O2_DECLARE_DYNAMIC_LOG(data_processor_context);
O2_DECLARE_DYNAMIC_LOG(stream_context);

namespace o2::framework
{

Expand Down Expand Up @@ -188,6 +192,8 @@ o2::framework::ServiceSpec CommonServices::streamContextSpec()
}
}
if (didCreate == false && messageContext.didDispatch() == true) {
O2_SIGNPOST_ID_FROM_POINTER(cid, stream_context, service);
O2_SIGNPOST_EVENT_EMIT(stream_context, cid, "postProcessingCallbacks", "Data created out of band");
LOGP(debug, "Data created out of band");
return;
}
Expand Down
123 changes: 61 additions & 62 deletions Framework/Core/src/DataProcessingContext.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -10,145 +10,144 @@
// or submit itself to any jurisdiction.

#include "Framework/DataProcessingContext.h"
#include "Framework/DataProcessorSpec.h"
#include "Framework/Signpost.h"

O2_DECLARE_DYNAMIC_LOG(data_processor_context);
namespace o2::framework
{

namespace
{
template <typename T, typename... ARGS>
void invokeAll(T& handles, char const* callbackName, o2::framework::DataProcessorSpec* spec, ARGS&... args)
{
assert(callbackName);
O2_SIGNPOST_ID_FROM_POINTER(dpid, data_processor_context, spec);
// FIXME: for now spec is nullptr because we don't have a list of possible DataProcessorSpecs
// per device.
char const* dataProcessorName = spec ? spec->name.c_str() : "DataProcessorContext";
O2_SIGNPOST_START(data_processor_context, dpid, "callbacks", "Starting %{public}s::%{public}s", dataProcessorName, callbackName);
for (auto& handle : handles) {
O2_SIGNPOST_ID_FROM_POINTER(cid, data_processor_context, handle.service);
O2_SIGNPOST_START(data_processor_context, cid, "callbacks", "Starting %{public}s::%{public}s::%{public}s", dataProcessorName, handle.spec.name.c_str(), callbackName);
handle.callback(args..., handle.service);
O2_SIGNPOST_END(data_processor_context, cid, "callbacks", "Ending %{public}s::%{public}s::%{public}s", dataProcessorName, handle.spec.name.c_str(), callbackName);
}
O2_SIGNPOST_END(data_processor_context, dpid, "callbacks", "Ending %{public}s::%{public}s", dataProcessorName, callbackName);
}
} // namespace

/// Invoke callbacks to be executed before every dangling check
void DataProcessorContext::preProcessingCallbacks(ProcessingContext& ctx)
{
for (auto& handle : preProcessingHandlers) {
LOGP(debug, "Invoking preDanglingCallback for service {}", handle.spec.name);
handle.callback(ctx, handle.service);
}
invokeAll(preProcessingHandlers, "preProcessingCallbacks", spec, ctx);
}

void DataProcessorContext::finaliseOutputsCallbacks(ProcessingContext& ctx)
{
for (auto& handle : finaliseOutputsHandles) {
LOGP(debug, "Invoking postProcessingCallback for service {}", handle.spec.name);
handle.callback(ctx, handle.service);
}
invokeAll(finaliseOutputsHandles, "finaliseOutputsCallbacks", spec, ctx);
}

/// Invoke callbacks to be executed before every dangling check
void DataProcessorContext::postProcessingCallbacks(ProcessingContext& ctx)
{
for (auto& handle : postProcessingHandlers) {
LOGP(debug, "Invoking postProcessingCallback for service {}", handle.spec.name);
handle.callback(ctx, handle.service);
}
invokeAll(postProcessingHandlers, "postProcessingCallbacks", spec, ctx);
}

/// Invoke callbacks to be executed before every dangling check
void DataProcessorContext::preDanglingCallbacks(DanglingContext& danglingContext)
void DataProcessorContext::preDanglingCallbacks(DanglingContext& ctx)
{
for (auto& handle : preDanglingHandles) {
LOGP(debug, "Invoking preDanglingCallback for service {}", handle.spec.name);
handle.callback(danglingContext, handle.service);
}
invokeAll(preDanglingHandles, "preDanglingCallbacks", spec, ctx);
}

/// Invoke callbacks to be executed after every dangling check
void DataProcessorContext::postDanglingCallbacks(DanglingContext& danglingContext)
void DataProcessorContext::postDanglingCallbacks(DanglingContext& ctx)
{
for (auto& handle : postDanglingHandles) {
LOGP(debug, "Invoking postDanglingCallback for service {}", handle.spec.name);
handle.callback(danglingContext, handle.service);
}
invokeAll(postDanglingHandles, "postDanglingCallbacks", spec, ctx);
}

/// Invoke callbacks to be executed before every EOS user callback invokation
void DataProcessorContext::preEOSCallbacks(EndOfStreamContext& eosContext)
void DataProcessorContext::preEOSCallbacks(EndOfStreamContext& ctx)
{
for (auto& handle : preEOSHandles) {
LOGP(detail, "Invoking preEosCallback for service {}", handle.spec.name);
handle.callback(eosContext, handle.service);
}
invokeAll(preEOSHandles, "preEOSCallbacks", spec, ctx);
}

/// Invoke callbacks to be executed after every EOS user callback invokation
void DataProcessorContext::postEOSCallbacks(EndOfStreamContext& eosContext)
void DataProcessorContext::postEOSCallbacks(EndOfStreamContext& ctx)
{
for (auto& handle : postEOSHandles) {
LOGP(detail, "Invoking postEoSCallback for service {}", handle.spec.name);
handle.callback(eosContext, handle.service);
}
invokeAll(postEOSHandles, "postEOSCallbacks", spec, ctx);
}

/// Invoke callbacks to be executed after every data Dispatching
void DataProcessorContext::postDispatchingCallbacks(ProcessingContext& processContext)
void DataProcessorContext::postDispatchingCallbacks(ProcessingContext& ctx)
{
for (auto& handle : postDispatchingHandles) {
LOGP(debug, "Invoking postDispatchingCallback for service {}", handle.spec.name);
handle.callback(processContext, handle.service);
}
invokeAll(postDispatchingHandles, "postDispatchingCallbacks", spec, ctx);
}

/// Invoke callbacks to be executed after every data Dispatching
void DataProcessorContext::postForwardingCallbacks(ProcessingContext& processContext)
void DataProcessorContext::postForwardingCallbacks(ProcessingContext& ctx)
{
for (auto& handle : postForwardingHandles) {
LOGP(debug, "Invoking postForwardingCallback for service {}", handle.spec.name);
handle.callback(processContext, handle.service);
}
invokeAll(postForwardingHandles, "postForwardingCallbacks", spec, ctx);
}

/// Callbacks to be called in fair::mq::Device::PreRun()
void DataProcessorContext::preStartCallbacks(ServiceRegistryRef ref)
{
for (auto& handle : preStartHandles) {
LOGP(detail, "Invoking preStartCallback for service {}", handle.spec.name);
handle.callback(ref, handle.service);
}
invokeAll(preStartHandles, "preStartCallbacks", spec, ref);
}

void DataProcessorContext::postStopCallbacks(ServiceRegistryRef ref)
{
// FIXME: we need to call the callback only once for the global services
/// I guess...
for (auto& handle : postStopHandles) {
LOGP(detail, "Invoking postStopCallback for service {}", handle.spec.name);
handle.callback(ref, handle.service);
}
invokeAll(postStopHandles, "postStopCallbacks", spec, ref);
}

/// Invoke callback to be executed on exit, in reverse order.
void DataProcessorContext::preExitCallbacks(std::vector<ServiceExitHandle> handles, ServiceRegistryRef ref)
{
O2_SIGNPOST_ID_FROM_POINTER(dpid, data_processor_context, &ref);
O2_SIGNPOST_START(data_processor_context, dpid, "callbacks", "Starting DataProcessorContext preExitCallbacks");
// FIXME: we need to call the callback only once for the global services
/// I guess...
for (auto handle = handles.rbegin(); handle != handles.rend(); ++handle) {
LOGP(detail, "Invoking preExitCallback for service {}", handle->spec.name);
O2_SIGNPOST_ID_FROM_POINTER(cid, data_processor_context, handle->service);
O2_SIGNPOST_START(data_processor_context, cid, "callbacks", "Starting DataProcessorContext::preExitCallbacks for service %{public}s", handle->spec.name.c_str());
handle->callback(ref, handle->service);
O2_SIGNPOST_END(data_processor_context, cid, "callbacks", "Ending DataProcessorContext::preExitCallbacks for service %{public}s", handle->spec.name.c_str());
}
O2_SIGNPOST_END(data_processor_context, dpid, "callbacks", "Ending DataProcessorContext preExitCallbacks");
}

/// Invoke callback to be executed on exit, in reverse order.
void DataProcessorContext::preLoopCallbacks(ServiceRegistryRef ref)
{
// FIXME: we need to call the callback only once for the global services
/// I guess...
LOGP(debug, "Invoking preLoopCallbacks");
for (auto& handle : preLoopHandles) {
LOGP(debug, "Invoking preLoopCallback for service {}", handle.spec.name);
handle.callback(ref, handle.service);
}
invokeAll(preLoopHandles, "preLoopCallbacks", spec, ref);
}

void DataProcessorContext::domainInfoUpdatedCallback(ServiceRegistryRef ref, size_t oldestPossibleTimeslice, ChannelIndex channelIndex)
{
O2_SIGNPOST_ID_FROM_POINTER(dpid, data_processor_context, this);
O2_SIGNPOST_START(data_processor_context, dpid, "callbacks", "Starting DataProcessorContext domainInfoUpdatedCallback");
for (auto& handle : domainInfoHandles) {
LOGP(debug, "Invoking domainInfoHandles for service {}", handle.spec.name);
O2_SIGNPOST_ID_FROM_POINTER(cid, data_processor_context, handle.service);
O2_SIGNPOST_START(data_processor_context, cid, "callbacks", "Starting DataProcessorContext::domainInfoUpdatedCallback for service %{public}s", handle.spec.name.c_str());
handle.callback(ref, oldestPossibleTimeslice, channelIndex);
O2_SIGNPOST_END(data_processor_context, cid, "callbacks", "Ending DataProcessorContext::domainInfoUpdatedCallback for service %{public}s", handle.spec.name.c_str());
}
O2_SIGNPOST_END(data_processor_context, dpid, "callbacks", "Ending DataProcessorContext domainInfoUpdatedCallback");
}

void DataProcessorContext::preSendingMessagesCallbacks(ServiceRegistryRef ref, fair::mq::Parts& parts, ChannelIndex channelIndex)
{
O2_SIGNPOST_ID_FROM_POINTER(dpid, data_processor_context, this);
O2_SIGNPOST_START(data_processor_context, dpid, "callbacks", "Starting DataProcessorContext preSendingMessagesCallbacks");
for (auto& handle : preSendingMessagesHandles) {
LOGP(debug, "Invoking preSending for service {}", handle.spec.name);
O2_SIGNPOST_ID_FROM_POINTER(cid, data_processor_context, handle.service);
O2_SIGNPOST_START(data_processor_context, cid, "callbacks", "Starting DataProcessorContext::preSendingMessagesCallbacks for service %{public}s", handle.spec.name.c_str());
handle.callback(ref, parts, channelIndex);
O2_SIGNPOST_END(data_processor_context, cid, "callbacks", "Ending DataProcessorContext::preSendingMessagesCallbacks for service %{public}s", handle.spec.name.c_str());
}
O2_SIGNPOST_END(data_processor_context, dpid, "callbacks", "Ending DataProcessorContext preSendingMessagesCallbacks");
}

} // namespace o2::framework
11 changes: 10 additions & 1 deletion Framework/Core/src/StreamContext.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@

#include "Framework/StreamContext.h"

#include "Framework/Signpost.h"

O2_DECLARE_DYNAMIC_LOG(stream_context);

namespace o2::framework
{

Expand Down Expand Up @@ -49,12 +53,17 @@ void StreamContext::finaliseOutputsCallbacks(ProcessingContext& pcx)
/// Invoke callbacks to be executed after every process method invokation
void StreamContext::postProcessingCallbacks(ProcessingContext& pcx)
{
O2_SIGNPOST_ID_FROM_POINTER(dpid, stream_context, &pcx);
O2_SIGNPOST_START(stream_context, dpid, "callbacks", "Starting StreamContext postProcessingCallbacks");
for (auto& handle : postProcessingHandles) {
LOG(debug) << "Invoking postProcessingCallbacks for " << handle.service;
O2_SIGNPOST_ID_FROM_POINTER(cid, stream_context, handle.service);
O2_SIGNPOST_START(stream_context, cid, "callbacks", "Starting StreamContext::postProcessingCallbacks for service %{public}s", handle.spec.name.c_str());
assert(handle.service);
assert(handle.callback);
handle.callback(pcx, handle.service);
O2_SIGNPOST_END(stream_context, cid, "callbacks", "Ending StreamContext::postProcessingCallbacks for service %{public}s", handle.spec.name.c_str());
}
O2_SIGNPOST_END(stream_context, dpid, "callbacks", "Ending StreamContext postProcessingCallbacks");
}

/// Invoke callbacks to be executed before every EOS user callback invokation
Expand Down
13 changes: 12 additions & 1 deletion Framework/Core/src/WSDriverClient.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
O2_DECLARE_DYNAMIC_LOG(device);
O2_DECLARE_DYNAMIC_LOG(completion);
O2_DECLARE_DYNAMIC_LOG(monitoring_service);
O2_DECLARE_DYNAMIC_LOG(data_processor_context);
O2_DECLARE_DYNAMIC_LOG(stream_context);

namespace o2::framework
{
Expand Down Expand Up @@ -184,12 +186,21 @@ void on_connect(uv_connect_t* connection, int status)
} else {
O2_LOG_DISABLE(completion);
}

if ((state.logStreams & DeviceState::LogStreams::MONITORING_SERVICE_LOG) != 0) {
O2_LOG_ENABLE(monitoring_service);
} else {
O2_LOG_DISABLE(monitoring_service);
}
if ((state.logStreams & DeviceState::LogStreams::DATA_PROCESSOR_CONTEXT_LOG) != 0) {
O2_LOG_ENABLE(data_processor_context);
} else {
O2_LOG_DISABLE(data_processor_context);
}
if ((state.logStreams & DeviceState::LogStreams::STREAM_CONTEXT_LOG) != 0) {
O2_LOG_ENABLE(stream_context);
} else {
O2_LOG_DISABLE(stream_context);
}
});

// Client will be filled in the line after. I can probably have a single
Expand Down
2 changes: 2 additions & 0 deletions Framework/GUISupport/src/FrameworkGUIDeviceInspector.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,8 @@ void displayDeviceInspector(DeviceSpec const& spec,
logsChanged = ImGui::CheckboxFlags("Device", &control.logStreams, DeviceState::LogStreams::DEVICE_LOG);
logsChanged = ImGui::CheckboxFlags("Completion", &control.logStreams, DeviceState::LogStreams::COMPLETION_LOG);
logsChanged = ImGui::CheckboxFlags("Monitoring", &control.logStreams, DeviceState::LogStreams::MONITORING_SERVICE_LOG);
logsChanged = ImGui::CheckboxFlags("DataProcessorContext", &control.logStreams, DeviceState::LogStreams::DATA_PROCESSOR_CONTEXT_LOG);
logsChanged = ImGui::CheckboxFlags("StreamContext", &control.logStreams, DeviceState::LogStreams::STREAM_CONTEXT_LOG);
if (logsChanged && control.controller) {
std::string cmd = fmt::format("/log-streams {}", control.logStreams);
control.controller->write(cmd.c_str(), cmd.size());
Expand Down