From bd4ef5bcf8773b963503bb3841a66d0c048aeaca Mon Sep 17 00:00:00 2001 From: Paul Dittamo Date: Fri, 15 Mar 2024 00:37:23 -0700 Subject: [PATCH 1/4] default array node concurrency to -1 Signed-off-by: Paul Dittamo --- flytekit/core/array_node_map_task.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/flytekit/core/array_node_map_task.py b/flytekit/core/array_node_map_task.py index 7e7dbaff39..1275b020ee 100644 --- a/flytekit/core/array_node_map_task.py +++ b/flytekit/core/array_node_map_task.py @@ -314,21 +314,22 @@ def _raw_execute(self, **kwargs) -> Any: def map_task( task_function: PythonFunctionTask, - concurrency: int = 0, + concurrency: int = -1, # TODO why no min_successes? min_success_ratio: float = 1.0, **kwargs, ): - """Map task that uses the ``ArrayNode`` construct.. + """Map task that uses the ``ArrayNode`` construct... - .. important:: + ... important:: This is an experimental drop-in replacement for :py:func:`~flytekit.map_task`. :param task_function: This argument is implicitly passed and represents the repeatable function :param concurrency: If specified, this limits the number of mapped tasks than can run in parallel to the given batch size. If the size of the input exceeds the concurrency value, then multiple batches will be run serially until - all inputs are processed. If left unspecified, this means unbounded concurrency. + all inputs are processed. If set to 0, this means unbounded concurrency. If left unspecified, this means the + array node will inherit parallelism from the workflow :param min_success_ratio: If specified, this determines the minimum fraction of total jobs which can complete successfully before terminating this task and marking it successful. """ From e4a437fd673afdf4744dab5a3bc8cadcbb1f738d Mon Sep 17 00:00:00 2001 From: Paul Dittamo Date: Fri, 15 Mar 2024 00:38:50 -0700 Subject: [PATCH 2/4] typo Signed-off-by: Paul Dittamo --- flytekit/core/array_node_map_task.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flytekit/core/array_node_map_task.py b/flytekit/core/array_node_map_task.py index 1275b020ee..34d785525f 100644 --- a/flytekit/core/array_node_map_task.py +++ b/flytekit/core/array_node_map_task.py @@ -319,9 +319,9 @@ def map_task( min_success_ratio: float = 1.0, **kwargs, ): - """Map task that uses the ``ArrayNode`` construct... + """Map task that uses the ``ArrayNode`` construct.. - ... important:: + .. important:: This is an experimental drop-in replacement for :py:func:`~flytekit.map_task`. From 8a8f4fd6d274548aedc99f36c45a7acf5fbb6013 Mon Sep 17 00:00:00 2001 From: Paul Dittamo Date: Wed, 10 Apr 2024 14:21:53 -0700 Subject: [PATCH 3/4] set default concurrency to None for backwards compatibility Signed-off-by: Paul Dittamo --- flytekit/core/array_node_map_task.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flytekit/core/array_node_map_task.py b/flytekit/core/array_node_map_task.py index 69b250647d..94cba1426c 100644 --- a/flytekit/core/array_node_map_task.py +++ b/flytekit/core/array_node_map_task.py @@ -314,7 +314,7 @@ def _raw_execute(self, **kwargs) -> Any: def map_task( task_function: PythonFunctionTask, - concurrency: int = -1, + concurrency: Optional[int] = None, # TODO why no min_successes? min_success_ratio: float = 1.0, **kwargs, From 3f596c2108de89eaddf865c4b45c9b840caa857a Mon Sep 17 00:00:00 2001 From: Paul Dittamo Date: Thu, 18 Apr 2024 12:03:26 -0700 Subject: [PATCH 4/4] update unit test - hashed name Signed-off-by: Paul Dittamo --- tests/flytekit/unit/core/test_array_node_map_task.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/flytekit/unit/core/test_array_node_map_task.py b/tests/flytekit/unit/core/test_array_node_map_task.py index 8b078cdf2a..5c84c60984 100644 --- a/tests/flytekit/unit/core/test_array_node_map_task.py +++ b/tests/flytekit/unit/core/test_array_node_map_task.py @@ -184,7 +184,7 @@ def many_inputs(a: int, b: str, c: float) -> str: assert m.python_interface.inputs == {"a": List[int], "b": List[str], "c": List[float]} assert ( m.name - == "tests.flytekit.unit.core.test_array_node_map_task.map_many_inputs_bf51001578d0ae197a52c0af0a99dd89-arraynode" + == "tests.flytekit.unit.core.test_array_node_map_task.map_many_inputs_6b3bd0353da5de6e84d7982921ead2b3-arraynode" ) r_m = ArrayNodeMapTask(many_inputs) assert str(r_m.python_interface) == str(m.python_interface) @@ -194,7 +194,7 @@ def many_inputs(a: int, b: str, c: float) -> str: assert m.python_interface.inputs == {"a": List[int], "b": List[str], "c": float} assert ( m.name - == "tests.flytekit.unit.core.test_array_node_map_task.map_many_inputs_cb470e880fabd6265ec80e29fe60250d-arraynode" + == "tests.flytekit.unit.core.test_array_node_map_task.map_many_inputs_7df6892fe8ce5343c76197a0b6127e80-arraynode" ) r_m = ArrayNodeMapTask(many_inputs, bound_inputs=set("c")) assert str(r_m.python_interface) == str(m.python_interface) @@ -204,7 +204,7 @@ def many_inputs(a: int, b: str, c: float) -> str: assert m.python_interface.inputs == {"a": List[int], "b": str, "c": float} assert ( m.name - == "tests.flytekit.unit.core.test_array_node_map_task.map_many_inputs_316e10eb97f5d2abd585951048b807b9-arraynode" + == "tests.flytekit.unit.core.test_array_node_map_task.map_many_inputs_80fd21f14571026755b99d6b1c045089-arraynode" ) r_m = ArrayNodeMapTask(many_inputs, bound_inputs={"c", "b"}) assert str(r_m.python_interface) == str(m.python_interface) @@ -214,7 +214,7 @@ def many_inputs(a: int, b: str, c: float) -> str: assert m.python_interface.inputs == {"a": int, "b": str, "c": float} assert ( m.name - == "tests.flytekit.unit.core.test_array_node_map_task.map_many_inputs_758022acd59ad1c8b81670378d4de4f6-arraynode" + == "tests.flytekit.unit.core.test_array_node_map_task.map_many_inputs_5d2500dc176052a030efda3b8c283f96-arraynode" ) r_m = ArrayNodeMapTask(many_inputs, bound_inputs={"a", "c", "b"}) assert str(r_m.python_interface) == str(m.python_interface)