-
Notifications
You must be signed in to change notification settings - Fork 2
Workflow task examples
Currently the library produces LAMBDA tasks, newer versions of conductor transformed this to INLINE tasks
LAMBDA Task executes specified Javascript code, which should be the last parameter of the AddTask
method. The result of the Javascript statement will be the output of the Workflow task, as some_task.output.result
. The MacAddress
property of the ValidateMacInput
input class from this example would be accessible in the Javascript expression as $.mac_address
.
public LambdaTaskModel<ValidateMacInput, ValidateMacOutput> MacValidateTask { get; set; }
public override void BuildDefinition()
{
_builder.AddTask(
wf => wf.MacValidateTask ,
wf =>
new()
{
MacAddress = wf.WorkflowInput.MacAddress.
},
"return { transformed_mac : $.mac_address.toLowerCase().replaceAll(':','')}"
);
}
As per the Conductor docs:
A Fork operation in conductor, lets you run a specified list of other tasks or sub workflows in parallel after the fork task. A fork task is followed by a join operation that waits on the forked tasks or sub workflows to finish. The JOIN task also collects outputs from each of the forked tasks or sub workflows.
In a regular fork operation (FORK_JOIN task), the list of tasks or sub workflows that need to be forked and run in parallel are already known at the time of workflow definition creation time. However, there are cases when that list can only be determined at run-time and that is when the dynamic fork operation (FORK_JOIN_DYNAMIC task) is needed.
There are three things that are needed to configure a FORK_JOIN_DYNAMIC task.
- A list of tasks or sub-workflows that needs to be forked and run in parallel.
- A list of inputs to each of these forked tasks or sub-workflows
- A task prior to the FORK_JOIN_DYNAMIC tasks outputs 1 and 2 above that can be wired in as in input to the FORK_JOIN_DYNAMIC tasks
The library simplifies the construction of dynamic fork joins by adding both the FORK_JOIN_DYNAMIC
and the JOIN
tasks using one builder. The DynamicTasks
and DynamicTasksI
parameters are provided by other Tasks or Workflows, and are bound to the DynamicForkJoin
Workflow task.
public DynamicForkJoinTaskModel DynamicForkJoin { get; set; }
_builder.AddTask(
a => a.DynamicForkJoin,
b =>
new()
{
DynamicTasks = b.PrepareDynamicProvisionTasks.Output.DynamicTasks,
DynamicTasksI = b.PrepareDynamicProvisionTasks.Output.DynamicTasksI
}
);
Each decision case is specified by using indexer init syntax in C#. Default case is specified using DefaultCase
property
public DecisionTaskModel ShouldRevertClassUpdateDecision { get;set; }
public TerminateTaskModel TerminateWorkflow { get;set; }
_builder.AddTask(
a => a.ShouldRevertClassUpdateDecision,
b => new() { CaseValueParam = "${original_workflow_tasks.output.tasks.update_class.status}" },
new()
{
["COMPLETED"] = c =>
{
c.AddTask(
d => d.RevertRegistryChange,
e =>
new()
{
Class = "${original_workflow_tasks.output.tasks.get_info.outputData.class}",
ServiceId =
"${original_workflow_tasks.output.tasks.get_info.outputData.id}",
}
);
c.AddTask(d => d.Wait, e => new() { Minutes = 1 });
},
DefaultCase = c =>
{
c.AddTask(
d => d.TerminateWorkflow,
e =>
new()
{
TerminationStatus = TerminationStatus.Completed,
WorkflowOutput = new { Message = "Class update was not reverted" }
}
);
}
}
);
public EmailPrepareV1 EmailPrepare { get; set; }
public override void BuildDefinition()
{
_builder.AddTask(
wf => wf.EmailPrepare,
wf =>
new()
{
Address = $"{wf.WorkflowInput.FirstInput},{wf.WorkflowInput.SecondInput}",
Name = $"Workflow name: {NamingUtil.NameOf<StringInterpolation>()}"
}
);
_builder.SetOutput(a => new() { EmailBody = a.EmailPrepare.Output.EmailBody });
}
public class ProvisionServiceInput : IRequest<ProvisionServiceOutput>
{
public dynamic Context { get;set }
public dynamic ServiceSpecification { get;set }
}
public class ProvisionServiceOutput { }
public class ProvisionService : SubWorkflowTaskModel<ProvisionServiceInput, ProvisionServiceOutput> { }
public ProvisionService ProvisionService { get;set; }
_builder.AddTask(
a => a.ProvisionService,
b => new() { Context = b.WorkflowInput.Context, ServiceSpecification = b.WorkflowInput.Specification }
);
public DecisionTaskModel DecisionTask { get; set; }
public TerminateTaskModel DecisionTerminate { get; set; }
_builder.AddTask(
wf => wf.DecisionTask,
wf => new() { CaseValueParam = "value" },
new()
{
["value"] = builder =>
builder.AddTask(
wf => wf.DecisionTerminate,
wf =>
new()
{
WorkflowOutput = new { Property = "Test" },
TerminationStatus = TerminationStatus.Completed
}
)
}
);
In case support for some task is missing you can add raw tasks to builder using AddTasks
method
_builder.AddTasks(
new WorkflowDefinition.Task
{
Name = "LAMBDA_return_data",
TaskReferenceName = "return_data",
Type = "LAMBDA",
Description = new JObject
{
new JProperty("description", "Lambda task to return data")
}.ToString(Newtonsoft.Json.Formatting.None),
InputParameters = new JObject
{
new JProperty("hostname", "${workflow.input.hostname}"),
new JProperty("additional_template", "${workflow.input.additional_template}"),
new JProperty("base_template", "${workflow.input.base_template}"),
new JProperty("licence", "${workflow.input.licence}"),
new JProperty("oam_domain", "${workflow.input.oam_domain}"),
new JProperty("platform_codename", "${workflow.input.platform_codename}"),
new JProperty("platform_name", "${workflow.input.platform_name}"),
new JProperty("software_version", "${workflow.input.software_version}"),
new JProperty("upstream_switch", "${workflow.input.upstream_switch}"),
new JProperty(
"upstream_switch_interface_name",
"${workflow.input.upstream_switch_interface_name}"
),
new JProperty(
"scriptExpression",
"return { "
+ "hostname : $.hostname,"
+ "additional_template : $.additional_template,"
+ "base_template : $.base_template,"
+ "licence : $.licence,"
+ "oam_domain : $.oam_domain,"
+ "platform_codename : $.platform_codename,"
+ "platform_name : $.platform_name,"
+ "software_version : $.software_version,"
+ "upstream_switch : $.upstream_switch,"
+ "upstream_switch_interface_name : $.upstream_switch_interface_name,"
+ "}"
)
}
}
);