From c149d0a44e21fe6b56099d1d12be3c3e0e6b26f3 Mon Sep 17 00:00:00 2001 From: wayner0628 Date: Tue, 13 Aug 2024 02:48:34 +0800 Subject: [PATCH 01/13] refactor: let flyte backend generate ID Signed-off-by: wayner0628 --- flytekit/clients/friendly.py | 16 +-- flytekit/clis/flyte_cli/main.py | 6 +- flytekit/remote/remote.py | 139 +++++----------------- tests/flytekit/unit/remote/test_remote.py | 53 +-------- 4 files changed, 42 insertions(+), 172 deletions(-) diff --git a/flytekit/clients/friendly.py b/flytekit/clients/friendly.py index 58038d12ec..24bc3b3c8c 100644 --- a/flytekit/clients/friendly.py +++ b/flytekit/clients/friendly.py @@ -537,12 +537,11 @@ def update_named_entity(self, resource_type, id, metadata): # #################################################################################################################### - def create_execution(self, project, domain, name, execution_spec, inputs): + def create_execution(self, project, domain, execution_spec, inputs): """ This will create an execution for the given execution spec. :param Text project: :param Text domain: - :param Text name: :param flytekit.models.execution.ExecutionSpec execution_spec: This is the specification for the execution. :param flytekit.models.literals.LiteralMap inputs: The inputs for the execution :returns: The unique identifier for the execution. @@ -554,7 +553,7 @@ def create_execution(self, project, domain, name, execution_spec, inputs): _execution_pb2.ExecutionCreateRequest( project=project, domain=domain, - name=name, + name="", spec=execution_spec.to_flyte_idl(), inputs=inputs.to_flyte_idl(), ) @@ -562,16 +561,15 @@ def create_execution(self, project, domain, name, execution_spec, inputs): .id ) - def recover_execution(self, id, name: str = None): + def recover_execution(self, id): """ Recreates a previously-run workflow execution that will only start executing from the last known failure point. :param flytekit.models.core.identifier.WorkflowExecutionIdentifier id: - :param name str: Optional name to assign to the newly created execution. :rtype: flytekit.models.core.identifier.WorkflowExecutionIdentifier """ return _identifier.WorkflowExecutionIdentifier.from_flyte_idl( super(SynchronousFlyteClient, self) - .recover_execution(_execution_pb2.ExecutionRecoverRequest(id=id.to_flyte_idl(), name=name)) + .recover_execution(_execution_pb2.ExecutionRecoverRequest(id=id.to_flyte_idl(), name="")) .id ) @@ -650,17 +648,15 @@ def terminate_execution(self, id, cause): _execution_pb2.ExecutionTerminateRequest(id=id.to_flyte_idl(), cause=cause) ) - def relaunch_execution(self, id, name=None): + def relaunch_execution(self, id): """ :param flytekit.models.core.identifier.WorkflowExecutionIdentifier id: - :param Text name: [Optional] name for the new execution. If not specified, a randomly generated name will be - used :returns: The unique identifier for the new execution. :rtype: flytekit.models.core.identifier.WorkflowExecutionIdentifier """ return _identifier.WorkflowExecutionIdentifier.from_flyte_idl( super(SynchronousFlyteClient, self) - .relaunch_execution(_execution_pb2.ExecutionRelaunchRequest(id=id.to_flyte_idl(), name=name)) + .relaunch_execution(_execution_pb2.ExecutionRelaunchRequest(id=id.to_flyte_idl(), name="")) .id ) diff --git a/flytekit/clis/flyte_cli/main.py b/flytekit/clis/flyte_cli/main.py index c201bc5b57..9089407b0a 100644 --- a/flytekit/clis/flyte_cli/main.py +++ b/flytekit/clis/flyte_cli/main.py @@ -1088,7 +1088,7 @@ def update_launch_plan(state, host, insecure, urn=None): @_optional_name_option @_host_option @_insecure_option -def recover_execution(urn, name, host, insecure): +def recover_execution(urn, host, insecure): """ Recreates a previously-run workflow execution that will only start executing from the last known failure point. @@ -1100,8 +1100,6 @@ def recover_execution(urn, name, host, insecure): - downstream system failures (downstream services) - or simply to recover executions that failed because of retry exhaustion and should complete if tried again. - You can optionally assign a name to the recreated execution you trigger or let the system assign one. - Usage: $ flyte-cli recover-execution -u ex:flyteexamples:development:some-workflow:abc123 -n my_retry_name @@ -1115,7 +1113,7 @@ def recover_execution(urn, name, host, insecure): original_workflow_execution_identifier = cli_identifiers.WorkflowExecutionIdentifier.from_python_std(urn) - execution_identifier_resp = client.recover_execution(id=original_workflow_execution_identifier, name=name) + execution_identifier_resp = client.recover_execution(id=original_workflow_execution_identifier) execution_identifier = cli_identifiers.WorkflowExecutionIdentifier.promote_from_model(execution_identifier_resp) click.secho("Launched execution: {}".format(execution_identifier), fg="blue") click.echo("") diff --git a/flytekit/remote/remote.py b/flytekit/remote/remote.py index 005f2e4d4f..444c62d342 100644 --- a/flytekit/remote/remote.py +++ b/flytekit/remote/remote.py @@ -1112,8 +1112,6 @@ def _execute( inputs: typing.Dict[str, typing.Any], project: str = None, domain: str = None, - execution_name: typing.Optional[str] = None, - execution_name_prefix: typing.Optional[str] = None, options: typing.Optional[Options] = None, wait: bool = False, type_hints: typing.Optional[typing.Dict[str, typing.Type]] = None, @@ -1129,7 +1127,6 @@ def _execute( :param inputs: dictionary mapping argument names to values :param project: project on which to execute the entity referenced by flyte_id :param domain: domain on which to execute the entity referenced by flyte_id - :param execution_name: name of the execution :param wait: if True, waits for execution to complete :param type_hints: map of python types to inputs so that the TypeEngine knows how to convert the input values into Flyte Literals. @@ -1142,10 +1139,6 @@ def _execute( :param execution_cluster_label: Specify label of cluster(s) on which newly created execution should be placed. :returns: :class:`~flytekit.remote.workflow_execution.FlyteWorkflowExecution` """ - if execution_name is not None and execution_name_prefix is not None: - raise ValueError("Only one of execution_name and execution_name_prefix can be set, but got both set") - execution_name_prefix = execution_name_prefix + "-" if execution_name_prefix is not None else None - execution_name = execution_name or (execution_name_prefix or "f") + uuid.uuid4().hex[:19] if not options: options = Options() if options.disable_notifications is not None: @@ -1188,49 +1181,40 @@ def _execute( literal_inputs = literal_models.LiteralMap(literals=literal_map) - try: - # Currently, this will only execute the flyte entity referenced by - # flyte_id in the same project and domain. However, it is possible to execute it in a different project - # and domain, which is specified in the first two arguments of client.create_execution. This is useful - # in the case that I want to use a flyte entity from e.g. project "A" but actually execute the entity on a - # different project "B". For now, this method doesn't support this use case. - exec_id = self.client.create_execution( - project or self.default_project, - domain or self.default_domain, - execution_name, - ExecutionSpec( - entity.id, - ExecutionMetadata( - ExecutionMetadata.ExecutionMode.MANUAL, - "placeholder", # Admin replaces this from oidc token if auth is enabled. - 0, - ), - overwrite_cache=overwrite_cache, - notifications=notifications, - disable_all=options.disable_notifications, - labels=options.labels, - annotations=options.annotations, - raw_output_data_config=options.raw_output_data_config, - auth_role=None, - max_parallelism=options.max_parallelism, - security_context=options.security_context, - envs=common_models.Envs(envs) if envs else None, - tags=tags, - cluster_assignment=ClusterAssignment(cluster_pool=cluster_pool) if cluster_pool else None, - execution_cluster_label=ExecutionClusterLabel(execution_cluster_label) - if execution_cluster_label - else None, + # Currently, this will only execute the flyte entity referenced by + # flyte_id in the same project and domain. However, it is possible to execute it in a different project + # and domain, which is specified in the first two arguments of client.create_execution. This is useful + # in the case that I want to use a flyte entity from e.g. project "A" but actually execute the entity on a + # different project "B". For now, this method doesn't support this use case. + exec_id = self.client.create_execution( + project or self.default_project, + domain or self.default_domain, + ExecutionSpec( + entity.id, + ExecutionMetadata( + ExecutionMetadata.ExecutionMode.MANUAL, + "placeholder", # Admin replaces this from oidc token if auth is enabled. + 0, ), - literal_inputs, - ) - except user_exceptions.FlyteEntityAlreadyExistsException: - logger.warning( - f"Execution with Execution ID {execution_name} already exists. " - f"Assuming this is the same execution, returning!" - ) - exec_id = WorkflowExecutionIdentifier( - project=project or self.default_project, domain=domain or self.default_domain, name=execution_name - ) + overwrite_cache=overwrite_cache, + notifications=notifications, + disable_all=options.disable_notifications, + labels=options.labels, + annotations=options.annotations, + raw_output_data_config=options.raw_output_data_config, + auth_role=None, + max_parallelism=options.max_parallelism, + security_context=options.security_context, + envs=common_models.Envs(envs) if envs else None, + tags=tags, + cluster_assignment=ClusterAssignment(cluster_pool=cluster_pool) if cluster_pool else None, + execution_cluster_label=ExecutionClusterLabel(execution_cluster_label) + if execution_cluster_label + else None, + ), + literal_inputs, + ) + execution = FlyteWorkflowExecution.promote_from_model(self.client.get_execution(exec_id)) if wait: @@ -1270,8 +1254,6 @@ def execute( domain: str = None, name: str = None, version: str = None, - execution_name: typing.Optional[str] = None, - execution_name_prefix: typing.Optional[str] = None, image_config: typing.Optional[ImageConfig] = None, options: typing.Optional[Options] = None, wait: bool = False, @@ -1306,7 +1288,6 @@ def execute( first before executing. :param name: execute entity using this name. If not None, use this value instead of ``entity.name`` :param version: execute entity using this version. If None, uses auto-generated value. - :param execution_name: name of the execution. If None, uses auto-generated value. :param image_config: :param wait: if True, waits for execution to complete :param type_hints: Python types to be passed to the TypeEngine so that it knows how to properly convert the @@ -1335,8 +1316,6 @@ def execute( inputs=inputs, project=project, domain=domain, - execution_name=execution_name, - execution_name_prefix=execution_name_prefix, options=options, wait=wait, type_hints=type_hints, @@ -1352,8 +1331,6 @@ def execute( inputs=inputs, project=project, domain=domain, - execution_name=execution_name, - execution_name_prefix=execution_name_prefix, options=options, wait=wait, type_hints=type_hints, @@ -1367,8 +1344,6 @@ def execute( return self.execute_reference_task( entity=entity, inputs=inputs, - execution_name=execution_name, - execution_name_prefix=execution_name_prefix, options=options, wait=wait, type_hints=type_hints, @@ -1382,8 +1357,6 @@ def execute( return self.execute_reference_workflow( entity=entity, inputs=inputs, - execution_name=execution_name, - execution_name_prefix=execution_name_prefix, options=options, wait=wait, type_hints=type_hints, @@ -1397,8 +1370,6 @@ def execute( return self.execute_reference_launch_plan( entity=entity, inputs=inputs, - execution_name=execution_name, - execution_name_prefix=execution_name_prefix, options=options, wait=wait, type_hints=type_hints, @@ -1416,8 +1387,6 @@ def execute( domain=domain, name=name, version=version, - execution_name=execution_name, - execution_name_prefix=execution_name_prefix, image_config=image_config, wait=wait, overwrite_cache=overwrite_cache, @@ -1434,8 +1403,6 @@ def execute( domain=domain, name=name, version=version, - execution_name=execution_name, - execution_name_prefix=execution_name_prefix, image_config=image_config, options=options, wait=wait, @@ -1453,8 +1420,6 @@ def execute( project=project, domain=domain, name=name, - execution_name=execution_name, - execution_name_prefix=execution_name_prefix, options=options, wait=wait, overwrite_cache=overwrite_cache, @@ -1474,8 +1439,6 @@ def execute_remote_task_lp( inputs: typing.Dict[str, typing.Any], project: str = None, domain: str = None, - execution_name: typing.Optional[str] = None, - execution_name_prefix: typing.Optional[str] = None, options: typing.Optional[Options] = None, wait: bool = False, type_hints: typing.Optional[typing.Dict[str, typing.Type]] = None, @@ -1494,8 +1457,6 @@ def execute_remote_task_lp( inputs, project=project, domain=domain, - execution_name=execution_name, - execution_name_prefix=execution_name_prefix, wait=wait, options=options, type_hints=type_hints, @@ -1512,8 +1473,6 @@ def execute_remote_wf( inputs: typing.Dict[str, typing.Any], project: str = None, domain: str = None, - execution_name: typing.Optional[str] = None, - execution_name_prefix: typing.Optional[str] = None, options: typing.Optional[Options] = None, wait: bool = False, type_hints: typing.Optional[typing.Dict[str, typing.Type]] = None, @@ -1533,8 +1492,6 @@ def execute_remote_wf( inputs, project=project, domain=domain, - execution_name=execution_name, - execution_name_prefix=execution_name_prefix, options=options, wait=wait, type_hints=type_hints, @@ -1551,8 +1508,6 @@ def execute_reference_task( self, entity: ReferenceTask, inputs: typing.Dict[str, typing.Any], - execution_name: typing.Optional[str] = None, - execution_name_prefix: typing.Optional[str] = None, options: typing.Optional[Options] = None, wait: bool = False, type_hints: typing.Optional[typing.Dict[str, typing.Type]] = None, @@ -1582,8 +1537,6 @@ def execute_reference_task( inputs, project=resolved_identifiers.project, domain=resolved_identifiers.domain, - execution_name=execution_name, - execution_name_prefix=execution_name_prefix, options=options, wait=wait, type_hints=type_hints, @@ -1598,8 +1551,6 @@ def execute_reference_workflow( self, entity: ReferenceWorkflow, inputs: typing.Dict[str, typing.Any], - execution_name: typing.Optional[str] = None, - execution_name_prefix: typing.Optional[str] = None, options: typing.Optional[Options] = None, wait: bool = False, type_hints: typing.Optional[typing.Dict[str, typing.Type]] = None, @@ -1643,8 +1594,6 @@ def execute_reference_workflow( inputs, project=resolved_identifiers.project, domain=resolved_identifiers.domain, - execution_name=execution_name, - execution_name_prefix=execution_name_prefix, wait=wait, options=options, type_hints=type_hints, @@ -1659,8 +1608,6 @@ def execute_reference_launch_plan( self, entity: ReferenceLaunchPlan, inputs: typing.Dict[str, typing.Any], - execution_name: typing.Optional[str] = None, - execution_name_prefix: typing.Optional[str] = None, options: typing.Optional[Options] = None, wait: bool = False, type_hints: typing.Optional[typing.Dict[str, typing.Type]] = None, @@ -1690,8 +1637,6 @@ def execute_reference_launch_plan( inputs, project=resolved_identifiers.project, domain=resolved_identifiers.domain, - execution_name=execution_name, - execution_name_prefix=execution_name_prefix, options=options, wait=wait, type_hints=type_hints, @@ -1713,8 +1658,6 @@ def execute_local_task( domain: str = None, name: str = None, version: str = None, - execution_name: typing.Optional[str] = None, - execution_name_prefix: typing.Optional[str] = None, image_config: typing.Optional[ImageConfig] = None, wait: bool = False, overwrite_cache: typing.Optional[bool] = None, @@ -1732,7 +1675,6 @@ def execute_local_task( :param domain: The execution domain, will default to the Remote's default domain. :param name: specific name of the task to run. :param version: specific version of the task to run. - :param execution_name: If provided, will use this name for the execution. :param image_config: If provided, will use this image config in the pod. :param wait: If True, will wait for the execution to complete before returning. :param overwrite_cache: If True, will overwrite the cache. @@ -1763,8 +1705,6 @@ def execute_local_task( inputs, project=resolved_identifiers.project, domain=resolved_identifiers.domain, - execution_name=execution_name, - execution_name_prefix=execution_name_prefix, wait=wait, type_hints=entity.python_interface.inputs, overwrite_cache=overwrite_cache, @@ -1782,8 +1722,6 @@ def execute_local_workflow( domain: str = None, name: str = None, version: str = None, - execution_name: typing.Optional[str] = None, - execution_name_prefix: typing.Optional[str] = None, image_config: typing.Optional[ImageConfig] = None, options: typing.Optional[Options] = None, wait: bool = False, @@ -1801,7 +1739,6 @@ def execute_local_workflow( :param domain: :param name: :param version: - :param execution_name: :param image_config: :param options: :param wait: @@ -1849,8 +1786,6 @@ def execute_local_workflow( inputs, project=project, domain=domain, - execution_name=execution_name, - execution_name_prefix=execution_name_prefix, wait=wait, options=options, type_hints=entity.python_interface.inputs, @@ -1869,8 +1804,6 @@ def execute_local_launch_plan( project: typing.Optional[str] = None, domain: typing.Optional[str] = None, name: typing.Optional[str] = None, - execution_name: typing.Optional[str] = None, - execution_name_prefix: typing.Optional[str] = None, options: typing.Optional[Options] = None, wait: bool = False, overwrite_cache: typing.Optional[bool] = None, @@ -1887,7 +1820,6 @@ def execute_local_launch_plan( :param project: The same as version, but will default to the Remote object's project :param domain: The same as version, but will default to the Remote object's domain :param name: The same as version, but will default to the entity's name - :param execution_name: If specified, will be used as the execution name instead of randomly generating. :param options: Options to be passed into the execution. :param wait: If True, will wait for the execution to complete before returning. :param overwrite_cache: If True, will overwrite the cache. @@ -1915,8 +1847,6 @@ def execute_local_launch_plan( inputs, project=project, domain=domain, - execution_name=execution_name, - execution_name_prefix=execution_name_prefix, options=options, wait=wait, type_hints=entity.python_interface.inputs, @@ -2328,7 +2258,6 @@ def launch_backfill( to_date: datetime, launchplan: str, launchplan_version: str = None, - execution_name: str = None, version: str = None, dry_run: bool = False, execute: bool = True, @@ -2355,7 +2284,6 @@ def launch_backfill( :param to_date: datetime generate a backfill ending at this datetime (inclusive) :param launchplan: str launchplan name in the flyte backend :param launchplan_version: str (optional) version for the launchplan. If not specified the most recent will be retrieved - :param execution_name: str (optional) the generated execution will be named so. this can help in ensuring idempotency :param version: str (optional) version to be used for the newly created workflow. :param dry_run: bool do not register or execute the workflow :param execute: bool Register and execute the wwkflow. @@ -2396,7 +2324,6 @@ def launch_backfill( inputs={}, project=project, domain=domain, - execution_name=execution_name, overwrite_cache=overwrite_cache, ) diff --git a/tests/flytekit/unit/remote/test_remote.py b/tests/flytekit/unit/remote/test_remote.py index 3852da9a31..dee84059e9 100644 --- a/tests/flytekit/unit/remote/test_remote.py +++ b/tests/flytekit/unit/remote/test_remote.py @@ -533,8 +533,7 @@ def wf(name: str = "union"): name="flytesnacks.examples.basics.basics.workflow.slope", version="v1", ) - def ref_basic(x: typing.List[int], y: typing.List[int]) -> float: - ... + def ref_basic(x: typing.List[int], y: typing.List[int]) -> float: ... @workflow def wf1(name: str = "union") -> float: @@ -562,56 +561,6 @@ def test_local_server(mock_client): assert lr.get("hello", int) == 55 -@mock.patch("flytekit.remote.remote.uuid") -@mock.patch("flytekit.remote.remote.FlyteRemote.client") -def test_execution_name(mock_client, mock_uuid): - test_uuid = uuid.UUID("16fd2706-8baf-433b-82eb-8c7fada847da") - mock_uuid.uuid4.return_value = test_uuid - remote = FlyteRemote(config=Config.auto(), default_project="project", default_domain="domain") - - default_img = Image(name="default", fqn="test", tag="tag") - serialization_settings = SerializationSettings( - project="project", - domain="domain", - version="version", - env=None, - image_config=ImageConfig(default_image=default_img, images=[default_img]), - ) - tk_spec = get_serializable(OrderedDict(), serialization_settings, tk) - ft = FlyteTask.promote_from_model(tk_spec.template) - - remote._execute( - entity=ft, - inputs={"t": datetime.now(), "v": 0}, - execution_name="execution-test", - ) - remote._execute( - entity=ft, - inputs={"t": datetime.now(), "v": 0}, - execution_name_prefix="execution-test", - ) - remote._execute( - entity=ft, - inputs={"t": datetime.now(), "v": 0}, - ) - mock_client.create_execution.assert_has_calls( - [ - mock.call(ANY, ANY, "execution-test", ANY, ANY), - mock.call(ANY, ANY, "execution-test-" + test_uuid.hex[:19], ANY, ANY), - mock.call(ANY, ANY, "f" + test_uuid.hex[:19], ANY, ANY), - ] - ) - with pytest.raises( - ValueError, match="Only one of execution_name and execution_name_prefix can be set, but got both set" - ): - remote._execute( - entity=ft, - inputs={"t": datetime.now(), "v": 0}, - execution_name="execution-test", - execution_name_prefix="execution-test", - ) - - @pytest.mark.parametrize( "url, host", [ From 79486a061699ef6fd9477fabbe9584c9cacf90b2 Mon Sep 17 00:00:00 2001 From: wayner0628 Date: Tue, 13 Aug 2024 04:42:21 +0800 Subject: [PATCH 02/13] fix: fix arg order Signed-off-by: wayner0628 --- tests/flytekit/unit/remote/test_remote.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/flytekit/unit/remote/test_remote.py b/tests/flytekit/unit/remote/test_remote.py index dee84059e9..09d358d61c 100644 --- a/tests/flytekit/unit/remote/test_remote.py +++ b/tests/flytekit/unit/remote/test_remote.py @@ -134,7 +134,7 @@ def test_underscore_execute_uses_launch_plan_attributes(remote, mock_wf_exec): remote._client = mock_client def local_assertions(*args, **kwargs): - execution_spec = args[3] + execution_spec = args[2] assert execution_spec.security_context.run_as.k8s_service_account == "svc" assert execution_spec.labels == common_models.Labels({"a": "my_label_value"}) assert execution_spec.annotations == common_models.Annotations({"b": "my_annotation_value"}) @@ -163,7 +163,7 @@ def test_execution_cluster_label_attributes(remote, mock_wf_exec): remote._client = mock_client def local_assertions(*args, **kwargs): - execution_spec = args[3] + execution_spec = args[2] assert execution_spec.execution_cluster_label.value == "label" mock_client.create_execution.side_effect = local_assertions @@ -190,7 +190,7 @@ def test_underscore_execute_fall_back_remote_attributes(remote, mock_wf_exec): ) def local_assertions(*args, **kwargs): - execution_spec = args[3] + execution_spec = args[2] assert execution_spec.security_context.run_as.iam_role == "iam:some:role" assert execution_spec.raw_output_data_config.output_location_prefix == "raw_output" From 44a6c229e2d7aa8302e6b798baf4a6e95e7830bf Mon Sep 17 00:00:00 2001 From: wayner0628 Date: Tue, 13 Aug 2024 04:42:38 +0800 Subject: [PATCH 03/13] fix: remove unused package Signed-off-by: wayner0628 --- flytekit/remote/remote.py | 1 - 1 file changed, 1 deletion(-) diff --git a/flytekit/remote/remote.py b/flytekit/remote/remote.py index 444c62d342..f3c550bd60 100644 --- a/flytekit/remote/remote.py +++ b/flytekit/remote/remote.py @@ -16,7 +16,6 @@ import tempfile import time import typing -import uuid from base64 import b64encode from collections import OrderedDict from dataclasses import asdict, dataclass From cc4e6a8d021a318f3c8baaa5b75f7d0b06d02a8b Mon Sep 17 00:00:00 2001 From: wayner0628 Date: Tue, 13 Aug 2024 05:29:05 +0800 Subject: [PATCH 04/13] fix: modify parameter Signed-off-by: wayner0628 --- flytekit/clis/sdk_in_container/run.py | 1 - 1 file changed, 1 deletion(-) diff --git a/flytekit/clis/sdk_in_container/run.py b/flytekit/clis/sdk_in_container/run.py index d8c215a598..2594b6ba29 100644 --- a/flytekit/clis/sdk_in_container/run.py +++ b/flytekit/clis/sdk_in_container/run.py @@ -457,7 +457,6 @@ def run_remote( inputs=inputs, project=project, domain=domain, - execution_name=run_level_params.name, wait=run_level_params.wait_execution, options=options_from_run_params(run_level_params), type_hints=type_hints, From 41d655629a75bc1c167c3fa4b75f93677846bb11 Mon Sep 17 00:00:00 2001 From: wayner0628 Date: Wed, 14 Aug 2024 02:36:25 +0800 Subject: [PATCH 05/13] fix: send empty name when nothing is specified Signed-off-by: wayner0628 --- flytekit/clients/friendly.py | 16 ++- flytekit/clis/flyte_cli/main.py | 6 +- flytekit/clis/sdk_in_container/run.py | 1 + flytekit/remote/remote.py | 142 +++++++++++++++++----- tests/flytekit/unit/remote/test_remote.py | 59 ++++++++- 5 files changed, 179 insertions(+), 45 deletions(-) diff --git a/flytekit/clients/friendly.py b/flytekit/clients/friendly.py index 24bc3b3c8c..58038d12ec 100644 --- a/flytekit/clients/friendly.py +++ b/flytekit/clients/friendly.py @@ -537,11 +537,12 @@ def update_named_entity(self, resource_type, id, metadata): # #################################################################################################################### - def create_execution(self, project, domain, execution_spec, inputs): + def create_execution(self, project, domain, name, execution_spec, inputs): """ This will create an execution for the given execution spec. :param Text project: :param Text domain: + :param Text name: :param flytekit.models.execution.ExecutionSpec execution_spec: This is the specification for the execution. :param flytekit.models.literals.LiteralMap inputs: The inputs for the execution :returns: The unique identifier for the execution. @@ -553,7 +554,7 @@ def create_execution(self, project, domain, execution_spec, inputs): _execution_pb2.ExecutionCreateRequest( project=project, domain=domain, - name="", + name=name, spec=execution_spec.to_flyte_idl(), inputs=inputs.to_flyte_idl(), ) @@ -561,15 +562,16 @@ def create_execution(self, project, domain, execution_spec, inputs): .id ) - def recover_execution(self, id): + def recover_execution(self, id, name: str = None): """ Recreates a previously-run workflow execution that will only start executing from the last known failure point. :param flytekit.models.core.identifier.WorkflowExecutionIdentifier id: + :param name str: Optional name to assign to the newly created execution. :rtype: flytekit.models.core.identifier.WorkflowExecutionIdentifier """ return _identifier.WorkflowExecutionIdentifier.from_flyte_idl( super(SynchronousFlyteClient, self) - .recover_execution(_execution_pb2.ExecutionRecoverRequest(id=id.to_flyte_idl(), name="")) + .recover_execution(_execution_pb2.ExecutionRecoverRequest(id=id.to_flyte_idl(), name=name)) .id ) @@ -648,15 +650,17 @@ def terminate_execution(self, id, cause): _execution_pb2.ExecutionTerminateRequest(id=id.to_flyte_idl(), cause=cause) ) - def relaunch_execution(self, id): + def relaunch_execution(self, id, name=None): """ :param flytekit.models.core.identifier.WorkflowExecutionIdentifier id: + :param Text name: [Optional] name for the new execution. If not specified, a randomly generated name will be + used :returns: The unique identifier for the new execution. :rtype: flytekit.models.core.identifier.WorkflowExecutionIdentifier """ return _identifier.WorkflowExecutionIdentifier.from_flyte_idl( super(SynchronousFlyteClient, self) - .relaunch_execution(_execution_pb2.ExecutionRelaunchRequest(id=id.to_flyte_idl(), name="")) + .relaunch_execution(_execution_pb2.ExecutionRelaunchRequest(id=id.to_flyte_idl(), name=name)) .id ) diff --git a/flytekit/clis/flyte_cli/main.py b/flytekit/clis/flyte_cli/main.py index 9089407b0a..c201bc5b57 100644 --- a/flytekit/clis/flyte_cli/main.py +++ b/flytekit/clis/flyte_cli/main.py @@ -1088,7 +1088,7 @@ def update_launch_plan(state, host, insecure, urn=None): @_optional_name_option @_host_option @_insecure_option -def recover_execution(urn, host, insecure): +def recover_execution(urn, name, host, insecure): """ Recreates a previously-run workflow execution that will only start executing from the last known failure point. @@ -1100,6 +1100,8 @@ def recover_execution(urn, host, insecure): - downstream system failures (downstream services) - or simply to recover executions that failed because of retry exhaustion and should complete if tried again. + You can optionally assign a name to the recreated execution you trigger or let the system assign one. + Usage: $ flyte-cli recover-execution -u ex:flyteexamples:development:some-workflow:abc123 -n my_retry_name @@ -1113,7 +1115,7 @@ def recover_execution(urn, host, insecure): original_workflow_execution_identifier = cli_identifiers.WorkflowExecutionIdentifier.from_python_std(urn) - execution_identifier_resp = client.recover_execution(id=original_workflow_execution_identifier) + execution_identifier_resp = client.recover_execution(id=original_workflow_execution_identifier, name=name) execution_identifier = cli_identifiers.WorkflowExecutionIdentifier.promote_from_model(execution_identifier_resp) click.secho("Launched execution: {}".format(execution_identifier), fg="blue") click.echo("") diff --git a/flytekit/clis/sdk_in_container/run.py b/flytekit/clis/sdk_in_container/run.py index 2594b6ba29..d8c215a598 100644 --- a/flytekit/clis/sdk_in_container/run.py +++ b/flytekit/clis/sdk_in_container/run.py @@ -457,6 +457,7 @@ def run_remote( inputs=inputs, project=project, domain=domain, + execution_name=run_level_params.name, wait=run_level_params.wait_execution, options=options_from_run_params(run_level_params), type_hints=type_hints, diff --git a/flytekit/remote/remote.py b/flytekit/remote/remote.py index f3c550bd60..28ee9d4748 100644 --- a/flytekit/remote/remote.py +++ b/flytekit/remote/remote.py @@ -16,6 +16,7 @@ import tempfile import time import typing +import uuid from base64 import b64encode from collections import OrderedDict from dataclasses import asdict, dataclass @@ -1111,6 +1112,8 @@ def _execute( inputs: typing.Dict[str, typing.Any], project: str = None, domain: str = None, + execution_name: typing.Optional[str] = None, + execution_name_prefix: typing.Optional[str] = None, options: typing.Optional[Options] = None, wait: bool = False, type_hints: typing.Optional[typing.Dict[str, typing.Type]] = None, @@ -1126,6 +1129,7 @@ def _execute( :param inputs: dictionary mapping argument names to values :param project: project on which to execute the entity referenced by flyte_id :param domain: domain on which to execute the entity referenced by flyte_id + :param execution_name: name of the execution :param wait: if True, waits for execution to complete :param type_hints: map of python types to inputs so that the TypeEngine knows how to convert the input values into Flyte Literals. @@ -1138,6 +1142,12 @@ def _execute( :param execution_cluster_label: Specify label of cluster(s) on which newly created execution should be placed. :returns: :class:`~flytekit.remote.workflow_execution.FlyteWorkflowExecution` """ + if execution_name is not None and execution_name_prefix is not None: + raise ValueError("Only one of execution_name and execution_name_prefix can be set, but got both set") + if execution_name_prefix is not None: + execution_name = execution_name_prefix + "-" + uuid.uuid4().hex[:19] + if execution_name is None and execution_name_prefix is None: + execution_name = "" if not options: options = Options() if options.disable_notifications is not None: @@ -1180,40 +1190,49 @@ def _execute( literal_inputs = literal_models.LiteralMap(literals=literal_map) - # Currently, this will only execute the flyte entity referenced by - # flyte_id in the same project and domain. However, it is possible to execute it in a different project - # and domain, which is specified in the first two arguments of client.create_execution. This is useful - # in the case that I want to use a flyte entity from e.g. project "A" but actually execute the entity on a - # different project "B". For now, this method doesn't support this use case. - exec_id = self.client.create_execution( - project or self.default_project, - domain or self.default_domain, - ExecutionSpec( - entity.id, - ExecutionMetadata( - ExecutionMetadata.ExecutionMode.MANUAL, - "placeholder", # Admin replaces this from oidc token if auth is enabled. - 0, + try: + # Currently, this will only execute the flyte entity referenced by + # flyte_id in the same project and domain. However, it is possible to execute it in a different project + # and domain, which is specified in the first two arguments of client.create_execution. This is useful + # in the case that I want to use a flyte entity from e.g. project "A" but actually execute the entity on a + # different project "B". For now, this method doesn't support this use case. + exec_id = self.client.create_execution( + project or self.default_project, + domain or self.default_domain, + execution_name, + ExecutionSpec( + entity.id, + ExecutionMetadata( + ExecutionMetadata.ExecutionMode.MANUAL, + "placeholder", # Admin replaces this from oidc token if auth is enabled. + 0, + ), + overwrite_cache=overwrite_cache, + notifications=notifications, + disable_all=options.disable_notifications, + labels=options.labels, + annotations=options.annotations, + raw_output_data_config=options.raw_output_data_config, + auth_role=None, + max_parallelism=options.max_parallelism, + security_context=options.security_context, + envs=common_models.Envs(envs) if envs else None, + tags=tags, + cluster_assignment=ClusterAssignment(cluster_pool=cluster_pool) if cluster_pool else None, + execution_cluster_label=ExecutionClusterLabel(execution_cluster_label) + if execution_cluster_label + else None, ), - overwrite_cache=overwrite_cache, - notifications=notifications, - disable_all=options.disable_notifications, - labels=options.labels, - annotations=options.annotations, - raw_output_data_config=options.raw_output_data_config, - auth_role=None, - max_parallelism=options.max_parallelism, - security_context=options.security_context, - envs=common_models.Envs(envs) if envs else None, - tags=tags, - cluster_assignment=ClusterAssignment(cluster_pool=cluster_pool) if cluster_pool else None, - execution_cluster_label=ExecutionClusterLabel(execution_cluster_label) - if execution_cluster_label - else None, - ), - literal_inputs, - ) - + literal_inputs, + ) + except user_exceptions.FlyteEntityAlreadyExistsException: + logger.warning( + f"Execution with Execution ID {execution_name} already exists. " + f"Assuming this is the same execution, returning!" + ) + exec_id = WorkflowExecutionIdentifier( + project=project or self.default_project, domain=domain or self.default_domain, name=execution_name + ) execution = FlyteWorkflowExecution.promote_from_model(self.client.get_execution(exec_id)) if wait: @@ -1253,6 +1272,8 @@ def execute( domain: str = None, name: str = None, version: str = None, + execution_name: typing.Optional[str] = None, + execution_name_prefix: typing.Optional[str] = None, image_config: typing.Optional[ImageConfig] = None, options: typing.Optional[Options] = None, wait: bool = False, @@ -1287,6 +1308,7 @@ def execute( first before executing. :param name: execute entity using this name. If not None, use this value instead of ``entity.name`` :param version: execute entity using this version. If None, uses auto-generated value. + :param execution_name: name of the execution. If None, uses auto-generated value. :param image_config: :param wait: if True, waits for execution to complete :param type_hints: Python types to be passed to the TypeEngine so that it knows how to properly convert the @@ -1315,6 +1337,8 @@ def execute( inputs=inputs, project=project, domain=domain, + execution_name=execution_name, + execution_name_prefix=execution_name_prefix, options=options, wait=wait, type_hints=type_hints, @@ -1330,6 +1354,8 @@ def execute( inputs=inputs, project=project, domain=domain, + execution_name=execution_name, + execution_name_prefix=execution_name_prefix, options=options, wait=wait, type_hints=type_hints, @@ -1343,6 +1369,8 @@ def execute( return self.execute_reference_task( entity=entity, inputs=inputs, + execution_name=execution_name, + execution_name_prefix=execution_name_prefix, options=options, wait=wait, type_hints=type_hints, @@ -1356,6 +1384,8 @@ def execute( return self.execute_reference_workflow( entity=entity, inputs=inputs, + execution_name=execution_name, + execution_name_prefix=execution_name_prefix, options=options, wait=wait, type_hints=type_hints, @@ -1369,6 +1399,8 @@ def execute( return self.execute_reference_launch_plan( entity=entity, inputs=inputs, + execution_name=execution_name, + execution_name_prefix=execution_name_prefix, options=options, wait=wait, type_hints=type_hints, @@ -1386,6 +1418,8 @@ def execute( domain=domain, name=name, version=version, + execution_name=execution_name, + execution_name_prefix=execution_name_prefix, image_config=image_config, wait=wait, overwrite_cache=overwrite_cache, @@ -1402,6 +1436,8 @@ def execute( domain=domain, name=name, version=version, + execution_name=execution_name, + execution_name_prefix=execution_name_prefix, image_config=image_config, options=options, wait=wait, @@ -1419,6 +1455,8 @@ def execute( project=project, domain=domain, name=name, + execution_name=execution_name, + execution_name_prefix=execution_name_prefix, options=options, wait=wait, overwrite_cache=overwrite_cache, @@ -1438,6 +1476,8 @@ def execute_remote_task_lp( inputs: typing.Dict[str, typing.Any], project: str = None, domain: str = None, + execution_name: typing.Optional[str] = None, + execution_name_prefix: typing.Optional[str] = None, options: typing.Optional[Options] = None, wait: bool = False, type_hints: typing.Optional[typing.Dict[str, typing.Type]] = None, @@ -1456,6 +1496,8 @@ def execute_remote_task_lp( inputs, project=project, domain=domain, + execution_name=execution_name, + execution_name_prefix=execution_name_prefix, wait=wait, options=options, type_hints=type_hints, @@ -1472,6 +1514,8 @@ def execute_remote_wf( inputs: typing.Dict[str, typing.Any], project: str = None, domain: str = None, + execution_name: typing.Optional[str] = None, + execution_name_prefix: typing.Optional[str] = None, options: typing.Optional[Options] = None, wait: bool = False, type_hints: typing.Optional[typing.Dict[str, typing.Type]] = None, @@ -1491,6 +1535,8 @@ def execute_remote_wf( inputs, project=project, domain=domain, + execution_name=execution_name, + execution_name_prefix=execution_name_prefix, options=options, wait=wait, type_hints=type_hints, @@ -1507,6 +1553,8 @@ def execute_reference_task( self, entity: ReferenceTask, inputs: typing.Dict[str, typing.Any], + execution_name: typing.Optional[str] = None, + execution_name_prefix: typing.Optional[str] = None, options: typing.Optional[Options] = None, wait: bool = False, type_hints: typing.Optional[typing.Dict[str, typing.Type]] = None, @@ -1536,6 +1584,8 @@ def execute_reference_task( inputs, project=resolved_identifiers.project, domain=resolved_identifiers.domain, + execution_name=execution_name, + execution_name_prefix=execution_name_prefix, options=options, wait=wait, type_hints=type_hints, @@ -1550,6 +1600,8 @@ def execute_reference_workflow( self, entity: ReferenceWorkflow, inputs: typing.Dict[str, typing.Any], + execution_name: typing.Optional[str] = None, + execution_name_prefix: typing.Optional[str] = None, options: typing.Optional[Options] = None, wait: bool = False, type_hints: typing.Optional[typing.Dict[str, typing.Type]] = None, @@ -1593,6 +1645,8 @@ def execute_reference_workflow( inputs, project=resolved_identifiers.project, domain=resolved_identifiers.domain, + execution_name=execution_name, + execution_name_prefix=execution_name_prefix, wait=wait, options=options, type_hints=type_hints, @@ -1607,6 +1661,8 @@ def execute_reference_launch_plan( self, entity: ReferenceLaunchPlan, inputs: typing.Dict[str, typing.Any], + execution_name: typing.Optional[str] = None, + execution_name_prefix: typing.Optional[str] = None, options: typing.Optional[Options] = None, wait: bool = False, type_hints: typing.Optional[typing.Dict[str, typing.Type]] = None, @@ -1636,6 +1692,8 @@ def execute_reference_launch_plan( inputs, project=resolved_identifiers.project, domain=resolved_identifiers.domain, + execution_name=execution_name, + execution_name_prefix=execution_name_prefix, options=options, wait=wait, type_hints=type_hints, @@ -1657,6 +1715,8 @@ def execute_local_task( domain: str = None, name: str = None, version: str = None, + execution_name: typing.Optional[str] = None, + execution_name_prefix: typing.Optional[str] = None, image_config: typing.Optional[ImageConfig] = None, wait: bool = False, overwrite_cache: typing.Optional[bool] = None, @@ -1674,6 +1734,7 @@ def execute_local_task( :param domain: The execution domain, will default to the Remote's default domain. :param name: specific name of the task to run. :param version: specific version of the task to run. + :param execution_name: If provided, will use this name for the execution. :param image_config: If provided, will use this image config in the pod. :param wait: If True, will wait for the execution to complete before returning. :param overwrite_cache: If True, will overwrite the cache. @@ -1704,6 +1765,8 @@ def execute_local_task( inputs, project=resolved_identifiers.project, domain=resolved_identifiers.domain, + execution_name=execution_name, + execution_name_prefix=execution_name_prefix, wait=wait, type_hints=entity.python_interface.inputs, overwrite_cache=overwrite_cache, @@ -1721,6 +1784,8 @@ def execute_local_workflow( domain: str = None, name: str = None, version: str = None, + execution_name: typing.Optional[str] = None, + execution_name_prefix: typing.Optional[str] = None, image_config: typing.Optional[ImageConfig] = None, options: typing.Optional[Options] = None, wait: bool = False, @@ -1738,6 +1803,7 @@ def execute_local_workflow( :param domain: :param name: :param version: + :param execution_name: :param image_config: :param options: :param wait: @@ -1785,6 +1851,8 @@ def execute_local_workflow( inputs, project=project, domain=domain, + execution_name=execution_name, + execution_name_prefix=execution_name_prefix, wait=wait, options=options, type_hints=entity.python_interface.inputs, @@ -1803,6 +1871,8 @@ def execute_local_launch_plan( project: typing.Optional[str] = None, domain: typing.Optional[str] = None, name: typing.Optional[str] = None, + execution_name: typing.Optional[str] = None, + execution_name_prefix: typing.Optional[str] = None, options: typing.Optional[Options] = None, wait: bool = False, overwrite_cache: typing.Optional[bool] = None, @@ -1819,6 +1889,7 @@ def execute_local_launch_plan( :param project: The same as version, but will default to the Remote object's project :param domain: The same as version, but will default to the Remote object's domain :param name: The same as version, but will default to the entity's name + :param execution_name: If specified, will be used as the execution name instead of randomly generating. :param options: Options to be passed into the execution. :param wait: If True, will wait for the execution to complete before returning. :param overwrite_cache: If True, will overwrite the cache. @@ -1846,6 +1917,8 @@ def execute_local_launch_plan( inputs, project=project, domain=domain, + execution_name=execution_name, + execution_name_prefix=execution_name_prefix, options=options, wait=wait, type_hints=entity.python_interface.inputs, @@ -2257,6 +2330,7 @@ def launch_backfill( to_date: datetime, launchplan: str, launchplan_version: str = None, + execution_name: str = None, version: str = None, dry_run: bool = False, execute: bool = True, @@ -2283,6 +2357,7 @@ def launch_backfill( :param to_date: datetime generate a backfill ending at this datetime (inclusive) :param launchplan: str launchplan name in the flyte backend :param launchplan_version: str (optional) version for the launchplan. If not specified the most recent will be retrieved + :param execution_name: str (optional) the generated execution will be named so. this can help in ensuring idempotency :param version: str (optional) version to be used for the newly created workflow. :param dry_run: bool do not register or execute the workflow :param execute: bool Register and execute the wwkflow. @@ -2323,6 +2398,7 @@ def launch_backfill( inputs={}, project=project, domain=domain, + execution_name=execution_name, overwrite_cache=overwrite_cache, ) diff --git a/tests/flytekit/unit/remote/test_remote.py b/tests/flytekit/unit/remote/test_remote.py index 09d358d61c..68580f4f87 100644 --- a/tests/flytekit/unit/remote/test_remote.py +++ b/tests/flytekit/unit/remote/test_remote.py @@ -134,7 +134,7 @@ def test_underscore_execute_uses_launch_plan_attributes(remote, mock_wf_exec): remote._client = mock_client def local_assertions(*args, **kwargs): - execution_spec = args[2] + execution_spec = args[3] assert execution_spec.security_context.run_as.k8s_service_account == "svc" assert execution_spec.labels == common_models.Labels({"a": "my_label_value"}) assert execution_spec.annotations == common_models.Annotations({"b": "my_annotation_value"}) @@ -163,7 +163,7 @@ def test_execution_cluster_label_attributes(remote, mock_wf_exec): remote._client = mock_client def local_assertions(*args, **kwargs): - execution_spec = args[2] + execution_spec = args[3] assert execution_spec.execution_cluster_label.value == "label" mock_client.create_execution.side_effect = local_assertions @@ -190,7 +190,7 @@ def test_underscore_execute_fall_back_remote_attributes(remote, mock_wf_exec): ) def local_assertions(*args, **kwargs): - execution_spec = args[2] + execution_spec = args[3] assert execution_spec.security_context.run_as.iam_role == "iam:some:role" assert execution_spec.raw_output_data_config.output_location_prefix == "raw_output" @@ -533,7 +533,8 @@ def wf(name: str = "union"): name="flytesnacks.examples.basics.basics.workflow.slope", version="v1", ) - def ref_basic(x: typing.List[int], y: typing.List[int]) -> float: ... + def ref_basic(x: typing.List[int], y: typing.List[int]) -> float: + ... @workflow def wf1(name: str = "union") -> float: @@ -561,6 +562,56 @@ def test_local_server(mock_client): assert lr.get("hello", int) == 55 +@mock.patch("flytekit.remote.remote.uuid") +@mock.patch("flytekit.remote.remote.FlyteRemote.client") +def test_execution_name(mock_client, mock_uuid): + test_uuid = uuid.UUID("16fd2706-8baf-433b-82eb-8c7fada847da") + mock_uuid.uuid4.return_value = test_uuid + remote = FlyteRemote(config=Config.auto(), default_project="project", default_domain="domain") + + default_img = Image(name="default", fqn="test", tag="tag") + serialization_settings = SerializationSettings( + project="project", + domain="domain", + version="version", + env=None, + image_config=ImageConfig(default_image=default_img, images=[default_img]), + ) + tk_spec = get_serializable(OrderedDict(), serialization_settings, tk) + ft = FlyteTask.promote_from_model(tk_spec.template) + + remote._execute( + entity=ft, + inputs={"t": datetime.now(), "v": 0}, + execution_name="execution-test", + ) + remote._execute( + entity=ft, + inputs={"t": datetime.now(), "v": 0}, + execution_name_prefix="execution-test", + ) + remote._execute( + entity=ft, + inputs={"t": datetime.now(), "v": 0}, + ) + mock_client.create_execution.assert_has_calls( + [ + mock.call(ANY, ANY, "execution-test", ANY, ANY), + mock.call(ANY, ANY, "execution-test-" + test_uuid.hex[:19], ANY, ANY), + mock.call(ANY, ANY, "", ANY, ANY), + ] + ) + with pytest.raises( + ValueError, match="Only one of execution_name and execution_name_prefix can be set, but got both set" + ): + remote._execute( + entity=ft, + inputs={"t": datetime.now(), "v": 0}, + execution_name="execution-test", + execution_name_prefix="execution-test", + ) + + @pytest.mark.parametrize( "url, host", [ From 9808265c647a098337b4d7bc40adf05678d2b590 Mon Sep 17 00:00:00 2001 From: wayner0628 Date: Thu, 22 Aug 2024 15:52:24 +0800 Subject: [PATCH 06/13] remove empty string and add todo Signed-off-by: wayner0628 --- flytekit/remote/remote.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/flytekit/remote/remote.py b/flytekit/remote/remote.py index 28ee9d4748..bad1a0512f 100644 --- a/flytekit/remote/remote.py +++ b/flytekit/remote/remote.py @@ -1144,10 +1144,9 @@ def _execute( """ if execution_name is not None and execution_name_prefix is not None: raise ValueError("Only one of execution_name and execution_name_prefix can be set, but got both set") + # todo: The prefix should be passed to the backend if execution_name_prefix is not None: execution_name = execution_name_prefix + "-" + uuid.uuid4().hex[:19] - if execution_name is None and execution_name_prefix is None: - execution_name = "" if not options: options = Options() if options.disable_notifications is not None: From 9915cd338631b4f7356720ee15c2731e18fbace3 Mon Sep 17 00:00:00 2001 From: wayner0628 Date: Thu, 22 Aug 2024 16:08:54 +0800 Subject: [PATCH 07/13] fix: fix no specification assertion Signed-off-by: wayner0628 --- tests/flytekit/unit/remote/test_remote.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/flytekit/unit/remote/test_remote.py b/tests/flytekit/unit/remote/test_remote.py index 68580f4f87..81e70e0a21 100644 --- a/tests/flytekit/unit/remote/test_remote.py +++ b/tests/flytekit/unit/remote/test_remote.py @@ -598,7 +598,7 @@ def test_execution_name(mock_client, mock_uuid): [ mock.call(ANY, ANY, "execution-test", ANY, ANY), mock.call(ANY, ANY, "execution-test-" + test_uuid.hex[:19], ANY, ANY), - mock.call(ANY, ANY, "", ANY, ANY), + mock.call(ANY, ANY, None, ANY, ANY), ] ) with pytest.raises( From a70b9f8f728c4d4bd5099902e451a0dd519e50ac Mon Sep 17 00:00:00 2001 From: wayner0628 Date: Thu, 22 Aug 2024 18:14:30 +0800 Subject: [PATCH 08/13] add explicit test for execution name from prefix Signed-off-by: wayner0628 --- tests/flytekit/unit/remote/test_remote.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tests/flytekit/unit/remote/test_remote.py b/tests/flytekit/unit/remote/test_remote.py index 81e70e0a21..ed01f2ddcc 100644 --- a/tests/flytekit/unit/remote/test_remote.py +++ b/tests/flytekit/unit/remote/test_remote.py @@ -601,6 +601,10 @@ def test_execution_name(mock_client, mock_uuid): mock.call(ANY, ANY, None, ANY, ANY), ] ) + assert ( + mock_client.create_execution.call_args_list[1][1][2] + == "execution-test-" + test_uuid.hex[:19] + ) with pytest.raises( ValueError, match="Only one of execution_name and execution_name_prefix can be set, but got both set" ): From 1bcf905bf6340134915f96838fdc59d020782c72 Mon Sep 17 00:00:00 2001 From: wayner0628 Date: Thu, 22 Aug 2024 18:19:21 +0800 Subject: [PATCH 09/13] fix: fix lint error Signed-off-by: wayner0628 --- tests/flytekit/unit/remote/test_remote.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/tests/flytekit/unit/remote/test_remote.py b/tests/flytekit/unit/remote/test_remote.py index ed01f2ddcc..2d324c8eb1 100644 --- a/tests/flytekit/unit/remote/test_remote.py +++ b/tests/flytekit/unit/remote/test_remote.py @@ -533,8 +533,7 @@ def wf(name: str = "union"): name="flytesnacks.examples.basics.basics.workflow.slope", version="v1", ) - def ref_basic(x: typing.List[int], y: typing.List[int]) -> float: - ... + def ref_basic(x: typing.List[int], y: typing.List[int]) -> float: ... @workflow def wf1(name: str = "union") -> float: @@ -601,10 +600,7 @@ def test_execution_name(mock_client, mock_uuid): mock.call(ANY, ANY, None, ANY, ANY), ] ) - assert ( - mock_client.create_execution.call_args_list[1][1][2] - == "execution-test-" + test_uuid.hex[:19] - ) + assert mock_client.create_execution.call_args_list[1][1][2] == "execution-test-" + test_uuid.hex[:19] with pytest.raises( ValueError, match="Only one of execution_name and execution_name_prefix can be set, but got both set" ): From 27b14aafe3b68a35dd6d004eae4214bf9bde246a Mon Sep 17 00:00:00 2001 From: wayner0628 Date: Thu, 22 Aug 2024 18:29:39 +0800 Subject: [PATCH 10/13] fix call args tuple index Signed-off-by: wayner0628 --- tests/flytekit/unit/remote/test_remote.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/flytekit/unit/remote/test_remote.py b/tests/flytekit/unit/remote/test_remote.py index 2d324c8eb1..1ffbc275fc 100644 --- a/tests/flytekit/unit/remote/test_remote.py +++ b/tests/flytekit/unit/remote/test_remote.py @@ -600,7 +600,7 @@ def test_execution_name(mock_client, mock_uuid): mock.call(ANY, ANY, None, ANY, ANY), ] ) - assert mock_client.create_execution.call_args_list[1][1][2] == "execution-test-" + test_uuid.hex[:19] + assert mock_client.create_execution.call_args_list[1][0][2] == "execution-test-" + test_uuid.hex[:19] with pytest.raises( ValueError, match="Only one of execution_name and execution_name_prefix can be set, but got both set" ): From 0ada3d576fcef71ef54d2c8301b792f1c4f5e0ae Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Sun, 25 Aug 2024 01:43:02 -0700 Subject: [PATCH 11/13] nit Signed-off-by: Kevin Su --- tests/flytekit/unit/remote/test_remote.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/flytekit/unit/remote/test_remote.py b/tests/flytekit/unit/remote/test_remote.py index 1ffbc275fc..b8a371e2a0 100644 --- a/tests/flytekit/unit/remote/test_remote.py +++ b/tests/flytekit/unit/remote/test_remote.py @@ -584,7 +584,7 @@ def test_execution_name(mock_client, mock_uuid): inputs={"t": datetime.now(), "v": 0}, execution_name="execution-test", ) - remote._execute( + exe = remote._execute( entity=ft, inputs={"t": datetime.now(), "v": 0}, execution_name_prefix="execution-test", @@ -600,7 +600,6 @@ def test_execution_name(mock_client, mock_uuid): mock.call(ANY, ANY, None, ANY, ANY), ] ) - assert mock_client.create_execution.call_args_list[1][0][2] == "execution-test-" + test_uuid.hex[:19] with pytest.raises( ValueError, match="Only one of execution_name and execution_name_prefix can be set, but got both set" ): From e70aec6cb8d057761ad10f654d07044efeb25f33 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Sun, 25 Aug 2024 01:43:14 -0700 Subject: [PATCH 12/13] nit Signed-off-by: Kevin Su --- tests/flytekit/unit/remote/test_remote.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/flytekit/unit/remote/test_remote.py b/tests/flytekit/unit/remote/test_remote.py index b8a371e2a0..0a0e2ba545 100644 --- a/tests/flytekit/unit/remote/test_remote.py +++ b/tests/flytekit/unit/remote/test_remote.py @@ -584,7 +584,7 @@ def test_execution_name(mock_client, mock_uuid): inputs={"t": datetime.now(), "v": 0}, execution_name="execution-test", ) - exe = remote._execute( + remote._execute( entity=ft, inputs={"t": datetime.now(), "v": 0}, execution_name_prefix="execution-test", From dc705b217c2b4976367863c71153bc2ca2900a7e Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Sun, 25 Aug 2024 01:43:40 -0700 Subject: [PATCH 13/13] nit Signed-off-by: Kevin Su --- tests/flytekit/unit/remote/test_remote.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/flytekit/unit/remote/test_remote.py b/tests/flytekit/unit/remote/test_remote.py index 0a0e2ba545..81e70e0a21 100644 --- a/tests/flytekit/unit/remote/test_remote.py +++ b/tests/flytekit/unit/remote/test_remote.py @@ -533,7 +533,8 @@ def wf(name: str = "union"): name="flytesnacks.examples.basics.basics.workflow.slope", version="v1", ) - def ref_basic(x: typing.List[int], y: typing.List[int]) -> float: ... + def ref_basic(x: typing.List[int], y: typing.List[int]) -> float: + ... @workflow def wf1(name: str = "union") -> float: