diff --git a/sdk/model/extended_workflow_def.go b/sdk/model/extended_workflow_def.go index 1f329361..e3a5cfe9 100644 --- a/sdk/model/extended_workflow_def.go +++ b/sdk/model/extended_workflow_def.go @@ -38,26 +38,27 @@ type ExtendedWorkflowDef struct { func NewExtendedWorkflowDef(workflowDef WorkflowDef, tags []TagObject, overwriteTags bool) ExtendedWorkflowDef { return ExtendedWorkflowDef{ - Name: workflowDef.Name, - Description: workflowDef.Description, - Tasks: workflowDef.Tasks, - InputParameters: workflowDef.InputParameters, - InputTemplate: workflowDef.InputTemplate, - OutputParameters: workflowDef.OutputParameters, - FailureWorkflow: workflowDef.FailureWorkflow, - Variables: workflowDef.Variables, - OwnerApp: workflowDef.OwnerApp, - OwnerEmail: workflowDef.OwnerEmail, - CreateTime: workflowDef.CreateTime, - CreatedBy: workflowDef.CreatedBy, - UpdateTime: workflowDef.UpdateTime, - UpdatedBy: workflowDef.UpdatedBy, - Restartable: workflowDef.Restartable, - SchemaVersion: workflowDef.SchemaVersion, - Version: workflowDef.Version, - TimeoutPolicy: workflowDef.TimeoutPolicy, - TimeoutSeconds: workflowDef.TimeoutSeconds, - Tags: tags, - OverwriteTags: overwriteTags, + Name: workflowDef.Name, + Description: workflowDef.Description, + Tasks: workflowDef.Tasks, + InputParameters: workflowDef.InputParameters, + InputTemplate: workflowDef.InputTemplate, + OutputParameters: workflowDef.OutputParameters, + FailureWorkflow: workflowDef.FailureWorkflow, + Variables: workflowDef.Variables, + OwnerApp: workflowDef.OwnerApp, + OwnerEmail: workflowDef.OwnerEmail, + CreateTime: workflowDef.CreateTime, + CreatedBy: workflowDef.CreatedBy, + UpdateTime: workflowDef.UpdateTime, + UpdatedBy: workflowDef.UpdatedBy, + Restartable: workflowDef.Restartable, + SchemaVersion: workflowDef.SchemaVersion, + Version: workflowDef.Version, + TimeoutPolicy: workflowDef.TimeoutPolicy, + TimeoutSeconds: workflowDef.TimeoutSeconds, + Tags: tags, + OverwriteTags: overwriteTags, + WorkflowStatusListenerEnabled: workflowDef.WorkflowStatusListenerEnabled, } } diff --git a/sdk/workflow/workflow.go b/sdk/workflow/workflow.go index 8502efbb..282495ae 100644 --- a/sdk/workflow/workflow.go +++ b/sdk/workflow/workflow.go @@ -25,20 +25,21 @@ const ( ) type ConductorWorkflow struct { - executor *executor.WorkflowExecutor - name string - version int32 - description string - ownerEmail string - tasks []TaskInterface - timeoutPolicy TimeoutPolicy - timeoutSeconds int64 - failureWorkflow string - inputParameters []string - outputParameters map[string]interface{} - inputTemplate map[string]interface{} - variables map[string]interface{} - restartable bool + executor *executor.WorkflowExecutor + name string + version int32 + description string + ownerEmail string + tasks []TaskInterface + timeoutPolicy TimeoutPolicy + timeoutSeconds int64 + failureWorkflow string + inputParameters []string + outputParameters map[string]interface{} + inputTemplate map[string]interface{} + variables map[string]interface{} + restartable bool + workflowStatusListenerEnabled bool } func NewConductorWorkflow(executor *executor.WorkflowExecutor) *ConductorWorkflow { @@ -89,6 +90,12 @@ func (workflow *ConductorWorkflow) Restartable(restartable bool) *ConductorWorkf return workflow } +// WorkflowStatusListenerEnabled if the workflow status listener need to be enabled. +func (workflow *ConductorWorkflow) WorkflowStatusListenerEnabled(workflowStatusListenerEnabled bool) *ConductorWorkflow { + workflow.workflowStatusListenerEnabled = workflowStatusListenerEnabled + return workflow +} + // OutputParameters Workflow outputs. Workflow output follows similar structure as task inputs // See https://conductor.netflix.com/how-tos/Tasks/task-inputs.html for more details func (workflow *ConductorWorkflow) OutputParameters(outputParameters interface{}) *ConductorWorkflow { @@ -223,19 +230,21 @@ func getInputAsMap(input interface{}) map[string]interface{} { // ToWorkflowDef converts the workflow to the JSON serializable format func (workflow *ConductorWorkflow) ToWorkflowDef() *model.WorkflowDef { return &model.WorkflowDef{ - Name: workflow.name, - Description: workflow.description, - Version: workflow.version, - Tasks: getWorkflowTasksFromConductorWorkflow(workflow), - InputParameters: workflow.inputParameters, - OutputParameters: workflow.outputParameters, - FailureWorkflow: workflow.failureWorkflow, - SchemaVersion: 2, - OwnerEmail: workflow.ownerEmail, - TimeoutPolicy: string(workflow.timeoutPolicy), - TimeoutSeconds: workflow.timeoutSeconds, - Variables: workflow.variables, - InputTemplate: workflow.inputTemplate, + Name: workflow.name, + Description: workflow.description, + Version: workflow.version, + Tasks: getWorkflowTasksFromConductorWorkflow(workflow), + InputParameters: workflow.inputParameters, + OutputParameters: workflow.outputParameters, + FailureWorkflow: workflow.failureWorkflow, + SchemaVersion: 2, + OwnerEmail: workflow.ownerEmail, + TimeoutPolicy: string(workflow.timeoutPolicy), + TimeoutSeconds: workflow.timeoutSeconds, + Variables: workflow.variables, + InputTemplate: workflow.inputTemplate, + Restartable: workflow.restartable, + WorkflowStatusListenerEnabled: workflow.workflowStatusListenerEnabled, } } diff --git a/test/integration_tests/workflow_def_test.go b/test/integration_tests/workflow_def_test.go index 99fcd03d..51628473 100644 --- a/test/integration_tests/workflow_def_test.go +++ b/test/integration_tests/workflow_def_test.go @@ -34,6 +34,7 @@ func TestHttpTask(t *testing.T) { Name("TEST_GO_WORKFLOW_HTTP"). OwnerEmail("test@orkes.io"). Version(1). + WorkflowStatusListenerEnabled(true). Add(common.TestHttpTask) err := testdata.ValidateWorkflow(httpTaskWorkflow, common.WorkflowValidationTimeout, model.CompletedWorkflow) if err != nil {