diff --git a/.gitignore b/.gitignore index bcb5de5f..536fbbe2 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,7 @@ *.iml .vscode/ dist +arcaflow +config.yaml +workflow.yaml + diff --git a/cmd/arcaflow/main.go b/cmd/arcaflow/main.go index 3aa340ab..83cea9af 100644 --- a/cmd/arcaflow/main.go +++ b/cmd/arcaflow/main.go @@ -101,6 +101,20 @@ Options: } flag.Parse() + if printVersion { + fmt.Printf( + "Arcaflow Engine\n"+ + "===============\n"+ + "Version: %s\n"+ + "Commit: %s\n"+ + "Date: %s\n"+ + "Apache 2.0 license\n"+ + "Copyright (c) Arcalot Contributors", + version, commit, date, + ) + return + } + var err error var configData any = map[any]any{} if configFile != "" { @@ -135,20 +149,6 @@ Options: os.Exit(ExitCodeInvalidData) } - if printVersion { - fmt.Printf( - "Arcaflow Engine\n"+ - "===============\n"+ - "Version: %s\n"+ - "Commit: %s\n"+ - "Date: %s\n"+ - "Apache 2.0 license\n"+ - "Copyright (c) Arcalot Contributors", - version, commit, date, - ) - return - } - var inputData []byte if input != "" { inputData, err = os.ReadFile(input) diff --git a/internal/step/plugin/provider.go b/internal/step/plugin/provider.go index 855305d7..a1708766 100644 --- a/internal/step/plugin/provider.go +++ b/internal/step/plugin/provider.go @@ -122,9 +122,8 @@ var startingLifecycleStage = step.LifecycleStage{ RunningName: "starting", FinishedName: "started", InputFields: map[string]struct{}{ - //nolint:godox - // TODO: Add wait_for here. Empty struct. - "input": {}, + "input": {}, + "wait_for": {}, }, NextStages: []string{ string(StageIDRunning), string(StageIDCrashed), @@ -317,9 +316,6 @@ func (r *runnableStep) Lifecycle(input map[string]any) (result step.Lifecycle[st { LifecycleStage: startingLifecycleStage, InputSchema: map[string]*schema.PropertySchema{ - //nolint:godox - // TODO: Add wait_for right here. Should be an any type. - // Also add to section above. "input": schema.NewPropertySchema( stepSchema.Input(), stepSchema.Display(), @@ -330,6 +326,20 @@ func (r *runnableStep) Lifecycle(input map[string]any) (result step.Lifecycle[st nil, nil, ), + "wait_for": schema.NewPropertySchema( + schema.NewAnySchema(), + schema.NewDisplayValue( + schema.PointerTo("Wait for condition"), + schema.PointerTo("Used to wait for a previous step stage to complete before running the step which is waiting."), + nil, + ), + false, + nil, + nil, + nil, + nil, + nil, + ), }, }, { diff --git a/workflow/workflow_test.go b/workflow/workflow_test.go index 266a5bb9..fb81dc37 100644 --- a/workflow/workflow_test.go +++ b/workflow/workflow_test.go @@ -3,11 +3,13 @@ package workflow_test import ( "context" "errors" + "testing" + "time" + "go.flow.arcalot.io/deployer" "go.flow.arcalot.io/engine/internal/step" "go.flow.arcalot.io/engine/internal/step/plugin" testimpl "go.flow.arcalot.io/testdeployer" - "testing" "go.arcalot.io/assert" "go.arcalot.io/lang" @@ -144,3 +146,163 @@ func TestStepCancellation(t *testing.T) { stepResultCancelledEarly := stepResult.(map[string]interface{})["cancelled_early"] assert.NotNil(t, stepResultCancelledEarly) } + +var waitForSerialWorkflowDefinition = ` +input: + root: RootObject + objects: + RootObject: + id: RootObject + properties: {} +steps: + first_wait: + plugin: "n/a" + step: wait + input: + wait_time_ms: 500 + second_wait: + plugin: "n/a" + step: wait + input: + wait_time_ms: 500 + wait_for: !expr $.steps.first_wait.outputs.success +outputs: + success: + first_step_output: !expr $.steps.first_wait.outputs + second_step_output: !expr $.steps.second_wait.outputs +` + +func TestWaitForSerial(t *testing.T) { + // For this test, a workflow runs two steps, where each step runs a wait step for 5s + // The second wait step waits for the first to succeed after which it runs + // Due to the wait for condition, the steps will execute serially + // The total execution time for this test function should be greater than 10seconds + // as each step runs for 5s and are run serially + // The test double deployer will be used for this test, as we + // need a deployer to test the plugin step provider. + startTime := time.Now() + logConfig := log.Config{ + Level: log.LevelInfo, + Destination: log.DestinationStdout, + } + logger := log.New( + logConfig, + ) + cfg := &config.Config{ + Log: logConfig, + } + stepRegistry := NewTestImplStepRegistry(logger, t) + + executor := lang.Must2(workflow.NewExecutor( + logger, + cfg, + stepRegistry, + )) + wf := lang.Must2(workflow.NewYAMLConverter(stepRegistry).FromYAML([]byte(waitForSerialWorkflowDefinition))) + preparedWorkflow := lang.Must2(executor.Prepare(wf, map[string][]byte{})) + outputID, outputData, err := preparedWorkflow.Execute(context.Background(), map[string]any{}) + assert.NoError(t, err) + assert.Equals(t, outputID, "success") + stepResult := outputData.(map[interface{}]interface{})["first_step_output"] + assert.NotNil(t, stepResult) + stepResultWaitFor := stepResult.(map[string]interface{})["success"] + assert.NotNil(t, stepResultWaitFor) + stepResult2 := outputData.(map[interface{}]interface{})["second_step_output"] + assert.NotNil(t, stepResult2) + stepResultWaitFor2 := stepResult.(map[string]interface{})["success"] + assert.NotNil(t, stepResultWaitFor2) + + duration := time.Since(startTime) + t.Logf("Test execution time: %s", duration) + var waitSuccess bool + if duration >= 1*time.Second { + waitSuccess = true + t.Logf("Test execution time is greater than 1 second, steps are running serially due to the wait_for condition.") + } else { + waitSuccess = false + t.Logf("Test execution time is lesser than 1 seconds, steps are not running serially.") + } + assert.Equals(t, waitSuccess, true) +} + +// Running parallel steps which wait on the same previous step sometimes causes a race condition. This needs to be investigated. +// once the race condition if fixed reduce the wait_time to 500ms. +var waitForParallelWorkflowDefinition = ` +input: + root: RootObject + objects: + RootObject: + id: RootObject + properties: {} +steps: + first_wait: + plugin: "n/a" + step: wait + input: + wait_time_ms: 5000 + second_wait: + plugin: "n/a" + step: wait + input: + wait_time_ms: 5000 + wait_for: !expr $.steps.first_wait.outputs.success + third_wait: + plugin: "n/a" + step: wait + input: + wait_time_ms: 5000 + wait_for: !expr $.steps.first_wait.outputs.success +outputs: + success: + third_step_output: !expr $.steps.third_wait.outputs + second_step_output: !expr $.steps.second_wait.outputs +` + +func TestWaitForParallel(t *testing.T) { + // For this test, a workflow runs three steps, where each step runs a wait step for 5s + // The second and third wait steps wait for the first to succeed after which they both run in parallel + // The total execution time for this test function should be greater than 5s but lesser than 15s + // as the first step runs for 5s and other two steps run in parallel after the first succeeds + // The test double deployer will be used for this test, as we + // need a deployer to test the plugin step provider. + startTime := time.Now() + logConfig := log.Config{ + Level: log.LevelInfo, + Destination: log.DestinationStdout, + } + logger := log.New( + logConfig, + ) + cfg := &config.Config{ + Log: logConfig, + } + stepRegistry := NewTestImplStepRegistry(logger, t) + + executor := lang.Must2(workflow.NewExecutor( + logger, + cfg, + stepRegistry, + )) + wf := lang.Must2(workflow.NewYAMLConverter(stepRegistry).FromYAML([]byte(waitForParallelWorkflowDefinition))) + preparedWorkflow := lang.Must2(executor.Prepare(wf, map[string][]byte{})) + outputID, outputData, err := preparedWorkflow.Execute(context.Background(), map[string]any{}) + assert.NoError(t, err) + assert.Equals(t, outputID, "success") + stepResult2 := outputData.(map[interface{}]interface{})["second_step_output"] + assert.NotNil(t, stepResult2) + stepResult3 := outputData.(map[interface{}]interface{})["third_step_output"] + assert.NotNil(t, stepResult3) + t.Log(stepResult3) + + duration := time.Since(startTime) + t.Logf("Test execution time: %s", duration) + var waitSuccess bool + if duration > 10*time.Second && duration < 20*time.Second { + waitSuccess = true + t.Logf("Steps second_wait and third_wait are running in parallel after waiting for the first_wait step.") + } else { + waitSuccess = false + t.Logf("Steps second_wait and third_wait are not running in parallel.") + } + assert.Equals(t, waitSuccess, true) +}