Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DPL: allow customising DataProcessingStats intervals #13583

Merged
merged 1 commit into from
Oct 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion Framework/Core/include/Framework/DataProcessingStats.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include <atomic>
#include <cstdint>
#include <array>
#include <memory>
#include <numeric>
#include <mutex>
#include <utility>
Expand Down Expand Up @@ -69,8 +70,16 @@ enum struct ProcessingStatsId : short {

/// Helper struct to hold statistics about the data processing happening.
struct DataProcessingStats {
// Parameters for the default behaviour
struct DefaultConfig {
int64_t minOnlinePublishInterval = 0;
};

DefaultConfig config = {};

DataProcessingStats(std::function<void(int64_t& base, int64_t& offset)> getRealtimeBase,
std::function<int64_t(int64_t base, int64_t offset)> getTimestamp);
std::function<int64_t(int64_t base, int64_t offset)> getTimestamp,
DefaultConfig config);

constexpr static ServiceKind service_kind = ServiceKind::Global;
constexpr static unsigned short MAX_METRICS = 1 << 15;
Expand Down
11 changes: 7 additions & 4 deletions Framework/Core/src/CommonServices.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -715,14 +715,14 @@ o2::framework::ServiceSpec
O2_SIGNPOST_EVENT_EMIT(data_processor_context, cid, "oldest_possible_timeslice", "Queueing oldest possible timeslice %" PRIu64 " propagation for execution.",
(uint64_t)oldestPossibleOutput.timeslice.value);
AsyncQueueHelpers::post(
queue, AsyncTask{ .timeslice = TimesliceId{oldestPossibleTimeslice},
.id = decongestion.oldestPossibleTimesliceTask,
queue, AsyncTask{ .timeslice = TimesliceId{oldestPossibleTimeslice},
.id = decongestion.oldestPossibleTimesliceTask,
.debounce = -1, .callback = decongestionCallback}
.user<DecongestionContext>(DecongestionContext{.ref = services, .oldestPossibleOutput = oldestPossibleOutput}));

if (decongestion.orderedCompletionPolicyActive) {
AsyncQueueHelpers::post(
queue, AsyncTask{.timeslice = TimesliceId{oldestPossibleOutput.timeslice.value},.id = decongestion.oldestPossibleTimesliceTask, .debounce = -1,
queue, AsyncTask{.timeslice = TimesliceId{oldestPossibleOutput.timeslice.value},.id = decongestion.oldestPossibleTimesliceTask, .debounce = -1,
.callback = decongestionCallbackOrdered}
.user<DecongestionContext>({.ref = services, .oldestPossibleOutput = oldestPossibleOutput}));
} },
Expand Down Expand Up @@ -867,8 +867,11 @@ o2::framework::ServiceSpec CommonServices::dataProcessingStats()
clock_gettime(CLOCK_REALTIME, &now);
uv_update_time(state.loop);
uint64_t offset = now.tv_sec * 1000 - uv_now(state.loop);
DataProcessingStats::DefaultConfig config = {
.minOnlinePublishInterval = std::stoi(options.GetProperty<std::string>("dpl-stats-min-online-publishing-interval").c_str()) * 1000};
auto* stats = new DataProcessingStats(TimingHelpers::defaultRealtimeBaseConfigurator(offset, state.loop),
TimingHelpers::defaultCPUTimeConfigurator(state.loop));
TimingHelpers::defaultCPUTimeConfigurator(state.loop),
config);
auto& runningWorkflow = services.get<RunningWorkflowInfo const>();

// It makes no sense to update the stats more often than every 5s
Expand Down
14 changes: 8 additions & 6 deletions Framework/Core/src/DataProcessingStats.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,20 @@

#include "Framework/DataProcessingStats.h"
#include "Framework/RuntimeError.h"
#include "Framework/ServiceRegistryRef.h"
#include "Framework/DeviceState.h"
#include "Framework/Logger.h"
#include <uv.h>
#include <iostream>
#include <atomic>
#include <utility>
#include <thread>

namespace o2::framework
{

DataProcessingStats::DataProcessingStats(std::function<void(int64_t& base, int64_t& offset)> getRealtimeBase_,
std::function<int64_t(int64_t base, int64_t offset)> getTimestamp_)
std::function<int64_t(int64_t base, int64_t offset)> getTimestamp_,
DefaultConfig config_)
: getTimestamp(getTimestamp_),
getRealtimeBase(getRealtimeBase_)
getRealtimeBase(getRealtimeBase_),
config(config_)
{
getRealtimeBase(realTimeBase, initialTimeOffset);
}
Expand Down Expand Up @@ -269,6 +268,9 @@ void DataProcessingStats::registerMetric(MetricSpec const& spec)
metricSpecs[spec.metricId] = spec;
metricsNames[spec.metricId] = spec.name;
metrics[spec.metricId] = spec.defaultValue;
if (metricSpecs[spec.metricId].scope == Scope::Online) {
metricSpecs[spec.metricId].minPublishInterval = std::max(metricSpecs[spec.metricId].minPublishInterval, config.minOnlinePublishInterval);
}
int64_t currentTime = getTimestamp(realTimeBase, initialTimeOffset);
updateInfos[spec.metricId] = UpdateInfo{currentTime, currentTime};
updated[spec.metricId] = spec.sendInitialValue;
Expand Down
1 change: 1 addition & 0 deletions Framework/Core/src/DeviceSpecHelpers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -1753,6 +1753,7 @@ boost::program_options::options_description DeviceSpecHelpers::getForwardedDevic
("configuration,cfg", bpo::value<std::string>(), "configuration connection string") //
("driver-client-backend", bpo::value<std::string>(), "driver connection string") //
("monitoring-backend", bpo::value<std::string>(), "monitoring connection string") //
("dpl-stats-min-online-publishing-interval", bpo::value<std::string>(), "minimum flushing interval for online metrics (in s)") //
("infologger-mode", bpo::value<std::string>(), "O2_INFOLOGGER_MODE override") //
("infologger-severity", bpo::value<std::string>(), "minimun FairLogger severity which goes to info logger") //
("dpl-tracing-flags", bpo::value<std::string>(), "pipe separated list of events to trace") //
Expand Down
1 change: 1 addition & 0 deletions Framework/Core/src/runDataProcessing.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -1049,6 +1049,7 @@ int doChild(int argc, char** argv, ServiceRegistry& serviceRegistry,
ConfigParamsHelper::populateBoostProgramOptions(optsDesc, spec.options, gHiddenDeviceOptions);
char const* defaultSignposts = getenv("DPL_SIGNPOSTS");
optsDesc.add_options()("monitoring-backend", bpo::value<std::string>()->default_value("default"), "monitoring backend info") //
("dpl-stats-min-online-publishing-interval", bpo::value<std::string>()->default_value("0"), "minimum flushing interval for online metrics (in s)") //
("driver-client-backend", bpo::value<std::string>()->default_value(defaultDriverClient), "backend for device -> driver communicataon: stdout://: use stdout, ws://: use websockets") //
("infologger-severity", bpo::value<std::string>()->default_value(""), "minimum FairLogger severity to send to InfoLogger") //
("dpl-tracing-flags", bpo::value<std::string>()->default_value(""), "pipe `|` separate list of events to be traced") //
Expand Down
2 changes: 1 addition & 1 deletion Framework/Core/test/test_ComputingQuotaEvaluator.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ TEST_CASE("TestComputingQuotaEvaluator")
};

DataProcessingStats stats(TimingHelpers::defaultRealtimeBaseConfigurator(0, uv_default_loop()),
TimingHelpers::defaultCPUTimeConfigurator(uv_default_loop()));
TimingHelpers::defaultCPUTimeConfigurator(uv_default_loop()), {});

ServiceRegistry registry;
ServiceRegistryRef ref(registry);
Expand Down
12 changes: 6 additions & 6 deletions Framework/Core/test/test_DataProcessingStats.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ using namespace o2::framework;
TEST_CASE("DataProcessingStats")
{
DataProcessingStats stats(TimingHelpers::defaultRealtimeBaseConfigurator(0, uv_default_loop()),
TimingHelpers::defaultCPUTimeConfigurator(uv_default_loop()));
TimingHelpers::defaultCPUTimeConfigurator(uv_default_loop()), {});

o2::framework::clean_all_runtime_errors();
stats.registerMetric({.name = "dummy_metric", .metricId = DummyMetric});
Expand Down Expand Up @@ -190,7 +190,7 @@ TEST_CASE("DataProcessingStatsOutOfOrder")
int64_t value[] = {0, 1000, 999, 998};
return base + value[count++] - offset;
};
DataProcessingStats stats(realtimeTime, cpuTime);
DataProcessingStats stats(realtimeTime, cpuTime, {});
// Notice this will consume one value in the cpuTime.
stats.registerMetric({.name = "dummy_metric", .metricId = DummyMetric});
stats.updateStats({DummyMetric, DataProcessingStats::Op::Set, 2});
Expand Down Expand Up @@ -222,7 +222,7 @@ TEST_CASE("DataProcessingStatsInstantaneousRate")

// I want to push deltas since the last update and have the immediate time
// averaged being stored.
DataProcessingStats stats(realtimeConfigurator, cpuTimeConfigurator);
DataProcessingStats stats(realtimeConfigurator, cpuTimeConfigurator, {});
stats.registerMetric({.name = "dummy_metric", .metricId = DummyMetric, .kind = DataProcessingStats::Kind::Rate});
REQUIRE(stats.updateInfos[DummyMetric].timestamp == 0);
REQUIRE(stats.updateInfos[DummyMetric].lastPublished == 0);
Expand Down Expand Up @@ -265,7 +265,7 @@ TEST_CASE("DataProcessingStatsCumulativeRate")

// I want to push deltas since the last update and have the immediate time
// averaged being stored.
DataProcessingStats stats(realtimeConfigurator, cpuTimeConfigurator);
DataProcessingStats stats(realtimeConfigurator, cpuTimeConfigurator, {});
stats.registerMetric({.name = "dummy_metric", .metricId = DummyMetric, .kind = DataProcessingStats::Kind::Rate});
REQUIRE(stats.updateInfos[DummyMetric].timestamp == 1000);
REQUIRE(stats.updateInfos[DummyMetric].lastPublished == 1000);
Expand Down Expand Up @@ -310,7 +310,7 @@ TEST_CASE("DataProcessingStatsPublishing")

// I want to push deltas since the last update and have the immediate time
// averaged being stored.
DataProcessingStats stats(realtimeTimestamp, cpuTimeTimestamp);
DataProcessingStats stats(realtimeTimestamp, cpuTimeTimestamp, {});
stats.registerMetric({.name = "dummy_metric", .metricId = DummyMetric, .minPublishInterval = 5000});
stats.registerMetric({.name = "dummy_metric2", .metricId = DummyMetric2, .minPublishInterval = 2000});
REQUIRE(stats.updateInfos[DummyMetric].timestamp == 1000);
Expand Down Expand Up @@ -355,7 +355,7 @@ TEST_CASE("DataProcessingStatsPublishingRepeated")

// I want to push deltas since the last update and have the immediate time
// averaged being stored.
DataProcessingStats stats(realtimeTimestamp, cpuTimeTimestamp);
DataProcessingStats stats(realtimeTimestamp, cpuTimeTimestamp, {});
stats.registerMetric({.name = "dummy_metric", .metricId = DummyMetric, .minPublishInterval = 3000, .maxRefreshLatency = 9000});
REQUIRE(stats.updateInfos[DummyMetric].timestamp == 1000);
REQUIRE(stats.updateInfos[DummyMetric].lastPublished == 1000);
Expand Down
2 changes: 1 addition & 1 deletion Framework/Core/test/test_DataRelayer.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ TEST_CASE("DataRelayer")
TimingHelpers::defaultCPUTimeConfigurator(uv_default_loop()));
DataProcessingStats stats(
TimingHelpers::defaultRealtimeBaseConfigurator(0, uv_default_loop()),
TimingHelpers::defaultCPUTimeConfigurator(uv_default_loop()));
TimingHelpers::defaultCPUTimeConfigurator(uv_default_loop()), {});
int quickUpdateInterval = 1;
using MetricSpec = DataProcessingStats::MetricSpec;
std::vector<MetricSpec> specs{
Expand Down
Loading