diff --git a/Framework/Core/include/Framework/DataProcessingStats.h b/Framework/Core/include/Framework/DataProcessingStats.h index 286df61ba1464..ec96bf8e9973c 100644 --- a/Framework/Core/include/Framework/DataProcessingStats.h +++ b/Framework/Core/include/Framework/DataProcessingStats.h @@ -16,6 +16,7 @@ #include #include #include +#include #include #include #include @@ -69,8 +70,16 @@ enum struct ProcessingStatsId : short { /// Helper struct to hold statistics about the data processing happening. struct DataProcessingStats { + // Parameters for the default behaviour + struct DefaultConfig { + int64_t minOnlinePublishInterval = 0; + }; + + DefaultConfig config = {}; + DataProcessingStats(std::function getRealtimeBase, - std::function getTimestamp); + std::function getTimestamp, + DefaultConfig config); constexpr static ServiceKind service_kind = ServiceKind::Global; constexpr static unsigned short MAX_METRICS = 1 << 15; diff --git a/Framework/Core/src/CommonServices.cxx b/Framework/Core/src/CommonServices.cxx index 9192cbd0c965d..bc750181d54e2 100644 --- a/Framework/Core/src/CommonServices.cxx +++ b/Framework/Core/src/CommonServices.cxx @@ -715,14 +715,14 @@ o2::framework::ServiceSpec O2_SIGNPOST_EVENT_EMIT(data_processor_context, cid, "oldest_possible_timeslice", "Queueing oldest possible timeslice %" PRIu64 " propagation for execution.", (uint64_t)oldestPossibleOutput.timeslice.value); AsyncQueueHelpers::post( - queue, AsyncTask{ .timeslice = TimesliceId{oldestPossibleTimeslice}, - .id = decongestion.oldestPossibleTimesliceTask, + queue, AsyncTask{ .timeslice = TimesliceId{oldestPossibleTimeslice}, + .id = decongestion.oldestPossibleTimesliceTask, .debounce = -1, .callback = decongestionCallback} .user(DecongestionContext{.ref = services, .oldestPossibleOutput = oldestPossibleOutput})); if (decongestion.orderedCompletionPolicyActive) { AsyncQueueHelpers::post( - queue, AsyncTask{.timeslice = TimesliceId{oldestPossibleOutput.timeslice.value},.id = decongestion.oldestPossibleTimesliceTask, .debounce = -1, + queue, AsyncTask{.timeslice = TimesliceId{oldestPossibleOutput.timeslice.value},.id = decongestion.oldestPossibleTimesliceTask, .debounce = -1, .callback = decongestionCallbackOrdered} .user({.ref = services, .oldestPossibleOutput = oldestPossibleOutput})); } }, @@ -867,8 +867,11 @@ o2::framework::ServiceSpec CommonServices::dataProcessingStats() clock_gettime(CLOCK_REALTIME, &now); uv_update_time(state.loop); uint64_t offset = now.tv_sec * 1000 - uv_now(state.loop); + DataProcessingStats::DefaultConfig config = { + .minOnlinePublishInterval = std::stoi(options.GetProperty("dpl-stats-min-online-publishing-interval").c_str()) * 1000}; auto* stats = new DataProcessingStats(TimingHelpers::defaultRealtimeBaseConfigurator(offset, state.loop), - TimingHelpers::defaultCPUTimeConfigurator(state.loop)); + TimingHelpers::defaultCPUTimeConfigurator(state.loop), + config); auto& runningWorkflow = services.get(); // It makes no sense to update the stats more often than every 5s diff --git a/Framework/Core/src/DataProcessingStats.cxx b/Framework/Core/src/DataProcessingStats.cxx index 920f4d7c7addb..3b02a0aacdd70 100644 --- a/Framework/Core/src/DataProcessingStats.cxx +++ b/Framework/Core/src/DataProcessingStats.cxx @@ -11,21 +11,20 @@ #include "Framework/DataProcessingStats.h" #include "Framework/RuntimeError.h" -#include "Framework/ServiceRegistryRef.h" -#include "Framework/DeviceState.h" #include "Framework/Logger.h" #include -#include #include -#include +#include namespace o2::framework { DataProcessingStats::DataProcessingStats(std::function getRealtimeBase_, - std::function getTimestamp_) + std::function getTimestamp_, + DefaultConfig config_) : getTimestamp(getTimestamp_), - getRealtimeBase(getRealtimeBase_) + getRealtimeBase(getRealtimeBase_), + config(config_) { getRealtimeBase(realTimeBase, initialTimeOffset); } @@ -269,6 +268,9 @@ void DataProcessingStats::registerMetric(MetricSpec const& spec) metricSpecs[spec.metricId] = spec; metricsNames[spec.metricId] = spec.name; metrics[spec.metricId] = spec.defaultValue; + if (metricSpecs[spec.metricId].scope == Scope::Online) { + metricSpecs[spec.metricId].minPublishInterval = std::max(metricSpecs[spec.metricId].minPublishInterval, config.minOnlinePublishInterval); + } int64_t currentTime = getTimestamp(realTimeBase, initialTimeOffset); updateInfos[spec.metricId] = UpdateInfo{currentTime, currentTime}; updated[spec.metricId] = spec.sendInitialValue; diff --git a/Framework/Core/src/DeviceSpecHelpers.cxx b/Framework/Core/src/DeviceSpecHelpers.cxx index d1fad2bb66f8b..f2644ed66ba08 100644 --- a/Framework/Core/src/DeviceSpecHelpers.cxx +++ b/Framework/Core/src/DeviceSpecHelpers.cxx @@ -1753,6 +1753,7 @@ boost::program_options::options_description DeviceSpecHelpers::getForwardedDevic ("configuration,cfg", bpo::value(), "configuration connection string") // ("driver-client-backend", bpo::value(), "driver connection string") // ("monitoring-backend", bpo::value(), "monitoring connection string") // + ("dpl-stats-min-online-publishing-interval", bpo::value(), "minimum flushing interval for online metrics (in s)") // ("infologger-mode", bpo::value(), "O2_INFOLOGGER_MODE override") // ("infologger-severity", bpo::value(), "minimun FairLogger severity which goes to info logger") // ("dpl-tracing-flags", bpo::value(), "pipe separated list of events to trace") // diff --git a/Framework/Core/src/runDataProcessing.cxx b/Framework/Core/src/runDataProcessing.cxx index d4d0c836aa925..58e669e127f03 100644 --- a/Framework/Core/src/runDataProcessing.cxx +++ b/Framework/Core/src/runDataProcessing.cxx @@ -1049,6 +1049,7 @@ int doChild(int argc, char** argv, ServiceRegistry& serviceRegistry, ConfigParamsHelper::populateBoostProgramOptions(optsDesc, spec.options, gHiddenDeviceOptions); char const* defaultSignposts = getenv("DPL_SIGNPOSTS"); optsDesc.add_options()("monitoring-backend", bpo::value()->default_value("default"), "monitoring backend info") // + ("dpl-stats-min-online-publishing-interval", bpo::value()->default_value("0"), "minimum flushing interval for online metrics (in s)") // ("driver-client-backend", bpo::value()->default_value(defaultDriverClient), "backend for device -> driver communicataon: stdout://: use stdout, ws://: use websockets") // ("infologger-severity", bpo::value()->default_value(""), "minimum FairLogger severity to send to InfoLogger") // ("dpl-tracing-flags", bpo::value()->default_value(""), "pipe `|` separate list of events to be traced") // diff --git a/Framework/Core/test/test_ComputingQuotaEvaluator.cxx b/Framework/Core/test/test_ComputingQuotaEvaluator.cxx index 26b41a3791485..92fedcfe78614 100644 --- a/Framework/Core/test/test_ComputingQuotaEvaluator.cxx +++ b/Framework/Core/test/test_ComputingQuotaEvaluator.cxx @@ -75,7 +75,7 @@ TEST_CASE("TestComputingQuotaEvaluator") }; DataProcessingStats stats(TimingHelpers::defaultRealtimeBaseConfigurator(0, uv_default_loop()), - TimingHelpers::defaultCPUTimeConfigurator(uv_default_loop())); + TimingHelpers::defaultCPUTimeConfigurator(uv_default_loop()), {}); ServiceRegistry registry; ServiceRegistryRef ref(registry); diff --git a/Framework/Core/test/test_DataProcessingStats.cxx b/Framework/Core/test/test_DataProcessingStats.cxx index c23b7fe0436d5..5f14ead966dc0 100644 --- a/Framework/Core/test/test_DataProcessingStats.cxx +++ b/Framework/Core/test/test_DataProcessingStats.cxx @@ -30,7 +30,7 @@ using namespace o2::framework; TEST_CASE("DataProcessingStats") { DataProcessingStats stats(TimingHelpers::defaultRealtimeBaseConfigurator(0, uv_default_loop()), - TimingHelpers::defaultCPUTimeConfigurator(uv_default_loop())); + TimingHelpers::defaultCPUTimeConfigurator(uv_default_loop()), {}); o2::framework::clean_all_runtime_errors(); stats.registerMetric({.name = "dummy_metric", .metricId = DummyMetric}); @@ -190,7 +190,7 @@ TEST_CASE("DataProcessingStatsOutOfOrder") int64_t value[] = {0, 1000, 999, 998}; return base + value[count++] - offset; }; - DataProcessingStats stats(realtimeTime, cpuTime); + DataProcessingStats stats(realtimeTime, cpuTime, {}); // Notice this will consume one value in the cpuTime. stats.registerMetric({.name = "dummy_metric", .metricId = DummyMetric}); stats.updateStats({DummyMetric, DataProcessingStats::Op::Set, 2}); @@ -222,7 +222,7 @@ TEST_CASE("DataProcessingStatsInstantaneousRate") // I want to push deltas since the last update and have the immediate time // averaged being stored. - DataProcessingStats stats(realtimeConfigurator, cpuTimeConfigurator); + DataProcessingStats stats(realtimeConfigurator, cpuTimeConfigurator, {}); stats.registerMetric({.name = "dummy_metric", .metricId = DummyMetric, .kind = DataProcessingStats::Kind::Rate}); REQUIRE(stats.updateInfos[DummyMetric].timestamp == 0); REQUIRE(stats.updateInfos[DummyMetric].lastPublished == 0); @@ -265,7 +265,7 @@ TEST_CASE("DataProcessingStatsCumulativeRate") // I want to push deltas since the last update and have the immediate time // averaged being stored. - DataProcessingStats stats(realtimeConfigurator, cpuTimeConfigurator); + DataProcessingStats stats(realtimeConfigurator, cpuTimeConfigurator, {}); stats.registerMetric({.name = "dummy_metric", .metricId = DummyMetric, .kind = DataProcessingStats::Kind::Rate}); REQUIRE(stats.updateInfos[DummyMetric].timestamp == 1000); REQUIRE(stats.updateInfos[DummyMetric].lastPublished == 1000); @@ -310,7 +310,7 @@ TEST_CASE("DataProcessingStatsPublishing") // I want to push deltas since the last update and have the immediate time // averaged being stored. - DataProcessingStats stats(realtimeTimestamp, cpuTimeTimestamp); + DataProcessingStats stats(realtimeTimestamp, cpuTimeTimestamp, {}); stats.registerMetric({.name = "dummy_metric", .metricId = DummyMetric, .minPublishInterval = 5000}); stats.registerMetric({.name = "dummy_metric2", .metricId = DummyMetric2, .minPublishInterval = 2000}); REQUIRE(stats.updateInfos[DummyMetric].timestamp == 1000); @@ -355,7 +355,7 @@ TEST_CASE("DataProcessingStatsPublishingRepeated") // I want to push deltas since the last update and have the immediate time // averaged being stored. - DataProcessingStats stats(realtimeTimestamp, cpuTimeTimestamp); + DataProcessingStats stats(realtimeTimestamp, cpuTimeTimestamp, {}); stats.registerMetric({.name = "dummy_metric", .metricId = DummyMetric, .minPublishInterval = 3000, .maxRefreshLatency = 9000}); REQUIRE(stats.updateInfos[DummyMetric].timestamp == 1000); REQUIRE(stats.updateInfos[DummyMetric].lastPublished == 1000); diff --git a/Framework/Core/test/test_DataRelayer.cxx b/Framework/Core/test/test_DataRelayer.cxx index ec5d48fbd78da..64a1827820638 100644 --- a/Framework/Core/test/test_DataRelayer.cxx +++ b/Framework/Core/test/test_DataRelayer.cxx @@ -48,7 +48,7 @@ TEST_CASE("DataRelayer") TimingHelpers::defaultCPUTimeConfigurator(uv_default_loop())); DataProcessingStats stats( TimingHelpers::defaultRealtimeBaseConfigurator(0, uv_default_loop()), - TimingHelpers::defaultCPUTimeConfigurator(uv_default_loop())); + TimingHelpers::defaultCPUTimeConfigurator(uv_default_loop()), {}); int quickUpdateInterval = 1; using MetricSpec = DataProcessingStats::MetricSpec; std::vector specs{