Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add wait_for functionality for serial and parallel execution #97

Merged
merged 6 commits into from
Jul 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,7 @@
*.iml
.vscode/
dist
arcaflow
config.yaml
workflow.yaml

28 changes: 14 additions & 14 deletions cmd/arcaflow/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,20 @@ Options:
}
flag.Parse()

if printVersion {
fmt.Printf(
"Arcaflow Engine\n"+
"===============\n"+
"Version: %s\n"+
"Commit: %s\n"+
"Date: %s\n"+
"Apache 2.0 license\n"+
"Copyright (c) Arcalot Contributors",
version, commit, date,
)
return
}

var err error
var configData any = map[any]any{}
if configFile != "" {
Expand Down Expand Up @@ -135,20 +149,6 @@ Options:
os.Exit(ExitCodeInvalidData)
}

if printVersion {
fmt.Printf(
"Arcaflow Engine\n"+
"===============\n"+
"Version: %s\n"+
"Commit: %s\n"+
"Date: %s\n"+
"Apache 2.0 license\n"+
"Copyright (c) Arcalot Contributors",
version, commit, date,
)
return
}

var inputData []byte
if input != "" {
inputData, err = os.ReadFile(input)
Expand Down
22 changes: 16 additions & 6 deletions internal/step/plugin/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,8 @@ var startingLifecycleStage = step.LifecycleStage{
RunningName: "starting",
FinishedName: "started",
InputFields: map[string]struct{}{
//nolint:godox
// TODO: Add wait_for here. Empty struct.
"input": {},
"input": {},
"wait_for": {},
},
NextStages: []string{
string(StageIDRunning), string(StageIDCrashed),
Expand Down Expand Up @@ -317,9 +316,6 @@ func (r *runnableStep) Lifecycle(input map[string]any) (result step.Lifecycle[st
{
LifecycleStage: startingLifecycleStage,
InputSchema: map[string]*schema.PropertySchema{
//nolint:godox
// TODO: Add wait_for right here. Should be an any type.
// Also add to section above.
"input": schema.NewPropertySchema(
stepSchema.Input(),
stepSchema.Display(),
Expand All @@ -330,6 +326,20 @@ func (r *runnableStep) Lifecycle(input map[string]any) (result step.Lifecycle[st
nil,
nil,
),
"wait_for": schema.NewPropertySchema(
schema.NewAnySchema(),
schema.NewDisplayValue(
schema.PointerTo("Wait for condition"),
schema.PointerTo("Used to wait for a previous step stage to complete before running the step which is waiting."),
nil,
),
false,
nil,
nil,
nil,
nil,
nil,
),
},
},
{
Expand Down
164 changes: 163 additions & 1 deletion workflow/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ package workflow_test
import (
"context"
"errors"
"testing"
"time"

"go.flow.arcalot.io/deployer"
"go.flow.arcalot.io/engine/internal/step"
"go.flow.arcalot.io/engine/internal/step/plugin"
testimpl "go.flow.arcalot.io/testdeployer"
"testing"

"go.arcalot.io/assert"
"go.arcalot.io/lang"
Expand Down Expand Up @@ -144,3 +146,163 @@ func TestStepCancellation(t *testing.T) {
stepResultCancelledEarly := stepResult.(map[string]interface{})["cancelled_early"]
assert.NotNil(t, stepResultCancelledEarly)
}

var waitForSerialWorkflowDefinition = `
input:
root: RootObject
objects:
RootObject:
id: RootObject
properties: {}
steps:
first_wait:
plugin: "n/a"
step: wait
input:
wait_time_ms: 500
second_wait:
plugin: "n/a"
step: wait
input:
wait_time_ms: 500
wait_for: !expr $.steps.first_wait.outputs.success
outputs:
success:
first_step_output: !expr $.steps.first_wait.outputs
second_step_output: !expr $.steps.second_wait.outputs
`

func TestWaitForSerial(t *testing.T) {
// For this test, a workflow runs two steps, where each step runs a wait step for 5s
// The second wait step waits for the first to succeed after which it runs
// Due to the wait for condition, the steps will execute serially
// The total execution time for this test function should be greater than 10seconds
// as each step runs for 5s and are run serially
// The test double deployer will be used for this test, as we
// need a deployer to test the plugin step provider.
startTime := time.Now()
logConfig := log.Config{
Level: log.LevelInfo,
Destination: log.DestinationStdout,
}
logger := log.New(
logConfig,
)
cfg := &config.Config{
Log: logConfig,
}
stepRegistry := NewTestImplStepRegistry(logger, t)

executor := lang.Must2(workflow.NewExecutor(
logger,
cfg,
stepRegistry,
))
wf := lang.Must2(workflow.NewYAMLConverter(stepRegistry).FromYAML([]byte(waitForSerialWorkflowDefinition)))
preparedWorkflow := lang.Must2(executor.Prepare(wf, map[string][]byte{}))
outputID, outputData, err := preparedWorkflow.Execute(context.Background(), map[string]any{})
assert.NoError(t, err)
assert.Equals(t, outputID, "success")
stepResult := outputData.(map[interface{}]interface{})["first_step_output"]
assert.NotNil(t, stepResult)
stepResultWaitFor := stepResult.(map[string]interface{})["success"]
assert.NotNil(t, stepResultWaitFor)
stepResult2 := outputData.(map[interface{}]interface{})["second_step_output"]
assert.NotNil(t, stepResult2)
stepResultWaitFor2 := stepResult.(map[string]interface{})["success"]
assert.NotNil(t, stepResultWaitFor2)

duration := time.Since(startTime)
t.Logf("Test execution time: %s", duration)
var waitSuccess bool
if duration >= 1*time.Second {
waitSuccess = true
t.Logf("Test execution time is greater than 1 second, steps are running serially due to the wait_for condition.")
} else {
waitSuccess = false
t.Logf("Test execution time is lesser than 1 seconds, steps are not running serially.")
}
assert.Equals(t, waitSuccess, true)
}

// Running parallel steps which wait on the same previous step sometimes causes a race condition. This needs to be investigated.
// once the race condition if fixed reduce the wait_time to 500ms.
var waitForParallelWorkflowDefinition = `
input:
root: RootObject
objects:
RootObject:
id: RootObject
properties: {}
steps:
first_wait:
plugin: "n/a"
step: wait
input:
wait_time_ms: 5000
second_wait:
plugin: "n/a"
step: wait
input:
wait_time_ms: 5000
wait_for: !expr $.steps.first_wait.outputs.success
third_wait:
plugin: "n/a"
step: wait
input:
wait_time_ms: 5000
wait_for: !expr $.steps.first_wait.outputs.success
outputs:
success:
third_step_output: !expr $.steps.third_wait.outputs
second_step_output: !expr $.steps.second_wait.outputs
`

func TestWaitForParallel(t *testing.T) {
// For this test, a workflow runs three steps, where each step runs a wait step for 5s
// The second and third wait steps wait for the first to succeed after which they both run in parallel
// The total execution time for this test function should be greater than 5s but lesser than 15s
// as the first step runs for 5s and other two steps run in parallel after the first succeeds
// The test double deployer will be used for this test, as we
// need a deployer to test the plugin step provider.
startTime := time.Now()
logConfig := log.Config{
Level: log.LevelInfo,
Destination: log.DestinationStdout,
}
logger := log.New(
logConfig,
)
cfg := &config.Config{
Log: logConfig,
}
stepRegistry := NewTestImplStepRegistry(logger, t)

executor := lang.Must2(workflow.NewExecutor(
logger,
cfg,
stepRegistry,
))
wf := lang.Must2(workflow.NewYAMLConverter(stepRegistry).FromYAML([]byte(waitForParallelWorkflowDefinition)))
preparedWorkflow := lang.Must2(executor.Prepare(wf, map[string][]byte{}))
outputID, outputData, err := preparedWorkflow.Execute(context.Background(), map[string]any{})
assert.NoError(t, err)
assert.Equals(t, outputID, "success")
stepResult2 := outputData.(map[interface{}]interface{})["second_step_output"]
assert.NotNil(t, stepResult2)
stepResult3 := outputData.(map[interface{}]interface{})["third_step_output"]
assert.NotNil(t, stepResult3)
t.Log(stepResult3)

duration := time.Since(startTime)
t.Logf("Test execution time: %s", duration)
var waitSuccess bool
if duration > 10*time.Second && duration < 20*time.Second {
waitSuccess = true
t.Logf("Steps second_wait and third_wait are running in parallel after waiting for the first_wait step.")
} else {
waitSuccess = false
t.Logf("Steps second_wait and third_wait are not running in parallel.")
}
assert.Equals(t, waitSuccess, true)
}