Skip to content

Commit

Permalink
add wait for to foreach schema (#128)
Browse files Browse the repository at this point in the history
* add wait for to foreach schema

* add first wait for foreach test

* add wait for foreach started test

* remove foreach parallelism
  • Loading branch information
mfleader authored Nov 20, 2023
1 parent 447c9c9 commit 563059b
Show file tree
Hide file tree
Showing 2 changed files with 259 additions and 1 deletion.
17 changes: 16 additions & 1 deletion internal/step/foreach/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ var executeLifecycleStage = step.LifecycleStage{
RunningName: "executing",
FinishedName: "finished",
InputFields: map[string]struct{}{
"items": {},
"items": {},
"wait_for": {},
},
NextStages: []string{
string(StageIDOutputs),
Expand Down Expand Up @@ -213,6 +214,20 @@ func (r *runnableStep) Lifecycle(_ map[string]any) (step.Lifecycle[step.Lifecycl
nil,
nil,
),
"wait_for": schema.NewPropertySchema(
schema.NewAnySchema(),
schema.NewDisplayValue(
schema.PointerTo("Wait for condition"),
schema.PointerTo("Used to wait for a previous step stage to complete before running the step which is waiting."),
nil,
),
false,
nil,
nil,
nil,
nil,
nil,
),
},
},
{
Expand Down
243 changes: 243 additions & 0 deletions workflow/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,18 @@ package workflow_test
import (
"context"
"errors"
"fmt"
"go.arcalot.io/assert"
"go.arcalot.io/lang"
"go.arcalot.io/log/v2"
"go.flow.arcalot.io/deployer"
deployerregistry "go.flow.arcalot.io/deployer/registry"
"go.flow.arcalot.io/engine/config"
"go.flow.arcalot.io/engine/internal/step"
"go.flow.arcalot.io/engine/internal/step/foreach"
"go.flow.arcalot.io/engine/internal/step/plugin"
stepregistry "go.flow.arcalot.io/engine/internal/step/registry"
testimpl "go.flow.arcalot.io/testdeployer"
"testing"
"time"

Expand Down Expand Up @@ -410,6 +418,241 @@ func TestWaitForStarted(t *testing.T) {
assert.Equals(t, outputID, "success")
}

var waitForSerialForeachWf = `
version: v0.2.0
input:
root: RootObject
objects:
RootObject:
id: RootObject
properties: {}
steps:
second_wait:
wait_for: !expr $.steps.first_wait.outputs.success
kind: foreach
items:
- wait_time_ms: 10
workflow: subworkflow.yaml
first_wait:
plugin:
src: "n/a"
deployment_type: "builtin"
step: wait
input:
wait_time_ms: 10
outputs:
success:
first_step_output: !expr $.steps.first_wait.outputs
second_step_output: !expr $.steps.second_wait.outputs
`

var waitForSerialForeachSubwf = `
version: v0.2.0
input:
root: RootObject
objects:
RootObject:
id: RootObject
properties:
wait_time_ms:
type:
type_id: integer
steps:
wait_1:
plugin:
src: "n/a"
deployment_type: "builtin"
step: wait
input:
wait_time_ms: !expr $.input.wait_time_ms
outputs:
success:
b: !expr $.steps.wait_1.outputs
`

func TestWaitForSerial_Foreach(t *testing.T) {
// This test highlights a lack of observability in this part of
// Arcaflow's engine.
// For this test, a workflow runs two steps, where each step runs a wait
// step for 10 ms. The second wait step waits for the first to succeed
// after which it runs. Due to the wait for condition, the steps will
// execute serially. The total execution time for this test function
// should be greater than 10 ms as the first step and the foreach steps
// run serially.

logConfig := log.Config{
Level: log.LevelInfo,
Destination: log.DestinationStdout,
}
logger := log.New(
logConfig,
)
cfg := &config.Config{
Log: logConfig,
}
factories := workflowFactory{
config: cfg,
}
deployerRegistry := deployerregistry.New(
deployer.Any(testimpl.NewFactory()),
)

pluginProvider := assert.NoErrorR[step.Provider](t)(
plugin.New(logger, deployerRegistry, map[string]interface{}{
"builtin": map[string]any{
"deployer_name": "test-impl",
"deploy_time": "0",
},
}),
)
stepRegistry, err := stepregistry.New(
pluginProvider,
lang.Must2(foreach.New(logger, factories.createYAMLParser, factories.createWorkflow)),
)
assert.NoError(t, err)

factories.stepRegistry = stepRegistry
executor := lang.Must2(workflow.NewExecutor(
logger,
cfg,
stepRegistry,
))
wf := lang.Must2(workflow.NewYAMLConverter(stepRegistry).FromYAML([]byte(waitForSerialForeachWf)))
preparedWorkflow := lang.Must2(executor.Prepare(wf, map[string][]byte{
"subworkflow.yaml": []byte(waitForSerialForeachSubwf),
}))
startTime := time.Now() // Right before execute to not include pre-processing time.
outputID, outputData, err := preparedWorkflow.Execute(context.Background(), map[string]any{})
assert.NoError(t, err)
assert.Equals(t, outputID, "success")
stepResult := outputData.(map[interface{}]interface{})["first_step_output"]
assert.NotNil(t, stepResult)
stepResultWaitFor := stepResult.(map[string]interface{})["success"]
assert.NotNil(t, stepResultWaitFor)
stepResult2 := outputData.(map[interface{}]interface{})["second_step_output"]
assert.NotNil(t, stepResult2)
stepResultWaitFor2 := stepResult.(map[string]interface{})["success"]
assert.NotNil(t, stepResultWaitFor2)

duration := time.Since(startTime)
t.Logf("Test execution time: %s", duration)
if duration >= 20*time.Millisecond {
t.Logf("Test execution time is greater than 20 milliseconds; steps are correctly running serially due to the wait_for condition.")
} else {
t.Fatalf("Test execution time is less than 20 milliseconds; steps are not running serially.")
}
}

var waitForStartedForeachWf = `
version: v0.2.0
input:
root: RootObject
objects:
RootObject:
id: RootObject
properties: {}
steps:
pre_wait:
plugin:
src: "n/a"
deployment_type: "builtin"
step: wait
input:
wait_time_ms: 2
second_wait:
wait_for: !expr $.steps.first_wait.starting.started
kind: foreach
items:
- wait_time_ms: 2
workflow: subworkflow.yaml
first_wait:
wait_for: !expr $.steps.pre_wait.outputs
plugin:
src: "n/a"
deployment_type: "builtin"
step: wait
input:
wait_time_ms: 2
outputs:
success:
first_step_output: !expr $.steps.first_wait.outputs
second_step_output: !expr $.steps.second_wait.outputs
`

func TestWaitForStarted_Foreach(t *testing.T) {
// This test highlights a lack of observability in this part of
// Arcaflow's engine.
// For this test, the second wait step depends on the first wait
// step's running state being started.

logConfig := log.Config{
Level: log.LevelInfo,
Destination: log.DestinationStdout,
}
logger := log.New(
logConfig,
)
cfg := &config.Config{
Log: logConfig,
}
factories := workflowFactory{
config: cfg,
}
deployerRegistry := deployerregistry.New(
deployer.Any(testimpl.NewFactory()),
)

pluginProvider := assert.NoErrorR[step.Provider](t)(
plugin.New(logger, deployerRegistry, map[string]interface{}{
"builtin": map[string]any{
"deployer_name": "test-impl",
"deploy_time": "0",
},
}),
)
stepRegistry, err := stepregistry.New(
pluginProvider,
lang.Must2(foreach.New(logger, factories.createYAMLParser, factories.createWorkflow)),
)
assert.NoError(t, err)

factories.stepRegistry = stepRegistry
executor := lang.Must2(workflow.NewExecutor(
logger,
cfg,
stepRegistry,
))
wf := lang.Must2(workflow.NewYAMLConverter(stepRegistry).FromYAML([]byte(waitForStartedForeachWf)))
preparedWorkflow := lang.Must2(executor.Prepare(wf, map[string][]byte{
"subworkflow.yaml": []byte(waitForSerialForeachSubwf),
}))

outputID, _, err := preparedWorkflow.Execute(context.Background(), map[string]any{})
assert.NoError(t, err)
assert.Equals(t, outputID, "success")
}

type workflowFactory struct {
stepRegistry step.Registry
config *config.Config
}

func (f *workflowFactory) createYAMLParser() (workflow.YAMLConverter, error) {
stepR := f.stepRegistry
if stepR == nil {
return nil, fmt.Errorf("YAML converter not available yet, please call the factory function after the engine has initialized")
}
return workflow.NewYAMLConverter(stepR), nil
}

func (f *workflowFactory) createWorkflow(logger log.Logger) (workflow.Executor, error) {
stepR := f.stepRegistry
if stepR == nil {
return nil, fmt.Errorf("YAML converter not available yet, please call the factory function after the engine has initialized")
}
return workflow.NewExecutor(logger, f.config, stepR)
}

var missingInputsFailedDeploymentWorkflowDefinition = `
version: v0.2.0
input:
Expand Down

0 comments on commit 563059b

Please sign in to comment.