diff --git a/Framework/TestWorkflows/CMakeLists.txt b/Framework/TestWorkflows/CMakeLists.txt index c2c89e19b894e..848337200a4d2 100644 --- a/Framework/TestWorkflows/CMakeLists.txt +++ b/Framework/TestWorkflows/CMakeLists.txt @@ -33,6 +33,10 @@ o2_add_dpl_workflow(diamond-workflow SOURCES src/o2DiamondWorkflow.cxx COMPONENT_NAME TestWorkflows) +o2_add_dpl_workflow(dummy-calibration-workflow + SOURCES src/o2DummyCalibrationWorkflow.cxx + COMPONENT_NAME TestWorkflows) + o2_add_dpl_workflow(diamond-workflow-leaky SOURCES src/o2DiamondWorkflowLeaky.cxx COMPONENT_NAME TestWorkflows) diff --git a/Framework/TestWorkflows/src/o2DummyCalibrationWorkflow.cxx b/Framework/TestWorkflows/src/o2DummyCalibrationWorkflow.cxx new file mode 100644 index 0000000000000..94916cfa36984 --- /dev/null +++ b/Framework/TestWorkflows/src/o2DummyCalibrationWorkflow.cxx @@ -0,0 +1,73 @@ +// Copyright 2019-2020 CERN and copyright holders of ALICE O2. +// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders. +// All rights not expressly granted are reserved. +// +// This software is distributed under the terms of the GNU General Public +// License v3 (GPL Version 3), copied verbatim in the file "COPYING". +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. +#include "Framework/ConfigParamSpec.h" +#include "Framework/DataTakingContext.h" +#include "Framework/CompletionPolicyHelpers.h" +#include "Framework/DeviceSpec.h" +#include "Framework/ControlService.h" +#include "Framework/Configurable.h" +#include "Framework/RunningWorkflowInfo.h" +#include "Framework/CallbackService.h" +#include "Framework/EndOfStreamContext.h" +#include + +#include +#include + +using namespace o2::framework; + +#include "Framework/runDataProcessing.h" + +// This is how you can define your processing in a declarative way +WorkflowSpec defineDataProcessing(ConfigContext const& specs) +{ + DataProcessorSpec a{ + .name = "counter", + .outputs = {OutputSpec{{"counter"}, "TST", "A1"}}, + .algorithm = AlgorithmSpec{adaptStateless( + [](DataAllocator& outputs, ProcessingContext& pcx) { + static int counter = 0; + auto& aData = outputs.make(OutputRef{"counter"}); + aData = counter++; + if (counter == 10) { + pcx.services().get().endOfStream(); + } + })}, + }; + + DataProcessorSpec b{ + .name = "aggregator", + .inputs = {InputSpec{"x", "TST", "A1", Lifetime::Timeframe}}, + .outputs = {OutputSpec{{"average"}, "TST", "B1", Lifetime::Sporadic}}, + .algorithm = adaptStateful([](CallbackService& callbacks) { + static int sum = 0; + auto eosCallback = [](EndOfStreamContext &ctx) { + auto& aData = ctx.outputs().make(OutputRef{"average"}); + aData = sum; + ctx.services().get().endOfStream(); + }; + callbacks.set(eosCallback); + return adaptStateless([](Input<"x", int> const& x) + { + sum += x; + std::cout << "Sum: " << sum << std::endl; + }); })}; + + DataProcessorSpec c{.name = "publisher", + .inputs = {InputSpec{"average", "TST", "B1", Lifetime::Sporadic}}, + .algorithm = adaptStateless([](Input<"average", int> const& counter) { + std::cout << "Counter to publish: " << counter << std::endl; + })}; + + return workflow::concat(WorkflowSpec{a}, + WorkflowSpec{b}, + WorkflowSpec{c}); +}