diff --git a/internal/step/foreach/provider.go b/internal/step/foreach/provider.go index 55d3b58b..8871ceef 100644 --- a/internal/step/foreach/provider.go +++ b/internal/step/foreach/provider.go @@ -43,7 +43,8 @@ var executeLifecycleStage = step.LifecycleStage{ RunningName: "executing", FinishedName: "finished", InputFields: map[string]struct{}{ - "items": {}, + "items": {}, + "wait_for": {}, }, NextStages: []string{ string(StageIDOutputs), @@ -213,6 +214,20 @@ func (r *runnableStep) Lifecycle(_ map[string]any) (step.Lifecycle[step.Lifecycl nil, nil, ), + "wait_for": schema.NewPropertySchema( + schema.NewAnySchema(), + schema.NewDisplayValue( + schema.PointerTo("Wait for condition"), + schema.PointerTo("Used to wait for a previous step stage to complete before running the step which is waiting."), + nil, + ), + false, + nil, + nil, + nil, + nil, + nil, + ), }, }, { diff --git a/workflow/workflow_test.go b/workflow/workflow_test.go index 350cca7c..18487773 100644 --- a/workflow/workflow_test.go +++ b/workflow/workflow_test.go @@ -3,10 +3,18 @@ package workflow_test import ( "context" "errors" + "fmt" "go.arcalot.io/assert" "go.arcalot.io/lang" "go.arcalot.io/log/v2" + "go.flow.arcalot.io/deployer" + deployerregistry "go.flow.arcalot.io/deployer/registry" "go.flow.arcalot.io/engine/config" + "go.flow.arcalot.io/engine/internal/step" + "go.flow.arcalot.io/engine/internal/step/foreach" + "go.flow.arcalot.io/engine/internal/step/plugin" + stepregistry "go.flow.arcalot.io/engine/internal/step/registry" + testimpl "go.flow.arcalot.io/testdeployer" "testing" "time" @@ -410,6 +418,241 @@ func TestWaitForStarted(t *testing.T) { assert.Equals(t, outputID, "success") } +var waitForSerialForeachWf = ` +version: v0.2.0 +input: + root: RootObject + objects: + RootObject: + id: RootObject + properties: {} +steps: + second_wait: + wait_for: !expr $.steps.first_wait.outputs.success + kind: foreach + items: + - wait_time_ms: 10 + workflow: subworkflow.yaml + first_wait: + plugin: + src: "n/a" + deployment_type: "builtin" + step: wait + input: + wait_time_ms: 10 +outputs: + success: + first_step_output: !expr $.steps.first_wait.outputs + second_step_output: !expr $.steps.second_wait.outputs +` + +var waitForSerialForeachSubwf = ` +version: v0.2.0 +input: + root: RootObject + objects: + RootObject: + id: RootObject + properties: + wait_time_ms: + type: + type_id: integer +steps: + wait_1: + plugin: + src: "n/a" + deployment_type: "builtin" + step: wait + input: + wait_time_ms: !expr $.input.wait_time_ms +outputs: + success: + b: !expr $.steps.wait_1.outputs +` + +func TestWaitForSerial_Foreach(t *testing.T) { + // This test highlights a lack of observability in this part of + // Arcaflow's engine. + // For this test, a workflow runs two steps, where each step runs a wait + // step for 10 ms. The second wait step waits for the first to succeed + // after which it runs. Due to the wait for condition, the steps will + // execute serially. The total execution time for this test function + // should be greater than 10 ms as the first step and the foreach steps + // run serially. + + logConfig := log.Config{ + Level: log.LevelInfo, + Destination: log.DestinationStdout, + } + logger := log.New( + logConfig, + ) + cfg := &config.Config{ + Log: logConfig, + } + factories := workflowFactory{ + config: cfg, + } + deployerRegistry := deployerregistry.New( + deployer.Any(testimpl.NewFactory()), + ) + + pluginProvider := assert.NoErrorR[step.Provider](t)( + plugin.New(logger, deployerRegistry, map[string]interface{}{ + "builtin": map[string]any{ + "deployer_name": "test-impl", + "deploy_time": "0", + }, + }), + ) + stepRegistry, err := stepregistry.New( + pluginProvider, + lang.Must2(foreach.New(logger, factories.createYAMLParser, factories.createWorkflow)), + ) + assert.NoError(t, err) + + factories.stepRegistry = stepRegistry + executor := lang.Must2(workflow.NewExecutor( + logger, + cfg, + stepRegistry, + )) + wf := lang.Must2(workflow.NewYAMLConverter(stepRegistry).FromYAML([]byte(waitForSerialForeachWf))) + preparedWorkflow := lang.Must2(executor.Prepare(wf, map[string][]byte{ + "subworkflow.yaml": []byte(waitForSerialForeachSubwf), + })) + startTime := time.Now() // Right before execute to not include pre-processing time. + outputID, outputData, err := preparedWorkflow.Execute(context.Background(), map[string]any{}) + assert.NoError(t, err) + assert.Equals(t, outputID, "success") + stepResult := outputData.(map[interface{}]interface{})["first_step_output"] + assert.NotNil(t, stepResult) + stepResultWaitFor := stepResult.(map[string]interface{})["success"] + assert.NotNil(t, stepResultWaitFor) + stepResult2 := outputData.(map[interface{}]interface{})["second_step_output"] + assert.NotNil(t, stepResult2) + stepResultWaitFor2 := stepResult.(map[string]interface{})["success"] + assert.NotNil(t, stepResultWaitFor2) + + duration := time.Since(startTime) + t.Logf("Test execution time: %s", duration) + if duration >= 20*time.Millisecond { + t.Logf("Test execution time is greater than 20 milliseconds; steps are correctly running serially due to the wait_for condition.") + } else { + t.Fatalf("Test execution time is less than 20 milliseconds; steps are not running serially.") + } +} + +var waitForStartedForeachWf = ` +version: v0.2.0 +input: + root: RootObject + objects: + RootObject: + id: RootObject + properties: {} +steps: + pre_wait: + plugin: + src: "n/a" + deployment_type: "builtin" + step: wait + input: + wait_time_ms: 2 + second_wait: + wait_for: !expr $.steps.first_wait.starting.started + kind: foreach + items: + - wait_time_ms: 2 + workflow: subworkflow.yaml + first_wait: + wait_for: !expr $.steps.pre_wait.outputs + plugin: + src: "n/a" + deployment_type: "builtin" + step: wait + input: + wait_time_ms: 2 +outputs: + success: + first_step_output: !expr $.steps.first_wait.outputs + second_step_output: !expr $.steps.second_wait.outputs +` + +func TestWaitForStarted_Foreach(t *testing.T) { + // This test highlights a lack of observability in this part of + // Arcaflow's engine. + // For this test, the second wait step depends on the first wait + // step's running state being started. + + logConfig := log.Config{ + Level: log.LevelInfo, + Destination: log.DestinationStdout, + } + logger := log.New( + logConfig, + ) + cfg := &config.Config{ + Log: logConfig, + } + factories := workflowFactory{ + config: cfg, + } + deployerRegistry := deployerregistry.New( + deployer.Any(testimpl.NewFactory()), + ) + + pluginProvider := assert.NoErrorR[step.Provider](t)( + plugin.New(logger, deployerRegistry, map[string]interface{}{ + "builtin": map[string]any{ + "deployer_name": "test-impl", + "deploy_time": "0", + }, + }), + ) + stepRegistry, err := stepregistry.New( + pluginProvider, + lang.Must2(foreach.New(logger, factories.createYAMLParser, factories.createWorkflow)), + ) + assert.NoError(t, err) + + factories.stepRegistry = stepRegistry + executor := lang.Must2(workflow.NewExecutor( + logger, + cfg, + stepRegistry, + )) + wf := lang.Must2(workflow.NewYAMLConverter(stepRegistry).FromYAML([]byte(waitForStartedForeachWf))) + preparedWorkflow := lang.Must2(executor.Prepare(wf, map[string][]byte{ + "subworkflow.yaml": []byte(waitForSerialForeachSubwf), + })) + + outputID, _, err := preparedWorkflow.Execute(context.Background(), map[string]any{}) + assert.NoError(t, err) + assert.Equals(t, outputID, "success") +} + +type workflowFactory struct { + stepRegistry step.Registry + config *config.Config +} + +func (f *workflowFactory) createYAMLParser() (workflow.YAMLConverter, error) { + stepR := f.stepRegistry + if stepR == nil { + return nil, fmt.Errorf("YAML converter not available yet, please call the factory function after the engine has initialized") + } + return workflow.NewYAMLConverter(stepR), nil +} + +func (f *workflowFactory) createWorkflow(logger log.Logger) (workflow.Executor, error) { + stepR := f.stepRegistry + if stepR == nil { + return nil, fmt.Errorf("YAML converter not available yet, please call the factory function after the engine has initialized") + } + return workflow.NewExecutor(logger, f.config, stepR) +} + var missingInputsFailedDeploymentWorkflowDefinition = ` version: v0.2.0 input: