From cea137828ffd95086249c8c76b70588563e48574 Mon Sep 17 00:00:00 2001 From: Paul Dittamo Date: Mon, 18 Nov 2024 21:10:47 -0800 Subject: [PATCH 1/6] add support for toggling data mode for array node Signed-off-by: Paul Dittamo --- flytekit/core/array_node.py | 14 ++++++++++---- flytekit/core/array_node_map_task.py | 21 ++++++++++----------- flytekit/models/core/workflow.py | 10 +++++++++- flytekit/tools/translator.py | 1 + 4 files changed, 30 insertions(+), 16 deletions(-) diff --git a/flytekit/core/array_node.py b/flytekit/core/array_node.py index c83da43f87..f50d3ee9c7 100644 --- a/flytekit/core/array_node.py +++ b/flytekit/core/array_node.py @@ -62,6 +62,7 @@ def __init__( self.id = target.name self._bindings = bindings or [] self.metadata = metadata + self._data_mode = None if min_successes is not None: self._min_successes = min_successes @@ -93,10 +94,12 @@ def __init__( raise ValueError("No interface found for the target entity.") if isinstance(target, LaunchPlan) or isinstance(target, FlyteLaunchPlan): + self._data_mode = _core_workflow.ArrayNode.SINGLE_INPUT_FILE if self._execution_mode != _core_workflow.ArrayNode.FULL_STATE: raise ValueError("Only execution version 1 is supported for LaunchPlans.") else: - raise ValueError(f"Only LaunchPlans are supported for now, but got {type(target)}") + self._data_mode = _core_workflow.ArrayNode.INDIVIDUAL_INPUT_FILES + # raise ValueError(f"Only LaunchPlans are supported for now, but got {type(target)}") def construct_node_metadata(self) -> _workflow_model.NodeMetadata: # Part of SupportsNodeCreation interface @@ -133,6 +136,10 @@ def upstream_nodes(self) -> List[Node]: def flyte_entity(self) -> Any: return self.target + @property + def data_mode(self) -> _core_workflow.ArrayNode.DataMode: + return self._data_mode + def local_execute(self, ctx: FlyteContext, **kwargs) -> Union[Tuple[Promise], Promise, VoidPromise]: if self._remote_interface: raise ValueError("Mapping over remote entities is not supported in local execution.") @@ -269,10 +276,9 @@ def array_node( :return: A callable function that takes in keyword arguments and returns a Promise created by flyte_entity_call_handler """ - from flytekit.remote import FlyteLaunchPlan - if not isinstance(target, LaunchPlan) and not isinstance(target, FlyteLaunchPlan): - raise ValueError("Only LaunchPlans are supported for now.") + # if not isinstance(target, LaunchPlan) and not isinstance(target, FlyteLaunchPlan): + # raise ValueError("Only LaunchPlans are supported for now.") node = ArrayNode( target=target, diff --git a/flytekit/core/array_node_map_task.py b/flytekit/core/array_node_map_task.py index 5fd184e7fd..32671d5d59 100644 --- a/flytekit/core/array_node_map_task.py +++ b/flytekit/core/array_node_map_task.py @@ -367,22 +367,21 @@ def map_task( :param min_successes: The minimum number of successful executions :param min_success_ratio: The minimum ratio of successful executions """ - from flytekit.remote import FlyteLaunchPlan - if isinstance(target, LaunchPlan) or isinstance(target, FlyteLaunchPlan): - return array_node( - target=target, - concurrency=concurrency, - min_successes=min_successes, - min_success_ratio=min_success_ratio, - ) - return array_node_map_task( - task_function=target, + # if isinstance(target, LaunchPlan) or isinstance(target, FlyteLaunchPlan): + return array_node( + target=target, concurrency=concurrency, min_successes=min_successes, min_success_ratio=min_success_ratio, - **kwargs, ) + # return array_node_map_task( + # task_function=target, + # concurrency=concurrency, + # min_successes=min_successes, + # min_success_ratio=min_success_ratio, + # **kwargs, + # ) def array_node_map_task( diff --git a/flytekit/models/core/workflow.py b/flytekit/models/core/workflow.py index cadb33a434..721eabf5cd 100644 --- a/flytekit/models/core/workflow.py +++ b/flytekit/models/core/workflow.py @@ -382,7 +382,13 @@ def from_flyte_idl(cls, pb2_object: _core_workflow.GateNode) -> "GateNode": class ArrayNode(_common.FlyteIdlEntity): def __init__( - self, node: "Node", parallelism=None, min_successes=None, min_success_ratio=None, execution_mode=None + self, + node: "Node", + parallelism=None, + min_successes=None, + min_success_ratio=None, + execution_mode=None, + data_mode=None, ) -> None: """ TODO: docstring @@ -393,6 +399,7 @@ def __init__( self._min_successes = min_successes self._min_success_ratio = min_success_ratio self._execution_mode = execution_mode + self._data_mode = data_mode @property def node(self) -> "Node": @@ -405,6 +412,7 @@ def to_flyte_idl(self) -> _core_workflow.ArrayNode: min_successes=self._min_successes, min_success_ratio=self._min_success_ratio, execution_mode=self._execution_mode, + data_mode=self._data_mode, ) @classmethod diff --git a/flytekit/tools/translator.py b/flytekit/tools/translator.py index 5521909810..d28e7bed92 100644 --- a/flytekit/tools/translator.py +++ b/flytekit/tools/translator.py @@ -595,6 +595,7 @@ def get_serializable_array_node( min_successes=array_node.min_successes, min_success_ratio=array_node.min_success_ratio, execution_mode=array_node.execution_mode, + data_mode=array_node.data_mode, ) From a3dded1201d54b862488b32032e40a044e324ab5 Mon Sep 17 00:00:00 2001 From: Paul Dittamo Date: Wed, 18 Dec 2024 19:56:48 -0800 Subject: [PATCH 2/6] clean up Signed-off-by: Paul Dittamo --- flytekit/core/array_node.py | 20 ++++++++++---------- flytekit/core/array_node_map_task.py | 22 ++++++++++++---------- 2 files changed, 22 insertions(+), 20 deletions(-) diff --git a/flytekit/core/array_node.py b/flytekit/core/array_node.py index 0f71a01020..848d999a77 100644 --- a/flytekit/core/array_node.py +++ b/flytekit/core/array_node.py @@ -19,6 +19,7 @@ flyte_entity_call_handler, translate_inputs_to_literals, ) +from flytekit.core.task import ReferenceTask from flytekit.loggers import logger from flytekit.models import interface as _interface_models from flytekit.models import literals as _literal_models @@ -34,8 +35,7 @@ class ArrayNode: def __init__( self, - target: Union[LaunchPlan, "FlyteLaunchPlan"], - execution_mode: _core_workflow.ArrayNode.ExecutionMode = _core_workflow.ArrayNode.FULL_STATE, + target: Union[LaunchPlan, ReferenceTask, "FlyteLaunchPlan"], bindings: Optional[List[_literal_models.Binding]] = None, concurrency: Optional[int] = None, min_successes: Optional[int] = None, @@ -51,14 +51,12 @@ def __init__( :param min_successes: The minimum number of successful executions. If set, this takes precedence over min_success_ratio :param min_success_ratio: The minimum ratio of successful executions. - :param execution_mode: The execution mode for propeller to use when handling ArrayNode :param metadata: The metadata for the underlying node """ from flytekit.remote import FlyteLaunchPlan self.target = target self._concurrency = concurrency - self._execution_mode = execution_mode self.id = target.name self._bindings = bindings or [] self.metadata = metadata @@ -93,13 +91,15 @@ def __init__( else: raise ValueError("No interface found for the target entity.") - if isinstance(target, LaunchPlan) or isinstance(target, FlyteLaunchPlan): + self._execution_mode = _core_workflow.ArrayNode.FULL_STATE + + if isinstance(target, (LaunchPlan, FlyteLaunchPlan)): self._data_mode = _core_workflow.ArrayNode.SINGLE_INPUT_FILE - if self._execution_mode != _core_workflow.ArrayNode.FULL_STATE: - raise ValueError("Only execution version 1 is supported for LaunchPlans.") - else: + elif isinstance(target, ReferenceTask): self._data_mode = _core_workflow.ArrayNode.INDIVIDUAL_INPUT_FILES - # raise ValueError(f"Only LaunchPlans are supported for now, but got {type(target)}") + self._execution_mode = _core_workflow.ArrayNode.MINIMAL_STATE + else: + raise ValueError(f"Only LaunchPlans are supported for now, but got {type(target)}") def construct_node_metadata(self) -> _workflow_model.NodeMetadata: # Part of SupportsNodeCreation interface @@ -261,7 +261,7 @@ def __call__(self, *args, **kwargs): def array_node( - target: Union[LaunchPlan, "FlyteLaunchPlan"], + target: Union[LaunchPlan, ReferenceTask, "FlyteLaunchPlan"], concurrency: Optional[int] = None, min_success_ratio: Optional[float] = None, min_successes: Optional[int] = None, diff --git a/flytekit/core/array_node_map_task.py b/flytekit/core/array_node_map_task.py index e960800477..78b9611651 100644 --- a/flytekit/core/array_node_map_task.py +++ b/flytekit/core/array_node_map_task.py @@ -18,6 +18,7 @@ from flytekit.core.interface import transform_interface_to_list_interface from flytekit.core.launch_plan import LaunchPlan from flytekit.core.python_function_task import PythonFunctionTask, PythonInstanceTask +from flytekit.core.task import ReferenceTask from flytekit.core.type_engine import TypeEngine from flytekit.core.utils import timeit from flytekit.loggers import logger @@ -388,21 +389,22 @@ def map_task( :param min_successes: The minimum number of successful executions :param min_success_ratio: The minimum ratio of successful executions """ + from flytekit.remote import FlyteLaunchPlan - # if isinstance(target, LaunchPlan) or isinstance(target, FlyteLaunchPlan): - return array_node( - target=target, + if isinstance(target, (LaunchPlan, FlyteLaunchPlan, ReferenceTask)): + return array_node( + target=target, + concurrency=concurrency, + min_successes=min_successes, + min_success_ratio=min_success_ratio, + ) + return array_node_map_task( + task_function=target, concurrency=concurrency, min_successes=min_successes, min_success_ratio=min_success_ratio, + **kwargs, ) - # return array_node_map_task( - # task_function=target, - # concurrency=concurrency, - # min_successes=min_successes, - # min_success_ratio=min_success_ratio, - # **kwargs, - # ) def array_node_map_task( From b12819baeea33d6e0662c6af5c1850fdc09af29f Mon Sep 17 00:00:00 2001 From: Paul Dittamo Date: Thu, 19 Dec 2024 00:54:19 -0800 Subject: [PATCH 3/6] clean up Signed-off-by: Paul Dittamo --- flytekit/core/array_node.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flytekit/core/array_node.py b/flytekit/core/array_node.py index 848d999a77..ca7c630e52 100644 --- a/flytekit/core/array_node.py +++ b/flytekit/core/array_node.py @@ -281,8 +281,8 @@ def array_node( flyte_entity_call_handler """ - # if not isinstance(target, LaunchPlan) and not isinstance(target, FlyteLaunchPlan): - # raise ValueError("Only LaunchPlans are supported for now.") + if not isinstance(target, (LaunchPlan, FlyteLaunchPlan, ReferenceTask)): + raise ValueError("Only LaunchPlans and ReferenceTasks are supported for now.") node = ArrayNode( target=target, From 036d8c447e4be1d3cc4a0a249b8f71440be91eec Mon Sep 17 00:00:00 2001 From: Paul Dittamo Date: Thu, 19 Dec 2024 10:16:02 -0800 Subject: [PATCH 4/6] cleanup Signed-off-by: Paul Dittamo --- flytekit/core/array_node.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flytekit/core/array_node.py b/flytekit/core/array_node.py index ca7c630e52..5d8cb21cb3 100644 --- a/flytekit/core/array_node.py +++ b/flytekit/core/array_node.py @@ -61,6 +61,7 @@ def __init__( self._bindings = bindings or [] self.metadata = metadata self._data_mode = None + self._execution_mode = None if min_successes is not None: self._min_successes = min_successes @@ -91,10 +92,9 @@ def __init__( else: raise ValueError("No interface found for the target entity.") - self._execution_mode = _core_workflow.ArrayNode.FULL_STATE - if isinstance(target, (LaunchPlan, FlyteLaunchPlan)): self._data_mode = _core_workflow.ArrayNode.SINGLE_INPUT_FILE + self._execution_mode = _core_workflow.ArrayNode.FULL_STATE elif isinstance(target, ReferenceTask): self._data_mode = _core_workflow.ArrayNode.INDIVIDUAL_INPUT_FILES self._execution_mode = _core_workflow.ArrayNode.MINIMAL_STATE From 1ea5bb7438159e9c166edd96d40f520b47594b9d Mon Sep 17 00:00:00 2001 From: Eduardo Apolinario Date: Fri, 20 Dec 2024 17:59:52 -0500 Subject: [PATCH 5/6] Bump flyteidl lower-bound to 1.14.1 Signed-off-by: Eduardo Apolinario --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 58c107cdc3..3dc782c507 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -20,7 +20,7 @@ dependencies = [ "diskcache>=5.2.1", "docker>=4.0.0", "docstring-parser>=0.9.0", - "flyteidl>=1.13.9", + "flyteidl>=1.14.1", "fsspec>=2023.3.0", "gcsfs>=2023.3.0", "googleapis-common-protos>=1.57", From 4733edd99894af530b9524623f4b7e404981925c Mon Sep 17 00:00:00 2001 From: Eduardo Apolinario Date: Fri, 20 Dec 2024 18:09:22 -0500 Subject: [PATCH 6/6] Add import of FlyteLaunchPlan back Signed-off-by: Eduardo Apolinario --- flytekit/core/array_node.py | 1 + 1 file changed, 1 insertion(+) diff --git a/flytekit/core/array_node.py b/flytekit/core/array_node.py index 5d8cb21cb3..466058a791 100644 --- a/flytekit/core/array_node.py +++ b/flytekit/core/array_node.py @@ -280,6 +280,7 @@ def array_node( :return: A callable function that takes in keyword arguments and returns a Promise created by flyte_entity_call_handler """ + from flytekit.remote import FlyteLaunchPlan if not isinstance(target, (LaunchPlan, FlyteLaunchPlan, ReferenceTask)): raise ValueError("Only LaunchPlans and ReferenceTasks are supported for now.")