Skip to content

Commit

Permalink
fix merge conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
mfleader committed Oct 27, 2023
2 parents 5d9d4f6 + b292124 commit 0ed414e
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 83 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ jobs:
env:
VERSION: ${{ github.ref_name }}
run: |
[[ ${VERSION//[[:blank:]]/} =~ ^v[[:digit:]]+\.[[:digit:]]\.[[:digit:]]$ ]] && export OK="[INF] version format accepted"
[[ ${VERSION//[[:blank:]]/} =~ ^v[[:digit:]]+\.[[:digit:]]\.[[:digit:]](-[[:alnum:]]+)?(\+[[:alnum:]]+)?$ ]] && export OK="[INF] version format accepted"
[[ -z $OK ]] && echo "[ERR] wrong version format: $VERSION" && exit 1
echo $OK
- name: Check out code
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ require (
go.flow.arcalot.io/kubernetesdeployer v0.7.0
go.flow.arcalot.io/pluginsdk v0.5.0
go.flow.arcalot.io/podmandeployer v0.5.0
go.flow.arcalot.io/pythondeployer v0.2.0
go.flow.arcalot.io/pythondeployer v0.3.0
go.flow.arcalot.io/testdeployer v0.3.0
gopkg.in/yaml.v3 v3.0.1
)
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,8 @@ go.flow.arcalot.io/pluginsdk v0.5.0 h1:TRS/waCTcdoMZ9neDAcfy3zpzyDnPHRbhV+Y1kpcw
go.flow.arcalot.io/pluginsdk v0.5.0/go.mod h1:2s2f//7uOkBjr1QaiWJD/bqDIeLlINJtD1BhiY4aGPM=
go.flow.arcalot.io/podmandeployer v0.5.0 h1:h7hEhWUgxJzNKlEohZ+meKhl3FWjaXQahQ8vN3YVRNs=
go.flow.arcalot.io/podmandeployer v0.5.0/go.mod h1:36JCcTB6nauahcXUPfIpdEw7Zfp0ufM07o3VNTvrCc0=
go.flow.arcalot.io/pythondeployer v0.2.0 h1:iSVxQzRGrEr0/bJfsRNQ7Q3ItA3sr0+7jDg1SL15llo=
go.flow.arcalot.io/pythondeployer v0.2.0/go.mod h1:zegeDjxiddprrPmO6243AslQX7BxCmV46xXSUG5TZGc=
go.flow.arcalot.io/pythondeployer v0.3.0 h1:ercLuDwFoDSL0f6YvZEqFW0/nO7Yv7DkbROl3rKxYDk=
go.flow.arcalot.io/pythondeployer v0.3.0/go.mod h1:ND1x/Vhu/6q50zQeisCcD6oQ6lKVJFflOrfDccnIjSY=
go.flow.arcalot.io/testdeployer v0.3.0 h1:Soyz+rDa3Y3VjWBGuL3zNlX3LM4uKp9Ex7///fCgrZA=
go.flow.arcalot.io/testdeployer v0.3.0/go.mod h1:Eel0ORhtKdYYDsd+e+btBBygIn+9Sz/b+JFDwH39VWI=
go.flow.arcalot.io/testplugin v0.2.1 h1:9kQ2MKvcXtEcwk5c4qSWN+FovpER2C9vn730laAm9iE=
Expand Down
57 changes: 57 additions & 0 deletions internal/step/plugin/provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,63 @@ func TestProvider_HappyError(t *testing.T) {
}

func TestProvider_VerifyCancelSignal(t *testing.T) {
logger := log.New(
log.Config{
Level: log.LevelError,
Destination: log.DestinationStdout,
},
)
workflowDeployerCfg := map[string]any{
"type": "test-impl",
}

deployerRegistry := deployer_registry.New(
deployer.Any(testdeployer.NewFactory()))

plp, err := plugin.New(
logger,
deployerRegistry,
workflowDeployerCfg,
)
assert.NoError(t, err)

runnable, err := plp.LoadSchema(
map[string]any{"plugin": "simulation"}, map[string][]byte{})
assert.NoError(t, err)
assert.NotNil(t, runnable)

waitLifecycle, err := runnable.Lifecycle(map[string]any{"step": "wait"})
assert.NoError(t, err)
// Verify that the expected lifecycle stage is there, then verify that cancel is disabled.
waitCancelledStageIDIndex := assert.SliceContainsExtractor(t,
func(schema step.LifecycleStageWithSchema) string {
return schema.ID
}, string(plugin.StageIDCancelled), waitLifecycle.Stages)
waitStageIDCancelled := waitLifecycle.Stages[waitCancelledStageIDIndex]
waitStopIfSchema := assert.MapContainsKey(t, "stop_if", waitStageIDCancelled.InputSchema)
if waitStopIfSchema.Disabled {
t.Fatalf("step wait's wait_for schema is disabled when the cancel signal is present.")
}

helloLifecycle, err := runnable.Lifecycle(map[string]any{"step": "hello"})
assert.NoError(t, err)
// Verify that the expected lifecycle stage is there, then verify that cancel is disabled.
helloCancelledStageIDIndex := assert.SliceContainsExtractor(t,
func(schema step.LifecycleStageWithSchema) string {
return schema.ID
}, string(plugin.StageIDCancelled), helloLifecycle.Stages)
helloStageIDCancelled := helloLifecycle.Stages[helloCancelledStageIDIndex]
helloStopIfSchema := assert.MapContainsKey(t, "stop_if", helloStageIDCancelled.InputSchema)
if !helloStopIfSchema.Disabled {
t.Fatalf("step hello's stop_if schema is not disabled when the cancel signal is not present.")
}
}

func TestProvider_DeployFail(t *testing.T) {

Check failure on line 328 in internal/step/plugin/provider_test.go

View workflow job for this annotation

GitHub Actions / go test

other declaration of TestProvider_DeployFail
logConfig := log.Config{

Check failure on line 329 in internal/step/plugin/provider_test.go

View workflow job for this annotation

GitHub Actions / go test

logConfig declared but not used
Level: log.LevelError,
Destination: log.DestinationStdout,
}
logger := log.New(
log.Config{
Level: log.LevelError,
Expand Down
79 changes: 0 additions & 79 deletions workflow/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,85 +322,6 @@ func TestWaitForSerial(t *testing.T) {
}
}

// Running parallel steps which wait on the same previous step sometimes causes a race condition. This needs to be investigated.
// once the race condition if fixed reduce the wait_time to 500ms.
var waitForParallelWorkflowDefinition = `
version: v0.1.0
input:
root: RootObject
objects:
RootObject:
id: RootObject
properties: {}
steps:
first_wait:
plugin: "n/a"
step: wait
input:
wait_time_ms: 10
second_wait:
plugin: "n/a"
step: wait
input:
wait_time_ms: 10
wait_for: !expr $.steps.first_wait.outputs.success
third_wait:
plugin: "n/a"
step: wait
input:
wait_time_ms: 10
wait_for: !expr $.steps.first_wait.outputs.success
outputs:
success:
third_step_output: !expr $.steps.third_wait.outputs
second_step_output: !expr $.steps.second_wait.outputs
`

func TestWaitForParallel(t *testing.T) {
// For this test, a workflow runs three steps, where each step runs a wait step for 5s
// The second and third wait steps wait for the first to succeed after which they both run in parallel
// The total execution time for this test function should be greater than 5s but lesser than 15s
// as the first step runs for 5s and other two steps run in parallel after the first succeeds
// The test double deployer will be used for this test, as we
// need a deployer to test the plugin step provider.
logConfig := log.Config{
Level: log.LevelInfo,
Destination: log.DestinationStdout,
}
logger := log.New(
logConfig,
)
cfg := &config.Config{
Log: logConfig,
}
stepRegistry := NewTestImplStepRegistry(logger, t)

executor := lang.Must2(workflow.NewExecutor(
logger,
cfg,
stepRegistry,
))
wf := lang.Must2(workflow.NewYAMLConverter(stepRegistry).FromYAML([]byte(waitForParallelWorkflowDefinition)))
preparedWorkflow := lang.Must2(executor.Prepare(wf, map[string][]byte{}))
startTime := time.Now() // Right before execute to not include pre-processing time.
outputID, outputData, err := preparedWorkflow.Execute(context.Background(), map[string]any{})
assert.NoError(t, err)
assert.Equals(t, outputID, "success")
stepResult2 := outputData.(map[interface{}]interface{})["second_step_output"]
assert.NotNil(t, stepResult2)
stepResult3 := outputData.(map[interface{}]interface{})["third_step_output"]
assert.NotNil(t, stepResult3)
t.Log(stepResult3)

duration := time.Since(startTime)
t.Logf("Test execution time: %s", duration)
if duration > 20*time.Millisecond && duration < 40*time.Millisecond {
t.Logf("Steps second_wait and third_wait are correctly running in parallel after waiting for the first_wait step.")
} else {
t.Fatalf("Steps second_wait and third_wait are not running in parallel.")
}
}

var missingInputsFailedDeploymentWorkflowDefinition = `
version: v0.1.0
input:
Expand Down

0 comments on commit 0ed414e

Please sign in to comment.