Skip to content

Commit

Permalink
DPL: first step to make workflow definition plugins
Browse files Browse the repository at this point in the history
This is the first step towards having the workflow definition inside plugins
rather than in executables. This will allow accumulating plugins which are needed
to instantiate a topology and do the option parsing / topology building only once,
simplifying the current case.

The end goal is to allow the driver to preload certain common services (e.g. ROOT)
and share it among the different tasks (which at the moment it's not allowed because different
tasks are in different executables). Moreover this will allow us to coalesce strictly coupled
dataprocessors and reduce the number of running processes.

For now the plugins are embedded in the executables and behave exactly like before.
  • Loading branch information
ktf committed Feb 18, 2025
1 parent 02090ec commit 9f7bb5c
Show file tree
Hide file tree
Showing 4 changed files with 178 additions and 85 deletions.
3 changes: 3 additions & 0 deletions Framework/Core/include/Framework/Plugins.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ enum struct DplPluginKind : int {
// using the arrow dataset API
RootObjectReadingImplementation,

// A plugin which defines a whole workflow. This will be used to separate
// workflows in shared libraries and run them via a separate loader.
Workflow,
// A plugin which was not initialised properly.
Unknown
};
Expand Down
56 changes: 56 additions & 0 deletions Framework/Core/include/Framework/WorkflowDefinitionContext.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright 2019-2025 CERN and copyright holders of ALICE O2.
// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
// All rights not expressly granted are reserved.
//
// This software is distributed under the terms of the GNU General Public
// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
//
// In applying this license CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.
#ifndef O2_FRAMEWORK_WORKFLOWDEFINITIONCONTEXT_H_
#define O2_FRAMEWORK_WORKFLOWDEFINITIONCONTEXT_H_

#include "Framework/ConfigParamSpec.h"
#include "Framework/CompletionPolicy.h"
#include "Framework/DispatchPolicy.h"
#include "Framework/ResourcePolicy.h"
#include "Framework/CallbacksPolicy.h"
#include "Framework/SendingPolicy.h"
#include "Framework/WorkflowSpec.h"
#include "Framework/ChannelConfigurationPolicy.h"
#include <vector>

namespace o2::framework
{

struct WorkflowDefinitionContext {
std::vector<ConfigParamSpec> workflowOptions;
std::vector<CompletionPolicy> completionPolicies;
std::vector<DispatchPolicy> dispatchPolicies;
std::vector<ResourcePolicy> resourcePolicies;
std::vector<CallbacksPolicy> callbacksPolicies;
std::vector<SendingPolicy> sendingPolicies;
std::vector<ConfigParamSpec> extraOptions;
std::vector<ChannelConfigurationPolicy> channelPolicies;
std::unique_ptr<ConfigContext> configContext;

// For the moment, let's put them here. We should
// probably move them to a different place, since these are not really part
// of the workflow definition but will be there also at runtine.
std::unique_ptr<ServiceRegistry> configRegistry{nullptr};
std::unique_ptr<ConfigParamRegistry> workflowOptionsRegistry{nullptr};

o2::framework::WorkflowSpec specs;
};

struct WorkflowDefinition {
std::function<o2::framework::WorkflowDefinitionContext(int argc, char** argv)> defineWorkflow;
};

struct WorkflowPlugin {
virtual o2::framework::WorkflowDefinition* create() = 0;
};

} // namespace o2::framework
#endif // O2_FRAMEWORK_WORKFLOWDEFINITIONCONTEXT_H_
126 changes: 78 additions & 48 deletions Framework/Core/include/Framework/runDataProcessing.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@
#include "Framework/CustomWorkflowTerminationHook.h"
#include "Framework/CommonServices.h"
#include "Framework/WorkflowCustomizationHelpers.h"
#include "Framework/WorkflowDefinitionContext.h"
#include "Framework/Logger.h"
#include "Framework/Plugins.h"
#include "Framework/CheckTypes.h"
#include "Framework/StructToTuple.h"
#include "ResourcePolicy.h"
Expand Down Expand Up @@ -125,16 +127,7 @@ void overrideCloning(o2::framework::ConfigContext& ctx, std::vector<o2::framewor
void overrideLabels(o2::framework::ConfigContext& ctx, std::vector<o2::framework::DataProcessorSpec>& workflow);

// This comes from the framework itself. This way we avoid code duplication.
int doMain(int argc, char** argv, o2::framework::WorkflowSpec const& specs,
std::vector<o2::framework::ChannelConfigurationPolicy> const& channelPolicies,
std::vector<o2::framework::CompletionPolicy> const& completionPolicies,
std::vector<o2::framework::DispatchPolicy> const& dispatchPolicies,
std::vector<o2::framework::ResourcePolicy> const& resourcePolicies,
std::vector<o2::framework::CallbacksPolicy> const& callbacksPolicies,
std::vector<o2::framework::SendingPolicy> const& sendingPolicies,
std::vector<o2::framework::ConfigParamSpec> const& workflowOptions,
std::vector<o2::framework::ConfigParamSpec> const& detectedOptions,
o2::framework::ConfigContext& configContext);
int doMain(int argc, char** argv, o2::framework::WorkflowDefinitionContext& context, o2::framework::ConfigContext& configContext);

void doDefaultWorkflowTerminationHook();

Expand Down Expand Up @@ -167,60 +160,97 @@ void callWorkflowTermination(T&, char const* idstring)

void overrideAll(o2::framework::ConfigContext& ctx, std::vector<o2::framework::DataProcessorSpec>& workflow);

o2::framework::ConfigContext createConfigContext(std::unique_ptr<o2::framework::ConfigParamRegistry>& workflowOptionsRegistry,
o2::framework::ServiceRegistry& configRegistry,
std::vector<o2::framework::ConfigParamSpec>& workflowOptions,
std::vector<o2::framework::ConfigParamSpec>& extraOptions, int argc, char** argv);
std::unique_ptr<o2::framework::ConfigContext> createConfigContext(std::unique_ptr<o2::framework::ConfigParamRegistry>& workflowOptionsRegistry,
o2::framework::ServiceRegistry& configRegistry,
std::vector<o2::framework::ConfigParamSpec>& workflowOptions,
std::vector<o2::framework::ConfigParamSpec>& extraOptions, int argc, char** argv);

std::unique_ptr<o2::framework::ServiceRegistry> createRegistry();

int mainNoCatch(int argc, char** argv)
{
using namespace o2::framework;
char* getIdString(int argc, char** argv);

std::vector<o2::framework::ConfigParamSpec> workflowOptions;
UserCustomizationsHelper::userDefinedCustomization(workflowOptions);
auto requiredWorkflowOptions = WorkflowCustomizationHelpers::requiredWorkflowOptions();
workflowOptions.insert(std::end(workflowOptions), std::begin(requiredWorkflowOptions), std::end(requiredWorkflowOptions));

std::vector<CompletionPolicy> completionPolicies = injectCustomizations<CompletionPolicy>();
std::vector<DispatchPolicy> dispatchPolicies = injectCustomizations<DispatchPolicy>();
std::vector<ResourcePolicy> resourcePolicies = injectCustomizations<ResourcePolicy>();
std::vector<CallbacksPolicy> callbacksPolicies = injectCustomizations<CallbacksPolicy>();
std::vector<SendingPolicy> sendingPolicies = injectCustomizations<SendingPolicy>();

std::unique_ptr<ServiceRegistry> configRegistry = createRegistry();
std::vector<ConfigParamSpec> extraOptions;
std::unique_ptr<ConfigParamRegistry> workflowOptionsRegistry{nullptr};
auto configContext = createConfigContext(workflowOptionsRegistry, *configRegistry, workflowOptions, extraOptions, argc, argv);

o2::framework::WorkflowSpec specs = defineDataProcessing(configContext);
overrideAll(configContext, specs);
for (auto& spec : specs) {
UserCustomizationsHelper::userDefinedCustomization(spec.requiredServices);
}
std::vector<ChannelConfigurationPolicy> channelPolicies;
UserCustomizationsHelper::userDefinedCustomization(channelPolicies);
auto defaultChannelPolicies = ChannelConfigurationPolicy::createDefaultPolicies(configContext);
channelPolicies.insert(std::end(channelPolicies), std::begin(defaultChannelPolicies), std::end(defaultChannelPolicies));
return doMain(argc, argv, specs,
channelPolicies, completionPolicies, dispatchPolicies,
resourcePolicies, callbacksPolicies, sendingPolicies, workflowOptions, extraOptions, configContext);
#define STRINGIZE_NX(A) #A
#define STRINGIZE(A) STRINGIZE_NX(A)

// This is to allow the old "executable" based behavior
// Each executable will contain a plugin called InternalWorkflow
// In case one wants to use the new DSO based approach, the
// name of the plugin an the library name where it is located
// will have to be specified at build time.
#ifndef DPL_WORKFLOW_PLUGIN_NAME
#define DPL_WORKFLOW_PLUGIN_NAME InternalCustomWorkflow
#ifdef DPL_WORKFLOW_PLUGIN_LIBRARY
#error Missing DPL_WORKFLOW_PLUGIN_NAME
#endif
#define DPL_WORKFLOW_PLUGIN_LIBRARY ""
#endif

consteval char const* pluginName()
{
return DPL_WORKFLOW_PLUGIN_LIBRARY ":" STRINGIZE(DPL_WORKFLOW_PLUGIN_NAME);
}

int callMain(int argc, char** argv, int (*)(int, char**));
char* getIdString(int argc, char** argv);
// Executables behave this way
int callMain(int argc, char** argv, char const* pluginName);

int main(int argc, char** argv)
{
using namespace o2::framework;

int result = callMain(argc, argv, mainNoCatch);
int result = callMain(argc, argv, pluginName());

char* idstring = getIdString(argc, argv);
o2::framework::OnWorkflowTerminationHook onWorkflowTerminationHook;
callWorkflowTermination(onWorkflowTerminationHook, idstring);

return result;
}

struct WorkflowDefinition {
std::function<o2::framework::WorkflowDefinitionContext(int argc, char** argv)> defineWorkflow;
};

struct DPL_WORKFLOW_PLUGIN_NAME : o2::framework::WorkflowPlugin {
o2::framework::WorkflowDefinition* create() override
{
return new o2::framework::WorkflowDefinition{
.defineWorkflow = [](int argc, char** argv) -> o2::framework::WorkflowDefinitionContext {
using namespace o2::framework;
WorkflowDefinitionContext workflowContext;

UserCustomizationsHelper::userDefinedCustomization(workflowContext.workflowOptions);
auto requiredWorkflowOptions = WorkflowCustomizationHelpers::requiredWorkflowOptions();
workflowContext.workflowOptions.insert(std::end(workflowContext.workflowOptions), std::begin(requiredWorkflowOptions), std::end(requiredWorkflowOptions));

workflowContext.completionPolicies = injectCustomizations<CompletionPolicy>();
workflowContext.dispatchPolicies = injectCustomizations<DispatchPolicy>();
workflowContext.resourcePolicies = injectCustomizations<ResourcePolicy>();
workflowContext.callbacksPolicies = injectCustomizations<CallbacksPolicy>();
workflowContext.sendingPolicies = injectCustomizations<SendingPolicy>();

workflowContext.configRegistry = createRegistry();
workflowContext.configContext = createConfigContext(workflowContext.workflowOptionsRegistry, *workflowContext.configRegistry, workflowContext.workflowOptions, workflowContext.extraOptions, argc, argv);

workflowContext.specs = defineDataProcessing(*workflowContext.configContext);
overrideAll(*workflowContext.configContext, workflowContext.specs);
for (auto& spec : workflowContext.specs) {
UserCustomizationsHelper::userDefinedCustomization(spec.requiredServices);
}
UserCustomizationsHelper::userDefinedCustomization(workflowContext.channelPolicies);
auto defaultChannelPolicies = ChannelConfigurationPolicy::createDefaultPolicies(*workflowContext.configContext);
workflowContext.channelPolicies.insert(std::end(workflowContext.channelPolicies), std::begin(defaultChannelPolicies), std::end(defaultChannelPolicies));
return workflowContext;
}};
}
};

// This is like the plugin macros, we simply do it explicitly to avoid macro inside macro expansion
extern "C" {
DPLPluginHandle* dpl_plugin_callback(DPLPluginHandle* previous)
{
previous = new DPLPluginHandle{new DPL_WORKFLOW_PLUGIN_NAME{}, strdup(STRINGIZE(DPL_WORKFLOW_PLUGIN_NAME)), o2::framework::DplPluginKind::Workflow, previous};
return previous;
}
}

#endif
Loading

0 comments on commit 9f7bb5c

Please sign in to comment.