From f3e742f61e82141d388388d7162438c51f6be53d Mon Sep 17 00:00:00 2001 From: Chi-Sheng Liu Date: Thu, 16 May 2024 23:35:05 +0800 Subject: [PATCH 1/3] feat(replicaspec): Update files with repect to common ReplicaSpec refactor Resolves: flyteorg/flyte#4408 Signed-off-by: Chi-Sheng Liu --- .../flytekitplugins/kfmpi/task.py | 15 +- .../flytekit-kf-mpi/tests/test_mpi_task.py | 86 +++++++----- .../flytekitplugins/kfpytorch/task.py | 15 +- .../tests/test_pytorch_task.py | 60 +++++--- .../flytekitplugins/kftensorflow/task.py | 19 +-- .../tests/test_tensorflow_task.py | 132 +++++++++++------- 6 files changed, 204 insertions(+), 123 deletions(-) diff --git a/plugins/flytekit-kf-mpi/flytekitplugins/kfmpi/task.py b/plugins/flytekit-kf-mpi/flytekitplugins/kfmpi/task.py index 7c8416d007..b911506bea 100644 --- a/plugins/flytekit-kf-mpi/flytekitplugins/kfmpi/task.py +++ b/plugins/flytekit-kf-mpi/flytekitplugins/kfmpi/task.py @@ -7,6 +7,7 @@ from enum import Enum from typing import Any, Callable, Dict, List, Optional, Union +from flyteidl.plugins import common_pb2 as plugins_common from flyteidl.plugins.kubeflow import common_pb2 as kubeflow_common from flyteidl.plugins.kubeflow import mpi_pb2 as mpi_task from google.protobuf.json_format import MessageToDict @@ -171,11 +172,13 @@ def _convert_replica_spec( ) -> mpi_task.DistributedMPITrainingReplicaSpec: resources = convert_resources_to_resource_model(requests=replica_config.requests, limits=replica_config.limits) return mpi_task.DistributedMPITrainingReplicaSpec( + common=plugins_common.CommonReplicaSpec( + replicas=replica_config.replicas, + image=replica_config.image, + resources=resources.to_flyte_idl() if resources else None, + restart_policy=replica_config.restart_policy.value if replica_config.restart_policy else None, + ), command=replica_config.command, - replicas=replica_config.replicas, - image=replica_config.image, - resources=resources.to_flyte_idl() if resources else None, - restart_policy=replica_config.restart_policy.value if replica_config.restart_policy else None, ) def _convert_run_policy(self, run_policy: RunPolicy) -> kubeflow_common.RunPolicy: @@ -203,11 +206,11 @@ def get_command(self, settings: SerializationSettings) -> List[str]: def get_custom(self, settings: SerializationSettings) -> Dict[str, Any]: worker = self._convert_replica_spec(self.task_config.worker) if self.task_config.num_workers: - worker.replicas = self.task_config.num_workers + worker.common.replicas = self.task_config.num_workers launcher = self._convert_replica_spec(self.task_config.launcher) if self.task_config.num_launcher_replicas: - launcher.replicas = self.task_config.num_launcher_replicas + launcher.common.replicas = self.task_config.num_launcher_replicas run_policy = self._convert_run_policy(self.task_config.run_policy) if self.task_config.run_policy else None mpi_job = mpi_task.DistributedMPITrainingTask( diff --git a/plugins/flytekit-kf-mpi/tests/test_mpi_task.py b/plugins/flytekit-kf-mpi/tests/test_mpi_task.py index deec3ff385..0b8b8353c1 100644 --- a/plugins/flytekit-kf-mpi/tests/test_mpi_task.py +++ b/plugins/flytekit-kf-mpi/tests/test_mpi_task.py @@ -34,8 +34,18 @@ def my_mpi_task(x: int, y: str) -> int: assert my_mpi_task.task_config is not None assert my_mpi_task.get_custom(serialization_settings) == { - "launcherReplicas": {"replicas": 10, "resources": {}}, - "workerReplicas": {"replicas": 10, "resources": {}}, + "launcherReplicas": { + "common": { + "replicas": 10, + "resources": {}, + }, + }, + "workerReplicas": { + "common": { + "replicas": 10, + "resources": {}, + }, + }, "slots": 1, } assert my_mpi_task.task_type == "mpi" @@ -69,12 +79,16 @@ def my_mpi_task(x: int, y: str) -> int: expected_dict = { "launcherReplicas": { - "replicas": 1, - "resources": {}, + "common": { + "replicas": 1, + "resources": {}, + }, }, "workerReplicas": { - "replicas": 1, - "resources": {}, + "common": { + "replicas": 1, + "resources": {}, + }, }, "slots": 1, } @@ -124,25 +138,29 @@ def my_mpi_task(x: int, y: str) -> int: expected_custom_dict = { "launcherReplicas": { - "replicas": 1, - "image": "launcher:latest", - "resources": { - "requests": [{"name": "CPU", "value": "1"}], - "limits": [{"name": "CPU", "value": "2"}], + "common": { + "replicas": 1, + "image": "launcher:latest", + "resources": { + "requests": [{"name": "CPU", "value": "1"}], + "limits": [{"name": "CPU", "value": "2"}], + }, }, }, "workerReplicas": { - "replicas": 5, - "image": "worker:latest", - "resources": { - "requests": [ - {"name": "CPU", "value": "2"}, - {"name": "MEMORY", "value": "2Gi"}, - ], - "limits": [ - {"name": "CPU", "value": "4"}, - {"name": "MEMORY", "value": "2Gi"}, - ], + "common": { + "replicas": 5, + "image": "worker:latest", + "resources": { + "requests": [ + {"name": "CPU", "value": "2"}, + {"name": "MEMORY", "value": "2Gi"}, + ], + "limits": [ + {"name": "CPU", "value": "4"}, + {"name": "MEMORY", "value": "2Gi"}, + ], + }, }, }, "slots": 2, @@ -185,19 +203,23 @@ def my_horovod_task(): ... # CleanPodPolicy.NONE is the default, so it should not be in the output dictionary expected_dict = { "launcherReplicas": { - "replicas": 1, - "resources": { - "requests": [ - {"name": "CPU", "value": "1"}, - ], - "limits": [ - {"name": "CPU", "value": "2"}, - ], + "common": { + "replicas": 1, + "resources": { + "requests": [ + {"name": "CPU", "value": "1"}, + ], + "limits": [ + {"name": "CPU", "value": "2"}, + ], + }, }, }, "workerReplicas": { - "replicas": 1, - "resources": {}, + "common": { + "replicas": 1, + "resources": {}, + }, "command": ["/usr/sbin/sshd", "-De", "-f", "/home/jobuser/.sshd_config"], }, "slots": 2, diff --git a/plugins/flytekit-kf-pytorch/flytekitplugins/kfpytorch/task.py b/plugins/flytekit-kf-pytorch/flytekitplugins/kfpytorch/task.py index 95477726e1..57fa6d8f3a 100644 --- a/plugins/flytekit-kf-pytorch/flytekitplugins/kfpytorch/task.py +++ b/plugins/flytekit-kf-pytorch/flytekitplugins/kfpytorch/task.py @@ -8,6 +8,7 @@ from enum import Enum from typing import Any, Callable, Dict, List, NamedTuple, Optional, Union +from flyteidl.plugins import common_pb2 as plugins_common from flyteidl.plugins.kubeflow import common_pb2 as kubeflow_common from flyteidl.plugins.kubeflow import pytorch_pb2 as pytorch_task from google.protobuf.json_format import MessageToDict @@ -177,17 +178,19 @@ def _convert_replica_spec( if not isinstance(replica_config, Master): replicas = replica_config.replicas return pytorch_task.DistributedPyTorchTrainingReplicaSpec( - replicas=replicas, - image=replica_config.image, - resources=resources.to_flyte_idl() if resources else None, - restart_policy=replica_config.restart_policy.value if replica_config.restart_policy else None, + common=plugins_common.CommonReplicaSpec( + replicas=replicas, + image=replica_config.image, + resources=resources.to_flyte_idl() if resources else None, + restart_policy=replica_config.restart_policy.value if replica_config.restart_policy else None, + ) ) def get_custom(self, settings: SerializationSettings) -> Dict[str, Any]: worker = self._convert_replica_spec(self.task_config.worker) # support v0 config for backwards compatibility if self.task_config.num_workers: - worker.replicas = self.task_config.num_workers + worker.common.replicas = self.task_config.num_workers run_policy = ( _convert_run_policy_to_flyte_idl(self.task_config.run_policy) if self.task_config.run_policy else None @@ -455,7 +458,7 @@ def get_custom(self, settings: SerializationSettings) -> Optional[Dict[str, Any] ) job = pytorch_task.DistributedPyTorchTrainingTask( worker_replicas=pytorch_task.DistributedPyTorchTrainingReplicaSpec( - replicas=self.max_nodes, + common=plugins_common.CommonReplicaSpec(replicas=self.max_nodes), ), elastic_config=elastic_config, run_policy=run_policy, diff --git a/plugins/flytekit-kf-pytorch/tests/test_pytorch_task.py b/plugins/flytekit-kf-pytorch/tests/test_pytorch_task.py index 0c15886c3a..1bf9408e57 100644 --- a/plugins/flytekit-kf-pytorch/tests/test_pytorch_task.py +++ b/plugins/flytekit-kf-pytorch/tests/test_pytorch_task.py @@ -33,8 +33,18 @@ def my_pytorch_task(x: int, y: str) -> int: assert my_pytorch_task.task_config is not None assert my_pytorch_task.get_custom(serialization_settings) == { - "workerReplicas": {"replicas": 10, "resources": {}}, - "masterReplicas": {"replicas": 1, "resources": {}}, + "workerReplicas": { + "common": { + "replicas": 10, + "resources": {}, + }, + }, + "masterReplicas": { + "common": { + "replicas": 1, + "resources": {}, + }, + }, } assert my_pytorch_task.resources.limits == Resources() assert my_pytorch_task.resources.requests == Resources(cpu="1") @@ -64,12 +74,16 @@ def my_pytorch_task(x: int, y: str) -> int: expected_dict = { "masterReplicas": { - "replicas": 1, - "resources": {}, + "common": { + "replicas": 1, + "resources": {}, + }, }, "workerReplicas": { - "replicas": 1, - "resources": {}, + "common": { + "replicas": 1, + "resources": {}, + }, }, } assert my_pytorch_task.get_custom(serialization_settings) == expected_dict @@ -114,24 +128,28 @@ def my_pytorch_task(x: int, y: str) -> int: expected_custom_dict = { "workerReplicas": { - "replicas": 5, - "image": "worker:latest", - "resources": { - "requests": [ - {"name": "CPU", "value": "2"}, - {"name": "MEMORY", "value": "2Gi"}, - ], - "limits": [ - {"name": "CPU", "value": "4"}, - {"name": "MEMORY", "value": "2Gi"}, - ], + "common": { + "replicas": 5, + "image": "worker:latest", + "resources": { + "requests": [ + {"name": "CPU", "value": "2"}, + {"name": "MEMORY", "value": "2Gi"}, + ], + "limits": [ + {"name": "CPU", "value": "4"}, + {"name": "MEMORY", "value": "2Gi"}, + ], + }, + "restartPolicy": "RESTART_POLICY_ON_FAILURE", }, - "restartPolicy": "RESTART_POLICY_ON_FAILURE", }, "masterReplicas": { - "resources": {}, - "replicas": 1, - "restartPolicy": "RESTART_POLICY_ALWAYS", + "common": { + "resources": {}, + "replicas": 1, + "restartPolicy": "RESTART_POLICY_ALWAYS", + }, }, "runPolicy": { "cleanPodPolicy": "CLEANPOD_POLICY_ALL", diff --git a/plugins/flytekit-kf-tensorflow/flytekitplugins/kftensorflow/task.py b/plugins/flytekit-kf-tensorflow/flytekitplugins/kftensorflow/task.py index 62cd482416..8bab5bb5ff 100644 --- a/plugins/flytekit-kf-tensorflow/flytekitplugins/kftensorflow/task.py +++ b/plugins/flytekit-kf-tensorflow/flytekitplugins/kftensorflow/task.py @@ -7,6 +7,7 @@ from enum import Enum from typing import Any, Callable, Dict, Optional, Union +from flyteidl.plugins import common_pb2 as plugins_common from flyteidl.plugins.kubeflow import common_pb2 as kubeflow_common from flyteidl.plugins.kubeflow import tensorflow_pb2 as tensorflow_task from google.protobuf.json_format import MessageToDict @@ -174,10 +175,12 @@ def _convert_replica_spec( ) -> tensorflow_task.DistributedTensorflowTrainingReplicaSpec: resources = convert_resources_to_resource_model(requests=replica_config.requests, limits=replica_config.limits) return tensorflow_task.DistributedTensorflowTrainingReplicaSpec( - replicas=replica_config.replicas, - image=replica_config.image, - resources=resources.to_flyte_idl() if resources else None, - restart_policy=replica_config.restart_policy.value if replica_config.restart_policy else None, + common=plugins_common.CommonReplicaSpec( + replicas=replica_config.replicas, + image=replica_config.image, + resources=resources.to_flyte_idl() if resources else None, + restart_policy=replica_config.restart_policy.value if replica_config.restart_policy else None, + ) ) def _convert_run_policy(self, run_policy: RunPolicy) -> kubeflow_common.RunPolicy: @@ -191,19 +194,19 @@ def _convert_run_policy(self, run_policy: RunPolicy) -> kubeflow_common.RunPolic def get_custom(self, settings: SerializationSettings) -> Dict[str, Any]: chief = self._convert_replica_spec(self.task_config.chief) if self.task_config.num_chief_replicas: - chief.replicas = self.task_config.num_chief_replicas + chief.common.replicas = self.task_config.num_chief_replicas worker = self._convert_replica_spec(self.task_config.worker) if self.task_config.num_workers: - worker.replicas = self.task_config.num_workers + worker.common.replicas = self.task_config.num_workers ps = self._convert_replica_spec(self.task_config.ps) if self.task_config.num_ps_replicas: - ps.replicas = self.task_config.num_ps_replicas + ps.common.replicas = self.task_config.num_ps_replicas evaluator = self._convert_replica_spec(self.task_config.evaluator) if self.task_config.num_evaluator_replicas: - evaluator.replicas = self.task_config.num_evaluator_replicas + evaluator.common.replicas = self.task_config.num_evaluator_replicas run_policy = self._convert_run_policy(self.task_config.run_policy) if self.task_config.run_policy else None training_task = tensorflow_task.DistributedTensorflowTrainingTask( diff --git a/plugins/flytekit-kf-tensorflow/tests/test_tensorflow_task.py b/plugins/flytekit-kf-tensorflow/tests/test_tensorflow_task.py index 0ae32439d7..fe7fa22466 100644 --- a/plugins/flytekit-kf-tensorflow/tests/test_tensorflow_task.py +++ b/plugins/flytekit-kf-tensorflow/tests/test_tensorflow_task.py @@ -44,17 +44,25 @@ def my_tensorflow_task(x: int, y: str) -> int: expected_dict = { "chiefReplicas": { - "resources": {}, + "common": { + "resources": {}, + }, }, "workerReplicas": { - "replicas": 1, - "resources": {}, + "common": { + "replicas": 1, + "resources": {}, + }, }, "psReplicas": { - "resources": {}, + "common": { + "resources": {}, + }, }, "evaluatorReplicas": { - "resources": {}, + "common": { + "resources": {}, + }, }, } assert my_tensorflow_task.get_custom(serialization_settings) == expected_dict @@ -106,47 +114,55 @@ def my_tensorflow_task(x: int, y: str) -> int: expected_custom_dict = { "chiefReplicas": { - "replicas": 1, - "image": "chief:latest", - "resources": { - "requests": [{"name": "CPU", "value": "1"}], - "limits": [{"name": "CPU", "value": "2"}], + "common": { + "replicas": 1, + "image": "chief:latest", + "resources": { + "requests": [{"name": "CPU", "value": "1"}], + "limits": [{"name": "CPU", "value": "2"}], + }, }, }, "workerReplicas": { - "replicas": 5, - "image": "worker:latest", - "resources": { - "requests": [ - {"name": "CPU", "value": "2"}, - {"name": "MEMORY", "value": "2Gi"}, - ], - "limits": [ - {"name": "CPU", "value": "4"}, - {"name": "MEMORY", "value": "2Gi"}, - ], + "common": { + "replicas": 5, + "image": "worker:latest", + "resources": { + "requests": [ + {"name": "CPU", "value": "2"}, + {"name": "MEMORY", "value": "2Gi"}, + ], + "limits": [ + {"name": "CPU", "value": "4"}, + {"name": "MEMORY", "value": "2Gi"}, + ], + }, + "restartPolicy": "RESTART_POLICY_ON_FAILURE", }, - "restartPolicy": "RESTART_POLICY_ON_FAILURE", }, "psReplicas": { - "resources": {}, - "replicas": 2, - "restartPolicy": "RESTART_POLICY_ALWAYS", + "common": { + "resources": {}, + "replicas": 2, + "restartPolicy": "RESTART_POLICY_ALWAYS", + }, }, "evaluatorReplicas": { - "replicas": 5, - "image": "evaluator:latest", - "resources": { - "requests": [ - {"name": "CPU", "value": "2"}, - {"name": "MEMORY", "value": "2Gi"}, - ], - "limits": [ - {"name": "CPU", "value": "4"}, - {"name": "MEMORY", "value": "2Gi"}, - ], + "common": { + "replicas": 5, + "image": "evaluator:latest", + "resources": { + "requests": [ + {"name": "CPU", "value": "2"}, + {"name": "MEMORY", "value": "2Gi"}, + ], + "limits": [ + {"name": "CPU", "value": "4"}, + {"name": "MEMORY", "value": "2Gi"}, + ], + }, + "restartPolicy": "RESTART_POLICY_ON_FAILURE", }, - "restartPolicy": "RESTART_POLICY_ON_FAILURE", }, } @@ -185,17 +201,25 @@ def my_tensorflow_task(x: int, y: str) -> int: expected_dict = { "chiefReplicas": { - "resources": {}, + "common": { + "resources": {}, + }, }, "workerReplicas": { - "replicas": 1, - "resources": {}, + "common": { + "replicas": 1, + "resources": {}, + }, }, "psReplicas": { - "resources": {}, + "common": { + "resources": {}, + }, }, "evaluatorReplicas": { - "resources": {}, + "common": { + "resources": {}, + }, }, "runPolicy": { "cleanPodPolicy": "CLEANPOD_POLICY_RUNNING", @@ -233,20 +257,28 @@ def my_tensorflow_task(x: int, y: str) -> int: expected_dict = { "chiefReplicas": { - "replicas": 1, - "resources": {}, + "common": { + "replicas": 1, + "resources": {}, + }, }, "workerReplicas": { - "replicas": 10, - "resources": {}, + "common": { + "replicas": 10, + "resources": {}, + }, }, "psReplicas": { - "replicas": 1, - "resources": {}, + "common": { + "replicas": 1, + "resources": {}, + }, }, "evaluatorReplicas": { - "replicas": 1, - "resources": {}, + "common": { + "replicas": 1, + "resources": {}, + }, }, } From 9100245d8f13c31e0edb75fed1adc16217a7d1bd Mon Sep 17 00:00:00 2001 From: Chi-Sheng Liu Date: Fri, 14 Jun 2024 16:40:11 +0800 Subject: [PATCH 2/3] fix(ci): Fix CI error Resolves: flyteorg/flyte#4408 Signed-off-by: Chi-Sheng Liu --- plugins/flytekit-kf-mpi/setup.py | 2 +- plugins/flytekit-kf-pytorch/setup.py | 2 +- plugins/flytekit-kf-tensorflow/setup.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/plugins/flytekit-kf-mpi/setup.py b/plugins/flytekit-kf-mpi/setup.py index 05efff84b0..6b1cb0e762 100644 --- a/plugins/flytekit-kf-mpi/setup.py +++ b/plugins/flytekit-kf-mpi/setup.py @@ -4,7 +4,7 @@ microlib_name = f"flytekitplugins-{PLUGIN_NAME}" -plugin_requires = ["flytekit>=1.6.1,<2.0.0"] +plugin_requires = ["flyteidl>1.12.2", "flytekit>=1.6.1"] __version__ = "0.0.0+develop" diff --git a/plugins/flytekit-kf-pytorch/setup.py b/plugins/flytekit-kf-pytorch/setup.py index cc90e0b299..7be0a07b19 100644 --- a/plugins/flytekit-kf-pytorch/setup.py +++ b/plugins/flytekit-kf-pytorch/setup.py @@ -4,7 +4,7 @@ microlib_name = f"flytekitplugins-{PLUGIN_NAME}" -plugin_requires = ["cloudpickle", "flyteidl>=1.5.1", "flytekit>=1.6.1"] +plugin_requires = ["cloudpickle", "flyteidl>1.12.2", "flytekit>=1.6.1"] __version__ = "0.0.0+develop" diff --git a/plugins/flytekit-kf-tensorflow/setup.py b/plugins/flytekit-kf-tensorflow/setup.py index 25ffe19eec..c3983cec50 100644 --- a/plugins/flytekit-kf-tensorflow/setup.py +++ b/plugins/flytekit-kf-tensorflow/setup.py @@ -4,7 +4,7 @@ microlib_name = f"flytekitplugins-{PLUGIN_NAME}" -plugin_requires = ["flyteidl>=1.10.0", "flytekit>=1.6.1"] +plugin_requires = ["flyteidl>1.12.2", "flytekit>=1.6.1"] __version__ = "0.0.0+develop" From a972abbe9a9512a7a757886d9ee86bae79916e3a Mon Sep 17 00:00:00 2001 From: Chi-Sheng Liu Date: Sun, 16 Jun 2024 11:34:03 +0800 Subject: [PATCH 3/3] feat(replicaspec): Keeps old fields for backward-compatibility Resolves: flyteorg/flyte#4408 Signed-off-by: Chi-Sheng Liu --- .../flytekitplugins/kfmpi/task.py | 10 ++++ .../flytekit-kf-mpi/tests/test_mpi_task.py | 37 +++++++++++++ .../flytekitplugins/kfpytorch/task.py | 11 +++- .../tests/test_pytorch_task.py | 24 +++++++++ .../flytekitplugins/kftensorflow/task.py | 15 +++++- .../tests/test_tensorflow_task.py | 53 +++++++++++++++++++ 6 files changed, 148 insertions(+), 2 deletions(-) diff --git a/plugins/flytekit-kf-mpi/flytekitplugins/kfmpi/task.py b/plugins/flytekit-kf-mpi/flytekitplugins/kfmpi/task.py index b911506bea..8e577542d7 100644 --- a/plugins/flytekit-kf-mpi/flytekitplugins/kfmpi/task.py +++ b/plugins/flytekit-kf-mpi/flytekitplugins/kfmpi/task.py @@ -179,6 +179,12 @@ def _convert_replica_spec( restart_policy=replica_config.restart_policy.value if replica_config.restart_policy else None, ), command=replica_config.command, + # The forllowing fields are deprecated. They are kept for backwards compatibility. + # The following fields are deprecated and will be removed in the future + replicas=replica_config.replicas, + image=replica_config.image, + resources=resources.to_flyte_idl() if resources else None, + restart_policy=replica_config.restart_policy.value if replica_config.restart_policy else None, ) def _convert_run_policy(self, run_policy: RunPolicy) -> kubeflow_common.RunPolicy: @@ -207,10 +213,14 @@ def get_custom(self, settings: SerializationSettings) -> Dict[str, Any]: worker = self._convert_replica_spec(self.task_config.worker) if self.task_config.num_workers: worker.common.replicas = self.task_config.num_workers + # Deprecated. Only kept for backwards compatibility. + worker.replicas = self.task_config.num_workers launcher = self._convert_replica_spec(self.task_config.launcher) if self.task_config.num_launcher_replicas: launcher.common.replicas = self.task_config.num_launcher_replicas + # Deprecated. Only kept for backwards compatibility. + launcher.replicas = self.task_config.num_launcher_replicas run_policy = self._convert_run_policy(self.task_config.run_policy) if self.task_config.run_policy else None mpi_job = mpi_task.DistributedMPITrainingTask( diff --git a/plugins/flytekit-kf-mpi/tests/test_mpi_task.py b/plugins/flytekit-kf-mpi/tests/test_mpi_task.py index 0b8b8353c1..fbf2b4d33d 100644 --- a/plugins/flytekit-kf-mpi/tests/test_mpi_task.py +++ b/plugins/flytekit-kf-mpi/tests/test_mpi_task.py @@ -39,12 +39,16 @@ def my_mpi_task(x: int, y: str) -> int: "replicas": 10, "resources": {}, }, + "replicas": 10, + "resources": {}, }, "workerReplicas": { "common": { "replicas": 10, "resources": {}, }, + "replicas": 10, + "resources": {}, }, "slots": 1, } @@ -83,12 +87,16 @@ def my_mpi_task(x: int, y: str) -> int: "replicas": 1, "resources": {}, }, + "replicas": 1, + "resources": {}, }, "workerReplicas": { "common": { "replicas": 1, "resources": {}, }, + "replicas": 1, + "resources": {}, }, "slots": 1, } @@ -146,6 +154,12 @@ def my_mpi_task(x: int, y: str) -> int: "limits": [{"name": "CPU", "value": "2"}], }, }, + "replicas": 1, + "image": "launcher:latest", + "resources": { + "requests": [{"name": "CPU", "value": "1"}], + "limits": [{"name": "CPU", "value": "2"}], + }, }, "workerReplicas": { "common": { @@ -162,6 +176,18 @@ def my_mpi_task(x: int, y: str) -> int: ], }, }, + "replicas": 5, + "image": "worker:latest", + "resources": { + "requests": [ + {"name": "CPU", "value": "2"}, + {"name": "MEMORY", "value": "2Gi"}, + ], + "limits": [ + {"name": "CPU", "value": "4"}, + {"name": "MEMORY", "value": "2Gi"}, + ], + }, }, "slots": 2, "runPolicy": {"cleanPodPolicy": "CLEANPOD_POLICY_ALL"}, @@ -214,12 +240,23 @@ def my_horovod_task(): ... ], }, }, + "replicas": 1, + "resources": { + "requests": [ + {"name": "CPU", "value": "1"}, + ], + "limits": [ + {"name": "CPU", "value": "2"}, + ], + }, }, "workerReplicas": { "common": { "replicas": 1, "resources": {}, }, + "replicas": 1, + "resources": {}, "command": ["/usr/sbin/sshd", "-De", "-f", "/home/jobuser/.sshd_config"], }, "slots": 2, diff --git a/plugins/flytekit-kf-pytorch/flytekitplugins/kfpytorch/task.py b/plugins/flytekit-kf-pytorch/flytekitplugins/kfpytorch/task.py index 57fa6d8f3a..68107a8f9a 100644 --- a/plugins/flytekit-kf-pytorch/flytekitplugins/kfpytorch/task.py +++ b/plugins/flytekit-kf-pytorch/flytekitplugins/kfpytorch/task.py @@ -183,7 +183,12 @@ def _convert_replica_spec( image=replica_config.image, resources=resources.to_flyte_idl() if resources else None, restart_policy=replica_config.restart_policy.value if replica_config.restart_policy else None, - ) + ), + # The forllowing fields are deprecated. They are kept for backwards compatibility. + replicas=replicas, + image=replica_config.image, + resources=resources.to_flyte_idl() if resources else None, + restart_policy=replica_config.restart_policy.value if replica_config.restart_policy else None, ) def get_custom(self, settings: SerializationSettings) -> Dict[str, Any]: @@ -191,6 +196,8 @@ def get_custom(self, settings: SerializationSettings) -> Dict[str, Any]: # support v0 config for backwards compatibility if self.task_config.num_workers: worker.common.replicas = self.task_config.num_workers + # Deprecated. Only kept for backwards compatibility. + worker.replicas = self.task_config.num_workers run_policy = ( _convert_run_policy_to_flyte_idl(self.task_config.run_policy) if self.task_config.run_policy else None @@ -459,6 +466,8 @@ def get_custom(self, settings: SerializationSettings) -> Optional[Dict[str, Any] job = pytorch_task.DistributedPyTorchTrainingTask( worker_replicas=pytorch_task.DistributedPyTorchTrainingReplicaSpec( common=plugins_common.CommonReplicaSpec(replicas=self.max_nodes), + # The following fields are deprecated. They are kept for backwards compatibility. + replicas=self.max_nodes, ), elastic_config=elastic_config, run_policy=run_policy, diff --git a/plugins/flytekit-kf-pytorch/tests/test_pytorch_task.py b/plugins/flytekit-kf-pytorch/tests/test_pytorch_task.py index 1bf9408e57..7714cc583f 100644 --- a/plugins/flytekit-kf-pytorch/tests/test_pytorch_task.py +++ b/plugins/flytekit-kf-pytorch/tests/test_pytorch_task.py @@ -38,12 +38,16 @@ def my_pytorch_task(x: int, y: str) -> int: "replicas": 10, "resources": {}, }, + "replicas": 10, + "resources": {}, }, "masterReplicas": { "common": { "replicas": 1, "resources": {}, }, + "replicas": 1, + "resources": {}, }, } assert my_pytorch_task.resources.limits == Resources() @@ -78,12 +82,16 @@ def my_pytorch_task(x: int, y: str) -> int: "replicas": 1, "resources": {}, }, + "replicas": 1, + "resources": {}, }, "workerReplicas": { "common": { "replicas": 1, "resources": {}, }, + "replicas": 1, + "resources": {}, }, } assert my_pytorch_task.get_custom(serialization_settings) == expected_dict @@ -143,6 +151,19 @@ def my_pytorch_task(x: int, y: str) -> int: }, "restartPolicy": "RESTART_POLICY_ON_FAILURE", }, + "replicas": 5, + "image": "worker:latest", + "resources": { + "requests": [ + {"name": "CPU", "value": "2"}, + {"name": "MEMORY", "value": "2Gi"}, + ], + "limits": [ + {"name": "CPU", "value": "4"}, + {"name": "MEMORY", "value": "2Gi"}, + ], + }, + "restartPolicy": "RESTART_POLICY_ON_FAILURE", }, "masterReplicas": { "common": { @@ -150,6 +171,9 @@ def my_pytorch_task(x: int, y: str) -> int: "replicas": 1, "restartPolicy": "RESTART_POLICY_ALWAYS", }, + "resources": {}, + "replicas": 1, + "restartPolicy": "RESTART_POLICY_ALWAYS", }, "runPolicy": { "cleanPodPolicy": "CLEANPOD_POLICY_ALL", diff --git a/plugins/flytekit-kf-tensorflow/flytekitplugins/kftensorflow/task.py b/plugins/flytekit-kf-tensorflow/flytekitplugins/kftensorflow/task.py index 8bab5bb5ff..bf07f2bab0 100644 --- a/plugins/flytekit-kf-tensorflow/flytekitplugins/kftensorflow/task.py +++ b/plugins/flytekit-kf-tensorflow/flytekitplugins/kftensorflow/task.py @@ -180,7 +180,12 @@ def _convert_replica_spec( image=replica_config.image, resources=resources.to_flyte_idl() if resources else None, restart_policy=replica_config.restart_policy.value if replica_config.restart_policy else None, - ) + ), + # The following fields are deprecated. They are kept for backwards compatibility. + replicas=replica_config.replicas, + image=replica_config.image, + resources=resources.to_flyte_idl() if resources else None, + restart_policy=replica_config.restart_policy.value if replica_config.restart_policy else None, ) def _convert_run_policy(self, run_policy: RunPolicy) -> kubeflow_common.RunPolicy: @@ -195,18 +200,26 @@ def get_custom(self, settings: SerializationSettings) -> Dict[str, Any]: chief = self._convert_replica_spec(self.task_config.chief) if self.task_config.num_chief_replicas: chief.common.replicas = self.task_config.num_chief_replicas + # Deprecated. Only kept for backwards compatibility. + chief.replicas = self.task_config.num_chief_replicas worker = self._convert_replica_spec(self.task_config.worker) if self.task_config.num_workers: worker.common.replicas = self.task_config.num_workers + # Deprecated. Only kept for backwards compatibility. + worker.replicas = self.task_config.num_workers ps = self._convert_replica_spec(self.task_config.ps) if self.task_config.num_ps_replicas: ps.common.replicas = self.task_config.num_ps_replicas + # Deprecated. Only kept for backwards compatibility. + ps.replicas = self.task_config.num_ps_replicas evaluator = self._convert_replica_spec(self.task_config.evaluator) if self.task_config.num_evaluator_replicas: evaluator.common.replicas = self.task_config.num_evaluator_replicas + # Deprecated. Only kept for backwards compatibility. + evaluator.replicas = self.task_config.num_evaluator_replicas run_policy = self._convert_run_policy(self.task_config.run_policy) if self.task_config.run_policy else None training_task = tensorflow_task.DistributedTensorflowTrainingTask( diff --git a/plugins/flytekit-kf-tensorflow/tests/test_tensorflow_task.py b/plugins/flytekit-kf-tensorflow/tests/test_tensorflow_task.py index fe7fa22466..f6fe8c4d6b 100644 --- a/plugins/flytekit-kf-tensorflow/tests/test_tensorflow_task.py +++ b/plugins/flytekit-kf-tensorflow/tests/test_tensorflow_task.py @@ -47,22 +47,27 @@ def my_tensorflow_task(x: int, y: str) -> int: "common": { "resources": {}, }, + "resources": {}, }, "workerReplicas": { "common": { "replicas": 1, "resources": {}, }, + "replicas": 1, + "resources": {}, }, "psReplicas": { "common": { "resources": {}, }, + "resources": {}, }, "evaluatorReplicas": { "common": { "resources": {}, }, + "resources": {}, }, } assert my_tensorflow_task.get_custom(serialization_settings) == expected_dict @@ -122,6 +127,12 @@ def my_tensorflow_task(x: int, y: str) -> int: "limits": [{"name": "CPU", "value": "2"}], }, }, + "replicas": 1, + "image": "chief:latest", + "resources": { + "requests": [{"name": "CPU", "value": "1"}], + "limits": [{"name": "CPU", "value": "2"}], + }, }, "workerReplicas": { "common": { @@ -139,6 +150,19 @@ def my_tensorflow_task(x: int, y: str) -> int: }, "restartPolicy": "RESTART_POLICY_ON_FAILURE", }, + "replicas": 5, + "image": "worker:latest", + "resources": { + "requests": [ + {"name": "CPU", "value": "2"}, + {"name": "MEMORY", "value": "2Gi"}, + ], + "limits": [ + {"name": "CPU", "value": "4"}, + {"name": "MEMORY", "value": "2Gi"}, + ], + }, + "restartPolicy": "RESTART_POLICY_ON_FAILURE", }, "psReplicas": { "common": { @@ -146,6 +170,9 @@ def my_tensorflow_task(x: int, y: str) -> int: "replicas": 2, "restartPolicy": "RESTART_POLICY_ALWAYS", }, + "resources": {}, + "replicas": 2, + "restartPolicy": "RESTART_POLICY_ALWAYS", }, "evaluatorReplicas": { "common": { @@ -163,6 +190,19 @@ def my_tensorflow_task(x: int, y: str) -> int: }, "restartPolicy": "RESTART_POLICY_ON_FAILURE", }, + "replicas": 5, + "image": "evaluator:latest", + "resources": { + "requests": [ + {"name": "CPU", "value": "2"}, + {"name": "MEMORY", "value": "2Gi"}, + ], + "limits": [ + {"name": "CPU", "value": "4"}, + {"name": "MEMORY", "value": "2Gi"}, + ], + }, + "restartPolicy": "RESTART_POLICY_ON_FAILURE", }, } @@ -204,22 +244,27 @@ def my_tensorflow_task(x: int, y: str) -> int: "common": { "resources": {}, }, + "resources": {}, }, "workerReplicas": { "common": { "replicas": 1, "resources": {}, }, + "replicas": 1, + "resources": {}, }, "psReplicas": { "common": { "resources": {}, }, + "resources": {}, }, "evaluatorReplicas": { "common": { "resources": {}, }, + "resources": {}, }, "runPolicy": { "cleanPodPolicy": "CLEANPOD_POLICY_RUNNING", @@ -261,24 +306,32 @@ def my_tensorflow_task(x: int, y: str) -> int: "replicas": 1, "resources": {}, }, + "replicas": 1, + "resources": {}, }, "workerReplicas": { "common": { "replicas": 10, "resources": {}, }, + "replicas": 10, + "resources": {}, }, "psReplicas": { "common": { "replicas": 1, "resources": {}, }, + "replicas": 1, + "resources": {}, }, "evaluatorReplicas": { "common": { "replicas": 1, "resources": {}, }, + "replicas": 1, + "resources": {}, }, }