Skip to content

Commit

Permalink
[OCTRL-793] DPL: Propagate max allowed CPU and memory usage threshold…
Browse files Browse the repository at this point in the history
… to AliECS
  • Loading branch information
knopers8 authored and ktf committed Jul 7, 2023
1 parent 37a9f48 commit 9af4c6e
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 3 deletions.
1 change: 1 addition & 0 deletions Framework/Core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ o2_add_library(Framework
src/SimpleOptionsRetriever.cxx
src/O2ControlHelpers.cxx
src/O2ControlLabels.cxx
src/O2ControlParameters.cxx
src/O2DataModelHelpers.cxx
src/OutputSpec.cxx
src/OptionsHelpers.cxx
Expand Down
39 changes: 39 additions & 0 deletions Framework/Core/include/Framework/O2ControlParameters.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
// All rights not expressly granted are reserved.
//
// This software is distributed under the terms of the GNU General Public
// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
//
// In applying this license CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.

#ifndef O2_FRAMEWORK_O2CONTROLPARAMETERS_H
#define O2_FRAMEWORK_O2CONTROLPARAMETERS_H

#include "Framework/DataProcessorSpec.h"
// TODO: merge the header below with this one once downstream includes <Framework/O2ControlParameters.h>
#include "Framework/O2ControlLabels.h"

namespace o2::framework
{

/// DataProcessorMetadata which are recognized by the --o2-control dump tool
/// and influence its output.
namespace ecs
{

// This key will demand AliECS to kill the task if it uses more CPU than the specified number in the value.
// The value should be a string with a floating point number, where "1.0" corresponds to 100% usage of one CPU.
const extern decltype(DataProcessorMetadata::key) cpuKillThreshold;

// This key will demand AliECS to kill the task if it uses more private memory than the specified number in the value.
// The value should be a string with a positive floating-point number or an integer (e.g. "128")
const extern decltype(DataProcessorMetadata::key) privateMemoryKillThresholdMB;

} // namespace ecs

} // namespace o2::framework

#endif // O2_FRAMEWORK_O2CONTROLPARAMETERS_H
24 changes: 23 additions & 1 deletion Framework/Core/src/O2ControlHelpers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,17 @@
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.
#include "O2ControlHelpers.h"
#include "Framework/O2ControlLabels.h"
#include "Framework/O2ControlParameters.h"
#include "Framework/ChannelSpecHelpers.h"
#include "Framework/Logger.h"
#include "Framework/DataProcessorSpec.h"

#include <iostream>
#include <cstring>
#include <string>
#include <string_view>
#include <filesystem>
#include <optional>
#include <set>

namespace bfs = std::filesystem;
Expand All @@ -34,6 +37,13 @@ std::string taskName(const std::string& workflowName, const std::string& deviceN
return workflowName + "-" + deviceName;
}

std::optional<DataProcessorMetadata> firstMatchingMetadata(DeviceSpec const& spec, std::string_view key)
{
auto sameKey = [otherKey = key](DataProcessorMetadata const& metadata) { return metadata.key == otherKey; };
auto result = std::find_if(spec.metadata.begin(), spec.metadata.end(), sameKey);
return result != spec.metadata.end() ? std::optional{*result} : std::nullopt;
}

template <typename T>
void dumpChannelBind(std::ostream& dumpOut, const T& channel, std::string indLevel)
{
Expand Down Expand Up @@ -402,6 +412,18 @@ void dumpTask(std::ostream& dumpOut, const DeviceSpec& spec, const DeviceExecuti
dumpOut << indLevel << indScheme << "cpu: 0.01\n";
dumpOut << indLevel << indScheme << "memory: 1\n";

auto cpuKillThreshold = implementation::firstMatchingMetadata(spec, ecs::cpuKillThreshold);
auto privateMemoryKillThresholdMB = implementation::firstMatchingMetadata(spec, ecs::privateMemoryKillThresholdMB);
if (cpuKillThreshold.has_value() || privateMemoryKillThresholdMB.has_value()) {
dumpOut << indLevel << "limits:\n";
if (cpuKillThreshold.has_value()) {
dumpOut << indLevel << indScheme << "cpu: " << cpuKillThreshold.value().value << '\n';
}
if (privateMemoryKillThresholdMB.has_value()) {
dumpOut << indLevel << indScheme << "memory: " << privateMemoryKillThresholdMB.value().value << '\n';
}
}

dumpOut << indLevel << "bind:\n";
for (const auto& outputChannel : spec.outputChannels) {
if (outputChannel.method == ChannelMethod::Bind) {
Expand Down
20 changes: 20 additions & 0 deletions Framework/Core/src/O2ControlParameters.cxx
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
// All rights not expressly granted are reserved.
//
// This software is distributed under the terms of the GNU General Public
// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
//
// In applying this license CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.

#include "Framework/O2ControlParameters.h"

namespace o2::framework::ecs
{

const decltype(DataProcessorMetadata::key) cpuKillThreshold = "ecs-cpu-kill-threshold";
const decltype(DataProcessorMetadata::key) privateMemoryKillThresholdMB = "ecs-private-memory-kill-threshold-mb";

} // namespace o2::framework::ecs
11 changes: 9 additions & 2 deletions Framework/Core/test/test_FrameworkDataFlowToO2Control.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "Framework/ProcessingContext.h"
#include "Framework/WorkflowSpec.h"
#include "Framework/DriverConfig.h"
#include "Framework/O2ControlParameters.h"

#include <sstream>

Expand All @@ -39,10 +40,12 @@ WorkflowSpec defineDataProcessing()
{"Out-of-band channel config"}}}},
{"B", // producer, no inputs
Inputs{},
Outputs{OutputSpec{"TST", "B1"}}},
Outputs{OutputSpec{"TST", "B1"}},
.metadata = {{ecs::cpuKillThreshold, "3.0"}}},
{"C", // first consumer of A1, consumer of B1
{InputSpec{"y", "TST", "A1"}, InputSpec{"y", "TST", "B1"}},
Outputs{}},
Outputs{},
.metadata = {{ecs::privateMemoryKillThresholdMB, "5000"}}},
{"D", // second consumer of A1
Inputs{
InputSpec{"x", "TST", "A1"}},
Expand Down Expand Up @@ -238,6 +241,8 @@ const std::vector expectedTasks{
wants:
cpu: 0.01
memory: 1
limits:
cpu: 3.0
bind:
- name: from_B_to_C
type: push
Expand Down Expand Up @@ -329,6 +334,8 @@ const std::vector expectedTasks{
wants:
cpu: 0.01
memory: 1
limits:
memory: 5000
bind:
- name: from_C_to_D
type: push
Expand Down

0 comments on commit 9af4c6e

Please sign in to comment.