Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add wait for to foreach schema #128

Merged
merged 4 commits into from
Nov 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion internal/step/foreach/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ var executeLifecycleStage = step.LifecycleStage{
RunningName: "executing",
FinishedName: "finished",
InputFields: map[string]struct{}{
"items": {},
"items": {},
"wait_for": {},
},
NextStages: []string{
string(StageIDOutputs),
Expand Down Expand Up @@ -213,6 +214,20 @@ func (r *runnableStep) Lifecycle(_ map[string]any) (step.Lifecycle[step.Lifecycl
nil,
nil,
),
"wait_for": schema.NewPropertySchema(
schema.NewAnySchema(),
schema.NewDisplayValue(
schema.PointerTo("Wait for condition"),
schema.PointerTo("Used to wait for a previous step stage to complete before running the step which is waiting."),
nil,
),
false,
nil,
nil,
nil,
nil,
nil,
),
},
},
{
Expand Down
243 changes: 243 additions & 0 deletions workflow/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,18 @@ package workflow_test
import (
"context"
"errors"
"fmt"
"go.arcalot.io/assert"
"go.arcalot.io/lang"
"go.arcalot.io/log/v2"
"go.flow.arcalot.io/deployer"
deployerregistry "go.flow.arcalot.io/deployer/registry"
"go.flow.arcalot.io/engine/config"
"go.flow.arcalot.io/engine/internal/step"
"go.flow.arcalot.io/engine/internal/step/foreach"
"go.flow.arcalot.io/engine/internal/step/plugin"
stepregistry "go.flow.arcalot.io/engine/internal/step/registry"
testimpl "go.flow.arcalot.io/testdeployer"
"testing"
"time"

Expand Down Expand Up @@ -410,6 +418,241 @@ func TestWaitForStarted(t *testing.T) {
assert.Equals(t, outputID, "success")
}

var waitForSerialForeachWf = `
version: v0.2.0
input:
root: RootObject
objects:
RootObject:
id: RootObject
properties: {}
steps:
second_wait:
wait_for: !expr $.steps.first_wait.outputs.success
kind: foreach
items:
- wait_time_ms: 10
workflow: subworkflow.yaml
first_wait:
plugin:
src: "n/a"
deployment_type: "builtin"
step: wait
input:
wait_time_ms: 10
outputs:
success:
first_step_output: !expr $.steps.first_wait.outputs
second_step_output: !expr $.steps.second_wait.outputs
`

var waitForSerialForeachSubwf = `
version: v0.2.0
input:
root: RootObject
objects:
RootObject:
id: RootObject
properties:
wait_time_ms:
type:
type_id: integer
steps:
wait_1:
plugin:
src: "n/a"
deployment_type: "builtin"
step: wait
input:
wait_time_ms: !expr $.input.wait_time_ms
outputs:
success:
b: !expr $.steps.wait_1.outputs
`

func TestWaitForSerial_Foreach(t *testing.T) {
// This test highlights a lack of observability in this part of
// Arcaflow's engine.
// For this test, a workflow runs two steps, where each step runs a wait
// step for 10 ms. The second wait step waits for the first to succeed
// after which it runs. Due to the wait for condition, the steps will
// execute serially. The total execution time for this test function
// should be greater than 10 ms as the first step and the foreach steps
// run serially.

logConfig := log.Config{
Level: log.LevelInfo,
Destination: log.DestinationStdout,
}
logger := log.New(
logConfig,
)
cfg := &config.Config{
Log: logConfig,
}
factories := workflowFactory{
config: cfg,
}
deployerRegistry := deployerregistry.New(
deployer.Any(testimpl.NewFactory()),
)

pluginProvider := assert.NoErrorR[step.Provider](t)(
plugin.New(logger, deployerRegistry, map[string]interface{}{
"builtin": map[string]any{
"deployer_name": "test-impl",
"deploy_time": "0",
},
}),
)
stepRegistry, err := stepregistry.New(
pluginProvider,
lang.Must2(foreach.New(logger, factories.createYAMLParser, factories.createWorkflow)),
)
assert.NoError(t, err)

factories.stepRegistry = stepRegistry
executor := lang.Must2(workflow.NewExecutor(
logger,
cfg,
stepRegistry,
))
wf := lang.Must2(workflow.NewYAMLConverter(stepRegistry).FromYAML([]byte(waitForSerialForeachWf)))
preparedWorkflow := lang.Must2(executor.Prepare(wf, map[string][]byte{
"subworkflow.yaml": []byte(waitForSerialForeachSubwf),
}))
startTime := time.Now() // Right before execute to not include pre-processing time.
outputID, outputData, err := preparedWorkflow.Execute(context.Background(), map[string]any{})
assert.NoError(t, err)
assert.Equals(t, outputID, "success")
stepResult := outputData.(map[interface{}]interface{})["first_step_output"]
assert.NotNil(t, stepResult)
stepResultWaitFor := stepResult.(map[string]interface{})["success"]
assert.NotNil(t, stepResultWaitFor)
stepResult2 := outputData.(map[interface{}]interface{})["second_step_output"]
assert.NotNil(t, stepResult2)
stepResultWaitFor2 := stepResult.(map[string]interface{})["success"]
assert.NotNil(t, stepResultWaitFor2)

duration := time.Since(startTime)
t.Logf("Test execution time: %s", duration)
if duration >= 20*time.Millisecond {
t.Logf("Test execution time is greater than 20 milliseconds; steps are correctly running serially due to the wait_for condition.")
} else {
t.Fatalf("Test execution time is less than 20 milliseconds; steps are not running serially.")
}
}

var waitForStartedForeachWf = `
version: v0.2.0
input:
root: RootObject
objects:
RootObject:
id: RootObject
properties: {}
steps:
pre_wait:
plugin:
src: "n/a"
deployment_type: "builtin"
step: wait
input:
wait_time_ms: 2
second_wait:
wait_for: !expr $.steps.first_wait.starting.started
kind: foreach
items:
- wait_time_ms: 2
workflow: subworkflow.yaml
first_wait:
wait_for: !expr $.steps.pre_wait.outputs
plugin:
src: "n/a"
deployment_type: "builtin"
step: wait
input:
wait_time_ms: 2
outputs:
success:
first_step_output: !expr $.steps.first_wait.outputs
second_step_output: !expr $.steps.second_wait.outputs
`

func TestWaitForStarted_Foreach(t *testing.T) {
// This test highlights a lack of observability in this part of
// Arcaflow's engine.
// For this test, the second wait step depends on the first wait
// step's running state being started.

logConfig := log.Config{
Level: log.LevelInfo,
Destination: log.DestinationStdout,
}
logger := log.New(
logConfig,
)
cfg := &config.Config{
Log: logConfig,
}
factories := workflowFactory{
config: cfg,
}
deployerRegistry := deployerregistry.New(
deployer.Any(testimpl.NewFactory()),
)

pluginProvider := assert.NoErrorR[step.Provider](t)(
plugin.New(logger, deployerRegistry, map[string]interface{}{
"builtin": map[string]any{
"deployer_name": "test-impl",
"deploy_time": "0",
},
}),
)
stepRegistry, err := stepregistry.New(
pluginProvider,
lang.Must2(foreach.New(logger, factories.createYAMLParser, factories.createWorkflow)),
)
assert.NoError(t, err)

factories.stepRegistry = stepRegistry
executor := lang.Must2(workflow.NewExecutor(
logger,
cfg,
stepRegistry,
))
wf := lang.Must2(workflow.NewYAMLConverter(stepRegistry).FromYAML([]byte(waitForStartedForeachWf)))
preparedWorkflow := lang.Must2(executor.Prepare(wf, map[string][]byte{
"subworkflow.yaml": []byte(waitForSerialForeachSubwf),
}))

outputID, _, err := preparedWorkflow.Execute(context.Background(), map[string]any{})
assert.NoError(t, err)
assert.Equals(t, outputID, "success")
}

type workflowFactory struct {
stepRegistry step.Registry
config *config.Config
}

func (f *workflowFactory) createYAMLParser() (workflow.YAMLConverter, error) {
stepR := f.stepRegistry
if stepR == nil {
return nil, fmt.Errorf("YAML converter not available yet, please call the factory function after the engine has initialized")
}
return workflow.NewYAMLConverter(stepR), nil
}

func (f *workflowFactory) createWorkflow(logger log.Logger) (workflow.Executor, error) {
stepR := f.stepRegistry
if stepR == nil {
return nil, fmt.Errorf("YAML converter not available yet, please call the factory function after the engine has initialized")
}
return workflow.NewExecutor(logger, f.config, stepR)
}

var missingInputsFailedDeploymentWorkflowDefinition = `
version: v0.2.0
input:
Expand Down