-
Notifications
You must be signed in to change notification settings - Fork 2
Home
Workflows and Tasks in conductor are JSON objects with properties specifying the name, input parameters, etc. These properties are referenced across different workflows and it can become hard to maintain the correctness of these definitions when updating Workflow properties. Any issues with Workflow definitions will be reported at Workflow registration time, but not while building the definition itself.
This library attempts to solve the problem by representing Workflows and Tasks as C# classes and using builders to bind their inputs and outputs.
Workflows are built by inheriting the Workflow
abstract generic class and overriding the BuildDefinition
method. This method should make use of the WorkflowDefinitionBuilder
to programmatically build up the Workflow definition. The class is parameterized with the Workflow input, Workflow output and the concrete Workflow class (see example below).
Remove this
In some cases (when dealing with legacy workflows, or in cases where the builder does not yet support a specific feature of conductor) the definition can be read from a file, or by overriding the GetDefinition
method. This approach should be used as a last resort since it does not provide any compile time error checks.
Workflows are made up of Tasks, these can be SubWorkflows (other Workflows defined on Conductor), simple worker Tasks, and system Tasks. Adding tasks to Workflows is done by adding properties to the concrete Workflow class and using those as inputs for the AddTask
method of the WorkflowDefinitionBuilder
. These properties must be of type supported by one of the Builders defined. Builders are added by creating extensions for the WorkflowDefinitionBuilder
class, the library provides the most commonly used ones, but new ones can be added.
public class SendCustomerNotificationInput : WorkflowInput<SendCustomerNotificationOutput>
{
public int CustomerId { get; set; }
}
public class SendCustomerNotificationOutput : WorkflowOutput
{
public dynamic EmailBody { get; set; }
}
[Version(3)]
[OriginalName("NOTIFICATION_send_to_customer")]
public class SendCustomerNotification : Workflow<SendCustomerNotification, SendCustomerNotificationInput, SendCustomerNotificationOutput>
{
public SendCustomerNotification(
WorkflowDefinitionBuilder<SendCustomerNotification, SendCustomerNotificationInput, SendCustomerNotificationOutput> builder
) : base(builder) { }
public GetCustomerHandler GetCustomer { get; set; }
public EmailPrepareV1 PrepareEmail { get; set; }
public override void BuildDefinition()
{
_builder.AddTask(a => a.GetCustomer, b => new() { CustomerId = b.WorkflowInput.CustomerId });
_builder.AddTask(a => a.PrepareEmail, b => new() { Address = b.GetCustomer.Output.Address, Name = b.GetCustomer.Output.Name });
}
}
Without builder:
{
"createTime": 0,
"updateTime": 0,
"name": "NOTIFICATION_send_to_customer",
"description": "{\"description\":null,\"labels\":null}",
"version": 3,
"tasks": [
{
"name": "CUSTOMER_get",
"taskReferenceName": "get_customer",
"description": "{\"description\":null}",
"inputParameters": {
"customer_id": "${workflow.input.customer_id}"
},
"type": "SIMPLE",
"startDelay": 0
},
{
"name": "EMAIL_prepare",
"taskReferenceName": "prepare_email",
"description": "{\"description\":null}",
"inputParameters": {
"address": "${get_customer.output.address}",
"name": "${get_customer.output.name}"
},
"type": "SIMPLE",
"startDelay": 0
}
],
"inputParameters": [
"{\"customer_id\":{\"value\":\"\",\"description\":\" (optional)\"}}"
],
"schemaVersion": 2,
"restartable": true,
"workflowStatusListenerEnabled": true,
"timeoutSeconds": 0
}
As previously mentioned to add a task you must define property of certain type and then use AddTask extension method. For each task type there is a corresponding AddTask extension method.
Following table lists supported task types
Task model | Conductor task type | Comment |
---|---|---|
DecisionTaskModel | DECISION | |
SimpleTaskModel<TInput, TOutput> | SIMPLE | TInput is task input type, TOutput is task output type |
SubWorkflowTaskModel<TInput, TOutput> | SUB_WORKFLOW | TInput is task input type, TOutput is task output type |
TerminateTaskModel | TERMINATE | |
SwitchTaskModel | SWITCH | |
DynamicForkJoinTaskModel | DYNAMIC_FORK_JOIN,JOIN | Generates DYNAMIC_FORK_JOIN followed by JOIN |
LambdaTaskModel<TInput, TOutput> | LAMBDA | TInput is task input type, TOutput is task output type |
TaskRequestHandler<TInput, TOutput> | SIMPLE | Using handlers directly is supported as well |
WaitTaskModel | WAIT | |
HumanTaskModel | HUMAN | |
JsonJqTransformTaskModel | JSON_JQ_TRANSFORM | |
DynamicTaskModel | DYNAMIC |
For each task type there is corresponding AddTask
method on WorkflowDefinitionBuilder
which is used to specify task in workflow. Here is an example how to add task and specify its input properties
_builder.AddTask(
wf => wf.EmailPrepare,
wf =>
new()
{
Address = $"{wf.WorkflowInput.FirstInput},{wf.WorkflowInput.SecondInput}",
}
);
Usually SimpleTaskModel
and SubWorkflowTaskModel
are inherited since OriginalName
attribute can be applied on a concrete class. This original name is then used when generating corresponding SIMPLE and SUB_WORKFLOW tasks. The same does not need to be done for lambda tasks since it is a system task.
TInput
and TOutput
contain properties that are used during generation of task input specification in workflow definition (i.e. conductor expressions input_prop = ${task_reference_name.output.output_prop}
). By default, PascalCase properties are renamed to snake_case. This can be overridden by specifying Netwonsoft [JsonProperty]
on input/output property of corresponding TInput
/TOutput
class
public partial class EmailPrepareV1Input : IRequest<EmailPrepareV1Output>
{
public object Address { get; set; }
public object Name { get; set; }
}
public partial class EmailPrepareV1Output
{
[JsonProperty("example_body")
public object EmailBody { get; set; }
}
[OriginalName("EMAIL_prepare")]
public partial class EmailPrepareV1 : SimpleTaskModel<EmailPrepareV1Input, EmailPrepareV1Output> { }
public class VersionSubworkflowInput : IRequest<VersionSubworkflowOutput> { }
public class VersionSubworkflowOutput { }
[OriginalName("TEST_subworkflow")]
[Version(3)]
public class VersionSubworkflow : SubWorkflowTaskModel<VersionSubworkflowInput, VersionSubworkflowOutput> { }
Note the usage of Version
attribute. Workflows in conductor are identified by name,version pair. If not specified then version 1 will be used when generating subworkflow task definition.
public class StringAdditionInput : WorkflowInput<StringAdditionOutput>
{
public string Input { get; set; }
}
public class StringAdditionOutput : WorkflowOutput { }
[OriginalName("string_addition")]
public class StringAddition : Workflow<StringAddition, StringAdditionInput, StringAdditionOutput>
{
public StringAddition(WorkflowDefinitionBuilder<StringAddition, StringAdditionInput, StringAdditionOutput> builder) : base(builder) { }
public class StringTaskOutput { public string Output { get; set; } }
public class StringTaskInput : IRequest<StringTaskOutput>
{
public string Input { get; set; }
}
public LambdaTaskModel<StringTaskInput, StringTaskOutput> StringTask { get; set; }
public override void BuildDefinition()
{
_builder.AddTask(
wf => wf.StringTask,
wf =>
new()
{
Input = wf.WorkflowInput.Input
},
"return {output: $.input + '_example_string'}"
);
}
}
Although LAMBDA tasks can return any javascript type, in ConductorSharp LambdaTaskModel
expects lambda tasks to return object with properties contained in TOutput
class which is used with LambdaTaskModel
. Make sure property names match i.e. javascript lambda must return snake_case properties(unless output property is renamed). The same holds for input properties.
In conductor task inputs in workflow are specified using conductor expressions which have the following format: ${SOURCE.input/output.JSONPath}
.
SOURCE can be 'workflow' or task reference name in workflow definition, input/output can refer to input of the workflow or output of the task.
JSONPath is used to traverse the input/output object.
ConductorSharp generates these expressions when writing workflows. Here is an example of input specification
wf =>
new PrepareEmailRequest
{
CustomerName = $"{wf.GetCustomer.Output.FirstName} {wf.GetCustomer.Output.LastName}",
Address = wf.WorkflowInput.Address
}
This is converted to the following conductor input parameters specification
"inputParameters": {
"customer_name": "${get_customer.output.first_name} ${get_customer.output.last_name}",
"address": "${workflow.input.address}",
}
In some scenarios input/output parameters can be of the different type. In that case casting can be used.
Following input specification:
wf =>
new PrepareEmailRequest
{
CustomerName = ((FullName)wf.GetCustomer.Output.Name).FirstName,
Address = (string)wf.GetCustomer.Output.Address
}
is translated to:
"inputParameters": {
"customer_name": "${get_customer.output.name.first_name}",
"address": "${get_customer.output.address}"
}
Array initialization is supported. Array can be typed or dynamic.
Following input specification:
wf =>
new()
{
Integers = new[] { 1, 2, 3 },
TestModelList = new List<ArrayTaskInput.TestModel>
{
new ArrayTaskInput.TestModel { String = wf.Input.TestValue },
new ArrayTaskInput.TestModel { String = "List2" }
},
Models = new[]
{
new ArrayTaskInput.TestModel { String = "Test1" },
new ArrayTaskInput.TestModel { String = "Test2" }
},
Objects = new dynamic[] { new { AnonymousObjProp = "Prop" }, new { Test = "Prop" } }
}
is translated to:
"inputParameters": {
"integers": [
1,
2,
3
],
"test_model_list": [
{
"string": "${workflow.input.test_value}"
},
{
"string": "List2"
}
],
"models": [
{
"string": "Test1"
},
{
"string": "Test2"
}
],
"objects": [
{
"anonymous_obj_prop": "Prop"
},
{
"test": "Prop"
}
]
}
Object initialization is supported.
Anonymous objects are supported when initializing sub-properties.
Following input specification:
wf =>
new()
{
NestedObjects = new TestModel
{
Integer = 1,
String = "test",
Object = new TestModel
{
Integer = 1,
String = "string",
Object = new { NestedInput = "1" }
}
},
}
is translated to:
"inputParameters": {
"nested_objects": {
"integer": 1,
"string": "test",
"object": {
"integer": 1,
"string": "string",
"object": {
"nested_input": "1"
}
}
}
}
Dictionary indexing is supported.
Indexing using an indexer on arbitrary type is currently not supported.
wf =>
new()
{
CustomerName = wf.WorkflowInput.Dictionary["test"].CustomerName,
Address = wf.WorkflowInput.DoubleDictionary["test"]["address"]
}
is translated to:
"inputParameters": {
"customer_name": "${workflow.input.dictionary['test'].customer_name}",
"address": "${workflow.input.double_dictionary['test']['address']}"
}
It is possible to embed the name of any workflow in task input specification.
Following input specification:
wf =>
new()
{
Name = $"Workflow name: {NamingUtil.NameOf<StringInterpolation>()}"
WfName = NamingUtil.Nameof<StringInterpolation>()
}
is translated to:
"inputParameters": {,
"name": "Workflow name: TEST_StringInterpolation"
"wf_name": "Test_StringInterpolation"
}
StringInterpolation
has an attribute [OriginalName("TEST_StringInterpolation")]
applied.
String concatenation is supported. Also, it is possible to concatenate strings with numbers, input/output parameters and interpolation strings.
Following input specification:
wf =>
new()
{
Input =
1
+ "Str_"
+ "2Str_"
+ wf.WorkflowInput.Input
+ $"My input: {wf.WorkflowInput.Input}"
+ NamingUtil.NameOf<StringAddition>()
+ 1
}
is translated to:
"inputParameters": {
"input": "1Str_2Str_${workflow.input.input}My input: ${workflow.input.input}string_addition1",
}
StringAddition
has an attribute [OriginalName("string_addition")]
applied.