Skip to content

Commit

Permalink
Added remaining fields in workflow definition. (#122)
Browse files Browse the repository at this point in the history
* Added remaining fields in workflow definition.

* update comment.

---------

Co-authored-by: manan164 <[email protected]>
  • Loading branch information
manan164 and manan164 authored Nov 3, 2023
1 parent cf4b3cf commit 921daee
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 48 deletions.
43 changes: 22 additions & 21 deletions sdk/model/extended_workflow_def.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,26 +38,27 @@ type ExtendedWorkflowDef struct {

func NewExtendedWorkflowDef(workflowDef WorkflowDef, tags []TagObject, overwriteTags bool) ExtendedWorkflowDef {
return ExtendedWorkflowDef{
Name: workflowDef.Name,
Description: workflowDef.Description,
Tasks: workflowDef.Tasks,
InputParameters: workflowDef.InputParameters,
InputTemplate: workflowDef.InputTemplate,
OutputParameters: workflowDef.OutputParameters,
FailureWorkflow: workflowDef.FailureWorkflow,
Variables: workflowDef.Variables,
OwnerApp: workflowDef.OwnerApp,
OwnerEmail: workflowDef.OwnerEmail,
CreateTime: workflowDef.CreateTime,
CreatedBy: workflowDef.CreatedBy,
UpdateTime: workflowDef.UpdateTime,
UpdatedBy: workflowDef.UpdatedBy,
Restartable: workflowDef.Restartable,
SchemaVersion: workflowDef.SchemaVersion,
Version: workflowDef.Version,
TimeoutPolicy: workflowDef.TimeoutPolicy,
TimeoutSeconds: workflowDef.TimeoutSeconds,
Tags: tags,
OverwriteTags: overwriteTags,
Name: workflowDef.Name,
Description: workflowDef.Description,
Tasks: workflowDef.Tasks,
InputParameters: workflowDef.InputParameters,
InputTemplate: workflowDef.InputTemplate,
OutputParameters: workflowDef.OutputParameters,
FailureWorkflow: workflowDef.FailureWorkflow,
Variables: workflowDef.Variables,
OwnerApp: workflowDef.OwnerApp,
OwnerEmail: workflowDef.OwnerEmail,
CreateTime: workflowDef.CreateTime,
CreatedBy: workflowDef.CreatedBy,
UpdateTime: workflowDef.UpdateTime,
UpdatedBy: workflowDef.UpdatedBy,
Restartable: workflowDef.Restartable,
SchemaVersion: workflowDef.SchemaVersion,
Version: workflowDef.Version,
TimeoutPolicy: workflowDef.TimeoutPolicy,
TimeoutSeconds: workflowDef.TimeoutSeconds,
Tags: tags,
OverwriteTags: overwriteTags,
WorkflowStatusListenerEnabled: workflowDef.WorkflowStatusListenerEnabled,
}
}
63 changes: 36 additions & 27 deletions sdk/workflow/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,21 @@ const (
)

type ConductorWorkflow struct {
executor *executor.WorkflowExecutor
name string
version int32
description string
ownerEmail string
tasks []TaskInterface
timeoutPolicy TimeoutPolicy
timeoutSeconds int64
failureWorkflow string
inputParameters []string
outputParameters map[string]interface{}
inputTemplate map[string]interface{}
variables map[string]interface{}
restartable bool
executor *executor.WorkflowExecutor
name string
version int32
description string
ownerEmail string
tasks []TaskInterface
timeoutPolicy TimeoutPolicy
timeoutSeconds int64
failureWorkflow string
inputParameters []string
outputParameters map[string]interface{}
inputTemplate map[string]interface{}
variables map[string]interface{}
restartable bool
workflowStatusListenerEnabled bool
}

func NewConductorWorkflow(executor *executor.WorkflowExecutor) *ConductorWorkflow {
Expand Down Expand Up @@ -89,6 +90,12 @@ func (workflow *ConductorWorkflow) Restartable(restartable bool) *ConductorWorkf
return workflow
}

// WorkflowStatusListenerEnabled if the workflow status listener need to be enabled.
func (workflow *ConductorWorkflow) WorkflowStatusListenerEnabled(workflowStatusListenerEnabled bool) *ConductorWorkflow {
workflow.workflowStatusListenerEnabled = workflowStatusListenerEnabled
return workflow
}

// OutputParameters Workflow outputs. Workflow output follows similar structure as task inputs
// See https://conductor.netflix.com/how-tos/Tasks/task-inputs.html for more details
func (workflow *ConductorWorkflow) OutputParameters(outputParameters interface{}) *ConductorWorkflow {
Expand Down Expand Up @@ -223,19 +230,21 @@ func getInputAsMap(input interface{}) map[string]interface{} {
// ToWorkflowDef converts the workflow to the JSON serializable format
func (workflow *ConductorWorkflow) ToWorkflowDef() *model.WorkflowDef {
return &model.WorkflowDef{
Name: workflow.name,
Description: workflow.description,
Version: workflow.version,
Tasks: getWorkflowTasksFromConductorWorkflow(workflow),
InputParameters: workflow.inputParameters,
OutputParameters: workflow.outputParameters,
FailureWorkflow: workflow.failureWorkflow,
SchemaVersion: 2,
OwnerEmail: workflow.ownerEmail,
TimeoutPolicy: string(workflow.timeoutPolicy),
TimeoutSeconds: workflow.timeoutSeconds,
Variables: workflow.variables,
InputTemplate: workflow.inputTemplate,
Name: workflow.name,
Description: workflow.description,
Version: workflow.version,
Tasks: getWorkflowTasksFromConductorWorkflow(workflow),
InputParameters: workflow.inputParameters,
OutputParameters: workflow.outputParameters,
FailureWorkflow: workflow.failureWorkflow,
SchemaVersion: 2,
OwnerEmail: workflow.ownerEmail,
TimeoutPolicy: string(workflow.timeoutPolicy),
TimeoutSeconds: workflow.timeoutSeconds,
Variables: workflow.variables,
InputTemplate: workflow.inputTemplate,
Restartable: workflow.restartable,
WorkflowStatusListenerEnabled: workflow.workflowStatusListenerEnabled,
}
}

Expand Down
1 change: 1 addition & 0 deletions test/integration_tests/workflow_def_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ func TestHttpTask(t *testing.T) {
Name("TEST_GO_WORKFLOW_HTTP").
OwnerEmail("[email protected]").
Version(1).
WorkflowStatusListenerEnabled(true).
Add(common.TestHttpTask)
err := testdata.ValidateWorkflow(httpTaskWorkflow, common.WorkflowValidationTimeout, model.CompletedWorkflow)
if err != nil {
Expand Down

0 comments on commit 921daee

Please sign in to comment.