Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/array node workflow parallelism #2268

Merged
merged 5 commits into from
Apr 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions flytekit/core/array_node_map_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ def _raw_execute(self, **kwargs) -> Any:

def map_task(
task_function: PythonFunctionTask,
concurrency: int = 0,
concurrency: Optional[int] = None,
# TODO why no min_successes?
min_success_ratio: float = 1.0,
**kwargs,
Expand All @@ -328,7 +328,8 @@ def map_task(
:param task_function: This argument is implicitly passed and represents the repeatable function
:param concurrency: If specified, this limits the number of mapped tasks than can run in parallel to the given batch
size. If the size of the input exceeds the concurrency value, then multiple batches will be run serially until
all inputs are processed. If left unspecified, this means unbounded concurrency.
all inputs are processed. If set to 0, this means unbounded concurrency. If left unspecified, this means the
array node will inherit parallelism from the workflow
:param min_success_ratio: If specified, this determines the minimum fraction of total jobs which can complete
successfully before terminating this task and marking it successful.
"""
Expand Down
8 changes: 4 additions & 4 deletions tests/flytekit/unit/core/test_array_node_map_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ def many_inputs(a: int, b: str, c: float) -> str:
assert m.python_interface.inputs == {"a": List[int], "b": List[str], "c": List[float]}
assert (
m.name
== "tests.flytekit.unit.core.test_array_node_map_task.map_many_inputs_bf51001578d0ae197a52c0af0a99dd89-arraynode"
== "tests.flytekit.unit.core.test_array_node_map_task.map_many_inputs_6b3bd0353da5de6e84d7982921ead2b3-arraynode"
)
r_m = ArrayNodeMapTask(many_inputs)
assert str(r_m.python_interface) == str(m.python_interface)
Expand All @@ -194,7 +194,7 @@ def many_inputs(a: int, b: str, c: float) -> str:
assert m.python_interface.inputs == {"a": List[int], "b": List[str], "c": float}
assert (
m.name
== "tests.flytekit.unit.core.test_array_node_map_task.map_many_inputs_cb470e880fabd6265ec80e29fe60250d-arraynode"
== "tests.flytekit.unit.core.test_array_node_map_task.map_many_inputs_7df6892fe8ce5343c76197a0b6127e80-arraynode"
)
r_m = ArrayNodeMapTask(many_inputs, bound_inputs=set("c"))
assert str(r_m.python_interface) == str(m.python_interface)
Expand All @@ -204,7 +204,7 @@ def many_inputs(a: int, b: str, c: float) -> str:
assert m.python_interface.inputs == {"a": List[int], "b": str, "c": float}
assert (
m.name
== "tests.flytekit.unit.core.test_array_node_map_task.map_many_inputs_316e10eb97f5d2abd585951048b807b9-arraynode"
== "tests.flytekit.unit.core.test_array_node_map_task.map_many_inputs_80fd21f14571026755b99d6b1c045089-arraynode"
)
r_m = ArrayNodeMapTask(many_inputs, bound_inputs={"c", "b"})
assert str(r_m.python_interface) == str(m.python_interface)
Expand All @@ -214,7 +214,7 @@ def many_inputs(a: int, b: str, c: float) -> str:
assert m.python_interface.inputs == {"a": int, "b": str, "c": float}
assert (
m.name
== "tests.flytekit.unit.core.test_array_node_map_task.map_many_inputs_758022acd59ad1c8b81670378d4de4f6-arraynode"
== "tests.flytekit.unit.core.test_array_node_map_task.map_many_inputs_5d2500dc176052a030efda3b8c283f96-arraynode"
)
r_m = ArrayNodeMapTask(many_inputs, bound_inputs={"a", "c", "b"})
assert str(r_m.python_interface) == str(m.python_interface)
Expand Down
Loading