diff --git a/Framework/Core/include/Framework/DeviceState.h b/Framework/Core/include/Framework/DeviceState.h index 4fa72a84cad71..89961b3e92dc7 100644 --- a/Framework/Core/include/Framework/DeviceState.h +++ b/Framework/Core/include/Framework/DeviceState.h @@ -59,9 +59,11 @@ struct DeviceState { enum LogStreams : int { NO_LOG = 0, - DEVICE_LOG = 1 << 0, // Log for Data Processing Device activities. - COMPLETION_LOG = 1 << 1, // Log for the completion policy of the device. - MONITORING_SERVICE_LOG = 1 << 2, // Log for the monitoring service flushing. + DEVICE_LOG = 1 << 0, // Log for Data Processing Device activities. + COMPLETION_LOG = 1 << 1, // Log for the completion policy of the device. + MONITORING_SERVICE_LOG = 1 << 2, // Log for the monitoring service flushing. + DATA_PROCESSOR_CONTEXT_LOG = 1 << 3, // Log for the DataProcessorContext callbacks + STREAM_CONTEXT_LOG = 1 << 4, // Log for the StreamContext callbacks }; std::vector inputChannelInfos; diff --git a/Framework/Core/src/CommonServices.cxx b/Framework/Core/src/CommonServices.cxx index fd02a4c9ccb88..d264124fe25af 100644 --- a/Framework/Core/src/CommonServices.cxx +++ b/Framework/Core/src/CommonServices.cxx @@ -57,6 +57,7 @@ #include #include #include +#include "Framework/Signpost.h" #include #include @@ -79,6 +80,9 @@ using Value = o2::monitoring::tags::Value; #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Wpedantic" +O2_DECLARE_DYNAMIC_LOG(data_processor_context); +O2_DECLARE_DYNAMIC_LOG(stream_context); + namespace o2::framework { @@ -188,6 +192,8 @@ o2::framework::ServiceSpec CommonServices::streamContextSpec() } } if (didCreate == false && messageContext.didDispatch() == true) { + O2_SIGNPOST_ID_FROM_POINTER(cid, stream_context, service); + O2_SIGNPOST_EVENT_EMIT(stream_context, cid, "postProcessingCallbacks", "Data created out of band"); LOGP(debug, "Data created out of band"); return; } diff --git a/Framework/Core/src/DataProcessingContext.cxx b/Framework/Core/src/DataProcessingContext.cxx index 9c023e5021a9b..73223b3d6ff89 100644 --- a/Framework/Core/src/DataProcessingContext.cxx +++ b/Framework/Core/src/DataProcessingContext.cxx @@ -10,145 +10,143 @@ // or submit itself to any jurisdiction. #include "Framework/DataProcessingContext.h" +#include "Framework/DataProcessorSpec.h" +#include "Framework/Signpost.h" +O2_DECLARE_DYNAMIC_LOG(data_processor_context); namespace o2::framework { + +namespace { +template +void invokeAll(T& handles, char const* callbackName, o2::framework::DataProcessorSpec *spec, ARGS&... args) +{ + assert(callbackName); + O2_SIGNPOST_ID_FROM_POINTER(dpid, data_processor_context, spec); + // FIXME: for now spec is nullptr because we don't have a list of possible DataProcessorSpecs + // per device. + char const* dataProcessorName = spec ? spec->name.c_str() : "DataProcessorContext"; + O2_SIGNPOST_START(data_processor_context, dpid, "callbacks", "Starting %{public}s::%{public}s", dataProcessorName, callbackName); + for (auto& handle : handles) { + O2_SIGNPOST_ID_FROM_POINTER(cid, data_processor_context, handle.service); + O2_SIGNPOST_START(data_processor_context, cid, "callbacks", "Starting %{public}s::%{public}s::%{public}s", dataProcessorName, handle.spec.name.c_str(), callbackName); + handle.callback(args..., handle.service); + O2_SIGNPOST_END(data_processor_context, cid, "callbacks", "Ending %{public}s::%{public}s::%{public}s", dataProcessorName, handle.spec.name.c_str(), callbackName); + } + O2_SIGNPOST_END(data_processor_context, dpid, "callbacks", "Ending %{public}s::%{public}s", dataProcessorName, callbackName); +} +} + /// Invoke callbacks to be executed before every dangling check void DataProcessorContext::preProcessingCallbacks(ProcessingContext& ctx) { - for (auto& handle : preProcessingHandlers) { - LOGP(debug, "Invoking preDanglingCallback for service {}", handle.spec.name); - handle.callback(ctx, handle.service); - } + invokeAll(preProcessingHandlers, "preProcessingCallbacks", spec, ctx); } void DataProcessorContext::finaliseOutputsCallbacks(ProcessingContext& ctx) { - for (auto& handle : finaliseOutputsHandles) { - LOGP(debug, "Invoking postProcessingCallback for service {}", handle.spec.name); - handle.callback(ctx, handle.service); - } + invokeAll(finaliseOutputsHandles, "finaliseOutputsCallbacks", spec, ctx); } /// Invoke callbacks to be executed before every dangling check void DataProcessorContext::postProcessingCallbacks(ProcessingContext& ctx) { - for (auto& handle : postProcessingHandlers) { - LOGP(debug, "Invoking postProcessingCallback for service {}", handle.spec.name); - handle.callback(ctx, handle.service); - } + invokeAll(postProcessingHandlers, "postProcessingCallbacks", spec, ctx); } /// Invoke callbacks to be executed before every dangling check -void DataProcessorContext::preDanglingCallbacks(DanglingContext& danglingContext) +void DataProcessorContext::preDanglingCallbacks(DanglingContext& ctx) { - for (auto& handle : preDanglingHandles) { - LOGP(debug, "Invoking preDanglingCallback for service {}", handle.spec.name); - handle.callback(danglingContext, handle.service); - } + invokeAll(preDanglingHandles, "preDanglingCallbacks", spec, ctx); } /// Invoke callbacks to be executed after every dangling check -void DataProcessorContext::postDanglingCallbacks(DanglingContext& danglingContext) +void DataProcessorContext::postDanglingCallbacks(DanglingContext& ctx) { - for (auto& handle : postDanglingHandles) { - LOGP(debug, "Invoking postDanglingCallback for service {}", handle.spec.name); - handle.callback(danglingContext, handle.service); - } + invokeAll(postDanglingHandles, "postDanglingCallbacks", spec, ctx); } /// Invoke callbacks to be executed before every EOS user callback invokation -void DataProcessorContext::preEOSCallbacks(EndOfStreamContext& eosContext) +void DataProcessorContext::preEOSCallbacks(EndOfStreamContext& ctx) { - for (auto& handle : preEOSHandles) { - LOGP(detail, "Invoking preEosCallback for service {}", handle.spec.name); - handle.callback(eosContext, handle.service); - } + invokeAll(preEOSHandles, "preEOSCallbacks", spec, ctx); } /// Invoke callbacks to be executed after every EOS user callback invokation -void DataProcessorContext::postEOSCallbacks(EndOfStreamContext& eosContext) +void DataProcessorContext::postEOSCallbacks(EndOfStreamContext& ctx) { - for (auto& handle : postEOSHandles) { - LOGP(detail, "Invoking postEoSCallback for service {}", handle.spec.name); - handle.callback(eosContext, handle.service); - } + invokeAll(postEOSHandles, "postEOSCallbacks", spec, ctx); } /// Invoke callbacks to be executed after every data Dispatching -void DataProcessorContext::postDispatchingCallbacks(ProcessingContext& processContext) +void DataProcessorContext::postDispatchingCallbacks(ProcessingContext& ctx) { - for (auto& handle : postDispatchingHandles) { - LOGP(debug, "Invoking postDispatchingCallback for service {}", handle.spec.name); - handle.callback(processContext, handle.service); - } + invokeAll(postDispatchingHandles, "postDispatchingCallbacks", spec, ctx); } /// Invoke callbacks to be executed after every data Dispatching -void DataProcessorContext::postForwardingCallbacks(ProcessingContext& processContext) +void DataProcessorContext::postForwardingCallbacks(ProcessingContext& ctx) { - for (auto& handle : postForwardingHandles) { - LOGP(debug, "Invoking postForwardingCallback for service {}", handle.spec.name); - handle.callback(processContext, handle.service); - } + invokeAll(postForwardingHandles, "postForwardingCallbacks", spec, ctx); } /// Callbacks to be called in fair::mq::Device::PreRun() void DataProcessorContext::preStartCallbacks(ServiceRegistryRef ref) { - for (auto& handle : preStartHandles) { - LOGP(detail, "Invoking preStartCallback for service {}", handle.spec.name); - handle.callback(ref, handle.service); - } + invokeAll(preStartHandles, "preStartCallbacks", spec, ref); } void DataProcessorContext::postStopCallbacks(ServiceRegistryRef ref) { - // FIXME: we need to call the callback only once for the global services - /// I guess... - for (auto& handle : postStopHandles) { - LOGP(detail, "Invoking postStopCallback for service {}", handle.spec.name); - handle.callback(ref, handle.service); - } + invokeAll(postStopHandles, "postStopCallbacks", spec, ref); } /// Invoke callback to be executed on exit, in reverse order. void DataProcessorContext::preExitCallbacks(std::vector handles, ServiceRegistryRef ref) { + O2_SIGNPOST_ID_FROM_POINTER(dpid, data_processor_context, &ref); + O2_SIGNPOST_START(data_processor_context, dpid, "callbacks", "Starting DataProcessorContext preExitCallbacks"); // FIXME: we need to call the callback only once for the global services /// I guess... for (auto handle = handles.rbegin(); handle != handles.rend(); ++handle) { - LOGP(detail, "Invoking preExitCallback for service {}", handle->spec.name); + O2_SIGNPOST_ID_FROM_POINTER(cid, data_processor_context, handle->service); + O2_SIGNPOST_START(data_processor_context, cid, "callbacks", "Starting DataProcessorContext::preExitCallbacks for service %{public}s", handle->spec.name.c_str()); handle->callback(ref, handle->service); + O2_SIGNPOST_END(data_processor_context, cid, "callbacks", "Ending DataProcessorContext::preExitCallbacks for service %{public}s", handle->spec.name.c_str()); } + O2_SIGNPOST_END(data_processor_context, dpid, "callbacks", "Ending DataProcessorContext preExitCallbacks"); } /// Invoke callback to be executed on exit, in reverse order. void DataProcessorContext::preLoopCallbacks(ServiceRegistryRef ref) { - // FIXME: we need to call the callback only once for the global services - /// I guess... - LOGP(debug, "Invoking preLoopCallbacks"); - for (auto& handle : preLoopHandles) { - LOGP(debug, "Invoking preLoopCallback for service {}", handle.spec.name); - handle.callback(ref, handle.service); - } + invokeAll(preLoopHandles, "preLoopCallbacks", spec, ref); } void DataProcessorContext::domainInfoUpdatedCallback(ServiceRegistryRef ref, size_t oldestPossibleTimeslice, ChannelIndex channelIndex) { + O2_SIGNPOST_ID_FROM_POINTER(dpid, data_processor_context, this); + O2_SIGNPOST_START(data_processor_context, dpid, "callbacks", "Starting DataProcessorContext domainInfoUpdatedCallback"); for (auto& handle : domainInfoHandles) { - LOGP(debug, "Invoking domainInfoHandles for service {}", handle.spec.name); + O2_SIGNPOST_ID_FROM_POINTER(cid, data_processor_context, handle.service); + O2_SIGNPOST_START(data_processor_context, cid, "callbacks", "Starting DataProcessorContext::domainInfoUpdatedCallback for service %{public}s", handle.spec.name.c_str()); handle.callback(ref, oldestPossibleTimeslice, channelIndex); + O2_SIGNPOST_END(data_processor_context, cid, "callbacks", "Ending DataProcessorContext::domainInfoUpdatedCallback for service %{public}s", handle.spec.name.c_str()); } + O2_SIGNPOST_END(data_processor_context, dpid, "callbacks", "Ending DataProcessorContext domainInfoUpdatedCallback"); } void DataProcessorContext::preSendingMessagesCallbacks(ServiceRegistryRef ref, fair::mq::Parts& parts, ChannelIndex channelIndex) { + O2_SIGNPOST_ID_FROM_POINTER(dpid, data_processor_context, this); + O2_SIGNPOST_START(data_processor_context, dpid, "callbacks", "Starting DataProcessorContext preSendingMessagesCallbacks"); for (auto& handle : preSendingMessagesHandles) { - LOGP(debug, "Invoking preSending for service {}", handle.spec.name); + O2_SIGNPOST_ID_FROM_POINTER(cid, data_processor_context, handle.service); + O2_SIGNPOST_START(data_processor_context, cid, "callbacks", "Starting DataProcessorContext::preSendingMessagesCallbacks for service %{public}s", handle.spec.name.c_str()); handle.callback(ref, parts, channelIndex); + O2_SIGNPOST_END(data_processor_context, cid, "callbacks", "Ending DataProcessorContext::preSendingMessagesCallbacks for service %{public}s", handle.spec.name.c_str()); } + O2_SIGNPOST_END(data_processor_context, dpid, "callbacks", "Ending DataProcessorContext preSendingMessagesCallbacks"); } } // namespace o2::framework diff --git a/Framework/Core/src/StreamContext.cxx b/Framework/Core/src/StreamContext.cxx index c7f28a3dbde1a..2ce0c1c427cbd 100644 --- a/Framework/Core/src/StreamContext.cxx +++ b/Framework/Core/src/StreamContext.cxx @@ -11,6 +11,10 @@ #include "Framework/StreamContext.h" +#include "Framework/Signpost.h" + +O2_DECLARE_DYNAMIC_LOG(stream_context); + namespace o2::framework { @@ -49,12 +53,17 @@ void StreamContext::finaliseOutputsCallbacks(ProcessingContext& pcx) /// Invoke callbacks to be executed after every process method invokation void StreamContext::postProcessingCallbacks(ProcessingContext& pcx) { + O2_SIGNPOST_ID_FROM_POINTER(dpid, stream_context, &pcx); + O2_SIGNPOST_START(stream_context, dpid, "callbacks", "Starting StreamContext postProcessingCallbacks"); for (auto& handle : postProcessingHandles) { - LOG(debug) << "Invoking postProcessingCallbacks for " << handle.service; + O2_SIGNPOST_ID_FROM_POINTER(cid, stream_context, handle.service); + O2_SIGNPOST_START(stream_context, cid, "callbacks", "Starting StreamContext::postProcessingCallbacks for service %{public}s", handle.spec.name.c_str()); assert(handle.service); assert(handle.callback); handle.callback(pcx, handle.service); + O2_SIGNPOST_END(stream_context, cid, "callbacks", "Ending StreamContext::postProcessingCallbacks for service %{public}s", handle.spec.name.c_str()); } + O2_SIGNPOST_END(stream_context, dpid, "callbacks", "Ending StreamContext postProcessingCallbacks"); } /// Invoke callbacks to be executed before every EOS user callback invokation diff --git a/Framework/Core/src/WSDriverClient.cxx b/Framework/Core/src/WSDriverClient.cxx index ac2e3db41fcef..179b13bf91d76 100644 --- a/Framework/Core/src/WSDriverClient.cxx +++ b/Framework/Core/src/WSDriverClient.cxx @@ -24,6 +24,8 @@ O2_DECLARE_DYNAMIC_LOG(device); O2_DECLARE_DYNAMIC_LOG(completion); O2_DECLARE_DYNAMIC_LOG(monitoring_service); +O2_DECLARE_DYNAMIC_LOG(data_processor_context); +O2_DECLARE_DYNAMIC_LOG(stream_context); namespace o2::framework { @@ -184,12 +186,21 @@ void on_connect(uv_connect_t* connection, int status) } else { O2_LOG_DISABLE(completion); } - if ((state.logStreams & DeviceState::LogStreams::MONITORING_SERVICE_LOG) != 0) { O2_LOG_ENABLE(monitoring_service); } else { O2_LOG_DISABLE(monitoring_service); } + if ((state.logStreams & DeviceState::LogStreams::DATA_PROCESSOR_CONTEXT_LOG) != 0) { + O2_LOG_ENABLE(data_processor_context); + } else { + O2_LOG_DISABLE(data_processor_context); + } + if ((state.logStreams & DeviceState::LogStreams::STREAM_CONTEXT_LOG) != 0) { + O2_LOG_ENABLE(stream_context); + } else { + O2_LOG_DISABLE(stream_context); + } }); // Client will be filled in the line after. I can probably have a single diff --git a/Framework/GUISupport/src/FrameworkGUIDeviceInspector.cxx b/Framework/GUISupport/src/FrameworkGUIDeviceInspector.cxx index fe93ca6d0f07f..f54e74d6ac019 100644 --- a/Framework/GUISupport/src/FrameworkGUIDeviceInspector.cxx +++ b/Framework/GUISupport/src/FrameworkGUIDeviceInspector.cxx @@ -405,6 +405,8 @@ void displayDeviceInspector(DeviceSpec const& spec, logsChanged = ImGui::CheckboxFlags("Device", &control.logStreams, DeviceState::LogStreams::DEVICE_LOG); logsChanged = ImGui::CheckboxFlags("Completion", &control.logStreams, DeviceState::LogStreams::COMPLETION_LOG); logsChanged = ImGui::CheckboxFlags("Monitoring", &control.logStreams, DeviceState::LogStreams::MONITORING_SERVICE_LOG); + logsChanged = ImGui::CheckboxFlags("DataProcessorContext", &control.logStreams, DeviceState::LogStreams::DATA_PROCESSOR_CONTEXT_LOG); + logsChanged = ImGui::CheckboxFlags("StreamContext", &control.logStreams, DeviceState::LogStreams::STREAM_CONTEXT_LOG); if (logsChanged && control.controller) { std::string cmd = fmt::format("/log-streams {}", control.logStreams); control.controller->write(cmd.c_str(), cmd.size());