Skip to content
Bojan Malinić edited this page Sep 27, 2023 · 58 revisions

Workflows and Tasks in conductor are JSON objects with properties specifying the name, input parameters, etc. These properties are referenced across different workflows and it can become hard to maintain the correctness of these definitions when updating Workflow properties. Any issues with Workflow definitions will be reported at Workflow registration time, but not while building the definition itself.

This library attempts to solve the problem by representing Workflows and Tasks as C# classes and using builders to bind their inputs and outputs.

Building Workflows

Workflows are built by inheriting the Workflow abstract generic class and overriding the BuildDefinition method. This method should make use of the WorkflowDefinitionBuilder to programatically build up the Workflow definition. The class is parameterized with the Workflow input, Workflow output and the concrete Workflow class (see example below).

In some cases (when dealing with legacy workflows, or in cases where the builder does not yet support a specific feature of conductor) the definition can be read from a file, or by overriding the GetDefinition method. This approach should be used as a last resort since it does not provide any compile time error checks.

Workflows are made up of Tasks, these can be SubWorkflows (other Workflows defined on Conductor), simple worker Tasks, and system Tasks. Adding tasks to Workflows is done by adding properties to the concrete Workflow class and using those as inputs for the AddTask method of the WorkflowDefinitionBuilder. These properties must be of type supported by one of the Builders defined. Builders are added by creating extensions for the WorkflowDefinitionBuilder class, the library provides the most commonly used ones, but new ones can be added.

public class SendCustomerNotificationInput : WorkflowInput<SendCustomerNotificationOutput>
{
    public int CustomerId { get; set; }
}

public class SendCustomerNotificationOutput : WorkflowOutput
{
    public dynamic EmailBody { get; set; }
}

[Version(3)]
[OriginalName("NOTIFICATION_send_to_customer")]
public class SendCustomerNotification : Workflow<SendCustomerNotification, SendCustomerNotificationInput, SendCustomerNotificationOutput>
{
    public SendCustomerNotification(
        WorkflowDefinitionBuilder<SendCustomerNotification, SendCustomerNotificationInput, SendCustomerNotificationOutput> builder
    ) : base(builder) { }

    public GetCustomerHandler GetCustomer { get; set; }
    public EmailPrepareV1 PrepareEmail { get; set; }

    public override void BuildDefinition()
    {
        _builder.AddTask(a => a.GetCustomer, b => new() { CustomerId = b.WorkflowInput.CustomerId });

        _builder.AddTask(a => a.PrepareEmail, b => new() { Address = b.GetCustomer.Output.Address, Name = b.GetCustomer.Output.Name });
    }
}

Without builder:

{
  "createTime": 0,
  "updateTime": 0,
  "name": "NOTIFICATION_send_to_customer",
  "description": "{\"description\":null,\"labels\":null}",
  "version": 3,
  "tasks": [
    {
      "name": "CUSTOMER_get",
      "taskReferenceName": "get_customer",
      "description": "{\"description\":null}",
      "inputParameters": {
        "customer_id": "${workflow.input.customer_id}"
      },
      "type": "SIMPLE",
      "startDelay": 0
    },
    {
      "name": "EMAIL_prepare",
      "taskReferenceName": "prepare_email",
      "description": "{\"description\":null}",
      "inputParameters": {
        "address": "${get_customer.output.address}",
        "name": "${get_customer.output.name}"
      },
      "type": "SIMPLE",
      "startDelay": 0
    }
  ],
  "inputParameters": [
    "{\"customer_id\":{\"value\":\"\",\"description\":\" (optional)\"}}"
  ],
  "schemaVersion": 2,
  "restartable": true,
  "workflowStatusListenerEnabled": true,
  "timeoutSeconds": 0
}

Adding task to workflow

As previously mentioned to add a task you must define property of certain type and then use AddTask extension method. For each task type there is a corresponding AddTask extension method.

Following table lists supported task types

Task model Conductor task type Comment
DecisionTaskModel DECISION
SimpleTaskModel<TInput, TOutput> SIMPLE TInput is task input type, TOutput is task output type
SubWorkflowTaskModel<TInput, TOutput> SUB_WORKFLOW TInput is task input type, TOutput is task output type
TerminateTaskModel TERMINATE
SwitchTaskModel SWITCH
DynamicForkJoinTaskModel DYNAMIC_FORK_JOIN,JOIN Generates DYNAMIC_FORK_JOIN followed by JOIN
LambdaTaskModel<TInput, TOutput> LAMBDA TInput is task input type, TOutput is task output type

| TaskRequestHandler<TInput, TOutput> | SIMPLE | Using handlers directly is supported as well |

Defining task type for simple tasks, subworkflows and lambda tasks

Usually SimpleTaskModel and SubWorkflowTaskModel are inherited since OriginalName attribute can be applied on a concrete class. This original name is then used when generating corresponding SIMPLE and SUB_WORKFLOW tasks. The same does not need to be done for lambda tasks since it is a system task.

TInput and TOutput contain properties that are used during generation of task input specification in workflow defintion(i.e. conductor expressions input_prop = ${task_reference_name.output.output_prop}. By default, PascalCase properties are renamed to snake_case. This can be overridden by specifying Netwonsoft [JsonProperty] on input/output property of corresponding TInput/TOutput class

Example SimpleTaskModel definition:

public partial class EmailPrepareV1Input : IRequest<EmailPrepareV1Output>
{
    public object Address { get; set; }
    public object Name { get; set; }
}

public partial class EmailPrepareV1Output
{
    [JsonProperty("example_body")
    public object EmailBody { get; set; }
}

[OriginalName("EMAIL_prepare")]
public partial class EmailPrepareV1 : SimpleTaskModel<EmailPrepareV1Input, EmailPrepareV1Output> { }

Example SubWorkflowTaskModel definition

public class VersionSubworkflowInput : IRequest<VersionSubworkflowOutput> { }

public class VersionSubworkflowOutput { }

[OriginalName("TEST_subworkflow")]
[Version(3)]
public class VersionSubworkflow : SubWorkflowTaskModel<VersionSubworkflowInput, VersionSubworkflowOutput> { }

Note the usage of Version attribute. Workflows in conductor are identified by name,version pair. If not specified then version 1 will be used when generating subworkflow task definition.

Example usage of LambdaTaskModel in workflow

   public class StringAdditionInput : WorkflowInput<StringAdditionOutput>
   {
       public string Input { get; set; }
   }

   public class StringAdditionOutput : WorkflowOutput { }

   [OriginalName("string_addition")]
   public class StringAddition : Workflow<StringAddition, StringAdditionInput, StringAdditionOutput>
   {
       public StringAddition(WorkflowDefinitionBuilder<StringAddition, StringAdditionInput, StringAdditionOutput> builder) : base(builder) { }

       public class StringTaskOutput { public string Output { get; set; } }

       public class StringTaskInput : IRequest<StringTaskOutput>
       {
           public string Input { get; set; }
       }

       public LambdaTaskModel<StringTaskInput, StringTaskOutput> StringTask { get; set; }

       public override void BuildDefinition()
       {
           _builder.AddTask(
               wf => wf.StringTask,
               wf =>
                   new()
                   {
                       Input = wf.WorkflowInput.Input
                   },
               "return {output: $.input + '_example_string'}"
           );
       }
   }

Although LAMBDA tasks can return any javascript type, in ConductorSharp LambdaTaskModel expects lambda tasks to return object with properties contained in TOutput class which is used with LambdaTaskModel. Make sure property names match i.e. javascript lambda must return snake_case properties(unless output property is renamed). Same holds for input properties.

Clone this wiki locally