Skip to content

Commit

Permalink
DPL: Pass the concept of expendable tasks in DPL to templates for ECS
Browse files Browse the repository at this point in the history
  • Loading branch information
knopers8 authored and ktf committed Feb 14, 2024
1 parent 2e1c905 commit 1704858
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 1 deletion.
9 changes: 9 additions & 0 deletions Framework/Core/src/O2ControlHelpers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,14 @@ bool isQcReconfigurable(const DeviceSpec& spec)
return std::find(spec.labels.begin(), spec.labels.end(), ecs::qcReconfigurable) != spec.labels.end();
}

bool isCritical(const DeviceSpec& spec)
{
// DPL's expendable Data Processor corresponds to a non-critical task in ECS
// DPL's resilient Data Processor corresponds to a critical task in ECS
// All tasks are considered critical by default in ECS
return std::find(spec.labels.begin(), spec.labels.end(), DataProcessorLabel{"expendable"}) == spec.labels.end();
}

void dumpProperties(std::ostream& dumpOut, const DeviceExecution& execution, const DeviceSpec& spec, const std::string& indLevel)
{
// get the argument `--config`
Expand Down Expand Up @@ -358,6 +366,7 @@ void dumpRole(std::ostream& dumpOut, const std::string& taskName, const DeviceSp

dumpOut << indLevel << indScheme << "task:\n";
dumpOut << indLevel << indScheme << indScheme << "load: " << taskName << "\n";
dumpOut << indLevel << indScheme << indScheme << "critical: " << (isCritical(spec) ? "true" : "false") << "\n";
}

std::string removeO2ControlArg(std::string_view command)
Expand Down
8 changes: 7 additions & 1 deletion Framework/Core/test/test_FrameworkDataFlowToO2Control.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ WorkflowSpec defineDataProcessing()
.metadata = {{ecs::cpuKillThreshold, "3.0"}}},
{.name = "C", // first consumer of A1, consumer of B1
.inputs = {InputSpec{"y", "TST", "A1"}, InputSpec{"y", "TST", "B1"}},
.labels = {{"expendable"}},
.metadata = {{ecs::privateMemoryKillThresholdMB, "5000"}}},
{.name = "D", // second consumer of A1
.inputs = Inputs{InputSpec{"x", "TST", "A1"}},
Expand All @@ -50,7 +51,8 @@ WorkflowSpec defineDataProcessing()
ConfigParamSpec{"c-param", VariantType::String, "foo;bar", {"another parameter which will be escaped"}},
ConfigParamSpec{"channel-config", VariantType::String, // raw output channel
"name=outta_dpl,type=push,method=bind,address=ipc:///tmp/pipe-outta-dpl,transport=shmem,rateLogging=10",
{"Out-of-band channel config"}}}}};
{"Out-of-band channel config"}}},
.labels = {{"resilient"}}}};
}

char* strdiffchr(const char* s1, const char* s2)
Expand Down Expand Up @@ -86,10 +88,12 @@ const auto expectedWorkflow = R"EXPECTED(name: testwf
rcvBufSize: 789
task:
load: testwf-A
critical: true
- name: "B"
connect:
task:
load: testwf-B
critical: true
- name: "C"
connect:
- name: from_A_to_C
Expand All @@ -108,6 +112,7 @@ const auto expectedWorkflow = R"EXPECTED(name: testwf
rcvBufSize: 1
task:
load: testwf-C
critical: false
- name: "D"
connect:
- name: from_C_to_D
Expand All @@ -126,6 +131,7 @@ const auto expectedWorkflow = R"EXPECTED(name: testwf
global: "outta_dpl-{{ it }}"
task:
load: testwf-D
critical: true
)EXPECTED";

const std::vector expectedTasks{
Expand Down

0 comments on commit 1704858

Please sign in to comment.