Skip to content

Commit

Permalink
DPL: calibration workflow example
Browse files Browse the repository at this point in the history
  • Loading branch information
ktf committed Feb 16, 2024
1 parent 5b1bbfd commit 0ed52bc
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 0 deletions.
4 changes: 4 additions & 0 deletions Framework/TestWorkflows/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ o2_add_dpl_workflow(diamond-workflow
SOURCES src/o2DiamondWorkflow.cxx
COMPONENT_NAME TestWorkflows)

o2_add_dpl_workflow(dummy-calibration-workflow
SOURCES src/o2DummyCalibrationWorkflow.cxx
COMPONENT_NAME TestWorkflows)

o2_add_dpl_workflow(diamond-workflow-leaky
SOURCES src/o2DiamondWorkflowLeaky.cxx
COMPONENT_NAME TestWorkflows)
Expand Down
73 changes: 73 additions & 0 deletions Framework/TestWorkflows/src/o2DummyCalibrationWorkflow.cxx
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
// All rights not expressly granted are reserved.
//
// This software is distributed under the terms of the GNU General Public
// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
//
// In applying this license CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.
#include "Framework/ConfigParamSpec.h"
#include "Framework/DataTakingContext.h"
#include "Framework/CompletionPolicyHelpers.h"
#include "Framework/DeviceSpec.h"
#include "Framework/ControlService.h"
#include "Framework/Configurable.h"
#include "Framework/RunningWorkflowInfo.h"
#include "Framework/CallbackService.h"
#include "Framework/EndOfStreamContext.h"
#include <fairmq/Device.h>

#include <iostream>
#include <vector>

using namespace o2::framework;

#include "Framework/runDataProcessing.h"

// This is how you can define your processing in a declarative way
WorkflowSpec defineDataProcessing(ConfigContext const& specs)
{
DataProcessorSpec a{
.name = "counter",
.outputs = {OutputSpec{{"counter"}, "TST", "A1"}},
.algorithm = AlgorithmSpec{adaptStateless(
[](DataAllocator& outputs, ProcessingContext& pcx) {
static int counter = 0;
auto& aData = outputs.make<int>(OutputRef{"counter"});
aData = counter++;
if (counter == 10) {
pcx.services().get<ControlService>().endOfStream();
}
})},
};

DataProcessorSpec b{
.name = "aggregator",
.inputs = {InputSpec{"x", "TST", "A1", Lifetime::Timeframe}},
.outputs = {OutputSpec{{"average"}, "TST", "B1", Lifetime::Sporadic}},
.algorithm = adaptStateful([](CallbackService& callbacks) {
static int sum = 0;
auto eosCallback = [](EndOfStreamContext &ctx) {
auto& aData = ctx.outputs().make<int>(OutputRef{"average"});
aData = sum;
ctx.services().get<ControlService>().endOfStream();
};
callbacks.set<CallbackService::Id::EndOfStream>(eosCallback);
return adaptStateless([](Input<"x", int> const& x)
{
sum += x;
std::cout << "Sum: " << sum << std::endl;
}); })};

DataProcessorSpec c{.name = "publisher",
.inputs = {InputSpec{"average", "TST", "B1", Lifetime::Sporadic}},
.algorithm = adaptStateless([](Input<"average", int> const& counter) {
std::cout << "Counter to publish: " << counter << std::endl;
})};

return workflow::concat(WorkflowSpec{a},
WorkflowSpec{b},
WorkflowSpec{c});
}

0 comments on commit 0ed52bc

Please sign in to comment.