-
Notifications
You must be signed in to change notification settings - Fork 674
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Nested parallelization #5022
Nested parallelization #5022
Changes from 4 commits
12e4836
f7a8434
6da6c72
8ec5a04
035e4ec
f6a27be
b2398c8
0237f91
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,190 @@ | ||
--- | ||
jupytext: | ||
cell_metadata_filter: all | ||
formats: md:myst | ||
main_language: python | ||
notebook_metadata_filter: all | ||
text_representation: | ||
extension: .md | ||
format_name: myst | ||
format_version: 0.13 | ||
jupytext_version: 1.16.1 | ||
kernelspec: | ||
display_name: Python 3 | ||
language: python | ||
name: python3 | ||
--- | ||
|
||
+++ {"lines_to_next_cell": 0} | ||
pryce-turner marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
(nested_parallelization)= | ||
|
||
# Nested Parallelization | ||
pryce-turner marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
```{eval-rst} | ||
.. tags:: Advanced | ||
``` | ||
|
||
For exceptionally large or complicated workflows where dynamic workflows or map tasks aren't enough, it can be benficial to have multiple levels of parallelization. | ||
pryce-turner marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
This is useful for multiple reasons: | ||
pryce-turner marked this conversation as resolved.
Show resolved
Hide resolved
|
||
1. Better code organization | ||
pryce-turner marked this conversation as resolved.
Show resolved
Hide resolved
|
||
2. Better code reuse | ||
3. Better testing | ||
4. Better debugging | ||
5. Better monitoring - each subworkflow can be run independently and monitored independently | ||
pryce-turner marked this conversation as resolved.
Show resolved
Hide resolved
|
||
6. Better performance and scale - each subworkflow is executed as a separate workflow and thus can land on different flytepropeller workers and shards. This allows for better parallelism and scale. | ||
pryce-turner marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
## Nested Dynamics | ||
pryce-turner marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
This example shows how to break down a large workflow into smaller workflows and then compose them together to form a hierarchy. | ||
pryce-turner marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
```python | ||
pryce-turner marked this conversation as resolved.
Show resolved
Hide resolved
|
||
""" | ||
The structure for 6 items and a chunk size of 2, the workflow will be broken down as follows: | ||
pryce-turner marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
multi_wf -> level1 -> level2 -> core_wf -> step1 -> step2 | ||
-> core_wf -> step1 -> step2 | ||
level2 -> core_wf -> step1 -> step2 | ||
-> core_wf -> step1 -> step2 | ||
level2 -> core_wf -> step1 -> step2 | ||
-> core_wf -> step1 -> step2 | ||
""" | ||
|
||
import typing | ||
from flytekit import task, workflow, dynamic, LaunchPlan | ||
|
||
|
||
@task | ||
def step1(a: int) -> int: | ||
return a + 1 | ||
|
||
|
||
@task | ||
def step2(a: int) -> int: | ||
return a + 2 | ||
|
||
|
||
@workflow | ||
def core_wf(a: int) -> int: | ||
return step2(a=step1(a=a)) | ||
|
||
|
||
core_wf_lp = LaunchPlan.get_or_create(core_wf) | ||
|
||
|
||
@dynamic | ||
def level2(l: typing.List[int]) -> typing.List[int]: | ||
return [core_wf_lp(a=a) for a in l] | ||
|
||
|
||
@task | ||
def reduce(l: typing.List[typing.List[int]]) -> typing.List[int]: | ||
f = [] | ||
for i in l: | ||
f.extend(i) | ||
return f | ||
|
||
|
||
@dynamic | ||
def level1(l: typing.List[int], chunk: int) -> typing.List[int]: | ||
v = [] | ||
for i in range(0, len(l), chunk): | ||
v.append(level2(l=l[i:i + chunk])) | ||
return reduce(l=v) | ||
|
||
|
||
@workflow | ||
def multi_wf(l: typing.List[int], chunk: int) -> typing.List[int]: | ||
return level1(l=l, chunk=chunk) | ||
``` | ||
|
||
+++ {"lines_to_next_cell": 0} | ||
pryce-turner marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
This shows a top-level workflow which uses 2 levels of dynamic workflows to process a list through some simple addition tasks and then flatten it again. Here is a visual representation of the execution in a Flyte console: | ||
pryce-turner marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
:::{figure} https://raw.githubusercontent.com/flyteorg/static-resources/main/flytesnacks/user_guide/nested_parallel_top_level.png?raw=true | ||
:alt: Nested Parallelization UI View | ||
:class: with-shadow | ||
::: | ||
|
||
For each node in that top-level we can see 2 sub-workflows being run in parallel. | ||
pryce-turner marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
:::{figure} https://raw.githubusercontent.com/flyteorg/static-resources/main/flytesnacks/user_guide/nested_parallel_inner_dynamic.png?raw=true | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would it be possible to add a "level2" annotation between the inner and outer dotted-line boxes, similar to the "level1" annotation outside the top-level box? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
:alt: Inner Dynamic | ||
pryce-turner marked this conversation as resolved.
Show resolved
Hide resolved
|
||
:class: with-shadow | ||
::: | ||
|
||
Finally, drilling into each sub-workflow, we'll see both those tasks being executed in series. | ||
pryce-turner marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
:::{figure} https://raw.githubusercontent.com/flyteorg/static-resources/main/flytesnacks/user_guide/nested_parallel_subworkflow.png?raw=true | ||
:alt: Sub-Workflow | ||
pryce-turner marked this conversation as resolved.
Show resolved
Hide resolved
|
||
:class: with-shadow | ||
::: | ||
|
||
## Mixed Parallelism | ||
pryce-turner marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
This example is similar to the above but instead of using a dynamic to parallelize a 2-task serial workflow, we're using that workflow to call a map task which processes both inputs in parallel. This workflow has one fewer layers of parallelism so the outputs won't be the same, but it does still demonstrate how you can mix these different approaches to achieving concurrency. | ||
pryce-turner marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
```python | ||
pryce-turner marked this conversation as resolved.
Show resolved
Hide resolved
|
||
""" | ||
The structure for 6 items and a chunk size of 2, the workflow will be broken down as follows: | ||
pryce-turner marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
multi_wf -> level1 -> level2 -> mappable | ||
-> mappable | ||
level2 -> mappable | ||
-> mappable | ||
level2 -> mappable | ||
-> mappable | ||
""" | ||
import typing | ||
from flytekit import task, workflow, dynamic, map_task | ||
|
||
|
||
@task | ||
def mappable(a: int) -> int: | ||
return a + 2 | ||
|
||
|
||
@workflow | ||
def level2(l: typing.List[int]) -> typing.List[int]: | ||
return map_task(mappable)(a=l) | ||
|
||
|
||
@task | ||
def reduce(l: typing.List[typing.List[int]]) -> typing.List[int]: | ||
f = [] | ||
for i in l: | ||
f.extend(i) | ||
return f | ||
|
||
|
||
@dynamic | ||
def level1(l: typing.List[int], chunk: int) -> typing.List[int]: | ||
v = [] | ||
for i in range(0, len(l), chunk): | ||
v.append(level2(l=l[i : i + chunk])) | ||
return reduce(l=v) | ||
|
||
|
||
@workflow | ||
def multi_wf(l: typing.List[int], chunk: int) -> typing.List[int]: | ||
return level1(l=l, chunk=chunk) | ||
|
||
``` | ||
+++ {"lines_to_next_cell": 0} | ||
pryce-turner marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
While the top-level dynamic will be exactly the same as the previous example, here you can see the inner map task nodes as well as links in the sidebar. | ||
pryce-turner marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
:::{figure} https://raw.githubusercontent.com/flyteorg/static-resources/main/flytesnacks/user_guide/nested_parallel_inner_map.png?raw=true | ||
:alt: Inner Map Task | ||
:class: with-shadow | ||
::: | ||
|
||
## Design Considerations | ||
pryce-turner marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
You can nest even further if needed, or incorporate map tasks if your inputs are all the same type. The design of your workflow should of course be informed by the actual data you're processing. For example if you have a big library of music that you'd like to extract the lyrics for, the first level can loop through all the albums and the second level can go through each song. | ||
pryce-turner marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
If you're just churning through an enormous list of the same input, it's typically best to keep the code simple and let the scheduler handle optimizing the execution. Additionally, unless you need the features of a dynamic workflow like mixing and matching inputs and outputs, it's usually most efficient to use a map task. This has the added benefit of keeping the UI clean. | ||
pryce-turner marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
You can also choose to limit the scale of parallel execution at a couple levels. The `max_parallelism` attribute can be applied at the workflow level and will limit the number of parallel tasks being executed, this is set to 25 by default. Within map tasks specifically, you can indicate a `concurrency` argument which will limit the number of mapped tasks within running at any given time. | ||
pryce-turner marked this conversation as resolved.
Show resolved
Hide resolved
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you can remove this entire block of frontmatter -- the contents of the Python sections will render correctly without it.