From b2225d283ed12e16cb056ed082eb307dbd9a431e Mon Sep 17 00:00:00 2001 From: Chi-Sheng Liu Date: Sun, 12 May 2024 03:20:26 +0800 Subject: [PATCH] feat(replica-spec): Update corresponding golang files based on protobuf changes Resolves: flyteorg/flyte#4408 Signed-off-by: Chi-Sheng Liu --- .../k8s/kfoperators/common/common_operator.go | 26 ++- .../plugins/k8s/kfoperators/mpi/mpi_test.go | 102 +++++----- .../k8s/kfoperators/pytorch/pytorch_test.go | 116 +++++++----- .../k8s/kfoperators/tensorflow/tensorflow.go | 2 +- .../kfoperators/tensorflow/tensorflow_test.go | 176 ++++++++++-------- 5 files changed, 234 insertions(+), 188 deletions(-) diff --git a/flyteplugins/go/tasks/plugins/k8s/kfoperators/common/common_operator.go b/flyteplugins/go/tasks/plugins/k8s/kfoperators/common/common_operator.go index 7e4b32e25e1..608683ebd45 100644 --- a/flyteplugins/go/tasks/plugins/k8s/kfoperators/common/common_operator.go +++ b/flyteplugins/go/tasks/plugins/k8s/kfoperators/common/common_operator.go @@ -11,6 +11,7 @@ import ( meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core" + "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/plugins" kfplugins "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/plugins/kubeflow" flyteerr "github.com/flyteorg/flyte/flyteplugins/go/tasks/errors" "github.com/flyteorg/flyte/flyteplugins/go/tasks/logs" @@ -241,11 +242,11 @@ func ParseCleanPodPolicy(flyteCleanPodPolicy kfplugins.CleanPodPolicy) commonOp. } // Get k8s restart policy from flyte kubeflow plugins restart policy. -func ParseRestartPolicy(flyteRestartPolicy kfplugins.RestartPolicy) commonOp.RestartPolicy { - restartPolicyMap := map[kfplugins.RestartPolicy]commonOp.RestartPolicy{ - kfplugins.RestartPolicy_RESTART_POLICY_NEVER: commonOp.RestartPolicyNever, - kfplugins.RestartPolicy_RESTART_POLICY_ON_FAILURE: commonOp.RestartPolicyOnFailure, - kfplugins.RestartPolicy_RESTART_POLICY_ALWAYS: commonOp.RestartPolicyAlways, +func ParseRestartPolicy(flyteRestartPolicy plugins.RestartPolicy) commonOp.RestartPolicy { + restartPolicyMap := map[plugins.RestartPolicy]commonOp.RestartPolicy{ + plugins.RestartPolicy_RESTART_POLICY_NEVER: commonOp.RestartPolicyNever, + plugins.RestartPolicy_RESTART_POLICY_ON_FAILURE: commonOp.RestartPolicyOnFailure, + plugins.RestartPolicy_RESTART_POLICY_ALWAYS: commonOp.RestartPolicyAlways, } return restartPolicyMap[flyteRestartPolicy] } @@ -290,10 +291,7 @@ func ToReplicaSpec(ctx context.Context, taskCtx pluginsCore.TaskExecutionContext } type kfDistributedReplicaSpec interface { - GetReplicas() int32 - GetImage() string - GetResources() *core.Resources - GetRestartPolicy() kfplugins.RestartPolicy + GetCommon() *plugins.CommonReplicaSpec } type allowsCommandOverride interface { @@ -302,8 +300,8 @@ type allowsCommandOverride interface { func ToReplicaSpecWithOverrides(ctx context.Context, taskCtx pluginsCore.TaskExecutionContext, rs kfDistributedReplicaSpec, primaryContainerName string, isMaster bool) (*commonOp.ReplicaSpec, error) { taskCtxOptions := []flytek8s.PluginTaskExecutionContextOption{} - if rs != nil && rs.GetResources() != nil { - resources, err := flytek8s.ToK8sResourceRequirements(rs.GetResources()) + if rs != nil && rs.GetCommon().GetResources() != nil { + resources, err := flytek8s.ToK8sResourceRequirements(rs.GetCommon().GetResources()) if err != nil { return nil, flyteerr.Errorf(flyteerr.BadTaskSpecification, "invalid TaskSpecification on Resources [%v], Err: [%v]", resources, err.Error()) } @@ -329,16 +327,16 @@ func ToReplicaSpecWithOverrides(ctx context.Context, taskCtx pluginsCore.TaskExe if err := OverrideContainerSpec( &replicaSpec.Template.Spec, primaryContainerName, - rs.GetImage(), + rs.GetCommon().GetImage(), command, ); err != nil { return nil, err } - replicaSpec.RestartPolicy = ParseRestartPolicy(rs.GetRestartPolicy()) + replicaSpec.RestartPolicy = ParseRestartPolicy(rs.GetCommon().GetRestartPolicy()) if !isMaster { - replicas := rs.GetReplicas() + replicas := rs.GetCommon().GetReplicas() replicaSpec.Replicas = &replicas } } diff --git a/flyteplugins/go/tasks/plugins/k8s/kfoperators/mpi/mpi_test.go b/flyteplugins/go/tasks/plugins/k8s/kfoperators/mpi/mpi_test.go index 3c823510aca..28b6aa2720e 100644 --- a/flyteplugins/go/tasks/plugins/k8s/kfoperators/mpi/mpi_test.go +++ b/flyteplugins/go/tasks/plugins/k8s/kfoperators/mpi/mpi_test.go @@ -596,29 +596,33 @@ func TestBuildResourceMPIV1(t *testing.T) { workerCommand := []string{"/usr/sbin/sshd", "/.sshd_config"} taskConfig := &kfplugins.DistributedMPITrainingTask{ LauncherReplicas: &kfplugins.DistributedMPITrainingReplicaSpec{ - Image: testImage, - Resources: &core.Resources{ - Requests: []*core.Resources_ResourceEntry{ - {Name: core.Resources_CPU, Value: "250m"}, - {Name: core.Resources_MEMORY, Value: "250Mi"}, - }, - Limits: []*core.Resources_ResourceEntry{ - {Name: core.Resources_CPU, Value: "500m"}, - {Name: core.Resources_MEMORY, Value: "500Mi"}, + Common: &kfplugins.CommonReplicaSpec{ + Image: testImage, + Resources: &core.Resources{ + Requests: []*core.Resources_ResourceEntry{ + {Name: core.Resources_CPU, Value: "250m"}, + {Name: core.Resources_MEMORY, Value: "250Mi"}, + }, + Limits: []*core.Resources_ResourceEntry{ + {Name: core.Resources_CPU, Value: "500m"}, + {Name: core.Resources_MEMORY, Value: "500Mi"}, + }, }, }, Command: launcherCommand, }, WorkerReplicas: &kfplugins.DistributedMPITrainingReplicaSpec{ - Replicas: 100, - Resources: &core.Resources{ - Requests: []*core.Resources_ResourceEntry{ - {Name: core.Resources_CPU, Value: "1024m"}, - {Name: core.Resources_MEMORY, Value: "1Gi"}, - }, - Limits: []*core.Resources_ResourceEntry{ - {Name: core.Resources_CPU, Value: "2048m"}, - {Name: core.Resources_MEMORY, Value: "2Gi"}, + Common: &kfplugins.CommonReplicaSpec{ + Replicas: 100, + Resources: &core.Resources{ + Requests: []*core.Resources_ResourceEntry{ + {Name: core.Resources_CPU, Value: "1024m"}, + {Name: core.Resources_MEMORY, Value: "1Gi"}, + }, + Limits: []*core.Resources_ResourceEntry{ + {Name: core.Resources_CPU, Value: "2048m"}, + {Name: core.Resources_MEMORY, Value: "2Gi"}, + }, }, }, Command: workerCommand, @@ -673,15 +677,17 @@ func TestBuildResourceMPIV1WithOnlyWorkerReplica(t *testing.T) { taskConfig := &kfplugins.DistributedMPITrainingTask{ WorkerReplicas: &kfplugins.DistributedMPITrainingReplicaSpec{ - Replicas: 100, - Resources: &core.Resources{ - Requests: []*core.Resources_ResourceEntry{ - {Name: core.Resources_CPU, Value: "1024m"}, - {Name: core.Resources_MEMORY, Value: "1Gi"}, - }, - Limits: []*core.Resources_ResourceEntry{ - {Name: core.Resources_CPU, Value: "2048m"}, - {Name: core.Resources_MEMORY, Value: "2Gi"}, + Common: &kfplugins.CommonReplicaSpec{ + Replicas: 100, + Resources: &core.Resources{ + Requests: []*core.Resources_ResourceEntry{ + {Name: core.Resources_CPU, Value: "1024m"}, + {Name: core.Resources_MEMORY, Value: "1Gi"}, + }, + Limits: []*core.Resources_ResourceEntry{ + {Name: core.Resources_CPU, Value: "2048m"}, + {Name: core.Resources_MEMORY, Value: "2Gi"}, + }, }, }, Command: []string{"/usr/sbin/sshd", "/.sshd_config"}, @@ -735,29 +741,33 @@ func TestBuildResourceMPIV1ResourceTolerations(t *testing.T) { taskConfig := &kfplugins.DistributedMPITrainingTask{ LauncherReplicas: &kfplugins.DistributedMPITrainingReplicaSpec{ - Resources: &core.Resources{ - Requests: []*core.Resources_ResourceEntry{ - {Name: core.Resources_CPU, Value: "250m"}, - {Name: core.Resources_MEMORY, Value: "250Mi"}, - }, - Limits: []*core.Resources_ResourceEntry{ - {Name: core.Resources_CPU, Value: "500m"}, - {Name: core.Resources_MEMORY, Value: "500Mi"}, + Common: &kfplugins.CommonReplicaSpec{ + Resources: &core.Resources{ + Requests: []*core.Resources_ResourceEntry{ + {Name: core.Resources_CPU, Value: "250m"}, + {Name: core.Resources_MEMORY, Value: "250Mi"}, + }, + Limits: []*core.Resources_ResourceEntry{ + {Name: core.Resources_CPU, Value: "500m"}, + {Name: core.Resources_MEMORY, Value: "500Mi"}, + }, }, }, }, WorkerReplicas: &kfplugins.DistributedMPITrainingReplicaSpec{ - Replicas: 100, - Resources: &core.Resources{ - Requests: []*core.Resources_ResourceEntry{ - {Name: core.Resources_CPU, Value: "1024m"}, - {Name: core.Resources_MEMORY, Value: "1Gi"}, - {Name: core.Resources_GPU, Value: "1"}, - }, - Limits: []*core.Resources_ResourceEntry{ - {Name: core.Resources_CPU, Value: "2048m"}, - {Name: core.Resources_MEMORY, Value: "2Gi"}, - {Name: core.Resources_GPU, Value: "1"}, + Common: &kfplugins.CommonReplicaSpec{ + Replicas: 100, + Resources: &core.Resources{ + Requests: []*core.Resources_ResourceEntry{ + {Name: core.Resources_CPU, Value: "1024m"}, + {Name: core.Resources_MEMORY, Value: "1Gi"}, + {Name: core.Resources_GPU, Value: "1"}, + }, + Limits: []*core.Resources_ResourceEntry{ + {Name: core.Resources_CPU, Value: "2048m"}, + {Name: core.Resources_MEMORY, Value: "2Gi"}, + {Name: core.Resources_GPU, Value: "1"}, + }, }, }, }, diff --git a/flyteplugins/go/tasks/plugins/k8s/kfoperators/pytorch/pytorch_test.go b/flyteplugins/go/tasks/plugins/k8s/kfoperators/pytorch/pytorch_test.go index aea096e4ba8..6442dd44642 100644 --- a/flyteplugins/go/tasks/plugins/k8s/kfoperators/pytorch/pytorch_test.go +++ b/flyteplugins/go/tasks/plugins/k8s/kfoperators/pytorch/pytorch_test.go @@ -729,29 +729,33 @@ func TestReplicaCounts(t *testing.T) { func TestBuildResourcePytorchV1(t *testing.T) { taskConfig := &kfplugins.DistributedPyTorchTrainingTask{ MasterReplicas: &kfplugins.DistributedPyTorchTrainingReplicaSpec{ - Image: testImageMaster, - Resources: &core.Resources{ - Requests: []*core.Resources_ResourceEntry{ - {Name: core.Resources_CPU, Value: "250m"}, - {Name: core.Resources_MEMORY, Value: "250Mi"}, - }, - Limits: []*core.Resources_ResourceEntry{ - {Name: core.Resources_CPU, Value: "500m"}, - {Name: core.Resources_MEMORY, Value: "500Mi"}, + Common: &kfplugins.CommonReplicaSpec{ + Image: testImageMaster, + Resources: &core.Resources{ + Requests: []*core.Resources_ResourceEntry{ + {Name: core.Resources_CPU, Value: "250m"}, + {Name: core.Resources_MEMORY, Value: "250Mi"}, + }, + Limits: []*core.Resources_ResourceEntry{ + {Name: core.Resources_CPU, Value: "500m"}, + {Name: core.Resources_MEMORY, Value: "500Mi"}, + }, }, + RestartPolicy: kfplugins.RestartPolicy_RESTART_POLICY_ALWAYS, }, - RestartPolicy: kfplugins.RestartPolicy_RESTART_POLICY_ALWAYS, }, WorkerReplicas: &kfplugins.DistributedPyTorchTrainingReplicaSpec{ - Replicas: 100, - Resources: &core.Resources{ - Requests: []*core.Resources_ResourceEntry{ - {Name: core.Resources_CPU, Value: "1024m"}, - {Name: core.Resources_MEMORY, Value: "1Gi"}, - }, - Limits: []*core.Resources_ResourceEntry{ - {Name: core.Resources_CPU, Value: "2048m"}, - {Name: core.Resources_MEMORY, Value: "2Gi"}, + Common: &kfplugins.CommonReplicaSpec{ + Replicas: 100, + Resources: &core.Resources{ + Requests: []*core.Resources_ResourceEntry{ + {Name: core.Resources_CPU, Value: "1024m"}, + {Name: core.Resources_MEMORY, Value: "1Gi"}, + }, + Limits: []*core.Resources_ResourceEntry{ + {Name: core.Resources_CPU, Value: "2048m"}, + {Name: core.Resources_MEMORY, Value: "2Gi"}, + }, }, }, }, @@ -814,7 +818,9 @@ func TestBuildResourcePytorchV1(t *testing.T) { func TestBuildResourcePytorchV1WithRunPolicy(t *testing.T) { taskConfig := &kfplugins.DistributedPyTorchTrainingTask{ WorkerReplicas: &kfplugins.DistributedPyTorchTrainingReplicaSpec{ - Replicas: 100, + Common: &kfplugins.CommonReplicaSpec{ + Replicas: 100, + }, }, RunPolicy: &kfplugins.RunPolicy{ CleanPodPolicy: kfplugins.CleanPodPolicy_CLEANPOD_POLICY_ALL, @@ -845,15 +851,17 @@ func TestBuildResourcePytorchV1WithRunPolicy(t *testing.T) { func TestBuildResourcePytorchV1WithOnlyWorkerSpec(t *testing.T) { taskConfig := &kfplugins.DistributedPyTorchTrainingTask{ WorkerReplicas: &kfplugins.DistributedPyTorchTrainingReplicaSpec{ - Replicas: 100, - Resources: &core.Resources{ - Requests: []*core.Resources_ResourceEntry{ - {Name: core.Resources_CPU, Value: "1024m"}, - {Name: core.Resources_MEMORY, Value: "1Gi"}, - }, - Limits: []*core.Resources_ResourceEntry{ - {Name: core.Resources_CPU, Value: "2048m"}, - {Name: core.Resources_MEMORY, Value: "2Gi"}, + Common: &kfplugins.CommonReplicaSpec{ + Replicas: 100, + Resources: &core.Resources{ + Requests: []*core.Resources_ResourceEntry{ + {Name: core.Resources_CPU, Value: "1024m"}, + {Name: core.Resources_MEMORY, Value: "1Gi"}, + }, + Limits: []*core.Resources_ResourceEntry{ + {Name: core.Resources_CPU, Value: "2048m"}, + {Name: core.Resources_MEMORY, Value: "2Gi"}, + }, }, }, }, @@ -926,29 +934,33 @@ func TestBuildResourcePytorchV1ResourceTolerations(t *testing.T) { taskConfig := &kfplugins.DistributedPyTorchTrainingTask{ MasterReplicas: &kfplugins.DistributedPyTorchTrainingReplicaSpec{ - Resources: &core.Resources{ - Requests: []*core.Resources_ResourceEntry{ - {Name: core.Resources_CPU, Value: "250m"}, - {Name: core.Resources_MEMORY, Value: "250Mi"}, - }, - Limits: []*core.Resources_ResourceEntry{ - {Name: core.Resources_CPU, Value: "500m"}, - {Name: core.Resources_MEMORY, Value: "500Mi"}, + Common: &kfplugins.CommonReplicaSpec{ + Resources: &core.Resources{ + Requests: []*core.Resources_ResourceEntry{ + {Name: core.Resources_CPU, Value: "250m"}, + {Name: core.Resources_MEMORY, Value: "250Mi"}, + }, + Limits: []*core.Resources_ResourceEntry{ + {Name: core.Resources_CPU, Value: "500m"}, + {Name: core.Resources_MEMORY, Value: "500Mi"}, + }, }, }, }, WorkerReplicas: &kfplugins.DistributedPyTorchTrainingReplicaSpec{ - Replicas: 100, - Resources: &core.Resources{ - Requests: []*core.Resources_ResourceEntry{ - {Name: core.Resources_CPU, Value: "1024m"}, - {Name: core.Resources_MEMORY, Value: "1Gi"}, - {Name: core.Resources_GPU, Value: "1"}, - }, - Limits: []*core.Resources_ResourceEntry{ - {Name: core.Resources_CPU, Value: "2048m"}, - {Name: core.Resources_MEMORY, Value: "2Gi"}, - {Name: core.Resources_GPU, Value: "1"}, + Common: &kfplugins.CommonReplicaSpec{ + Replicas: 100, + Resources: &core.Resources{ + Requests: []*core.Resources_ResourceEntry{ + {Name: core.Resources_CPU, Value: "1024m"}, + {Name: core.Resources_MEMORY, Value: "1Gi"}, + {Name: core.Resources_GPU, Value: "1"}, + }, + Limits: []*core.Resources_ResourceEntry{ + {Name: core.Resources_CPU, Value: "2048m"}, + {Name: core.Resources_MEMORY, Value: "2Gi"}, + {Name: core.Resources_GPU, Value: "1"}, + }, }, }, }, @@ -973,7 +985,9 @@ func TestBuildResourcePytorchV1ResourceTolerations(t *testing.T) { func TestBuildResourcePytorchV1WithElastic(t *testing.T) { taskConfig := &kfplugins.DistributedPyTorchTrainingTask{ WorkerReplicas: &kfplugins.DistributedPyTorchTrainingReplicaSpec{ - Replicas: 2, + Common: &kfplugins.CommonReplicaSpec{ + Replicas: 2, + }, }, ElasticConfig: &kfplugins.ElasticConfig{MinReplicas: 1, MaxReplicas: 2, NprocPerNode: 4, RdzvBackend: "c10d"}, } @@ -1011,7 +1025,9 @@ func TestBuildResourcePytorchV1WithElastic(t *testing.T) { func TestBuildResourcePytorchV1WithZeroWorker(t *testing.T) { taskConfig := &kfplugins.DistributedPyTorchTrainingTask{ WorkerReplicas: &kfplugins.DistributedPyTorchTrainingReplicaSpec{ - Replicas: 0, + Common: &kfplugins.CommonReplicaSpec{ + Replicas: 0, + }, }, } pytorchResourceHandler := pytorchOperatorResourceHandler{} diff --git a/flyteplugins/go/tasks/plugins/k8s/kfoperators/tensorflow/tensorflow.go b/flyteplugins/go/tasks/plugins/k8s/kfoperators/tensorflow/tensorflow.go index db5fe6a83a8..7c15c3d3443 100644 --- a/flyteplugins/go/tasks/plugins/k8s/kfoperators/tensorflow/tensorflow.go +++ b/flyteplugins/go/tasks/plugins/k8s/kfoperators/tensorflow/tensorflow.go @@ -100,7 +100,7 @@ func (tensorflowOperatorResourceHandler) BuildResource(ctx context.Context, task for t, cfg := range replicaSpecCfgMap { // Short circuit if replica set has no replicas to avoid unnecessarily // generating pod specs - if cfg.GetReplicas() <= 0 { + if cfg.GetCommon().GetReplicas() <= 0 { continue } rs, err := common.ToReplicaSpecWithOverrides(ctx, taskCtx, cfg, kubeflowv1.TFJobDefaultContainerName, false) diff --git a/flyteplugins/go/tasks/plugins/k8s/kfoperators/tensorflow/tensorflow_test.go b/flyteplugins/go/tasks/plugins/k8s/kfoperators/tensorflow/tensorflow_test.go index 80e95871d15..830d8744eb7 100644 --- a/flyteplugins/go/tasks/plugins/k8s/kfoperators/tensorflow/tensorflow_test.go +++ b/flyteplugins/go/tasks/plugins/k8s/kfoperators/tensorflow/tensorflow_test.go @@ -462,16 +462,24 @@ func TestBuildResourceTensorFlowExtendedResources(t *testing.T) { v0TaskTemplate := dummyTensorFlowTaskTemplate("v0", dummyTensorFlowCustomObj(100, 50, 1, 1)) v1TaskTemplate := dummyTensorFlowTaskTemplate("v1", &kfplugins.DistributedTensorflowTrainingTask{ ChiefReplicas: &kfplugins.DistributedTensorflowTrainingReplicaSpec{ - Replicas: 1, + Common: &kfplugins.CommonReplicaSpec{ + Replicas: 1, + }, }, WorkerReplicas: &kfplugins.DistributedTensorflowTrainingReplicaSpec{ - Replicas: 100, + Common: &kfplugins.CommonReplicaSpec{ + Replicas: 100, + }, }, PsReplicas: &kfplugins.DistributedTensorflowTrainingReplicaSpec{ - Replicas: 50, + Common: &kfplugins.CommonReplicaSpec{ + Replicas: 50, + }, }, EvaluatorReplicas: &kfplugins.DistributedTensorflowTrainingReplicaSpec{ - Replicas: 1, + Common: &kfplugins.CommonReplicaSpec{ + Replicas: 1, + }, }, }) v1TaskTemplate.TaskTypeVersion = 1 @@ -640,62 +648,70 @@ func TestReplicaCounts(t *testing.T) { func TestBuildResourceTensorFlowV1(t *testing.T) { taskConfig := &kfplugins.DistributedTensorflowTrainingTask{ ChiefReplicas: &kfplugins.DistributedTensorflowTrainingReplicaSpec{ - Replicas: 1, - Image: testImage, - Resources: &core.Resources{ - Requests: []*core.Resources_ResourceEntry{ - {Name: core.Resources_CPU, Value: "250m"}, - {Name: core.Resources_MEMORY, Value: "1Gi"}, - }, - Limits: []*core.Resources_ResourceEntry{ - {Name: core.Resources_CPU, Value: "500m"}, - {Name: core.Resources_MEMORY, Value: "2Gi"}, + Common: &kfplugins.CommonReplicaSpec{ + Replicas: 1, + Image: testImage, + Resources: &core.Resources{ + Requests: []*core.Resources_ResourceEntry{ + {Name: core.Resources_CPU, Value: "250m"}, + {Name: core.Resources_MEMORY, Value: "1Gi"}, + }, + Limits: []*core.Resources_ResourceEntry{ + {Name: core.Resources_CPU, Value: "500m"}, + {Name: core.Resources_MEMORY, Value: "2Gi"}, + }, }, + RestartPolicy: kfplugins.RestartPolicy_RESTART_POLICY_ALWAYS, }, - RestartPolicy: kfplugins.RestartPolicy_RESTART_POLICY_ALWAYS, }, WorkerReplicas: &kfplugins.DistributedTensorflowTrainingReplicaSpec{ - Replicas: 100, - Resources: &core.Resources{ - Requests: []*core.Resources_ResourceEntry{ - {Name: core.Resources_CPU, Value: "1024m"}, - {Name: core.Resources_MEMORY, Value: "1Gi"}, - {Name: core.Resources_GPU, Value: "1"}, - }, - Limits: []*core.Resources_ResourceEntry{ - {Name: core.Resources_CPU, Value: "2048m"}, - {Name: core.Resources_MEMORY, Value: "2Gi"}, - {Name: core.Resources_GPU, Value: "1"}, + Common: &kfplugins.CommonReplicaSpec{ + Replicas: 100, + Resources: &core.Resources{ + Requests: []*core.Resources_ResourceEntry{ + {Name: core.Resources_CPU, Value: "1024m"}, + {Name: core.Resources_MEMORY, Value: "1Gi"}, + {Name: core.Resources_GPU, Value: "1"}, + }, + Limits: []*core.Resources_ResourceEntry{ + {Name: core.Resources_CPU, Value: "2048m"}, + {Name: core.Resources_MEMORY, Value: "2Gi"}, + {Name: core.Resources_GPU, Value: "1"}, + }, }, }, }, PsReplicas: &kfplugins.DistributedTensorflowTrainingReplicaSpec{ - Replicas: 50, - Resources: &core.Resources{ - Requests: []*core.Resources_ResourceEntry{ - {Name: core.Resources_CPU, Value: "250m"}, - {Name: core.Resources_MEMORY, Value: "1Gi"}, - }, - Limits: []*core.Resources_ResourceEntry{ - {Name: core.Resources_CPU, Value: "500m"}, - {Name: core.Resources_MEMORY, Value: "2Gi"}, + Common: &kfplugins.CommonReplicaSpec{ + Replicas: 50, + Resources: &core.Resources{ + Requests: []*core.Resources_ResourceEntry{ + {Name: core.Resources_CPU, Value: "250m"}, + {Name: core.Resources_MEMORY, Value: "1Gi"}, + }, + Limits: []*core.Resources_ResourceEntry{ + {Name: core.Resources_CPU, Value: "500m"}, + {Name: core.Resources_MEMORY, Value: "2Gi"}, + }, }, }, }, EvaluatorReplicas: &kfplugins.DistributedTensorflowTrainingReplicaSpec{ - Replicas: 1, - Image: testImage, - Resources: &core.Resources{ - Requests: []*core.Resources_ResourceEntry{ - {Name: core.Resources_CPU, Value: "250m"}, - {Name: core.Resources_MEMORY, Value: "1Gi"}, - }, - Limits: []*core.Resources_ResourceEntry{ - {Name: core.Resources_CPU, Value: "500m"}, - {Name: core.Resources_MEMORY, Value: "2Gi"}, + Common: &kfplugins.CommonReplicaSpec{ + Replicas: 1, + Image: testImage, + Resources: &core.Resources{ + Requests: []*core.Resources_ResourceEntry{ + {Name: core.Resources_CPU, Value: "250m"}, + {Name: core.Resources_MEMORY, Value: "1Gi"}, + }, + Limits: []*core.Resources_ResourceEntry{ + {Name: core.Resources_CPU, Value: "500m"}, + {Name: core.Resources_MEMORY, Value: "2Gi"}, + }, }, + RestartPolicy: kfplugins.RestartPolicy_RESTART_POLICY_ALWAYS, }, - RestartPolicy: kfplugins.RestartPolicy_RESTART_POLICY_ALWAYS, }, RunPolicy: &kfplugins.RunPolicy{ CleanPodPolicy: kfplugins.CleanPodPolicy_CLEANPOD_POLICY_ALL, @@ -783,17 +799,19 @@ func TestBuildResourceTensorFlowV1(t *testing.T) { func TestBuildResourceTensorFlowV1WithOnlyWorker(t *testing.T) { taskConfig := &kfplugins.DistributedTensorflowTrainingTask{ WorkerReplicas: &kfplugins.DistributedTensorflowTrainingReplicaSpec{ - Replicas: 100, - Resources: &core.Resources{ - Requests: []*core.Resources_ResourceEntry{ - {Name: core.Resources_CPU, Value: "1024m"}, - {Name: core.Resources_MEMORY, Value: "1Gi"}, - {Name: core.Resources_GPU, Value: "1"}, - }, - Limits: []*core.Resources_ResourceEntry{ - {Name: core.Resources_CPU, Value: "2048m"}, - {Name: core.Resources_MEMORY, Value: "2Gi"}, - {Name: core.Resources_GPU, Value: "1"}, + Common: &kfplugins.CommonReplicaSpec{ + Replicas: 100, + Resources: &core.Resources{ + Requests: []*core.Resources_ResourceEntry{ + {Name: core.Resources_CPU, Value: "1024m"}, + {Name: core.Resources_MEMORY, Value: "1Gi"}, + {Name: core.Resources_GPU, Value: "1"}, + }, + Limits: []*core.Resources_ResourceEntry{ + {Name: core.Resources_CPU, Value: "2048m"}, + {Name: core.Resources_MEMORY, Value: "2Gi"}, + {Name: core.Resources_GPU, Value: "1"}, + }, }, }, }, @@ -859,30 +877,34 @@ func TestBuildResourceTensorFlowV1ResourceTolerations(t *testing.T) { taskConfig := &kfplugins.DistributedTensorflowTrainingTask{ ChiefReplicas: &kfplugins.DistributedTensorflowTrainingReplicaSpec{ - Replicas: 1, - Resources: &core.Resources{ - Requests: []*core.Resources_ResourceEntry{ - {Name: core.Resources_CPU, Value: "250m"}, - {Name: core.Resources_MEMORY, Value: "1Gi"}, - }, - Limits: []*core.Resources_ResourceEntry{ - {Name: core.Resources_CPU, Value: "500m"}, - {Name: core.Resources_MEMORY, Value: "2Gi"}, + Common: &kfplugins.CommonReplicaSpec{ + Replicas: 1, + Resources: &core.Resources{ + Requests: []*core.Resources_ResourceEntry{ + {Name: core.Resources_CPU, Value: "250m"}, + {Name: core.Resources_MEMORY, Value: "1Gi"}, + }, + Limits: []*core.Resources_ResourceEntry{ + {Name: core.Resources_CPU, Value: "500m"}, + {Name: core.Resources_MEMORY, Value: "2Gi"}, + }, }, }, }, WorkerReplicas: &kfplugins.DistributedTensorflowTrainingReplicaSpec{ - Replicas: 100, - Resources: &core.Resources{ - Requests: []*core.Resources_ResourceEntry{ - {Name: core.Resources_CPU, Value: "1024m"}, - {Name: core.Resources_MEMORY, Value: "1Gi"}, - {Name: core.Resources_GPU, Value: "1"}, - }, - Limits: []*core.Resources_ResourceEntry{ - {Name: core.Resources_CPU, Value: "2048m"}, - {Name: core.Resources_MEMORY, Value: "2Gi"}, - {Name: core.Resources_GPU, Value: "1"}, + Common: &kfplugins.CommonReplicaSpec{ + Replicas: 100, + Resources: &core.Resources{ + Requests: []*core.Resources_ResourceEntry{ + {Name: core.Resources_CPU, Value: "1024m"}, + {Name: core.Resources_MEMORY, Value: "1Gi"}, + {Name: core.Resources_GPU, Value: "1"}, + }, + Limits: []*core.Resources_ResourceEntry{ + {Name: core.Resources_CPU, Value: "2048m"}, + {Name: core.Resources_MEMORY, Value: "2Gi"}, + {Name: core.Resources_GPU, Value: "1"}, + }, }, }, },