Skip to content

Commit

Permalink
Support N Depth Sub-Workflow References (#146)
Browse files Browse the repository at this point in the history
* refactor and add tests

* add newline
  • Loading branch information
mfleader committed Jan 26, 2024
1 parent 0508072 commit 5dbf988
Show file tree
Hide file tree
Showing 8 changed files with 89 additions and 5 deletions.
22 changes: 20 additions & 2 deletions engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@ func (w workflowEngine) Parse(
return nil, err
}

stepWorkflowFileCache, err := SubworkflowCache(wf, files.RootDir())
flowCaches := make([]loadfile.FileCache, 0)
stepWorkflowFileCache, err := SubworkflowCache(wf, files.RootDir(), yamlConverter, flowCaches)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -150,7 +151,7 @@ func StepWorkflowPaths(wf *workflow.Workflow) map[string]string {

// SubworkflowCache creates a file cache of the sub-workflows referenced
// in this workflow using rootDir as a context.
func SubworkflowCache(wf *workflow.Workflow, rootDir string) (loadfile.FileCache, error) {
func SubworkflowCache(wf *workflow.Workflow, rootDir string, converter workflow.YAMLConverter, flowCaches []loadfile.FileCache) (loadfile.FileCache, error) {
stepWorkflowPaths := StepWorkflowPaths(wf)
if len(stepWorkflowPaths) == 0 {
return nil, nil
Expand All @@ -163,6 +164,23 @@ func SubworkflowCache(wf *workflow.Workflow, rootDir string) (loadfile.FileCache
if err != nil {
return nil, err
}
for _, ctxFile := range subworkflowCache.Files() {
subwf, err := converter.FromYAML(ctxFile.Content)
if err != nil {
return nil, err
}
flowCache, err := SubworkflowCache(subwf, rootDir, converter, flowCaches)
if err != nil {
return nil, err
}
flowCaches = append(flowCaches, flowCache)
}

flowCaches = append(flowCaches, subworkflowCache)
subworkflowCache, err = loadfile.MergeFileCaches(flowCaches...)
if err != nil {
return nil, err
}
return subworkflowCache, nil
}

Expand Down
28 changes: 28 additions & 0 deletions fixtures/test-subworkflow/depth-1_workflow-2.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
version: v0.2.0
input:
root: RootObject
objects:
RootObject:
id: RootObject
properties:
seconds:
type:
type_id: float
min: 0
steps:
wait_1:
plugin:
src: quay.io/arcalot/arcaflow-plugin-wait:atp-v3_cc3e02b
deployment_type: image
step: wait
input:
seconds: !expr $.input.seconds
wait_loop:
kind: foreach
items:
- seconds: .1
workflow: depth-2_workflow-1.yaml
outputs:
success:
a: !expr $.steps.wait_1.outputs
b: !expr $.steps.wait_loop.outputs
28 changes: 28 additions & 0 deletions fixtures/test-subworkflow/depth-2_workflow-1.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
version: v0.2.0
input:
root: RootObject
objects:
RootObject:
id: RootObject
properties:
seconds:
type:
type_id: float
min: 0
steps:
wait_1:
plugin:
src: quay.io/arcalot/arcaflow-plugin-wait:atp-v3_cc3e02b
deployment_type: image
step: wait
input:
seconds: !expr $.input.seconds
wait_loop:
kind: foreach
items:
- seconds: .1
workflow: depth-3_workflow-1.yaml
outputs:
success:
a: !expr $.steps.wait_1.outputs
b: !expr $.steps.wait_loop.outputs
6 changes: 3 additions & 3 deletions fixtures/test-subworkflow/workflow-happy.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,17 @@ steps:
kind: foreach
items:
- seconds: .1
workflow: subworkflow-1.yaml
workflow: depth-1_workflow-1.yaml
wait_wf_2:
kind: foreach
items:
- seconds: .1
workflow: subworkflow-2.yaml
workflow: depth-1_workflow-2.yaml
wait_wf_3:
kind: foreach
items:
- seconds: .1
workflow: subworkflow-1.yaml
workflow: depth-1_workflow-1.yaml
outputs:
success:
step_1: !expr $.steps.wait_wf_1.outputs
Expand Down
3 changes: 3 additions & 0 deletions loadfile/loadfile.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,9 @@ func MergeFileCaches(fileCaches ...FileCache) (FileCache, error) {
rootDir := ""

for _, fc := range fileCaches {
if fc == nil {
continue
}
for key, contextFile := range fc.Files() {
cache[key] = contextFile
}
Expand Down
7 changes: 7 additions & 0 deletions loadfile/loadfile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,13 @@ func Test_MergeFileCaches(t *testing.T) {
fcMerged2, err := loadfile.MergeFileCaches(fcMerged, fc3)
assert.Error(t, err)
assert.Nil(t, fcMerged2)

// nil file caches should not be merged
var fcNil loadfile.FileCache
fcMerged3, err := loadfile.MergeFileCaches(fcMerged, fcNil, fcNil)
assert.NoError(t, err)
assert.Equals(t, fcMerged3.RootDir(), expRootDir)
assert.Equals(t, fcMerged3.Files(), expMergedFiles)
}

func FileCacheAbsPaths(fc loadfile.FileCache) map[string]string {
Expand Down

0 comments on commit 5dbf988

Please sign in to comment.