From 27c00b3a2ae658506649bd448fc57fb0a07878e8 Mon Sep 17 00:00:00 2001 From: Harshith-umesh Date: Mon, 10 Jul 2023 19:59:01 +0530 Subject: [PATCH 1/4] Add wait_for functionality for serial and parallel execution --- .gitignore | 4 + cmd/arcaflow/main.go | 28 +++--- go.mod | 2 +- go.sum | 4 +- internal/step/plugin/provider.go | 22 +++-- workflow/workflow_test.go | 162 ++++++++++++++++++++++++++++++- 6 files changed, 198 insertions(+), 24 deletions(-) diff --git a/.gitignore b/.gitignore index bcb5de5f..536fbbe2 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,7 @@ *.iml .vscode/ dist +arcaflow +config.yaml +workflow.yaml + diff --git a/cmd/arcaflow/main.go b/cmd/arcaflow/main.go index 3aa340ab..83cea9af 100644 --- a/cmd/arcaflow/main.go +++ b/cmd/arcaflow/main.go @@ -101,6 +101,20 @@ Options: } flag.Parse() + if printVersion { + fmt.Printf( + "Arcaflow Engine\n"+ + "===============\n"+ + "Version: %s\n"+ + "Commit: %s\n"+ + "Date: %s\n"+ + "Apache 2.0 license\n"+ + "Copyright (c) Arcalot Contributors", + version, commit, date, + ) + return + } + var err error var configData any = map[any]any{} if configFile != "" { @@ -135,20 +149,6 @@ Options: os.Exit(ExitCodeInvalidData) } - if printVersion { - fmt.Printf( - "Arcaflow Engine\n"+ - "===============\n"+ - "Version: %s\n"+ - "Commit: %s\n"+ - "Date: %s\n"+ - "Apache 2.0 license\n"+ - "Copyright (c) Arcalot Contributors", - version, commit, date, - ) - return - } - var inputData []byte if input != "" { inputData, err = os.ReadFile(input) diff --git a/go.mod b/go.mod index a31bf799..1e58398b 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( go.flow.arcalot.io/kubernetesdeployer v0.5.1 go.flow.arcalot.io/pluginsdk v0.3.0-beta.1 go.flow.arcalot.io/podmandeployer v0.3.1 - go.flow.arcalot.io/pythondeployer v0.1.1 + go.flow.arcalot.io/pythondeployer v0.1.2 go.flow.arcalot.io/testdeployer v0.2.0 gopkg.in/yaml.v3 v3.0.1 ) diff --git a/go.sum b/go.sum index 89f30580..bb6b1d7a 100644 --- a/go.sum +++ b/go.sum @@ -163,8 +163,8 @@ go.flow.arcalot.io/pluginsdk v0.3.0-beta.1 h1:RrC5SKDkhwG/enE/FajAxRF1izET61/LO4 go.flow.arcalot.io/pluginsdk v0.3.0-beta.1/go.mod h1:7cEk8LSxpZakyfrmKTPbiMhlrZvWtCPYcaI7qfSu8MM= go.flow.arcalot.io/podmandeployer v0.3.1 h1:AbRmTTtuK50PLkZyu194oSra0zUCeM3lDCWTc7oo4Ys= go.flow.arcalot.io/podmandeployer v0.3.1/go.mod h1:SmROc9nHG+KfKasyTeKtGmI9EBlXCupXjBIgX5glGn8= -go.flow.arcalot.io/pythondeployer v0.1.1 h1:3K43XoCwM9zmH6wRX41lB996B1g6nEgD8/kmKKImuWA= -go.flow.arcalot.io/pythondeployer v0.1.1/go.mod h1:vCkwB72TinFVb367/o0djptTvR+V004i1I5OQUeCcPU= +go.flow.arcalot.io/pythondeployer v0.1.2 h1:IUIoYTuw+2N1zrpSv8yaO3wsS6aWAFwbLlYNhCEM+zI= +go.flow.arcalot.io/pythondeployer v0.1.2/go.mod h1:vCkwB72TinFVb367/o0djptTvR+V004i1I5OQUeCcPU= go.flow.arcalot.io/testdeployer v0.2.0 h1:4/cLr58/e6o5ouVRuJ5hM28nhciwJrL9AOE5Sdb7rN0= go.flow.arcalot.io/testdeployer v0.2.0/go.mod h1:vy3Iu+9SHmugvOJRtMWAj8R+SE9BYi7k9Xi7DM5n7eQ= go.flow.arcalot.io/testplugin v0.1.0 h1:I2BT978XISjaSnQbpaJfmjo2cTmTeBV7q+1IwTGbrig= diff --git a/internal/step/plugin/provider.go b/internal/step/plugin/provider.go index 855305d7..a1708766 100644 --- a/internal/step/plugin/provider.go +++ b/internal/step/plugin/provider.go @@ -122,9 +122,8 @@ var startingLifecycleStage = step.LifecycleStage{ RunningName: "starting", FinishedName: "started", InputFields: map[string]struct{}{ - //nolint:godox - // TODO: Add wait_for here. Empty struct. - "input": {}, + "input": {}, + "wait_for": {}, }, NextStages: []string{ string(StageIDRunning), string(StageIDCrashed), @@ -317,9 +316,6 @@ func (r *runnableStep) Lifecycle(input map[string]any) (result step.Lifecycle[st { LifecycleStage: startingLifecycleStage, InputSchema: map[string]*schema.PropertySchema{ - //nolint:godox - // TODO: Add wait_for right here. Should be an any type. - // Also add to section above. "input": schema.NewPropertySchema( stepSchema.Input(), stepSchema.Display(), @@ -330,6 +326,20 @@ func (r *runnableStep) Lifecycle(input map[string]any) (result step.Lifecycle[st nil, nil, ), + "wait_for": schema.NewPropertySchema( + schema.NewAnySchema(), + schema.NewDisplayValue( + schema.PointerTo("Wait for condition"), + schema.PointerTo("Used to wait for a previous step stage to complete before running the step which is waiting."), + nil, + ), + false, + nil, + nil, + nil, + nil, + nil, + ), }, }, { diff --git a/workflow/workflow_test.go b/workflow/workflow_test.go index 266a5bb9..d13ddeb3 100644 --- a/workflow/workflow_test.go +++ b/workflow/workflow_test.go @@ -3,11 +3,13 @@ package workflow_test import ( "context" "errors" + "testing" + "time" + "go.flow.arcalot.io/deployer" "go.flow.arcalot.io/engine/internal/step" "go.flow.arcalot.io/engine/internal/step/plugin" testimpl "go.flow.arcalot.io/testdeployer" - "testing" "go.arcalot.io/assert" "go.arcalot.io/lang" @@ -144,3 +146,161 @@ func TestStepCancellation(t *testing.T) { stepResultCancelledEarly := stepResult.(map[string]interface{})["cancelled_early"] assert.NotNil(t, stepResultCancelledEarly) } + +var waitForSerialWorkflowDefinition = ` +input: + root: RootObject + objects: + RootObject: + id: RootObject + properties: {} +steps: + first_wait: + plugin: "n/a" + step: wait + input: + wait_time_ms: 10000 + second_wait: + plugin: "n/a" + step: wait + input: + wait_time_ms: 10000 + wait_for: !expr $.steps.first_wait.outputs.success +outputs: + success: + first_step_output: !expr $.steps.first_wait.outputs + second_step_output: !expr $.steps.second_wait.outputs +` + +func TestWaitForSerial(t *testing.T) { + // For this test, a workflow runs two steps, where each step runs a wait step for 10s + // The second wait step waits for the first to succeed after which it runs + // Due to the wait for condition, the steps will execute serially + // The total execution time for this test function should be greater than 20seconds + // as each step runs for 10s and are run serially + // The test double deployer will be used for this test, as we + // need a deployer to test the plugin step provider. + startTime := time.Now() + logConfig := log.Config{ + Level: log.LevelInfo, + Destination: log.DestinationStdout, + } + logger := log.New( + logConfig, + ) + cfg := &config.Config{ + Log: logConfig, + } + stepRegistry := NewTestImplStepRegistry(logger, t) + + executor := lang.Must2(workflow.NewExecutor( + logger, + cfg, + stepRegistry, + )) + wf := lang.Must2(workflow.NewYAMLConverter(stepRegistry).FromYAML([]byte(waitForSerialWorkflowDefinition))) + preparedWorkflow := lang.Must2(executor.Prepare(wf, map[string][]byte{})) + outputID, outputData, err := preparedWorkflow.Execute(context.Background(), map[string]any{}) + assert.NoError(t, err) + assert.Equals(t, outputID, "success") + stepResult := outputData.(map[interface{}]interface{})["first_step_output"] + assert.NotNil(t, stepResult) + stepResultWaitFor := stepResult.(map[string]interface{})["success"] + assert.NotNil(t, stepResultWaitFor) + stepResult2 := outputData.(map[interface{}]interface{})["second_step_output"] + assert.NotNil(t, stepResult2) + stepResultWaitFor2 := stepResult.(map[string]interface{})["success"] + assert.NotNil(t, stepResultWaitFor2) + + duration := time.Since(startTime) + t.Logf("Test execution time: %s", duration) + var wait_success bool + if duration >= 20*time.Second { + wait_success = true + t.Logf("Test execution time is greater than 20 seconds, steps are running serially due to the wait_for condition.") + } else { + wait_success = false + t.Logf("Test execution time is lesser than 20 seconds, steps are not running serially.") + } + assert.Equals(t, wait_success, true) +} + +var waitForParallelWorkflowDefinition = ` +input: + root: RootObject + objects: + RootObject: + id: RootObject + properties: {} +steps: + first_wait: + plugin: "n/a" + step: wait + input: + wait_time_ms: 10000 + second_wait: + plugin: "n/a" + step: wait + input: + wait_time_ms: 10000 + wait_for: !expr $.steps.first_wait.outputs.success + third_wait: + plugin: "n/a" + step: wait + input: + wait_time_ms: 10000 + wait_for: !expr $.steps.first_wait.outputs.success +outputs: + success: + second_step_output: !expr $.steps.second_wait.outputs.success + third_step_output: !expr $.steps.third_wait.outputs.success +` + +func TestWaitForParallel(t *testing.T) { + // For this test, a workflow runs three steps, where each step runs a wait step for 10s + // The second and third wait steps wait for the first to succeed after which they both run in parallel + // The total execution time for this test function should be greater than 15s but lesser than 25s + // as the first step runs for 10s and other two steps run in parallel after the first succeeds + // The test double deployer will be used for this test, as we + // need a deployer to test the plugin step provider. + startTime := time.Now() + logConfig := log.Config{ + Level: log.LevelInfo, + Destination: log.DestinationStdout, + } + logger := log.New( + logConfig, + ) + cfg := &config.Config{ + Log: logConfig, + } + stepRegistry := NewTestImplStepRegistry(logger, t) + + executor := lang.Must2(workflow.NewExecutor( + logger, + cfg, + stepRegistry, + )) + wf := lang.Must2(workflow.NewYAMLConverter(stepRegistry).FromYAML([]byte(waitForParallelWorkflowDefinition))) + preparedWorkflow := lang.Must2(executor.Prepare(wf, map[string][]byte{})) + outputID, outputData, err := preparedWorkflow.Execute(context.Background(), map[string]any{}) + assert.NoError(t, err) + assert.Equals(t, outputID, "success") + stepResult2 := outputData.(map[interface{}]interface{})["second_step_output"] + assert.NotNil(t, stepResult2) + stepResult3 := outputData.(map[interface{}]interface{})["third_step_output"] + assert.NotNil(t, stepResult3) + t.Log(stepResult3) + + duration := time.Since(startTime) + t.Logf("Test execution time: %s", duration) + var wait_success bool + if duration > 15*time.Second && duration < 25*time.Second { + wait_success = true + t.Logf("Steps second_wait and third_wait are running in parallel after waiting for the first_wait step.") + } else { + wait_success = false + t.Logf("Steps second_wait and third_wait are not running in parallel.") + } + assert.Equals(t, wait_success, true) +} From c145db6bd4257cc844014eae9c2d9f3b1d470b9c Mon Sep 17 00:00:00 2001 From: Harshith-umesh Date: Tue, 11 Jul 2023 14:20:20 +0530 Subject: [PATCH 2/4] reduce test time and fix ci-lint issue --- workflow/workflow_test.go | 46 +++++++++++++++++++-------------------- 1 file changed, 23 insertions(+), 23 deletions(-) diff --git a/workflow/workflow_test.go b/workflow/workflow_test.go index d13ddeb3..fd638d25 100644 --- a/workflow/workflow_test.go +++ b/workflow/workflow_test.go @@ -159,12 +159,12 @@ steps: plugin: "n/a" step: wait input: - wait_time_ms: 10000 + wait_time_ms: 5000 second_wait: plugin: "n/a" step: wait input: - wait_time_ms: 10000 + wait_time_ms: 5000 wait_for: !expr $.steps.first_wait.outputs.success outputs: success: @@ -173,11 +173,11 @@ outputs: ` func TestWaitForSerial(t *testing.T) { - // For this test, a workflow runs two steps, where each step runs a wait step for 10s + // For this test, a workflow runs two steps, where each step runs a wait step for 5s // The second wait step waits for the first to succeed after which it runs // Due to the wait for condition, the steps will execute serially - // The total execution time for this test function should be greater than 20seconds - // as each step runs for 10s and are run serially + // The total execution time for this test function should be greater than 10seconds + // as each step runs for 5s and are run serially // The test double deployer will be used for this test, as we // need a deployer to test the plugin step provider. startTime := time.Now() @@ -214,15 +214,15 @@ func TestWaitForSerial(t *testing.T) { duration := time.Since(startTime) t.Logf("Test execution time: %s", duration) - var wait_success bool - if duration >= 20*time.Second { - wait_success = true - t.Logf("Test execution time is greater than 20 seconds, steps are running serially due to the wait_for condition.") + var waitSuccess bool + if duration >= 10*time.Second { + waitSuccess = true + t.Logf("Test execution time is greater than 10 seconds, steps are running serially due to the wait_for condition.") } else { - wait_success = false - t.Logf("Test execution time is lesser than 20 seconds, steps are not running serially.") + waitSuccess = false + t.Logf("Test execution time is lesser than 10 seconds, steps are not running serially.") } - assert.Equals(t, wait_success, true) + assert.Equals(t, waitSuccess, true) } var waitForParallelWorkflowDefinition = ` @@ -237,18 +237,18 @@ steps: plugin: "n/a" step: wait input: - wait_time_ms: 10000 + wait_time_ms: 5000 second_wait: plugin: "n/a" step: wait input: - wait_time_ms: 10000 + wait_time_ms: 5000 wait_for: !expr $.steps.first_wait.outputs.success third_wait: plugin: "n/a" step: wait input: - wait_time_ms: 10000 + wait_time_ms: 5000 wait_for: !expr $.steps.first_wait.outputs.success outputs: success: @@ -257,10 +257,10 @@ outputs: ` func TestWaitForParallel(t *testing.T) { - // For this test, a workflow runs three steps, where each step runs a wait step for 10s + // For this test, a workflow runs three steps, where each step runs a wait step for 5s // The second and third wait steps wait for the first to succeed after which they both run in parallel - // The total execution time for this test function should be greater than 15s but lesser than 25s - // as the first step runs for 10s and other two steps run in parallel after the first succeeds + // The total execution time for this test function should be greater than 5s but lesser than 15s + // as the first step runs for 5s and other two steps run in parallel after the first succeeds // The test double deployer will be used for this test, as we // need a deployer to test the plugin step provider. startTime := time.Now() @@ -294,13 +294,13 @@ func TestWaitForParallel(t *testing.T) { duration := time.Since(startTime) t.Logf("Test execution time: %s", duration) - var wait_success bool - if duration > 15*time.Second && duration < 25*time.Second { - wait_success = true + var waitSuccess bool + if duration > 5*time.Second && duration < 15*time.Second { + waitSuccess = true t.Logf("Steps second_wait and third_wait are running in parallel after waiting for the first_wait step.") } else { - wait_success = false + waitSuccess = false t.Logf("Steps second_wait and third_wait are not running in parallel.") } - assert.Equals(t, wait_success, true) + assert.Equals(t, waitSuccess, true) } From a7fd7b6f0df0ace0d7452d850475a527bdc84a40 Mon Sep 17 00:00:00 2001 From: Harshith-umesh Date: Fri, 21 Jul 2023 17:39:17 +0530 Subject: [PATCH 3/4] Reduce test time --- workflow/workflow_test.go | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/workflow/workflow_test.go b/workflow/workflow_test.go index fd638d25..29e30d10 100644 --- a/workflow/workflow_test.go +++ b/workflow/workflow_test.go @@ -159,12 +159,12 @@ steps: plugin: "n/a" step: wait input: - wait_time_ms: 5000 + wait_time_ms: 500 second_wait: plugin: "n/a" step: wait input: - wait_time_ms: 5000 + wait_time_ms: 500 wait_for: !expr $.steps.first_wait.outputs.success outputs: success: @@ -215,12 +215,12 @@ func TestWaitForSerial(t *testing.T) { duration := time.Since(startTime) t.Logf("Test execution time: %s", duration) var waitSuccess bool - if duration >= 10*time.Second { + if duration >= 1*time.Second { waitSuccess = true - t.Logf("Test execution time is greater than 10 seconds, steps are running serially due to the wait_for condition.") + t.Logf("Test execution time is greater than 1 second, steps are running serially due to the wait_for condition.") } else { waitSuccess = false - t.Logf("Test execution time is lesser than 10 seconds, steps are not running serially.") + t.Logf("Test execution time is lesser than 1 seconds, steps are not running serially.") } assert.Equals(t, waitSuccess, true) } @@ -237,23 +237,23 @@ steps: plugin: "n/a" step: wait input: - wait_time_ms: 5000 + wait_time_ms: 500 second_wait: plugin: "n/a" step: wait input: - wait_time_ms: 5000 + wait_time_ms: 500 wait_for: !expr $.steps.first_wait.outputs.success third_wait: plugin: "n/a" step: wait input: - wait_time_ms: 5000 + wait_time_ms: 500 wait_for: !expr $.steps.first_wait.outputs.success outputs: success: - second_step_output: !expr $.steps.second_wait.outputs.success - third_step_output: !expr $.steps.third_wait.outputs.success + third_step_output: !expr $.steps.third_wait.outputs + second_step_output: !expr $.steps.second_wait.outputs ` func TestWaitForParallel(t *testing.T) { @@ -295,7 +295,7 @@ func TestWaitForParallel(t *testing.T) { duration := time.Since(startTime) t.Logf("Test execution time: %s", duration) var waitSuccess bool - if duration > 5*time.Second && duration < 15*time.Second { + if duration > 1*time.Second && duration < 2*time.Second { waitSuccess = true t.Logf("Steps second_wait and third_wait are running in parallel after waiting for the first_wait step.") } else { From 28a9ab999b65b42ec29f9bd47698a350ff14aa56 Mon Sep 17 00:00:00 2001 From: Harshith-umesh Date: Tue, 25 Jul 2023 01:19:18 +0530 Subject: [PATCH 4/4] Add race condition comment --- workflow/workflow_test.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/workflow/workflow_test.go b/workflow/workflow_test.go index 29e30d10..fb81dc37 100644 --- a/workflow/workflow_test.go +++ b/workflow/workflow_test.go @@ -225,6 +225,8 @@ func TestWaitForSerial(t *testing.T) { assert.Equals(t, waitSuccess, true) } +// Running parallel steps which wait on the same previous step sometimes causes a race condition. This needs to be investigated. +// once the race condition if fixed reduce the wait_time to 500ms. var waitForParallelWorkflowDefinition = ` input: root: RootObject @@ -237,18 +239,18 @@ steps: plugin: "n/a" step: wait input: - wait_time_ms: 500 + wait_time_ms: 5000 second_wait: plugin: "n/a" step: wait input: - wait_time_ms: 500 + wait_time_ms: 5000 wait_for: !expr $.steps.first_wait.outputs.success third_wait: plugin: "n/a" step: wait input: - wait_time_ms: 500 + wait_time_ms: 5000 wait_for: !expr $.steps.first_wait.outputs.success outputs: success: @@ -295,7 +297,7 @@ func TestWaitForParallel(t *testing.T) { duration := time.Since(startTime) t.Logf("Test execution time: %s", duration) var waitSuccess bool - if duration > 1*time.Second && duration < 2*time.Second { + if duration > 10*time.Second && duration < 20*time.Second { waitSuccess = true t.Logf("Steps second_wait and third_wait are running in parallel after waiting for the first_wait step.") } else {