Skip to content

Commit f7c0616

Browse files
authored
fix(backend): parallelFor resolve upstream inputs. Fixes #11520 (#11627)
Signed-off-by: zazulam <[email protected]>
1 parent d2c0376 commit f7c0616

File tree

6 files changed

+74
-49
lines changed

6 files changed

+74
-49
lines changed

backend/src/v2/compiler/argocompiler/dag.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -369,7 +369,7 @@ func (c *workflowCompiler) iteratorTask(name string, task *pipelinespec.Pipeline
369369

370370
tasks = []wfapi.DAGTask{
371371
{
372-
Name: name + "-loop",
372+
Name: name,
373373
Template: loopTmplName,
374374
Depends: depends(task.GetDependentTasks()),
375375
Arguments: wfapi.Arguments{

backend/src/v2/compiler/argocompiler/testdata/multiple_parallel_loops.yaml

+8-47
Original file line numberDiff line numberDiff line change
@@ -9,32 +9,13 @@ spec:
99
- name: components-e7a1060777c9ef84e36a4d54f25d3102abbcbd62d4c8bbb5883c3a2cbcfb5c6d
1010
value: '{"executorLabel":"exec-print-op","inputDefinitions":{"parameters":{"s":{"parameterType":"STRING"}}}}'
1111
- name: implementations-e7a1060777c9ef84e36a4d54f25d3102abbcbd62d4c8bbb5883c3a2cbcfb5c6d
12-
value: '{"args":["--executor_input","{{$}}","--function_to_execute","print_op"],"command":["sh","-c","\nif
13-
! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip || python3 -m
14-
ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1
15-
python3 -m pip install --quiet --no-warn-script-location ''kfp==2.7.0'' ''--no-deps''
16-
''typing-extensions\u003e=3.7.4,\u003c5; python_version\u003c\"3.9\"'' \u0026\u0026
17-
\"$0\" \"$@\"\n","sh","-ec","program_path=$(mktemp -d)\n\nprintf \"%s\" \"$0\"
18-
\u003e \"$program_path/ephemeral_component.py\"\n_KFP_RUNTIME=true python3 -m
19-
kfp.dsl.executor_main --component_module_path \"$program_path/ephemeral_component.py\" \"$@\"\n","\nimport
20-
kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import *\n\ndef
21-
print_op(s: str):\n print(s)\n\n"],"image":"python:3.7"}'
12+
value: '{"args":["--executor_input","{{$}}","--function_to_execute","print_op"],"command":["sh","-c","\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip || python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location ''kfp==2.7.0'' ''--no-deps'' ''typing-extensions\u003e=3.7.4,\u003c5; python_version\u003c\"3.9\"'' \u0026\u0026 \"$0\" \"$@\"\n","sh","-ec","program_path=$(mktemp -d)\n\nprintf \"%s\" \"$0\" \u003e \"$program_path/ephemeral_component.py\"\n_KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path \"$program_path/ephemeral_component.py\" \"$@\"\n","\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import *\n\ndef print_op(s: str):\n print(s)\n\n"],"image":"python:3.7"}'
2213
- name: components-comp-for-loop-2
2314
value: '{"dag":{"tasks":{"print-op":{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-print-op"},"inputs":{"parameters":{"s":{"componentInputParameter":"pipelinechannel--loop-item-param-1","parameterExpressionSelector":"parseJson(string_value)[\"A_a\"]"}}},"taskInfo":{"name":"print-op"}},"print-op-2":{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-print-op-2"},"inputs":{"parameters":{"s":{"componentInputParameter":"pipelinechannel--loop-item-param-1","parameterExpressionSelector":"parseJson(string_value)[\"B_b\"]"}}},"taskInfo":{"name":"print-op-2"}}}},"inputDefinitions":{"parameters":{"pipelinechannel--loop-item-param-1":{"parameterType":"STRUCT"}}}}'
2415
- name: components-comp-for-loop-4
2516
value: '{"dag":{"tasks":{"print-op-3":{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-print-op-3"},"inputs":{"parameters":{"s":{"componentInputParameter":"pipelinechannel--loop-item-param-3","parameterExpressionSelector":"parseJson(string_value)[\"A_a\"]"}}},"taskInfo":{"name":"print-op-3"}},"print-op-4":{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-print-op-4"},"inputs":{"parameters":{"s":{"componentInputParameter":"pipelinechannel--loop-item-param-3","parameterExpressionSelector":"parseJson(string_value)[\"B_b\"]"}}},"taskInfo":{"name":"print-op-4"}}}},"inputDefinitions":{"parameters":{"pipelinechannel--loop-item-param-3":{"parameterType":"STRUCT"}}}}'
2617
- name: components-root
27-
value: '{"dag":{"tasks":{"for-loop-2":{"componentRef":{"name":"comp-for-loop-2"},"iteratorPolicy":{"parallelismLimit":2},"parameterIterator":{"itemInput":"pipelinechannel--loop-item-param-1","items":{"raw":"[{\"A_a\":
28-
\"1\", \"B_b\": \"10\"}, {\"A_a\": \"2\", \"B_b\": \"20\"}, {\"A_a\": \"3\",
29-
\"B_b\": \"30\"}, {\"A_a\": \"4\", \"B_b\": \"40\"}, {\"A_a\": \"5\", \"B_b\":
30-
\"50\"}, {\"A_a\": \"6\", \"B_b\": \"60\"}, {\"A_a\": \"7\", \"B_b\": \"70\"},
31-
{\"A_a\": \"8\", \"B_b\": \"80\"}, {\"A_a\": \"9\", \"B_b\": \"90\"}, {\"A_a\":
32-
\"10\", \"B_b\": \"100\"}]"}},"taskInfo":{"name":"foo"}},"for-loop-4":{"componentRef":{"name":"comp-for-loop-4"},"iteratorPolicy":{"parallelismLimit":4},"parameterIterator":{"itemInput":"pipelinechannel--loop-item-param-3","items":{"raw":"[{\"A_a\":
33-
\"1\", \"B_b\": \"10\"}, {\"A_a\": \"2\", \"B_b\": \"20\"}, {\"A_a\": \"3\",
34-
\"B_b\": \"30\"}, {\"A_a\": \"4\", \"B_b\": \"40\"}, {\"A_a\": \"5\", \"B_b\":
35-
\"50\"}, {\"A_a\": \"6\", \"B_b\": \"60\"}, {\"A_a\": \"7\", \"B_b\": \"70\"},
36-
{\"A_a\": \"8\", \"B_b\": \"80\"}, {\"A_a\": \"9\", \"B_b\": \"90\"}, {\"A_a\":
37-
\"10\", \"B_b\": \"100\"}]"}},"taskInfo":{"name":"bar"}}}}}'
18+
value: '{"dag":{"tasks":{"for-loop-2":{"componentRef":{"name":"comp-for-loop-2"},"iteratorPolicy":{"parallelismLimit":2},"parameterIterator":{"itemInput":"pipelinechannel--loop-item-param-1","items":{"raw":"[{\"A_a\": \"1\", \"B_b\": \"10\"}, {\"A_a\": \"2\", \"B_b\": \"20\"}, {\"A_a\": \"3\", \"B_b\": \"30\"}, {\"A_a\": \"4\", \"B_b\": \"40\"}, {\"A_a\": \"5\", \"B_b\": \"50\"}, {\"A_a\": \"6\", \"B_b\": \"60\"}, {\"A_a\": \"7\", \"B_b\": \"70\"}, {\"A_a\": \"8\", \"B_b\": \"80\"}, {\"A_a\": \"9\", \"B_b\": \"90\"}, {\"A_a\": \"10\", \"B_b\": \"100\"}]"}},"taskInfo":{"name":"foo"}},"for-loop-4":{"componentRef":{"name":"comp-for-loop-4"},"iteratorPolicy":{"parallelismLimit":4},"parameterIterator":{"itemInput":"pipelinechannel--loop-item-param-3","items":{"raw":"[{\"A_a\": \"1\", \"B_b\": \"10\"}, {\"A_a\": \"2\", \"B_b\": \"20\"}, {\"A_a\": \"3\", \"B_b\": \"30\"}, {\"A_a\": \"4\", \"B_b\": \"40\"}, {\"A_a\": \"5\", \"B_b\": \"50\"}, {\"A_a\": \"6\", \"B_b\": \"60\"}, {\"A_a\": \"7\", \"B_b\": \"70\"}, {\"A_a\": \"8\", \"B_b\": \"80\"}, {\"A_a\": \"9\", \"B_b\": \"90\"}, {\"A_a\": \"10\", \"B_b\": \"100\"}]"}},"taskInfo":{"name":"bar"}}}}}'
3819
entrypoint: entrypoint
3920
podMetadata:
4021
annotations:
@@ -383,12 +364,7 @@ spec:
383364
- name: parent-dag-id
384365
value: '{{inputs.parameters.parent-dag-id}}'
385366
- name: task
386-
value: '{"componentRef":{"name":"comp-for-loop-2"},"iteratorPolicy":{"parallelismLimit":2},"parameterIterator":{"itemInput":"pipelinechannel--loop-item-param-1","items":{"raw":"[{\"A_a\":
387-
\"1\", \"B_b\": \"10\"}, {\"A_a\": \"2\", \"B_b\": \"20\"}, {\"A_a\":
388-
\"3\", \"B_b\": \"30\"}, {\"A_a\": \"4\", \"B_b\": \"40\"}, {\"A_a\":
389-
\"5\", \"B_b\": \"50\"}, {\"A_a\": \"6\", \"B_b\": \"60\"}, {\"A_a\":
390-
\"7\", \"B_b\": \"70\"}, {\"A_a\": \"8\", \"B_b\": \"80\"}, {\"A_a\":
391-
\"9\", \"B_b\": \"90\"}, {\"A_a\": \"10\", \"B_b\": \"100\"}]"}},"taskInfo":{"name":"foo"}}'
367+
value: '{"componentRef":{"name":"comp-for-loop-2"},"iteratorPolicy":{"parallelismLimit":2},"parameterIterator":{"itemInput":"pipelinechannel--loop-item-param-1","items":{"raw":"[{\"A_a\": \"1\", \"B_b\": \"10\"}, {\"A_a\": \"2\", \"B_b\": \"20\"}, {\"A_a\": \"3\", \"B_b\": \"30\"}, {\"A_a\": \"4\", \"B_b\": \"40\"}, {\"A_a\": \"5\", \"B_b\": \"50\"}, {\"A_a\": \"6\", \"B_b\": \"60\"}, {\"A_a\": \"7\", \"B_b\": \"70\"}, {\"A_a\": \"8\", \"B_b\": \"80\"}, {\"A_a\": \"9\", \"B_b\": \"90\"}, {\"A_a\": \"10\", \"B_b\": \"100\"}]"}},"taskInfo":{"name":"foo"}}'
392368
name: iteration-item-driver
393369
template: system-dag-driver
394370
- arguments:
@@ -416,12 +392,7 @@ spec:
416392
- name: parent-dag-id
417393
value: '{{inputs.parameters.parent-dag-id}}'
418394
- name: task
419-
value: '{"componentRef":{"name":"comp-for-loop-2"},"iteratorPolicy":{"parallelismLimit":2},"parameterIterator":{"itemInput":"pipelinechannel--loop-item-param-1","items":{"raw":"[{\"A_a\":
420-
\"1\", \"B_b\": \"10\"}, {\"A_a\": \"2\", \"B_b\": \"20\"}, {\"A_a\":
421-
\"3\", \"B_b\": \"30\"}, {\"A_a\": \"4\", \"B_b\": \"40\"}, {\"A_a\":
422-
\"5\", \"B_b\": \"50\"}, {\"A_a\": \"6\", \"B_b\": \"60\"}, {\"A_a\":
423-
\"7\", \"B_b\": \"70\"}, {\"A_a\": \"8\", \"B_b\": \"80\"}, {\"A_a\":
424-
\"9\", \"B_b\": \"90\"}, {\"A_a\": \"10\", \"B_b\": \"100\"}]"}},"taskInfo":{"name":"foo"}}'
395+
value: '{"componentRef":{"name":"comp-for-loop-2"},"iteratorPolicy":{"parallelismLimit":2},"parameterIterator":{"itemInput":"pipelinechannel--loop-item-param-1","items":{"raw":"[{\"A_a\": \"1\", \"B_b\": \"10\"}, {\"A_a\": \"2\", \"B_b\": \"20\"}, {\"A_a\": \"3\", \"B_b\": \"30\"}, {\"A_a\": \"4\", \"B_b\": \"40\"}, {\"A_a\": \"5\", \"B_b\": \"50\"}, {\"A_a\": \"6\", \"B_b\": \"60\"}, {\"A_a\": \"7\", \"B_b\": \"70\"}, {\"A_a\": \"8\", \"B_b\": \"80\"}, {\"A_a\": \"9\", \"B_b\": \"90\"}, {\"A_a\": \"10\", \"B_b\": \"100\"}]"}},"taskInfo":{"name":"foo"}}'
425396
name: iteration-driver
426397
template: system-dag-driver
427398
- arguments:
@@ -453,12 +424,7 @@ spec:
453424
- name: parent-dag-id
454425
value: '{{inputs.parameters.parent-dag-id}}'
455426
- name: task
456-
value: '{"componentRef":{"name":"comp-for-loop-4"},"iteratorPolicy":{"parallelismLimit":4},"parameterIterator":{"itemInput":"pipelinechannel--loop-item-param-3","items":{"raw":"[{\"A_a\":
457-
\"1\", \"B_b\": \"10\"}, {\"A_a\": \"2\", \"B_b\": \"20\"}, {\"A_a\":
458-
\"3\", \"B_b\": \"30\"}, {\"A_a\": \"4\", \"B_b\": \"40\"}, {\"A_a\":
459-
\"5\", \"B_b\": \"50\"}, {\"A_a\": \"6\", \"B_b\": \"60\"}, {\"A_a\":
460-
\"7\", \"B_b\": \"70\"}, {\"A_a\": \"8\", \"B_b\": \"80\"}, {\"A_a\":
461-
\"9\", \"B_b\": \"90\"}, {\"A_a\": \"10\", \"B_b\": \"100\"}]"}},"taskInfo":{"name":"bar"}}'
427+
value: '{"componentRef":{"name":"comp-for-loop-4"},"iteratorPolicy":{"parallelismLimit":4},"parameterIterator":{"itemInput":"pipelinechannel--loop-item-param-3","items":{"raw":"[{\"A_a\": \"1\", \"B_b\": \"10\"}, {\"A_a\": \"2\", \"B_b\": \"20\"}, {\"A_a\": \"3\", \"B_b\": \"30\"}, {\"A_a\": \"4\", \"B_b\": \"40\"}, {\"A_a\": \"5\", \"B_b\": \"50\"}, {\"A_a\": \"6\", \"B_b\": \"60\"}, {\"A_a\": \"7\", \"B_b\": \"70\"}, {\"A_a\": \"8\", \"B_b\": \"80\"}, {\"A_a\": \"9\", \"B_b\": \"90\"}, {\"A_a\": \"10\", \"B_b\": \"100\"}]"}},"taskInfo":{"name":"bar"}}'
462428
name: iteration-item-driver
463429
template: system-dag-driver
464430
- arguments:
@@ -486,12 +452,7 @@ spec:
486452
- name: parent-dag-id
487453
value: '{{inputs.parameters.parent-dag-id}}'
488454
- name: task
489-
value: '{"componentRef":{"name":"comp-for-loop-4"},"iteratorPolicy":{"parallelismLimit":4},"parameterIterator":{"itemInput":"pipelinechannel--loop-item-param-3","items":{"raw":"[{\"A_a\":
490-
\"1\", \"B_b\": \"10\"}, {\"A_a\": \"2\", \"B_b\": \"20\"}, {\"A_a\":
491-
\"3\", \"B_b\": \"30\"}, {\"A_a\": \"4\", \"B_b\": \"40\"}, {\"A_a\":
492-
\"5\", \"B_b\": \"50\"}, {\"A_a\": \"6\", \"B_b\": \"60\"}, {\"A_a\":
493-
\"7\", \"B_b\": \"70\"}, {\"A_a\": \"8\", \"B_b\": \"80\"}, {\"A_a\":
494-
\"9\", \"B_b\": \"90\"}, {\"A_a\": \"10\", \"B_b\": \"100\"}]"}},"taskInfo":{"name":"bar"}}'
455+
value: '{"componentRef":{"name":"comp-for-loop-4"},"iteratorPolicy":{"parallelismLimit":4},"parameterIterator":{"itemInput":"pipelinechannel--loop-item-param-3","items":{"raw":"[{\"A_a\": \"1\", \"B_b\": \"10\"}, {\"A_a\": \"2\", \"B_b\": \"20\"}, {\"A_a\": \"3\", \"B_b\": \"30\"}, {\"A_a\": \"4\", \"B_b\": \"40\"}, {\"A_a\": \"5\", \"B_b\": \"50\"}, {\"A_a\": \"6\", \"B_b\": \"60\"}, {\"A_a\": \"7\", \"B_b\": \"70\"}, {\"A_a\": \"8\", \"B_b\": \"80\"}, {\"A_a\": \"9\", \"B_b\": \"90\"}, {\"A_a\": \"10\", \"B_b\": \"100\"}]"}},"taskInfo":{"name":"bar"}}'
495456
name: iteration-driver
496457
template: system-dag-driver
497458
- arguments:
@@ -519,14 +480,14 @@ spec:
519480
- name: parent-dag-id
520481
value: '{{inputs.parameters.parent-dag-id}}'
521482
depends: ""
522-
name: for-loop-2-loop
483+
name: for-loop-2
523484
template: comp-for-loop-2-for-loop-2-iterator
524485
- arguments:
525486
parameters:
526487
- name: parent-dag-id
527488
value: '{{inputs.parameters.parent-dag-id}}'
528489
depends: ""
529-
name: for-loop-4-loop
490+
name: for-loop-4
530491
template: comp-for-loop-4-for-loop-4-iterator
531492
inputs:
532493
parameters:

backend/src/v2/driver/driver.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -1662,7 +1662,7 @@ func resolveUpstreamArtifacts(cfg resolveUpstreamOutputsConfig) error {
16621662
if taskOutput.GetOutputArtifactKey() == "" {
16631663
cfg.err(fmt.Errorf("output artifact key is empty"))
16641664
}
1665-
tasks, err := cfg.mlmd.GetExecutionsInDAG(cfg.ctx, cfg.dag, cfg.pipeline, false)
1665+
tasks, err := getDAGTasks(cfg.ctx, cfg.dag, cfg.pipeline, cfg.mlmd, nil)
16661666
if err != nil {
16671667
cfg.err(err)
16681668
}
+21
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
from kfp import Client, dsl
2+
3+
4+
@dsl.component
5+
def print_op(message: str) -> str:
6+
print(message)
7+
return message
8+
9+
10+
@dsl.pipeline()
11+
def loop_with_after_dependency_set():
12+
with dsl.ParallelFor([1, 2, 3]):
13+
one = print_op(message='foo')
14+
# Ensure that the dependecy is set downstream for all loop iterations
15+
two = print_op(message='bar').after(one)
16+
three = print_op(message='baz').after(one)
17+
18+
19+
if __name__ == '__main__':
20+
client = Client()
21+
run = client.create_run_from_pipeline_func(loop_with_after_dependency_set)
+39
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
from kfp import Client
2+
from kfp import dsl
3+
from kfp.dsl import Artifact, Input, Output
4+
5+
6+
@dsl.component
7+
def split_input(input: str) -> list:
8+
return input.split(',')
9+
10+
11+
@dsl.component
12+
def create_file(file: Output[Artifact], content: str):
13+
with open(file.path, 'w') as f:
14+
f.write(content)
15+
16+
17+
@dsl.component
18+
def read_file(file: Input[Artifact]) -> str:
19+
with open(file.path, 'r') as f:
20+
print(f.read())
21+
return file.path
22+
23+
24+
@dsl.pipeline()
25+
def loop_consume_upstream():
26+
model_ids_split_op = split_input(input='component1,component2,component3')
27+
model_ids_split_op.set_caching_options(False)
28+
29+
with dsl.ParallelFor(model_ids_split_op.output) as model_id:
30+
create_file_op = create_file(content=model_id)
31+
create_file_op.set_caching_options(False)
32+
# Consume the output from a op in the loop iteration DAG context
33+
read_file_op = read_file(file=create_file_op.outputs['file'])
34+
read_file_op.set_caching_options(False)
35+
36+
37+
if __name__ == '__main__':
38+
client = Client()
39+
run = client.create_run_from_pipeline_func(loop_consume_upstream)

samples/v2/sample_test.py

+4
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828
import pipeline_with_env
2929
import producer_consumer_param
3030
import subdagio
31+
import parallel_consume_upstream
32+
import parallel_after_dependency
3133
import two_step_pipeline_containerized
3234
import pipeline_with_placeholders
3335
from modelcar_import import modelcar_import
@@ -78,6 +80,8 @@ def test(self):
7880
pipeline_func=subdagio.multiple_artifacts_namedtuple.crust),
7981
TestCase(pipeline_func=pipeline_with_placeholders.pipeline_with_placeholders),
8082
TestCase(pipeline_func=modelcar_import.pipeline_modelcar_import),
83+
TestCase(pipeline_func=parallel_consume_upstream.loop_consume_upstream),
84+
TestCase(pipeline_func=parallel_after_dependency.loop_with_after_dependency_set),
8185
]
8286

8387
with ThreadPoolExecutor() as executor:

0 commit comments

Comments
 (0)