Skip to content
Bojan Malinić edited this page Sep 28, 2023 · 58 revisions

Workflows and Tasks in conductor are JSON objects with properties specifying the name, input parameters, etc. These properties are referenced across different workflows and it can become hard to maintain the correctness of these definitions when updating Workflow properties. Any issues with Workflow definitions will be reported at Workflow registration time, but not while building the definition itself.

This library attempts to solve the problem by representing Workflows and Tasks as C# classes and using builders to bind their inputs and outputs.

Building Workflows

Workflows are built by inheriting the Workflow abstract generic class and overriding the BuildDefinition method. This method should make use of the WorkflowDefinitionBuilder to programmatically build up the Workflow definition. The class is parameterized with the Workflow input, Workflow output and the concrete Workflow class (see example below).

In some cases (when dealing with legacy workflows, or in cases where the builder does not yet support a specific feature of conductor) the definition can be read from a file, or by overriding the GetDefinition method. This approach should be used as a last resort since it does not provide any compile time error checks.

Workflows are made up of Tasks, these can be SubWorkflows (other Workflows defined on Conductor), simple worker Tasks, and system Tasks. Adding tasks to Workflows is done by adding properties to the concrete Workflow class and using those as inputs for the AddTask method of the WorkflowDefinitionBuilder. These properties must be of type supported by one of the Builders defined. Builders are added by creating extensions for the WorkflowDefinitionBuilder class, the library provides the most commonly used ones, but new ones can be added.

public class SendCustomerNotificationInput : WorkflowInput<SendCustomerNotificationOutput>
{
    public int CustomerId { get; set; }
}

public class SendCustomerNotificationOutput : WorkflowOutput
{
    public dynamic EmailBody { get; set; }
}

[Version(3)]
[OriginalName("NOTIFICATION_send_to_customer")]
public class SendCustomerNotification : Workflow<SendCustomerNotification, SendCustomerNotificationInput, SendCustomerNotificationOutput>
{
    public SendCustomerNotification(
        WorkflowDefinitionBuilder<SendCustomerNotification, SendCustomerNotificationInput, SendCustomerNotificationOutput> builder
    ) : base(builder) { }

    public GetCustomerHandler GetCustomer { get; set; }
    public EmailPrepareV1 PrepareEmail { get; set; }

    public override void BuildDefinition()
    {
        _builder.AddTask(a => a.GetCustomer, b => new() { CustomerId = b.WorkflowInput.CustomerId });

        _builder.AddTask(a => a.PrepareEmail, b => new() { Address = b.GetCustomer.Output.Address, Name = b.GetCustomer.Output.Name });
    }
}

Without builder:

{
  "createTime": 0,
  "updateTime": 0,
  "name": "NOTIFICATION_send_to_customer",
  "description": "{\"description\":null,\"labels\":null}",
  "version": 3,
  "tasks": [
    {
      "name": "CUSTOMER_get",
      "taskReferenceName": "get_customer",
      "description": "{\"description\":null}",
      "inputParameters": {
        "customer_id": "${workflow.input.customer_id}"
      },
      "type": "SIMPLE",
      "startDelay": 0
    },
    {
      "name": "EMAIL_prepare",
      "taskReferenceName": "prepare_email",
      "description": "{\"description\":null}",
      "inputParameters": {
        "address": "${get_customer.output.address}",
        "name": "${get_customer.output.name}"
      },
      "type": "SIMPLE",
      "startDelay": 0
    }
  ],
  "inputParameters": [
    "{\"customer_id\":{\"value\":\"\",\"description\":\" (optional)\"}}"
  ],
  "schemaVersion": 2,
  "restartable": true,
  "workflowStatusListenerEnabled": true,
  "timeoutSeconds": 0
}

Adding task to workflow

As previously mentioned to add a task you must define property of certain type and then use AddTask extension method. For each task type there is a corresponding AddTask extension method.

Following table lists supported task types

Task model Conductor task type Comment
DecisionTaskModel DECISION
SimpleTaskModel<TInput, TOutput> SIMPLE TInput is task input type, TOutput is task output type
SubWorkflowTaskModel<TInput, TOutput> SUB_WORKFLOW TInput is task input type, TOutput is task output type
TerminateTaskModel TERMINATE
SwitchTaskModel SWITCH
DynamicForkJoinTaskModel DYNAMIC_FORK_JOIN,JOIN Generates DYNAMIC_FORK_JOIN followed by JOIN
LambdaTaskModel<TInput, TOutput> LAMBDA TInput is task input type, TOutput is task output type
TaskRequestHandler<TInput, TOutput> SIMPLE Using handlers directly is supported as well

For each task type there is corresponding AddTask method on WorkflowDefinitionBuilder which is used to specify task in workflow. Here is an example how to add task and specify its input properties

_builder.AddTask(
        wf => wf.EmailPrepare,
        wf =>
            new()
            {
                Address = $"{wf.WorkflowInput.FirstInput},{wf.WorkflowInput.SecondInput}",
            }
    );

Defining task type for simple tasks, subworkflows and lambda tasks

Usually SimpleTaskModel and SubWorkflowTaskModel are inherited since OriginalName attribute can be applied on a concrete class. This original name is then used when generating corresponding SIMPLE and SUB_WORKFLOW tasks. The same does not need to be done for lambda tasks since it is a system task.

TInput and TOutput contain properties that are used during generation of task input specification in workflow definition (i.e. conductor expressions input_prop = ${task_reference_name.output.output_prop}). By default, PascalCase properties are renamed to snake_case. This can be overridden by specifying Netwonsoft [JsonProperty] on input/output property of corresponding TInput/TOutput class

Example SimpleTaskModel definition:

public partial class EmailPrepareV1Input : IRequest<EmailPrepareV1Output>
{
    public object Address { get; set; }
    public object Name { get; set; }
}

public partial class EmailPrepareV1Output
{
    [JsonProperty("example_body")
    public object EmailBody { get; set; }
}

[OriginalName("EMAIL_prepare")]
public partial class EmailPrepareV1 : SimpleTaskModel<EmailPrepareV1Input, EmailPrepareV1Output> { }

Example SubWorkflowTaskModel definition

public class VersionSubworkflowInput : IRequest<VersionSubworkflowOutput> { }

public class VersionSubworkflowOutput { }

[OriginalName("TEST_subworkflow")]
[Version(3)]
public class VersionSubworkflow : SubWorkflowTaskModel<VersionSubworkflowInput, VersionSubworkflowOutput> { }

Note the usage of Version attribute. Workflows in conductor are identified by name,version pair. If not specified then version 1 will be used when generating subworkflow task definition.

Example usage of LambdaTaskModel in workflow

   public class StringAdditionInput : WorkflowInput<StringAdditionOutput>
   {
       public string Input { get; set; }
   }

   public class StringAdditionOutput : WorkflowOutput { }

   [OriginalName("string_addition")]
   public class StringAddition : Workflow<StringAddition, StringAdditionInput, StringAdditionOutput>
   {
       public StringAddition(WorkflowDefinitionBuilder<StringAddition, StringAdditionInput, StringAdditionOutput> builder) : base(builder) { }

       public class StringTaskOutput { public string Output { get; set; } }

       public class StringTaskInput : IRequest<StringTaskOutput>
       {
           public string Input { get; set; }
       }

       public LambdaTaskModel<StringTaskInput, StringTaskOutput> StringTask { get; set; }

       public override void BuildDefinition()
       {
           _builder.AddTask(
               wf => wf.StringTask,
               wf =>
                   new()
                   {
                       Input = wf.WorkflowInput.Input
                   },
               "return {output: $.input + '_example_string'}"
           );
       }
   }

Although LAMBDA tasks can return any javascript type, in ConductorSharp LambdaTaskModel expects lambda tasks to return object with properties contained in TOutput class which is used with LambdaTaskModel. Make sure property names match i.e. javascript lambda must return snake_case properties(unless output property is renamed). The same holds for input properties.

Task input specification

In conductor task inputs in workflow are specified using conductor expressions which have the following format: ${SOURCE.input/output.JSONPath}. SOURCE can be 'workflow' or task reference name in workflow definition, input/output can refer to input of the workflow or output of the task. JSONPath is used to traverse the input/output object.

ConductorSharp generates these expressions when writing workflows. Here is an example of input specification

wf =>
    new PrepareEmailRequest
    {
        CustomerName = $"{wf.GetCustomer.Output.FirstName} {wf.GetCustomer.Output.LastName}",
        Address = wf.WorkflowInput.Address
    }

This is converted to the following conductor input parameters specification

"inputParameters": {
        "customer_name": "${get_customer.output.first_name} ${get_customer.output.last_name}",
        "address": "${workflow.input.address}",
      }

Casting

In some scenarios input/output parameters can be of the different type. In that case casting can be used.

Following input specification:

wf =>
    new PrepareEmailRequest
    {
        CustomerName = ((FullName)wf.GetCustomer.Output.Name).FirstName,
        Address = (string)wf.GetCustomer.Output.Address
    }

is translated to:

"inputParameters": {
  "customer_name": "${get_customer.output.name.first_name}",
  "address": "${get_customer.output.address}"
}

Array initalization

Array initialization is supported. Array can be typed or dynamic.

Following input specification:

wf =>
    new()
    {
        Integers = new[] { 1, 2, 3 },
        TestModelList = new List<ArrayTaskInput.TestModel>
        {
            new ArrayTaskInput.TestModel { String = wf.Input.TestValue },
            new ArrayTaskInput.TestModel { String = "List2" }
        },
        Models = new[]
        {
            new ArrayTaskInput.TestModel { String = "Test1" },
            new ArrayTaskInput.TestModel { String = "Test2" }
        },
        Objects = new dynamic[] { new { AnonymousObjProp = "Prop" }, new { Test = "Prop" } }
    }

is translated to:

"inputParameters": {
  "integers": [
    1,
    2,
    3
  ],
  "test_model_list": [
    {
      "string": "${workflow.input.test_value}"
    },
    {
      "string": "List2"
    }
  ],
  "models": [
    {
      "string": "Test1"
    },
    {
      "string": "Test2"
    }
  ],
  "objects": [
    {
      "anonymous_obj_prop": "Prop"
    },
    {
      "test": "Prop"
    }
  ]
}

Object initialization

Object initialization is supported.
Anonymous objects are supported when initializing sub-properties.

Following input specification:

wf =>
    new()
    {
        NestedObjects = new TestModel
        {
            Integer = 1,
            String = "test",
            Object = new TestModel
            {
                Integer = 1,
                String = "string",
                Object = new { NestedInput = "1" }
            }
        },
    }

is translated to:

"inputParameters": {
  "nested_objects": {
    "integer": 1,
    "string": "test",
    "object": {
      "integer": 1,
      "string": "string",
      "object": {
        "nested_input": "1"
      }
    }
  }
}

Indexing

Dictionary indexing is supported
Indexing using an indexer is currently not supported.

wf =>
    new()
    {
        CustomerName = wf.WorkflowInput.Dictionary["test"].CustomerName,
        Address = wf.WorkflowInput.DoubleDictionary["test"]["address"]
    }

is translated to:

"inputParameters": {
  "customer_name": "${workflow.input.dictionary['test'].customer_name}",
  "address": "${workflow.input.double_dictionary['test']['address']}"
}

Workflow name

It is possible to embed the name of any workflow in task input specification.

Following input specification:

wf =>
    new()
    {
        Name = $"Workflow name: {NamingUtil.NameOf<StringInterpolation>()}"
        WfName = NamingUtil.Nameof<StringInterpolation>()
    }

is translated to:

"inputParameters": {,
  "name": "Workflow name: TEST_StringInterpolation"
  "wf_name": "Test_StringInterpolation"
}

StringInterpolation has an attribute [OriginalName("TEST_StringInterpolation")] applied.

String concatenation

String concatenation is supported. Also, it is possible to concatenate strings with numbers, input/output parameters, and interpolation strings

Following input specification:

wf =>
    new()
    {
        Input =
            1
            + "Str_"
            + "2Str_"
            + wf.WorkflowInput.Input
            + $"My input: {wf.WorkflowInput.Input}"
            + NamingUtil.NameOf<StringAddition>()
            + 1
    }

is translated to:

"inputParameters": {
  "input": "1Str_2Str_${workflow.input.input}My input: ${workflow.input.input}string_addition1",
}

StringAddition has an attribute [OriginalName("string_addition")] applied.

Clone this wiki locally