Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add wait_for functionality for serial and parallel execution #97

Merged
merged 6 commits into from
Jul 25, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,7 @@
*.iml
.vscode/
dist
arcaflow
config.yaml
workflow.yaml

28 changes: 14 additions & 14 deletions cmd/arcaflow/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,20 @@ Options:
}
flag.Parse()

if printVersion {
fmt.Printf(
"Arcaflow Engine\n"+
"===============\n"+
"Version: %s\n"+
"Commit: %s\n"+
"Date: %s\n"+
"Apache 2.0 license\n"+
"Copyright (c) Arcalot Contributors",
version, commit, date,
)
return
}

var err error
var configData any = map[any]any{}
if configFile != "" {
Expand Down Expand Up @@ -135,20 +149,6 @@ Options:
os.Exit(ExitCodeInvalidData)
}

if printVersion {
fmt.Printf(
"Arcaflow Engine\n"+
"===============\n"+
"Version: %s\n"+
"Commit: %s\n"+
"Date: %s\n"+
"Apache 2.0 license\n"+
"Copyright (c) Arcalot Contributors",
version, commit, date,
)
return
}

var inputData []byte
if input != "" {
inputData, err = os.ReadFile(input)
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ require (
go.flow.arcalot.io/kubernetesdeployer v0.5.1
go.flow.arcalot.io/pluginsdk v0.3.0-beta.1
go.flow.arcalot.io/podmandeployer v0.3.1
go.flow.arcalot.io/pythondeployer v0.1.1
go.flow.arcalot.io/pythondeployer v0.1.2
go.flow.arcalot.io/testdeployer v0.2.0
gopkg.in/yaml.v3 v3.0.1
)
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,8 @@ go.flow.arcalot.io/pluginsdk v0.3.0-beta.1 h1:RrC5SKDkhwG/enE/FajAxRF1izET61/LO4
go.flow.arcalot.io/pluginsdk v0.3.0-beta.1/go.mod h1:7cEk8LSxpZakyfrmKTPbiMhlrZvWtCPYcaI7qfSu8MM=
go.flow.arcalot.io/podmandeployer v0.3.1 h1:AbRmTTtuK50PLkZyu194oSra0zUCeM3lDCWTc7oo4Ys=
go.flow.arcalot.io/podmandeployer v0.3.1/go.mod h1:SmROc9nHG+KfKasyTeKtGmI9EBlXCupXjBIgX5glGn8=
go.flow.arcalot.io/pythondeployer v0.1.1 h1:3K43XoCwM9zmH6wRX41lB996B1g6nEgD8/kmKKImuWA=
go.flow.arcalot.io/pythondeployer v0.1.1/go.mod h1:vCkwB72TinFVb367/o0djptTvR+V004i1I5OQUeCcPU=
go.flow.arcalot.io/pythondeployer v0.1.2 h1:IUIoYTuw+2N1zrpSv8yaO3wsS6aWAFwbLlYNhCEM+zI=
go.flow.arcalot.io/pythondeployer v0.1.2/go.mod h1:vCkwB72TinFVb367/o0djptTvR+V004i1I5OQUeCcPU=
go.flow.arcalot.io/testdeployer v0.2.0 h1:4/cLr58/e6o5ouVRuJ5hM28nhciwJrL9AOE5Sdb7rN0=
go.flow.arcalot.io/testdeployer v0.2.0/go.mod h1:vy3Iu+9SHmugvOJRtMWAj8R+SE9BYi7k9Xi7DM5n7eQ=
go.flow.arcalot.io/testplugin v0.1.0 h1:I2BT978XISjaSnQbpaJfmjo2cTmTeBV7q+1IwTGbrig=
Expand Down
22 changes: 16 additions & 6 deletions internal/step/plugin/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,8 @@ var startingLifecycleStage = step.LifecycleStage{
RunningName: "starting",
FinishedName: "started",
InputFields: map[string]struct{}{
//nolint:godox
// TODO: Add wait_for here. Empty struct.
"input": {},
"input": {},
"wait_for": {},
},
NextStages: []string{
string(StageIDRunning), string(StageIDCrashed),
Expand Down Expand Up @@ -317,9 +316,6 @@ func (r *runnableStep) Lifecycle(input map[string]any) (result step.Lifecycle[st
{
LifecycleStage: startingLifecycleStage,
InputSchema: map[string]*schema.PropertySchema{
//nolint:godox
// TODO: Add wait_for right here. Should be an any type.
// Also add to section above.
"input": schema.NewPropertySchema(
stepSchema.Input(),
stepSchema.Display(),
Expand All @@ -330,6 +326,20 @@ func (r *runnableStep) Lifecycle(input map[string]any) (result step.Lifecycle[st
nil,
nil,
),
"wait_for": schema.NewPropertySchema(
schema.NewAnySchema(),
schema.NewDisplayValue(
schema.PointerTo("Wait for condition"),
schema.PointerTo("Used to wait for a previous step stage to complete before running the step which is waiting."),
nil,
),
false,
nil,
nil,
nil,
nil,
nil,
),
},
},
{
Expand Down
162 changes: 161 additions & 1 deletion workflow/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ package workflow_test
import (
"context"
"errors"
"testing"
"time"

"go.flow.arcalot.io/deployer"
"go.flow.arcalot.io/engine/internal/step"
"go.flow.arcalot.io/engine/internal/step/plugin"
testimpl "go.flow.arcalot.io/testdeployer"
"testing"

"go.arcalot.io/assert"
"go.arcalot.io/lang"
Expand Down Expand Up @@ -144,3 +146,161 @@ func TestStepCancellation(t *testing.T) {
stepResultCancelledEarly := stepResult.(map[string]interface{})["cancelled_early"]
assert.NotNil(t, stepResultCancelledEarly)
}

var waitForSerialWorkflowDefinition = `
input:
root: RootObject
objects:
RootObject:
id: RootObject
properties: {}
steps:
first_wait:
plugin: "n/a"
step: wait
input:
wait_time_ms: 10000
second_wait:
plugin: "n/a"
step: wait
input:
wait_time_ms: 10000
wait_for: !expr $.steps.first_wait.outputs.success
Harshith-umesh marked this conversation as resolved.
Show resolved Hide resolved
outputs:
success:
first_step_output: !expr $.steps.first_wait.outputs
second_step_output: !expr $.steps.second_wait.outputs
`

func TestWaitForSerial(t *testing.T) {
// For this test, a workflow runs two steps, where each step runs a wait step for 10s
// The second wait step waits for the first to succeed after which it runs
// Due to the wait for condition, the steps will execute serially
// The total execution time for this test function should be greater than 20seconds
// as each step runs for 10s and are run serially
// The test double deployer will be used for this test, as we
// need a deployer to test the plugin step provider.
startTime := time.Now()
logConfig := log.Config{
Level: log.LevelInfo,
Destination: log.DestinationStdout,
}
logger := log.New(
logConfig,
)
cfg := &config.Config{
Log: logConfig,
}
stepRegistry := NewTestImplStepRegistry(logger, t)

executor := lang.Must2(workflow.NewExecutor(
logger,
cfg,
stepRegistry,
))
wf := lang.Must2(workflow.NewYAMLConverter(stepRegistry).FromYAML([]byte(waitForSerialWorkflowDefinition)))
preparedWorkflow := lang.Must2(executor.Prepare(wf, map[string][]byte{}))
outputID, outputData, err := preparedWorkflow.Execute(context.Background(), map[string]any{})
assert.NoError(t, err)
assert.Equals(t, outputID, "success")
stepResult := outputData.(map[interface{}]interface{})["first_step_output"]
assert.NotNil(t, stepResult)
stepResultWaitFor := stepResult.(map[string]interface{})["success"]
assert.NotNil(t, stepResultWaitFor)
stepResult2 := outputData.(map[interface{}]interface{})["second_step_output"]
assert.NotNil(t, stepResult2)
stepResultWaitFor2 := stepResult.(map[string]interface{})["success"]
assert.NotNil(t, stepResultWaitFor2)

duration := time.Since(startTime)
t.Logf("Test execution time: %s", duration)
var wait_success bool
if duration >= 20*time.Second {
wait_success = true
t.Logf("Test execution time is greater than 20 seconds, steps are running serially due to the wait_for condition.")
} else {
wait_success = false
t.Logf("Test execution time is lesser than 20 seconds, steps are not running serially.")
}
assert.Equals(t, wait_success, true)
}

var waitForParallelWorkflowDefinition = `
input:
root: RootObject
objects:
RootObject:
id: RootObject
properties: {}
steps:
first_wait:
plugin: "n/a"
step: wait
input:
wait_time_ms: 10000
second_wait:
plugin: "n/a"
step: wait
input:
wait_time_ms: 10000
wait_for: !expr $.steps.first_wait.outputs.success
third_wait:
plugin: "n/a"
step: wait
input:
wait_time_ms: 10000
wait_for: !expr $.steps.first_wait.outputs.success
outputs:
success:
second_step_output: !expr $.steps.second_wait.outputs.success
third_step_output: !expr $.steps.third_wait.outputs.success
`

func TestWaitForParallel(t *testing.T) {
// For this test, a workflow runs three steps, where each step runs a wait step for 10s
// The second and third wait steps wait for the first to succeed after which they both run in parallel
// The total execution time for this test function should be greater than 15s but lesser than 25s
// as the first step runs for 10s and other two steps run in parallel after the first succeeds
// The test double deployer will be used for this test, as we
// need a deployer to test the plugin step provider.
startTime := time.Now()
logConfig := log.Config{
Level: log.LevelInfo,
Destination: log.DestinationStdout,
}
logger := log.New(
logConfig,
)
cfg := &config.Config{
Log: logConfig,
}
stepRegistry := NewTestImplStepRegistry(logger, t)

executor := lang.Must2(workflow.NewExecutor(
logger,
cfg,
stepRegistry,
))
wf := lang.Must2(workflow.NewYAMLConverter(stepRegistry).FromYAML([]byte(waitForParallelWorkflowDefinition)))
preparedWorkflow := lang.Must2(executor.Prepare(wf, map[string][]byte{}))
outputID, outputData, err := preparedWorkflow.Execute(context.Background(), map[string]any{})
assert.NoError(t, err)
assert.Equals(t, outputID, "success")
stepResult2 := outputData.(map[interface{}]interface{})["second_step_output"]
assert.NotNil(t, stepResult2)
stepResult3 := outputData.(map[interface{}]interface{})["third_step_output"]
assert.NotNil(t, stepResult3)
t.Log(stepResult3)

duration := time.Since(startTime)
t.Logf("Test execution time: %s", duration)
var wait_success bool
if duration > 15*time.Second && duration < 25*time.Second {
wait_success = true
t.Logf("Steps second_wait and third_wait are running in parallel after waiting for the first_wait step.")
} else {
wait_success = false
t.Logf("Steps second_wait and third_wait are not running in parallel.")
}
assert.Equals(t, wait_success, true)
}