From 946ca709e846675c079d861e36b9244176dfd846 Mon Sep 17 00:00:00 2001 From: Chi-Sheng Liu Date: Sat, 18 May 2024 19:07:15 +0800 Subject: [PATCH] feat(replica-spec): Update corresponding golang files based on protobuf changes Resolves: flyteorg/flyte#4408 Signed-off-by: Chi-Sheng Liu --- .../k8s/kfoperators/common/common_operator.go | 58 +- .../plugins/k8s/kfoperators/mpi/mpi_test.go | 345 ++++++---- .../k8s/kfoperators/pytorch/pytorch_test.go | 534 +++++++++------ .../k8s/kfoperators/tensorflow/tensorflow.go | 11 +- .../kfoperators/tensorflow/tensorflow_test.go | 634 +++++++++++------- 5 files changed, 1006 insertions(+), 576 deletions(-) diff --git a/flyteplugins/go/tasks/plugins/k8s/kfoperators/common/common_operator.go b/flyteplugins/go/tasks/plugins/k8s/kfoperators/common/common_operator.go index 7e4b32e25e1..9d2e4a5aec7 100644 --- a/flyteplugins/go/tasks/plugins/k8s/kfoperators/common/common_operator.go +++ b/flyteplugins/go/tasks/plugins/k8s/kfoperators/common/common_operator.go @@ -294,6 +294,7 @@ type kfDistributedReplicaSpec interface { GetImage() string GetResources() *core.Resources GetRestartPolicy() kfplugins.RestartPolicy + GetCommon() *kfplugins.CommonReplicaSpec } type allowsCommandOverride interface { @@ -301,9 +302,29 @@ type allowsCommandOverride interface { } func ToReplicaSpecWithOverrides(ctx context.Context, taskCtx pluginsCore.TaskExecutionContext, rs kfDistributedReplicaSpec, primaryContainerName string, isMaster bool) (*commonOp.ReplicaSpec, error) { + var replicas int32 + var image string + var resources *core.Resources + var restartPolicy kfplugins.RestartPolicy + + // replicas, image, resources, restartPolicy are deprecated since the common replica spec is introduced. + // Therefore, if the common replica spec is set, use that to get the common fields + common := rs.GetCommon() + if common != nil { + replicas = common.GetReplicas() + image = common.GetImage() + resources = common.GetResources() + restartPolicy = common.GetRestartPolicy() + } else { + replicas = rs.GetReplicas() + image = rs.GetImage() + resources = rs.GetResources() + restartPolicy = rs.GetRestartPolicy() + } + taskCtxOptions := []flytek8s.PluginTaskExecutionContextOption{} - if rs != nil && rs.GetResources() != nil { - resources, err := flytek8s.ToK8sResourceRequirements(rs.GetResources()) + if resources != nil { + resources, err := flytek8s.ToK8sResourceRequirements(resources) if err != nil { return nil, flyteerr.Errorf(flyteerr.BadTaskSpecification, "invalid TaskSpecification on Resources [%v], Err: [%v]", resources, err.Error()) } @@ -321,26 +342,23 @@ func ToReplicaSpecWithOverrides(ctx context.Context, taskCtx pluginsCore.TaskExe replicaSpec.Replicas = &replicas } - if rs != nil { - var command []string - if v, ok := rs.(allowsCommandOverride); ok { - command = v.GetCommand() - } - if err := OverrideContainerSpec( - &replicaSpec.Template.Spec, - primaryContainerName, - rs.GetImage(), - command, - ); err != nil { - return nil, err - } + var command []string + if v, ok := rs.(allowsCommandOverride); ok { + command = v.GetCommand() + } + if err := OverrideContainerSpec( + &replicaSpec.Template.Spec, + primaryContainerName, + image, + command, + ); err != nil { + return nil, err + } - replicaSpec.RestartPolicy = ParseRestartPolicy(rs.GetRestartPolicy()) + replicaSpec.RestartPolicy = ParseRestartPolicy(restartPolicy) - if !isMaster { - replicas := rs.GetReplicas() - replicaSpec.Replicas = &replicas - } + if !isMaster { + replicaSpec.Replicas = &replicas } return replicaSpec, nil diff --git a/flyteplugins/go/tasks/plugins/k8s/kfoperators/mpi/mpi_test.go b/flyteplugins/go/tasks/plugins/k8s/kfoperators/mpi/mpi_test.go index 3c823510aca..900091f78a3 100644 --- a/flyteplugins/go/tasks/plugins/k8s/kfoperators/mpi/mpi_test.go +++ b/flyteplugins/go/tasks/plugins/k8s/kfoperators/mpi/mpi_test.go @@ -594,129 +594,191 @@ func TestReplicaCounts(t *testing.T) { func TestBuildResourceMPIV1(t *testing.T) { launcherCommand := []string{"python", "launcher.py"} workerCommand := []string{"/usr/sbin/sshd", "/.sshd_config"} - taskConfig := &kfplugins.DistributedMPITrainingTask{ - LauncherReplicas: &kfplugins.DistributedMPITrainingReplicaSpec{ - Image: testImage, - Resources: &core.Resources{ - Requests: []*core.Resources_ResourceEntry{ - {Name: core.Resources_CPU, Value: "250m"}, - {Name: core.Resources_MEMORY, Value: "250Mi"}, + taskConfigs := []*kfplugins.DistributedMPITrainingTask{ + { + LauncherReplicas: &kfplugins.DistributedMPITrainingReplicaSpec{ + Image: testImage, + Resources: &core.Resources{ + Requests: []*core.Resources_ResourceEntry{ + {Name: core.Resources_CPU, Value: "250m"}, + {Name: core.Resources_MEMORY, Value: "250Mi"}, + }, + Limits: []*core.Resources_ResourceEntry{ + {Name: core.Resources_CPU, Value: "500m"}, + {Name: core.Resources_MEMORY, Value: "500Mi"}, + }, }, - Limits: []*core.Resources_ResourceEntry{ - {Name: core.Resources_CPU, Value: "500m"}, - {Name: core.Resources_MEMORY, Value: "500Mi"}, + Command: launcherCommand, + }, + WorkerReplicas: &kfplugins.DistributedMPITrainingReplicaSpec{ + Replicas: 100, + Resources: &core.Resources{ + Requests: []*core.Resources_ResourceEntry{ + {Name: core.Resources_CPU, Value: "1024m"}, + {Name: core.Resources_MEMORY, Value: "1Gi"}, + }, + Limits: []*core.Resources_ResourceEntry{ + {Name: core.Resources_CPU, Value: "2048m"}, + {Name: core.Resources_MEMORY, Value: "2Gi"}, + }, }, + Command: workerCommand, }, - Command: launcherCommand, + Slots: int32(1), }, - WorkerReplicas: &kfplugins.DistributedMPITrainingReplicaSpec{ - Replicas: 100, - Resources: &core.Resources{ - Requests: []*core.Resources_ResourceEntry{ - {Name: core.Resources_CPU, Value: "1024m"}, - {Name: core.Resources_MEMORY, Value: "1Gi"}, + { + LauncherReplicas: &kfplugins.DistributedMPITrainingReplicaSpec{ + Common: &kfplugins.CommonReplicaSpec{ + Image: testImage, + Resources: &core.Resources{ + Requests: []*core.Resources_ResourceEntry{ + {Name: core.Resources_CPU, Value: "250m"}, + {Name: core.Resources_MEMORY, Value: "250Mi"}, + }, + Limits: []*core.Resources_ResourceEntry{ + {Name: core.Resources_CPU, Value: "500m"}, + {Name: core.Resources_MEMORY, Value: "500Mi"}, + }, + }, }, - Limits: []*core.Resources_ResourceEntry{ - {Name: core.Resources_CPU, Value: "2048m"}, - {Name: core.Resources_MEMORY, Value: "2Gi"}, + Command: launcherCommand, + }, + WorkerReplicas: &kfplugins.DistributedMPITrainingReplicaSpec{ + Common: &kfplugins.CommonReplicaSpec{ + Replicas: 100, + Resources: &core.Resources{ + Requests: []*core.Resources_ResourceEntry{ + {Name: core.Resources_CPU, Value: "1024m"}, + {Name: core.Resources_MEMORY, Value: "1Gi"}, + }, + Limits: []*core.Resources_ResourceEntry{ + {Name: core.Resources_CPU, Value: "2048m"}, + {Name: core.Resources_MEMORY, Value: "2Gi"}, + }, + }, }, + Command: workerCommand, }, - Command: workerCommand, + Slots: int32(1), }, - Slots: int32(1), } - launcherResourceRequirements := &corev1.ResourceRequirements{ - Requests: corev1.ResourceList{ - corev1.ResourceCPU: resource.MustParse("250m"), - corev1.ResourceMemory: resource.MustParse("250Mi"), - }, - Limits: corev1.ResourceList{ - corev1.ResourceCPU: resource.MustParse("500m"), - corev1.ResourceMemory: resource.MustParse("500Mi"), - }, - } + for _, taskConfig := range taskConfigs { + launcherResourceRequirements := &corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("250m"), + corev1.ResourceMemory: resource.MustParse("250Mi"), + }, + Limits: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("500m"), + corev1.ResourceMemory: resource.MustParse("500Mi"), + }, + } - workerResourceRequirements := &corev1.ResourceRequirements{ - Requests: corev1.ResourceList{ - corev1.ResourceCPU: resource.MustParse("1024m"), - corev1.ResourceMemory: resource.MustParse("1Gi"), - }, - Limits: corev1.ResourceList{ - corev1.ResourceCPU: resource.MustParse("2048m"), - corev1.ResourceMemory: resource.MustParse("2Gi"), - }, - } + workerResourceRequirements := &corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("1024m"), + corev1.ResourceMemory: resource.MustParse("1Gi"), + }, + Limits: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("2048m"), + corev1.ResourceMemory: resource.MustParse("2Gi"), + }, + } - mpiResourceHandler := mpiOperatorResourceHandler{} + mpiResourceHandler := mpiOperatorResourceHandler{} - taskTemplate := dummyMPITaskTemplate(mpiID2, taskConfig) - taskTemplate.TaskTypeVersion = 1 + taskTemplate := dummyMPITaskTemplate(mpiID2, taskConfig) + taskTemplate.TaskTypeVersion = 1 - resource, err := mpiResourceHandler.BuildResource(context.TODO(), dummyMPITaskContext(taskTemplate, resourceRequirements, nil)) - assert.NoError(t, err) - assert.NotNil(t, resource) + resource, err := mpiResourceHandler.BuildResource(context.TODO(), dummyMPITaskContext(taskTemplate, resourceRequirements, nil)) + assert.NoError(t, err) + assert.NotNil(t, resource) - mpiJob, ok := resource.(*kubeflowv1.MPIJob) - assert.True(t, ok) - assert.Equal(t, int32(1), *mpiJob.Spec.MPIReplicaSpecs[kubeflowv1.MPIJobReplicaTypeLauncher].Replicas) - assert.Equal(t, int32(100), *mpiJob.Spec.MPIReplicaSpecs[kubeflowv1.MPIJobReplicaTypeWorker].Replicas) - assert.Equal(t, int32(1), *mpiJob.Spec.SlotsPerWorker) - assert.Equal(t, *launcherResourceRequirements, mpiJob.Spec.MPIReplicaSpecs[kubeflowv1.MPIJobReplicaTypeLauncher].Template.Spec.Containers[0].Resources) - assert.Equal(t, *workerResourceRequirements, mpiJob.Spec.MPIReplicaSpecs[kubeflowv1.MPIJobReplicaTypeWorker].Template.Spec.Containers[0].Resources) - assert.Equal(t, launcherCommand, mpiJob.Spec.MPIReplicaSpecs[kubeflowv1.MPIJobReplicaTypeLauncher].Template.Spec.Containers[0].Args) - assert.Equal(t, workerCommand, mpiJob.Spec.MPIReplicaSpecs[kubeflowv1.MPIJobReplicaTypeWorker].Template.Spec.Containers[0].Args) + mpiJob, ok := resource.(*kubeflowv1.MPIJob) + assert.True(t, ok) + assert.Equal(t, int32(1), *mpiJob.Spec.MPIReplicaSpecs[kubeflowv1.MPIJobReplicaTypeLauncher].Replicas) + assert.Equal(t, int32(100), *mpiJob.Spec.MPIReplicaSpecs[kubeflowv1.MPIJobReplicaTypeWorker].Replicas) + assert.Equal(t, int32(1), *mpiJob.Spec.SlotsPerWorker) + assert.Equal(t, *launcherResourceRequirements, mpiJob.Spec.MPIReplicaSpecs[kubeflowv1.MPIJobReplicaTypeLauncher].Template.Spec.Containers[0].Resources) + assert.Equal(t, *workerResourceRequirements, mpiJob.Spec.MPIReplicaSpecs[kubeflowv1.MPIJobReplicaTypeWorker].Template.Spec.Containers[0].Resources) + assert.Equal(t, launcherCommand, mpiJob.Spec.MPIReplicaSpecs[kubeflowv1.MPIJobReplicaTypeLauncher].Template.Spec.Containers[0].Args) + assert.Equal(t, workerCommand, mpiJob.Spec.MPIReplicaSpecs[kubeflowv1.MPIJobReplicaTypeWorker].Template.Spec.Containers[0].Args) + } } func TestBuildResourceMPIV1WithOnlyWorkerReplica(t *testing.T) { workerCommand := []string{"/usr/sbin/sshd", "/.sshd_config"} - taskConfig := &kfplugins.DistributedMPITrainingTask{ - WorkerReplicas: &kfplugins.DistributedMPITrainingReplicaSpec{ - Replicas: 100, - Resources: &core.Resources{ - Requests: []*core.Resources_ResourceEntry{ - {Name: core.Resources_CPU, Value: "1024m"}, - {Name: core.Resources_MEMORY, Value: "1Gi"}, + taskConfigs := []*kfplugins.DistributedMPITrainingTask{ + { + WorkerReplicas: &kfplugins.DistributedMPITrainingReplicaSpec{ + Replicas: 100, + Resources: &core.Resources{ + Requests: []*core.Resources_ResourceEntry{ + {Name: core.Resources_CPU, Value: "1024m"}, + {Name: core.Resources_MEMORY, Value: "1Gi"}, + }, + Limits: []*core.Resources_ResourceEntry{ + {Name: core.Resources_CPU, Value: "2048m"}, + {Name: core.Resources_MEMORY, Value: "2Gi"}, + }, }, - Limits: []*core.Resources_ResourceEntry{ - {Name: core.Resources_CPU, Value: "2048m"}, - {Name: core.Resources_MEMORY, Value: "2Gi"}, + Command: []string{"/usr/sbin/sshd", "/.sshd_config"}, + }, + Slots: int32(1), + }, + { + WorkerReplicas: &kfplugins.DistributedMPITrainingReplicaSpec{ + Common: &kfplugins.CommonReplicaSpec{ + Replicas: 100, + Resources: &core.Resources{ + Requests: []*core.Resources_ResourceEntry{ + {Name: core.Resources_CPU, Value: "1024m"}, + {Name: core.Resources_MEMORY, Value: "1Gi"}, + }, + Limits: []*core.Resources_ResourceEntry{ + {Name: core.Resources_CPU, Value: "2048m"}, + {Name: core.Resources_MEMORY, Value: "2Gi"}, + }, + }, }, + Command: []string{"/usr/sbin/sshd", "/.sshd_config"}, }, - Command: []string{"/usr/sbin/sshd", "/.sshd_config"}, + Slots: int32(1), }, - Slots: int32(1), } - workerResourceRequirements := &corev1.ResourceRequirements{ - Requests: corev1.ResourceList{ - corev1.ResourceCPU: resource.MustParse("1024m"), - corev1.ResourceMemory: resource.MustParse("1Gi"), - }, - Limits: corev1.ResourceList{ - corev1.ResourceCPU: resource.MustParse("2048m"), - corev1.ResourceMemory: resource.MustParse("2Gi"), - }, - } + for _, taskConfig := range taskConfigs { + workerResourceRequirements := &corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("1024m"), + corev1.ResourceMemory: resource.MustParse("1Gi"), + }, + Limits: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("2048m"), + corev1.ResourceMemory: resource.MustParse("2Gi"), + }, + } - mpiResourceHandler := mpiOperatorResourceHandler{} + mpiResourceHandler := mpiOperatorResourceHandler{} - taskTemplate := dummyMPITaskTemplate(mpiID2, taskConfig) - taskTemplate.TaskTypeVersion = 1 + taskTemplate := dummyMPITaskTemplate(mpiID2, taskConfig) + taskTemplate.TaskTypeVersion = 1 - resource, err := mpiResourceHandler.BuildResource(context.TODO(), dummyMPITaskContext(taskTemplate, resourceRequirements, nil)) - assert.NoError(t, err) - assert.NotNil(t, resource) + resource, err := mpiResourceHandler.BuildResource(context.TODO(), dummyMPITaskContext(taskTemplate, resourceRequirements, nil)) + assert.NoError(t, err) + assert.NotNil(t, resource) - mpiJob, ok := resource.(*kubeflowv1.MPIJob) - assert.True(t, ok) - assert.Equal(t, int32(1), *mpiJob.Spec.MPIReplicaSpecs[kubeflowv1.MPIJobReplicaTypeLauncher].Replicas) - assert.Equal(t, int32(100), *mpiJob.Spec.MPIReplicaSpecs[kubeflowv1.MPIJobReplicaTypeWorker].Replicas) - assert.Equal(t, int32(1), *mpiJob.Spec.SlotsPerWorker) - assert.Equal(t, *workerResourceRequirements, mpiJob.Spec.MPIReplicaSpecs[kubeflowv1.MPIJobReplicaTypeWorker].Template.Spec.Containers[0].Resources) - assert.Equal(t, testArgs, mpiJob.Spec.MPIReplicaSpecs[kubeflowv1.MPIJobReplicaTypeLauncher].Template.Spec.Containers[0].Args) - assert.Equal(t, workerCommand, mpiJob.Spec.MPIReplicaSpecs[kubeflowv1.MPIJobReplicaTypeWorker].Template.Spec.Containers[0].Args) + mpiJob, ok := resource.(*kubeflowv1.MPIJob) + assert.True(t, ok) + assert.Equal(t, int32(1), *mpiJob.Spec.MPIReplicaSpecs[kubeflowv1.MPIJobReplicaTypeLauncher].Replicas) + assert.Equal(t, int32(100), *mpiJob.Spec.MPIReplicaSpecs[kubeflowv1.MPIJobReplicaTypeWorker].Replicas) + assert.Equal(t, int32(1), *mpiJob.Spec.SlotsPerWorker) + assert.Equal(t, *workerResourceRequirements, mpiJob.Spec.MPIReplicaSpecs[kubeflowv1.MPIJobReplicaTypeWorker].Template.Spec.Containers[0].Resources) + assert.Equal(t, testArgs, mpiJob.Spec.MPIReplicaSpecs[kubeflowv1.MPIJobReplicaTypeLauncher].Template.Spec.Containers[0].Args) + assert.Equal(t, workerCommand, mpiJob.Spec.MPIReplicaSpecs[kubeflowv1.MPIJobReplicaTypeWorker].Template.Spec.Containers[0].Args) + } } func TestBuildResourceMPIV1ResourceTolerations(t *testing.T) { @@ -733,50 +795,87 @@ func TestBuildResourceMPIV1ResourceTolerations(t *testing.T) { }, })) - taskConfig := &kfplugins.DistributedMPITrainingTask{ - LauncherReplicas: &kfplugins.DistributedMPITrainingReplicaSpec{ - Resources: &core.Resources{ - Requests: []*core.Resources_ResourceEntry{ - {Name: core.Resources_CPU, Value: "250m"}, - {Name: core.Resources_MEMORY, Value: "250Mi"}, + taskConfigs := []*kfplugins.DistributedMPITrainingTask{ + { + LauncherReplicas: &kfplugins.DistributedMPITrainingReplicaSpec{ + Resources: &core.Resources{ + Requests: []*core.Resources_ResourceEntry{ + {Name: core.Resources_CPU, Value: "250m"}, + {Name: core.Resources_MEMORY, Value: "250Mi"}, + }, + Limits: []*core.Resources_ResourceEntry{ + {Name: core.Resources_CPU, Value: "500m"}, + {Name: core.Resources_MEMORY, Value: "500Mi"}, + }, }, - Limits: []*core.Resources_ResourceEntry{ - {Name: core.Resources_CPU, Value: "500m"}, - {Name: core.Resources_MEMORY, Value: "500Mi"}, + }, + WorkerReplicas: &kfplugins.DistributedMPITrainingReplicaSpec{ + Replicas: 100, + Resources: &core.Resources{ + Requests: []*core.Resources_ResourceEntry{ + {Name: core.Resources_CPU, Value: "1024m"}, + {Name: core.Resources_MEMORY, Value: "1Gi"}, + {Name: core.Resources_GPU, Value: "1"}, + }, + Limits: []*core.Resources_ResourceEntry{ + {Name: core.Resources_CPU, Value: "2048m"}, + {Name: core.Resources_MEMORY, Value: "2Gi"}, + {Name: core.Resources_GPU, Value: "1"}, + }, }, }, }, - WorkerReplicas: &kfplugins.DistributedMPITrainingReplicaSpec{ - Replicas: 100, - Resources: &core.Resources{ - Requests: []*core.Resources_ResourceEntry{ - {Name: core.Resources_CPU, Value: "1024m"}, - {Name: core.Resources_MEMORY, Value: "1Gi"}, - {Name: core.Resources_GPU, Value: "1"}, + { + LauncherReplicas: &kfplugins.DistributedMPITrainingReplicaSpec{ + Common: &kfplugins.CommonReplicaSpec{ + Resources: &core.Resources{ + Requests: []*core.Resources_ResourceEntry{ + {Name: core.Resources_CPU, Value: "250m"}, + {Name: core.Resources_MEMORY, Value: "250Mi"}, + }, + Limits: []*core.Resources_ResourceEntry{ + {Name: core.Resources_CPU, Value: "500m"}, + {Name: core.Resources_MEMORY, Value: "500Mi"}, + }, + }, }, - Limits: []*core.Resources_ResourceEntry{ - {Name: core.Resources_CPU, Value: "2048m"}, - {Name: core.Resources_MEMORY, Value: "2Gi"}, - {Name: core.Resources_GPU, Value: "1"}, + }, + WorkerReplicas: &kfplugins.DistributedMPITrainingReplicaSpec{ + Common: &kfplugins.CommonReplicaSpec{ + Replicas: 100, + Resources: &core.Resources{ + Requests: []*core.Resources_ResourceEntry{ + {Name: core.Resources_CPU, Value: "1024m"}, + {Name: core.Resources_MEMORY, Value: "1Gi"}, + {Name: core.Resources_GPU, Value: "1"}, + }, + Limits: []*core.Resources_ResourceEntry{ + {Name: core.Resources_CPU, Value: "2048m"}, + {Name: core.Resources_MEMORY, Value: "2Gi"}, + {Name: core.Resources_GPU, Value: "1"}, + }, + }, }, }, }, } - mpiResourceHandler := mpiOperatorResourceHandler{} + for _, taskConfig := range taskConfigs { + mpiResourceHandler := mpiOperatorResourceHandler{} - taskTemplate := dummyMPITaskTemplate(mpiID2, taskConfig) - taskTemplate.TaskTypeVersion = 1 + taskTemplate := dummyMPITaskTemplate(mpiID2, taskConfig) + taskTemplate.TaskTypeVersion = 1 - resource, err := mpiResourceHandler.BuildResource(context.TODO(), dummyMPITaskContext(taskTemplate, resourceRequirements, nil)) - assert.NoError(t, err) - assert.NotNil(t, resource) + resource, err := mpiResourceHandler.BuildResource(context.TODO(), dummyMPITaskContext(taskTemplate, resourceRequirements, nil)) + assert.NoError(t, err) + assert.NotNil(t, resource) - mpiJob, ok := resource.(*kubeflowv1.MPIJob) - assert.True(t, ok) + mpiJob, ok := resource.(*kubeflowv1.MPIJob) + assert.True(t, ok) - assert.NotContains(t, mpiJob.Spec.MPIReplicaSpecs[kubeflowv1.MPIJobReplicaTypeLauncher].Template.Spec.Tolerations, gpuToleration) - assert.Contains(t, mpiJob.Spec.MPIReplicaSpecs[kubeflowv1.MPIJobReplicaTypeWorker].Template.Spec.Tolerations, gpuToleration) + assert.NotContains(t, mpiJob.Spec.MPIReplicaSpecs[kubeflowv1.MPIJobReplicaTypeLauncher].Template.Spec.Tolerations, gpuToleration) + assert.Contains(t, mpiJob.Spec.MPIReplicaSpecs[kubeflowv1.MPIJobReplicaTypeWorker].Template.Spec.Tolerations, gpuToleration) + } } func TestGetReplicaCount(t *testing.T) { diff --git a/flyteplugins/go/tasks/plugins/k8s/kfoperators/pytorch/pytorch_test.go b/flyteplugins/go/tasks/plugins/k8s/kfoperators/pytorch/pytorch_test.go index aea096e4ba8..f40f80e1f74 100644 --- a/flyteplugins/go/tasks/plugins/k8s/kfoperators/pytorch/pytorch_test.go +++ b/flyteplugins/go/tasks/plugins/k8s/kfoperators/pytorch/pytorch_test.go @@ -727,187 +727,264 @@ func TestReplicaCounts(t *testing.T) { } func TestBuildResourcePytorchV1(t *testing.T) { - taskConfig := &kfplugins.DistributedPyTorchTrainingTask{ - MasterReplicas: &kfplugins.DistributedPyTorchTrainingReplicaSpec{ - Image: testImageMaster, - Resources: &core.Resources{ - Requests: []*core.Resources_ResourceEntry{ - {Name: core.Resources_CPU, Value: "250m"}, - {Name: core.Resources_MEMORY, Value: "250Mi"}, + taskConfigs := []*kfplugins.DistributedPyTorchTrainingTask{ + { + MasterReplicas: &kfplugins.DistributedPyTorchTrainingReplicaSpec{ + Image: testImageMaster, + Resources: &core.Resources{ + Requests: []*core.Resources_ResourceEntry{ + {Name: core.Resources_CPU, Value: "250m"}, + {Name: core.Resources_MEMORY, Value: "250Mi"}, + }, + Limits: []*core.Resources_ResourceEntry{ + {Name: core.Resources_CPU, Value: "500m"}, + {Name: core.Resources_MEMORY, Value: "500Mi"}, + }, }, - Limits: []*core.Resources_ResourceEntry{ - {Name: core.Resources_CPU, Value: "500m"}, - {Name: core.Resources_MEMORY, Value: "500Mi"}, + RestartPolicy: kfplugins.RestartPolicy_RESTART_POLICY_ALWAYS, + }, + WorkerReplicas: &kfplugins.DistributedPyTorchTrainingReplicaSpec{ + Replicas: 100, + Resources: &core.Resources{ + Requests: []*core.Resources_ResourceEntry{ + {Name: core.Resources_CPU, Value: "1024m"}, + {Name: core.Resources_MEMORY, Value: "1Gi"}, + }, + Limits: []*core.Resources_ResourceEntry{ + {Name: core.Resources_CPU, Value: "2048m"}, + {Name: core.Resources_MEMORY, Value: "2Gi"}, + }, }, }, - RestartPolicy: kfplugins.RestartPolicy_RESTART_POLICY_ALWAYS, }, - WorkerReplicas: &kfplugins.DistributedPyTorchTrainingReplicaSpec{ - Replicas: 100, - Resources: &core.Resources{ - Requests: []*core.Resources_ResourceEntry{ - {Name: core.Resources_CPU, Value: "1024m"}, - {Name: core.Resources_MEMORY, Value: "1Gi"}, + { + MasterReplicas: &kfplugins.DistributedPyTorchTrainingReplicaSpec{ + Common: &kfplugins.CommonReplicaSpec{ + Image: testImageMaster, + Resources: &core.Resources{ + Requests: []*core.Resources_ResourceEntry{ + {Name: core.Resources_CPU, Value: "250m"}, + {Name: core.Resources_MEMORY, Value: "250Mi"}, + }, + Limits: []*core.Resources_ResourceEntry{ + {Name: core.Resources_CPU, Value: "500m"}, + {Name: core.Resources_MEMORY, Value: "500Mi"}, + }, + }, + RestartPolicy: kfplugins.RestartPolicy_RESTART_POLICY_ALWAYS, }, - Limits: []*core.Resources_ResourceEntry{ - {Name: core.Resources_CPU, Value: "2048m"}, - {Name: core.Resources_MEMORY, Value: "2Gi"}, + }, + WorkerReplicas: &kfplugins.DistributedPyTorchTrainingReplicaSpec{ + Common: &kfplugins.CommonReplicaSpec{ + Replicas: 100, + Resources: &core.Resources{ + Requests: []*core.Resources_ResourceEntry{ + {Name: core.Resources_CPU, Value: "1024m"}, + {Name: core.Resources_MEMORY, Value: "1Gi"}, + }, + Limits: []*core.Resources_ResourceEntry{ + {Name: core.Resources_CPU, Value: "2048m"}, + {Name: core.Resources_MEMORY, Value: "2Gi"}, + }, + }, }, }, }, } - masterResourceRequirements := &corev1.ResourceRequirements{ - Requests: corev1.ResourceList{ - corev1.ResourceCPU: resource.MustParse("250m"), - corev1.ResourceMemory: resource.MustParse("250Mi"), - }, - Limits: corev1.ResourceList{ - corev1.ResourceCPU: resource.MustParse("500m"), - corev1.ResourceMemory: resource.MustParse("500Mi"), - }, - } + for _, taskConfig := range taskConfigs { + masterResourceRequirements := &corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("250m"), + corev1.ResourceMemory: resource.MustParse("250Mi"), + }, + Limits: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("500m"), + corev1.ResourceMemory: resource.MustParse("500Mi"), + }, + } - workerResourceRequirements := &corev1.ResourceRequirements{ - Requests: corev1.ResourceList{ - corev1.ResourceCPU: resource.MustParse("1024m"), - corev1.ResourceMemory: resource.MustParse("1Gi"), - }, - Limits: corev1.ResourceList{ - corev1.ResourceCPU: resource.MustParse("2048m"), - corev1.ResourceMemory: resource.MustParse("2Gi"), - }, - } + workerResourceRequirements := &corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("1024m"), + corev1.ResourceMemory: resource.MustParse("1Gi"), + }, + Limits: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("2048m"), + corev1.ResourceMemory: resource.MustParse("2Gi"), + }, + } - pytorchResourceHandler := pytorchOperatorResourceHandler{} + pytorchResourceHandler := pytorchOperatorResourceHandler{} - taskTemplate := dummyPytorchTaskTemplate("job4", taskConfig) - taskTemplate.TaskTypeVersion = 1 + taskTemplate := dummyPytorchTaskTemplate("job4", taskConfig) + taskTemplate.TaskTypeVersion = 1 - res, err := pytorchResourceHandler.BuildResource(context.TODO(), dummyPytorchTaskContext(taskTemplate, resourceRequirements, nil, "")) - assert.NoError(t, err) - assert.NotNil(t, res) + res, err := pytorchResourceHandler.BuildResource(context.TODO(), dummyPytorchTaskContext(taskTemplate, resourceRequirements, nil, "")) + assert.NoError(t, err) + assert.NotNil(t, res) - pytorchJob, ok := res.(*kubeflowv1.PyTorchJob) - assert.True(t, ok) + pytorchJob, ok := res.(*kubeflowv1.PyTorchJob) + assert.True(t, ok) - assert.Equal(t, int32(100), *pytorchJob.Spec.PyTorchReplicaSpecs[kubeflowv1.PyTorchJobReplicaTypeWorker].Replicas) - assert.Equal(t, int32(1), *pytorchJob.Spec.PyTorchReplicaSpecs[kubeflowv1.PyTorchJobReplicaTypeMaster].Replicas) + assert.Equal(t, int32(100), *pytorchJob.Spec.PyTorchReplicaSpecs[kubeflowv1.PyTorchJobReplicaTypeWorker].Replicas) + assert.Equal(t, int32(1), *pytorchJob.Spec.PyTorchReplicaSpecs[kubeflowv1.PyTorchJobReplicaTypeMaster].Replicas) - assert.Equal(t, testImageMaster, pytorchJob.Spec.PyTorchReplicaSpecs[kubeflowv1.PyTorchJobReplicaTypeMaster].Template.Spec.Containers[0].Image) - assert.Equal(t, testImage, pytorchJob.Spec.PyTorchReplicaSpecs[kubeflowv1.PyTorchJobReplicaTypeWorker].Template.Spec.Containers[0].Image) + assert.Equal(t, testImageMaster, pytorchJob.Spec.PyTorchReplicaSpecs[kubeflowv1.PyTorchJobReplicaTypeMaster].Template.Spec.Containers[0].Image) + assert.Equal(t, testImage, pytorchJob.Spec.PyTorchReplicaSpecs[kubeflowv1.PyTorchJobReplicaTypeWorker].Template.Spec.Containers[0].Image) - assert.Equal(t, *masterResourceRequirements, pytorchJob.Spec.PyTorchReplicaSpecs[kubeflowv1.PyTorchJobReplicaTypeMaster].Template.Spec.Containers[0].Resources) - assert.Equal(t, *workerResourceRequirements, pytorchJob.Spec.PyTorchReplicaSpecs[kubeflowv1.PyTorchJobReplicaTypeWorker].Template.Spec.Containers[0].Resources) + assert.Equal(t, *masterResourceRequirements, pytorchJob.Spec.PyTorchReplicaSpecs[kubeflowv1.PyTorchJobReplicaTypeMaster].Template.Spec.Containers[0].Resources) + assert.Equal(t, *workerResourceRequirements, pytorchJob.Spec.PyTorchReplicaSpecs[kubeflowv1.PyTorchJobReplicaTypeWorker].Template.Spec.Containers[0].Resources) - assert.Equal(t, commonOp.RestartPolicyAlways, pytorchJob.Spec.PyTorchReplicaSpecs[kubeflowv1.PyTorchJobReplicaTypeMaster].RestartPolicy) - assert.Equal(t, commonOp.RestartPolicyNever, pytorchJob.Spec.PyTorchReplicaSpecs[kubeflowv1.PyTorchJobReplicaTypeWorker].RestartPolicy) + assert.Equal(t, commonOp.RestartPolicyAlways, pytorchJob.Spec.PyTorchReplicaSpecs[kubeflowv1.PyTorchJobReplicaTypeMaster].RestartPolicy) + assert.Equal(t, commonOp.RestartPolicyNever, pytorchJob.Spec.PyTorchReplicaSpecs[kubeflowv1.PyTorchJobReplicaTypeWorker].RestartPolicy) - assert.Nil(t, pytorchJob.Spec.RunPolicy.CleanPodPolicy) - assert.Nil(t, pytorchJob.Spec.RunPolicy.BackoffLimit) - assert.Nil(t, pytorchJob.Spec.RunPolicy.TTLSecondsAfterFinished) - assert.Nil(t, pytorchJob.Spec.RunPolicy.ActiveDeadlineSeconds) + assert.Nil(t, pytorchJob.Spec.RunPolicy.CleanPodPolicy) + assert.Nil(t, pytorchJob.Spec.RunPolicy.BackoffLimit) + assert.Nil(t, pytorchJob.Spec.RunPolicy.TTLSecondsAfterFinished) + assert.Nil(t, pytorchJob.Spec.RunPolicy.ActiveDeadlineSeconds) - assert.Nil(t, pytorchJob.Spec.ElasticPolicy) + assert.Nil(t, pytorchJob.Spec.ElasticPolicy) + } } func TestBuildResourcePytorchV1WithRunPolicy(t *testing.T) { - taskConfig := &kfplugins.DistributedPyTorchTrainingTask{ - WorkerReplicas: &kfplugins.DistributedPyTorchTrainingReplicaSpec{ - Replicas: 100, + taskConfigs := []*kfplugins.DistributedPyTorchTrainingTask{ + { + WorkerReplicas: &kfplugins.DistributedPyTorchTrainingReplicaSpec{ + Replicas: 100, + }, + RunPolicy: &kfplugins.RunPolicy{ + CleanPodPolicy: kfplugins.CleanPodPolicy_CLEANPOD_POLICY_ALL, + BackoffLimit: 100, + ActiveDeadlineSeconds: 1000, + TtlSecondsAfterFinished: 10000, + }, }, - RunPolicy: &kfplugins.RunPolicy{ - CleanPodPolicy: kfplugins.CleanPodPolicy_CLEANPOD_POLICY_ALL, - BackoffLimit: 100, - ActiveDeadlineSeconds: 1000, - TtlSecondsAfterFinished: 10000, + { + WorkerReplicas: &kfplugins.DistributedPyTorchTrainingReplicaSpec{ + Common: &kfplugins.CommonReplicaSpec{ + Replicas: 100, + }, + }, + RunPolicy: &kfplugins.RunPolicy{ + CleanPodPolicy: kfplugins.CleanPodPolicy_CLEANPOD_POLICY_ALL, + BackoffLimit: 100, + ActiveDeadlineSeconds: 1000, + TtlSecondsAfterFinished: 10000, + }, }, } - pytorchResourceHandler := pytorchOperatorResourceHandler{} - taskTemplate := dummyPytorchTaskTemplate("job5", taskConfig) - taskTemplate.TaskTypeVersion = 1 + for _, taskConfig := range taskConfigs { + pytorchResourceHandler := pytorchOperatorResourceHandler{} - res, err := pytorchResourceHandler.BuildResource(context.TODO(), dummyPytorchTaskContext(taskTemplate, resourceRequirements, nil, "")) - assert.NoError(t, err) - assert.NotNil(t, res) + taskTemplate := dummyPytorchTaskTemplate("job5", taskConfig) + taskTemplate.TaskTypeVersion = 1 - pytorchJob, ok := res.(*kubeflowv1.PyTorchJob) - assert.True(t, ok) - assert.Equal(t, int32(100), *pytorchJob.Spec.PyTorchReplicaSpecs[kubeflowv1.PyTorchJobReplicaTypeWorker].Replicas) - assert.Equal(t, int32(1), *pytorchJob.Spec.PyTorchReplicaSpecs[kubeflowv1.PyTorchJobReplicaTypeMaster].Replicas) - assert.Equal(t, commonOp.CleanPodPolicyAll, *pytorchJob.Spec.RunPolicy.CleanPodPolicy) - assert.Equal(t, int32(100), *pytorchJob.Spec.RunPolicy.BackoffLimit) - assert.Equal(t, int64(1000), *pytorchJob.Spec.RunPolicy.ActiveDeadlineSeconds) - assert.Equal(t, int32(10000), *pytorchJob.Spec.RunPolicy.TTLSecondsAfterFinished) + res, err := pytorchResourceHandler.BuildResource(context.TODO(), dummyPytorchTaskContext(taskTemplate, resourceRequirements, nil, "")) + assert.NoError(t, err) + assert.NotNil(t, res) + + pytorchJob, ok := res.(*kubeflowv1.PyTorchJob) + assert.True(t, ok) + assert.Equal(t, int32(100), *pytorchJob.Spec.PyTorchReplicaSpecs[kubeflowv1.PyTorchJobReplicaTypeWorker].Replicas) + assert.Equal(t, int32(1), *pytorchJob.Spec.PyTorchReplicaSpecs[kubeflowv1.PyTorchJobReplicaTypeMaster].Replicas) + assert.Equal(t, commonOp.CleanPodPolicyAll, *pytorchJob.Spec.RunPolicy.CleanPodPolicy) + assert.Equal(t, int32(100), *pytorchJob.Spec.RunPolicy.BackoffLimit) + assert.Equal(t, int64(1000), *pytorchJob.Spec.RunPolicy.ActiveDeadlineSeconds) + assert.Equal(t, int32(10000), *pytorchJob.Spec.RunPolicy.TTLSecondsAfterFinished) + } } func TestBuildResourcePytorchV1WithOnlyWorkerSpec(t *testing.T) { - taskConfig := &kfplugins.DistributedPyTorchTrainingTask{ - WorkerReplicas: &kfplugins.DistributedPyTorchTrainingReplicaSpec{ - Replicas: 100, - Resources: &core.Resources{ - Requests: []*core.Resources_ResourceEntry{ - {Name: core.Resources_CPU, Value: "1024m"}, - {Name: core.Resources_MEMORY, Value: "1Gi"}, - }, - Limits: []*core.Resources_ResourceEntry{ - {Name: core.Resources_CPU, Value: "2048m"}, - {Name: core.Resources_MEMORY, Value: "2Gi"}, + taskConfigs := []*kfplugins.DistributedPyTorchTrainingTask{ + { + WorkerReplicas: &kfplugins.DistributedPyTorchTrainingReplicaSpec{ + Replicas: 100, + Resources: &core.Resources{ + Requests: []*core.Resources_ResourceEntry{ + {Name: core.Resources_CPU, Value: "1024m"}, + {Name: core.Resources_MEMORY, Value: "1Gi"}, + }, + Limits: []*core.Resources_ResourceEntry{ + {Name: core.Resources_CPU, Value: "2048m"}, + {Name: core.Resources_MEMORY, Value: "2Gi"}, + }, }, }, }, - } - // Master Replica should use resource from task override if not set - taskOverrideResourceRequirements := &corev1.ResourceRequirements{ - Limits: corev1.ResourceList{ - corev1.ResourceCPU: resource.MustParse("1000m"), - corev1.ResourceMemory: resource.MustParse("1Gi"), - flytek8s.ResourceNvidiaGPU: resource.MustParse("1"), - }, - Requests: corev1.ResourceList{ - corev1.ResourceCPU: resource.MustParse("100m"), - corev1.ResourceMemory: resource.MustParse("512Mi"), - flytek8s.ResourceNvidiaGPU: resource.MustParse("1"), + { + WorkerReplicas: &kfplugins.DistributedPyTorchTrainingReplicaSpec{ + Common: &kfplugins.CommonReplicaSpec{ + Replicas: 100, + Resources: &core.Resources{ + Requests: []*core.Resources_ResourceEntry{ + {Name: core.Resources_CPU, Value: "1024m"}, + {Name: core.Resources_MEMORY, Value: "1Gi"}, + }, + Limits: []*core.Resources_ResourceEntry{ + {Name: core.Resources_CPU, Value: "2048m"}, + {Name: core.Resources_MEMORY, Value: "2Gi"}, + }, + }, + }, + }, }, } - workerResourceRequirements := &corev1.ResourceRequirements{ - Requests: corev1.ResourceList{ - corev1.ResourceCPU: resource.MustParse("1024m"), - corev1.ResourceMemory: resource.MustParse("1Gi"), - }, - Limits: corev1.ResourceList{ - corev1.ResourceCPU: resource.MustParse("2048m"), - corev1.ResourceMemory: resource.MustParse("2Gi"), - }, - } + for _, taskConfig := range taskConfigs { + // Master Replica should use resource from task override if not set + taskOverrideResourceRequirements := &corev1.ResourceRequirements{ + Limits: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("1000m"), + corev1.ResourceMemory: resource.MustParse("1Gi"), + flytek8s.ResourceNvidiaGPU: resource.MustParse("1"), + }, + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("100m"), + corev1.ResourceMemory: resource.MustParse("512Mi"), + flytek8s.ResourceNvidiaGPU: resource.MustParse("1"), + }, + } - pytorchResourceHandler := pytorchOperatorResourceHandler{} + workerResourceRequirements := &corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("1024m"), + corev1.ResourceMemory: resource.MustParse("1Gi"), + }, + Limits: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("2048m"), + corev1.ResourceMemory: resource.MustParse("2Gi"), + }, + } - taskTemplate := dummyPytorchTaskTemplate("job5", taskConfig) - taskTemplate.TaskTypeVersion = 1 + pytorchResourceHandler := pytorchOperatorResourceHandler{} - res, err := pytorchResourceHandler.BuildResource(context.TODO(), dummyPytorchTaskContext(taskTemplate, resourceRequirements, nil, "")) - assert.NoError(t, err) - assert.NotNil(t, res) + taskTemplate := dummyPytorchTaskTemplate("job5", taskConfig) + taskTemplate.TaskTypeVersion = 1 - pytorchJob, ok := res.(*kubeflowv1.PyTorchJob) - assert.True(t, ok) + res, err := pytorchResourceHandler.BuildResource(context.TODO(), dummyPytorchTaskContext(taskTemplate, resourceRequirements, nil, "")) + assert.NoError(t, err) + assert.NotNil(t, res) - assert.Equal(t, int32(100), *pytorchJob.Spec.PyTorchReplicaSpecs[kubeflowv1.PyTorchJobReplicaTypeWorker].Replicas) - assert.Equal(t, int32(1), *pytorchJob.Spec.PyTorchReplicaSpecs[kubeflowv1.PyTorchJobReplicaTypeMaster].Replicas) + pytorchJob, ok := res.(*kubeflowv1.PyTorchJob) + assert.True(t, ok) - assert.Equal(t, testImage, pytorchJob.Spec.PyTorchReplicaSpecs[kubeflowv1.PyTorchJobReplicaTypeMaster].Template.Spec.Containers[0].Image) - assert.Equal(t, testImage, pytorchJob.Spec.PyTorchReplicaSpecs[kubeflowv1.PyTorchJobReplicaTypeWorker].Template.Spec.Containers[0].Image) + assert.Equal(t, int32(100), *pytorchJob.Spec.PyTorchReplicaSpecs[kubeflowv1.PyTorchJobReplicaTypeWorker].Replicas) + assert.Equal(t, int32(1), *pytorchJob.Spec.PyTorchReplicaSpecs[kubeflowv1.PyTorchJobReplicaTypeMaster].Replicas) - assert.Equal(t, *taskOverrideResourceRequirements, pytorchJob.Spec.PyTorchReplicaSpecs[kubeflowv1.PyTorchJobReplicaTypeMaster].Template.Spec.Containers[0].Resources) - assert.Equal(t, *workerResourceRequirements, pytorchJob.Spec.PyTorchReplicaSpecs[kubeflowv1.PyTorchJobReplicaTypeWorker].Template.Spec.Containers[0].Resources) + assert.Equal(t, testImage, pytorchJob.Spec.PyTorchReplicaSpecs[kubeflowv1.PyTorchJobReplicaTypeMaster].Template.Spec.Containers[0].Image) + assert.Equal(t, testImage, pytorchJob.Spec.PyTorchReplicaSpecs[kubeflowv1.PyTorchJobReplicaTypeWorker].Template.Spec.Containers[0].Image) - assert.Equal(t, commonOp.RestartPolicyNever, pytorchJob.Spec.PyTorchReplicaSpecs[kubeflowv1.PyTorchJobReplicaTypeMaster].RestartPolicy) - assert.Equal(t, commonOp.RestartPolicyNever, pytorchJob.Spec.PyTorchReplicaSpecs[kubeflowv1.PyTorchJobReplicaTypeWorker].RestartPolicy) + assert.Equal(t, *taskOverrideResourceRequirements, pytorchJob.Spec.PyTorchReplicaSpecs[kubeflowv1.PyTorchJobReplicaTypeMaster].Template.Spec.Containers[0].Resources) + assert.Equal(t, *workerResourceRequirements, pytorchJob.Spec.PyTorchReplicaSpecs[kubeflowv1.PyTorchJobReplicaTypeWorker].Template.Spec.Containers[0].Resources) - assert.Nil(t, pytorchJob.Spec.ElasticPolicy) + assert.Equal(t, commonOp.RestartPolicyNever, pytorchJob.Spec.PyTorchReplicaSpecs[kubeflowv1.PyTorchJobReplicaTypeMaster].RestartPolicy) + assert.Equal(t, commonOp.RestartPolicyNever, pytorchJob.Spec.PyTorchReplicaSpecs[kubeflowv1.PyTorchJobReplicaTypeWorker].RestartPolicy) + + assert.Nil(t, pytorchJob.Spec.ElasticPolicy) + } } func TestBuildResourcePytorchV1ResourceTolerations(t *testing.T) { @@ -924,101 +1001,164 @@ func TestBuildResourcePytorchV1ResourceTolerations(t *testing.T) { }, })) - taskConfig := &kfplugins.DistributedPyTorchTrainingTask{ - MasterReplicas: &kfplugins.DistributedPyTorchTrainingReplicaSpec{ - Resources: &core.Resources{ - Requests: []*core.Resources_ResourceEntry{ - {Name: core.Resources_CPU, Value: "250m"}, - {Name: core.Resources_MEMORY, Value: "250Mi"}, + taskConfigs := []*kfplugins.DistributedPyTorchTrainingTask{ + { + MasterReplicas: &kfplugins.DistributedPyTorchTrainingReplicaSpec{ + Resources: &core.Resources{ + Requests: []*core.Resources_ResourceEntry{ + {Name: core.Resources_CPU, Value: "250m"}, + {Name: core.Resources_MEMORY, Value: "250Mi"}, + }, + Limits: []*core.Resources_ResourceEntry{ + {Name: core.Resources_CPU, Value: "500m"}, + {Name: core.Resources_MEMORY, Value: "500Mi"}, + }, }, - Limits: []*core.Resources_ResourceEntry{ - {Name: core.Resources_CPU, Value: "500m"}, - {Name: core.Resources_MEMORY, Value: "500Mi"}, + }, + WorkerReplicas: &kfplugins.DistributedPyTorchTrainingReplicaSpec{ + Replicas: 100, + Resources: &core.Resources{ + Requests: []*core.Resources_ResourceEntry{ + {Name: core.Resources_CPU, Value: "1024m"}, + {Name: core.Resources_MEMORY, Value: "1Gi"}, + {Name: core.Resources_GPU, Value: "1"}, + }, + Limits: []*core.Resources_ResourceEntry{ + {Name: core.Resources_CPU, Value: "2048m"}, + {Name: core.Resources_MEMORY, Value: "2Gi"}, + {Name: core.Resources_GPU, Value: "1"}, + }, }, }, }, - WorkerReplicas: &kfplugins.DistributedPyTorchTrainingReplicaSpec{ - Replicas: 100, - Resources: &core.Resources{ - Requests: []*core.Resources_ResourceEntry{ - {Name: core.Resources_CPU, Value: "1024m"}, - {Name: core.Resources_MEMORY, Value: "1Gi"}, - {Name: core.Resources_GPU, Value: "1"}, + { + MasterReplicas: &kfplugins.DistributedPyTorchTrainingReplicaSpec{ + Common: &kfplugins.CommonReplicaSpec{ + Resources: &core.Resources{ + Requests: []*core.Resources_ResourceEntry{ + {Name: core.Resources_CPU, Value: "250m"}, + {Name: core.Resources_MEMORY, Value: "250Mi"}, + }, + Limits: []*core.Resources_ResourceEntry{ + {Name: core.Resources_CPU, Value: "500m"}, + {Name: core.Resources_MEMORY, Value: "500Mi"}, + }, + }, }, - Limits: []*core.Resources_ResourceEntry{ - {Name: core.Resources_CPU, Value: "2048m"}, - {Name: core.Resources_MEMORY, Value: "2Gi"}, - {Name: core.Resources_GPU, Value: "1"}, + }, + WorkerReplicas: &kfplugins.DistributedPyTorchTrainingReplicaSpec{ + Common: &kfplugins.CommonReplicaSpec{ + Replicas: 100, + Resources: &core.Resources{ + Requests: []*core.Resources_ResourceEntry{ + {Name: core.Resources_CPU, Value: "1024m"}, + {Name: core.Resources_MEMORY, Value: "1Gi"}, + {Name: core.Resources_GPU, Value: "1"}, + }, + Limits: []*core.Resources_ResourceEntry{ + {Name: core.Resources_CPU, Value: "2048m"}, + {Name: core.Resources_MEMORY, Value: "2Gi"}, + {Name: core.Resources_GPU, Value: "1"}, + }, + }, }, }, }, } - pytorchResourceHandler := pytorchOperatorResourceHandler{} + for _, taskConfig := range taskConfigs { + pytorchResourceHandler := pytorchOperatorResourceHandler{} - taskTemplate := dummyPytorchTaskTemplate("job4", taskConfig) - taskTemplate.TaskTypeVersion = 1 + taskTemplate := dummyPytorchTaskTemplate("job4", taskConfig) + taskTemplate.TaskTypeVersion = 1 - res, err := pytorchResourceHandler.BuildResource(context.TODO(), dummyPytorchTaskContext(taskTemplate, resourceRequirements, nil, "")) - assert.NoError(t, err) - assert.NotNil(t, res) + res, err := pytorchResourceHandler.BuildResource(context.TODO(), dummyPytorchTaskContext(taskTemplate, resourceRequirements, nil, "")) + assert.NoError(t, err) + assert.NotNil(t, res) - pytorchJob, ok := res.(*kubeflowv1.PyTorchJob) - assert.True(t, ok) + pytorchJob, ok := res.(*kubeflowv1.PyTorchJob) + assert.True(t, ok) - assert.NotContains(t, pytorchJob.Spec.PyTorchReplicaSpecs[kubeflowv1.PyTorchJobReplicaTypeMaster].Template.Spec.Tolerations, gpuToleration) - assert.Contains(t, pytorchJob.Spec.PyTorchReplicaSpecs[kubeflowv1.PyTorchJobReplicaTypeWorker].Template.Spec.Tolerations, gpuToleration) + assert.NotContains(t, pytorchJob.Spec.PyTorchReplicaSpecs[kubeflowv1.PyTorchJobReplicaTypeMaster].Template.Spec.Tolerations, gpuToleration) + assert.Contains(t, pytorchJob.Spec.PyTorchReplicaSpecs[kubeflowv1.PyTorchJobReplicaTypeWorker].Template.Spec.Tolerations, gpuToleration) + } } func TestBuildResourcePytorchV1WithElastic(t *testing.T) { - taskConfig := &kfplugins.DistributedPyTorchTrainingTask{ - WorkerReplicas: &kfplugins.DistributedPyTorchTrainingReplicaSpec{ - Replicas: 2, + taskConfigs := []*kfplugins.DistributedPyTorchTrainingTask{ + { + WorkerReplicas: &kfplugins.DistributedPyTorchTrainingReplicaSpec{ + Replicas: 2, + }, + ElasticConfig: &kfplugins.ElasticConfig{MinReplicas: 1, MaxReplicas: 2, NprocPerNode: 4, RdzvBackend: "c10d"}, + }, + { + WorkerReplicas: &kfplugins.DistributedPyTorchTrainingReplicaSpec{ + Common: &kfplugins.CommonReplicaSpec{ + Replicas: 2, + }, + }, + ElasticConfig: &kfplugins.ElasticConfig{MinReplicas: 1, MaxReplicas: 2, NprocPerNode: 4, RdzvBackend: "c10d"}, }, - ElasticConfig: &kfplugins.ElasticConfig{MinReplicas: 1, MaxReplicas: 2, NprocPerNode: 4, RdzvBackend: "c10d"}, } - taskTemplate := dummyPytorchTaskTemplate("job5", taskConfig) - taskTemplate.TaskTypeVersion = 1 - pytorchResourceHandler := pytorchOperatorResourceHandler{} - resource, err := pytorchResourceHandler.BuildResource(context.TODO(), dummyPytorchTaskContext(taskTemplate, resourceRequirements, nil, "")) - assert.NoError(t, err) - assert.NotNil(t, resource) + for _, taskConfig := range taskConfigs { + taskTemplate := dummyPytorchTaskTemplate("job5", taskConfig) + taskTemplate.TaskTypeVersion = 1 - pytorchJob, ok := resource.(*kubeflowv1.PyTorchJob) - assert.True(t, ok) - assert.Equal(t, int32(2), *pytorchJob.Spec.PyTorchReplicaSpecs[kubeflowv1.PyTorchJobReplicaTypeWorker].Replicas) - assert.NotNil(t, pytorchJob.Spec.ElasticPolicy) - assert.Equal(t, int32(1), *pytorchJob.Spec.ElasticPolicy.MinReplicas) - assert.Equal(t, int32(2), *pytorchJob.Spec.ElasticPolicy.MaxReplicas) - assert.Equal(t, int32(4), *pytorchJob.Spec.ElasticPolicy.NProcPerNode) - assert.Equal(t, kubeflowv1.RDZVBackend("c10d"), *pytorchJob.Spec.ElasticPolicy.RDZVBackend) + pytorchResourceHandler := pytorchOperatorResourceHandler{} + resource, err := pytorchResourceHandler.BuildResource(context.TODO(), dummyPytorchTaskContext(taskTemplate, resourceRequirements, nil, "")) + assert.NoError(t, err) + assert.NotNil(t, resource) - assert.Equal(t, 1, len(pytorchJob.Spec.PyTorchReplicaSpecs)) - assert.Contains(t, pytorchJob.Spec.PyTorchReplicaSpecs, kubeflowv1.PyTorchJobReplicaTypeWorker) + pytorchJob, ok := resource.(*kubeflowv1.PyTorchJob) + assert.True(t, ok) + assert.Equal(t, int32(2), *pytorchJob.Spec.PyTorchReplicaSpecs[kubeflowv1.PyTorchJobReplicaTypeWorker].Replicas) + assert.NotNil(t, pytorchJob.Spec.ElasticPolicy) + assert.Equal(t, int32(1), *pytorchJob.Spec.ElasticPolicy.MinReplicas) + assert.Equal(t, int32(2), *pytorchJob.Spec.ElasticPolicy.MaxReplicas) + assert.Equal(t, int32(4), *pytorchJob.Spec.ElasticPolicy.NProcPerNode) + assert.Equal(t, kubeflowv1.RDZVBackend("c10d"), *pytorchJob.Spec.ElasticPolicy.RDZVBackend) + + assert.Equal(t, 1, len(pytorchJob.Spec.PyTorchReplicaSpecs)) + assert.Contains(t, pytorchJob.Spec.PyTorchReplicaSpecs, kubeflowv1.PyTorchJobReplicaTypeWorker) - var hasContainerWithDefaultPytorchName = false + var hasContainerWithDefaultPytorchName = false - for _, container := range pytorchJob.Spec.PyTorchReplicaSpecs[kubeflowv1.PyTorchJobReplicaTypeWorker].Template.Spec.Containers { - if container.Name == kubeflowv1.PytorchJobDefaultContainerName { - hasContainerWithDefaultPytorchName = true + for _, container := range pytorchJob.Spec.PyTorchReplicaSpecs[kubeflowv1.PyTorchJobReplicaTypeWorker].Template.Spec.Containers { + if container.Name == kubeflowv1.PytorchJobDefaultContainerName { + hasContainerWithDefaultPytorchName = true + } } - } - assert.True(t, hasContainerWithDefaultPytorchName) + assert.True(t, hasContainerWithDefaultPytorchName) + } } func TestBuildResourcePytorchV1WithZeroWorker(t *testing.T) { - taskConfig := &kfplugins.DistributedPyTorchTrainingTask{ - WorkerReplicas: &kfplugins.DistributedPyTorchTrainingReplicaSpec{ - Replicas: 0, + taskConfigs := []*kfplugins.DistributedPyTorchTrainingTask{ + { + WorkerReplicas: &kfplugins.DistributedPyTorchTrainingReplicaSpec{ + Replicas: 0, + }, + }, + { + WorkerReplicas: &kfplugins.DistributedPyTorchTrainingReplicaSpec{ + Common: &kfplugins.CommonReplicaSpec{ + Replicas: 0, + }, + }, }, } - pytorchResourceHandler := pytorchOperatorResourceHandler{} - taskTemplate := dummyPytorchTaskTemplate("job5", taskConfig) - taskTemplate.TaskTypeVersion = 1 - _, err := pytorchResourceHandler.BuildResource(context.TODO(), dummyPytorchTaskContext(taskTemplate, resourceRequirements, nil, "")) - assert.Error(t, err) + + for _, taskConfig := range taskConfigs { + pytorchResourceHandler := pytorchOperatorResourceHandler{} + + taskTemplate := dummyPytorchTaskTemplate("job5", taskConfig) + taskTemplate.TaskTypeVersion = 1 + _, err := pytorchResourceHandler.BuildResource(context.TODO(), dummyPytorchTaskContext(taskTemplate, resourceRequirements, nil, "")) + assert.Error(t, err) + } } func TestParseElasticConfig(t *testing.T) { diff --git a/flyteplugins/go/tasks/plugins/k8s/kfoperators/tensorflow/tensorflow.go b/flyteplugins/go/tasks/plugins/k8s/kfoperators/tensorflow/tensorflow.go index db5fe6a83a8..d69fd30b014 100644 --- a/flyteplugins/go/tasks/plugins/k8s/kfoperators/tensorflow/tensorflow.go +++ b/flyteplugins/go/tasks/plugins/k8s/kfoperators/tensorflow/tensorflow.go @@ -100,9 +100,18 @@ func (tensorflowOperatorResourceHandler) BuildResource(ctx context.Context, task for t, cfg := range replicaSpecCfgMap { // Short circuit if replica set has no replicas to avoid unnecessarily // generating pod specs - if cfg.GetReplicas() <= 0 { + var replicas int32 + // replicas is deprecated since the common replica spec is introduced. + // Therefore, if the common replica spec is set, use that to get the common fields + if cfg.GetCommon() != nil { + replicas = cfg.GetCommon().GetReplicas() + } else { + replicas = cfg.GetReplicas() + } + if replicas <= 0 { continue } + rs, err := common.ToReplicaSpecWithOverrides(ctx, taskCtx, cfg, kubeflowv1.TFJobDefaultContainerName, false) if err != nil { return nil, flyteerr.Errorf(flyteerr.BadTaskSpecification, "Unable to create replica spec: [%v]", err.Error()) diff --git a/flyteplugins/go/tasks/plugins/k8s/kfoperators/tensorflow/tensorflow_test.go b/flyteplugins/go/tasks/plugins/k8s/kfoperators/tensorflow/tensorflow_test.go index 80e95871d15..a85ce8f8759 100644 --- a/flyteplugins/go/tasks/plugins/k8s/kfoperators/tensorflow/tensorflow_test.go +++ b/flyteplugins/go/tasks/plugins/k8s/kfoperators/tensorflow/tensorflow_test.go @@ -460,55 +460,81 @@ func TestBuildResourceTensorFlowExtendedResources(t *testing.T) { } v0TaskTemplate := dummyTensorFlowTaskTemplate("v0", dummyTensorFlowCustomObj(100, 50, 1, 1)) - v1TaskTemplate := dummyTensorFlowTaskTemplate("v1", &kfplugins.DistributedTensorflowTrainingTask{ - ChiefReplicas: &kfplugins.DistributedTensorflowTrainingReplicaSpec{ - Replicas: 1, - }, - WorkerReplicas: &kfplugins.DistributedTensorflowTrainingReplicaSpec{ - Replicas: 100, - }, - PsReplicas: &kfplugins.DistributedTensorflowTrainingReplicaSpec{ - Replicas: 50, - }, - EvaluatorReplicas: &kfplugins.DistributedTensorflowTrainingReplicaSpec{ - Replicas: 1, - }, - }) - v1TaskTemplate.TaskTypeVersion = 1 - testConfigs := []struct { - name string - taskTemplate *core.TaskTemplate - }{ - {"v0", v0TaskTemplate}, - {"v1", v1TaskTemplate}, - } - - for _, tCfg := range testConfigs { - for _, f := range fixtures { - t.Run(tCfg.name+" "+f.name, func(t *testing.T) { - taskTemplate := *tCfg.taskTemplate - taskTemplate.ExtendedResources = f.extendedResourcesBase - tensorflowResourceHandler := tensorflowOperatorResourceHandler{} - taskContext := dummyTensorFlowTaskContext(&taskTemplate, f.resources, f.extendedResourcesOverride) - r, err := tensorflowResourceHandler.BuildResource(context.TODO(), taskContext) - assert.NoError(t, err) - assert.NotNil(t, r) - tensorflowJob, ok := r.(*kubeflowv1.TFJob) - assert.True(t, ok) - - for _, replicaSpec := range tensorflowJob.Spec.TFReplicaSpecs { - assert.EqualValues( - t, - f.expectedNsr, - replicaSpec.Template.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms, - ) - assert.EqualValues( - t, - f.expectedTol, - replicaSpec.Template.Spec.Tolerations, - ) - } - }) + v1TaskTemplates := []*core.TaskTemplate{ + dummyTensorFlowTaskTemplate("v1", &kfplugins.DistributedTensorflowTrainingTask{ + ChiefReplicas: &kfplugins.DistributedTensorflowTrainingReplicaSpec{ + Replicas: 1, + }, + WorkerReplicas: &kfplugins.DistributedTensorflowTrainingReplicaSpec{ + Replicas: 100, + }, + PsReplicas: &kfplugins.DistributedTensorflowTrainingReplicaSpec{ + Replicas: 50, + }, + EvaluatorReplicas: &kfplugins.DistributedTensorflowTrainingReplicaSpec{ + Replicas: 1, + }, + }), + dummyTensorFlowTaskTemplate("v1", &kfplugins.DistributedTensorflowTrainingTask{ + ChiefReplicas: &kfplugins.DistributedTensorflowTrainingReplicaSpec{ + Common: &kfplugins.CommonReplicaSpec{ + Replicas: 1, + }, + }, + WorkerReplicas: &kfplugins.DistributedTensorflowTrainingReplicaSpec{ + Common: &kfplugins.CommonReplicaSpec{ + Replicas: 100, + }, + }, + PsReplicas: &kfplugins.DistributedTensorflowTrainingReplicaSpec{ + Common: &kfplugins.CommonReplicaSpec{ + Replicas: 50, + }, + }, + EvaluatorReplicas: &kfplugins.DistributedTensorflowTrainingReplicaSpec{ + Common: &kfplugins.CommonReplicaSpec{ + Replicas: 1, + }, + }, + }), + } + for _, v1TaskTemplate := range v1TaskTemplates { + v1TaskTemplate.TaskTypeVersion = 1 + testConfigs := []struct { + name string + taskTemplate *core.TaskTemplate + }{ + {"v0", v0TaskTemplate}, + {"v1", v1TaskTemplate}, + } + + for _, tCfg := range testConfigs { + for _, f := range fixtures { + t.Run(tCfg.name+" "+f.name, func(t *testing.T) { + taskTemplate := *tCfg.taskTemplate + taskTemplate.ExtendedResources = f.extendedResourcesBase + tensorflowResourceHandler := tensorflowOperatorResourceHandler{} + taskContext := dummyTensorFlowTaskContext(&taskTemplate, f.resources, f.extendedResourcesOverride) + r, err := tensorflowResourceHandler.BuildResource(context.TODO(), taskContext) + assert.NoError(t, err) + assert.NotNil(t, r) + tensorflowJob, ok := r.(*kubeflowv1.TFJob) + assert.True(t, ok) + + for _, replicaSpec := range tensorflowJob.Spec.TFReplicaSpecs { + assert.EqualValues( + t, + f.expectedNsr, + replicaSpec.Template.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms, + ) + assert.EqualValues( + t, + f.expectedTol, + replicaSpec.Template.Spec.Tolerations, + ) + } + }) + } } } } @@ -638,208 +664,307 @@ func TestReplicaCounts(t *testing.T) { } func TestBuildResourceTensorFlowV1(t *testing.T) { - taskConfig := &kfplugins.DistributedTensorflowTrainingTask{ - ChiefReplicas: &kfplugins.DistributedTensorflowTrainingReplicaSpec{ - Replicas: 1, - Image: testImage, - Resources: &core.Resources{ - Requests: []*core.Resources_ResourceEntry{ - {Name: core.Resources_CPU, Value: "250m"}, - {Name: core.Resources_MEMORY, Value: "1Gi"}, + taskConfigs := []*kfplugins.DistributedTensorflowTrainingTask{ + { + ChiefReplicas: &kfplugins.DistributedTensorflowTrainingReplicaSpec{ + Replicas: 1, + Image: testImage, + Resources: &core.Resources{ + Requests: []*core.Resources_ResourceEntry{ + {Name: core.Resources_CPU, Value: "250m"}, + {Name: core.Resources_MEMORY, Value: "1Gi"}, + }, + Limits: []*core.Resources_ResourceEntry{ + {Name: core.Resources_CPU, Value: "500m"}, + {Name: core.Resources_MEMORY, Value: "2Gi"}, + }, }, - Limits: []*core.Resources_ResourceEntry{ - {Name: core.Resources_CPU, Value: "500m"}, - {Name: core.Resources_MEMORY, Value: "2Gi"}, + RestartPolicy: kfplugins.RestartPolicy_RESTART_POLICY_ALWAYS, + }, + WorkerReplicas: &kfplugins.DistributedTensorflowTrainingReplicaSpec{ + Replicas: 100, + Resources: &core.Resources{ + Requests: []*core.Resources_ResourceEntry{ + {Name: core.Resources_CPU, Value: "1024m"}, + {Name: core.Resources_MEMORY, Value: "1Gi"}, + {Name: core.Resources_GPU, Value: "1"}, + }, + Limits: []*core.Resources_ResourceEntry{ + {Name: core.Resources_CPU, Value: "2048m"}, + {Name: core.Resources_MEMORY, Value: "2Gi"}, + {Name: core.Resources_GPU, Value: "1"}, + }, }, }, - RestartPolicy: kfplugins.RestartPolicy_RESTART_POLICY_ALWAYS, - }, - WorkerReplicas: &kfplugins.DistributedTensorflowTrainingReplicaSpec{ - Replicas: 100, - Resources: &core.Resources{ - Requests: []*core.Resources_ResourceEntry{ - {Name: core.Resources_CPU, Value: "1024m"}, - {Name: core.Resources_MEMORY, Value: "1Gi"}, - {Name: core.Resources_GPU, Value: "1"}, + PsReplicas: &kfplugins.DistributedTensorflowTrainingReplicaSpec{ + Replicas: 50, + Resources: &core.Resources{ + Requests: []*core.Resources_ResourceEntry{ + {Name: core.Resources_CPU, Value: "250m"}, + {Name: core.Resources_MEMORY, Value: "1Gi"}, + }, + Limits: []*core.Resources_ResourceEntry{ + {Name: core.Resources_CPU, Value: "500m"}, + {Name: core.Resources_MEMORY, Value: "2Gi"}, + }, }, - Limits: []*core.Resources_ResourceEntry{ - {Name: core.Resources_CPU, Value: "2048m"}, - {Name: core.Resources_MEMORY, Value: "2Gi"}, - {Name: core.Resources_GPU, Value: "1"}, + }, + EvaluatorReplicas: &kfplugins.DistributedTensorflowTrainingReplicaSpec{ + Replicas: 1, + Image: testImage, + Resources: &core.Resources{ + Requests: []*core.Resources_ResourceEntry{ + {Name: core.Resources_CPU, Value: "250m"}, + {Name: core.Resources_MEMORY, Value: "1Gi"}, + }, + Limits: []*core.Resources_ResourceEntry{ + {Name: core.Resources_CPU, Value: "500m"}, + {Name: core.Resources_MEMORY, Value: "2Gi"}, + }, }, + RestartPolicy: kfplugins.RestartPolicy_RESTART_POLICY_ALWAYS, + }, + RunPolicy: &kfplugins.RunPolicy{ + CleanPodPolicy: kfplugins.CleanPodPolicy_CLEANPOD_POLICY_ALL, + ActiveDeadlineSeconds: int32(100), }, }, - PsReplicas: &kfplugins.DistributedTensorflowTrainingReplicaSpec{ - Replicas: 50, - Resources: &core.Resources{ - Requests: []*core.Resources_ResourceEntry{ - {Name: core.Resources_CPU, Value: "250m"}, - {Name: core.Resources_MEMORY, Value: "1Gi"}, + { + ChiefReplicas: &kfplugins.DistributedTensorflowTrainingReplicaSpec{ + Common: &kfplugins.CommonReplicaSpec{ + Replicas: 1, + Image: testImage, + Resources: &core.Resources{ + Requests: []*core.Resources_ResourceEntry{ + {Name: core.Resources_CPU, Value: "250m"}, + {Name: core.Resources_MEMORY, Value: "1Gi"}, + }, + Limits: []*core.Resources_ResourceEntry{ + {Name: core.Resources_CPU, Value: "500m"}, + {Name: core.Resources_MEMORY, Value: "2Gi"}, + }, + }, + RestartPolicy: kfplugins.RestartPolicy_RESTART_POLICY_ALWAYS, }, - Limits: []*core.Resources_ResourceEntry{ - {Name: core.Resources_CPU, Value: "500m"}, - {Name: core.Resources_MEMORY, Value: "2Gi"}, + }, + WorkerReplicas: &kfplugins.DistributedTensorflowTrainingReplicaSpec{ + Common: &kfplugins.CommonReplicaSpec{ + Replicas: 100, + Resources: &core.Resources{ + Requests: []*core.Resources_ResourceEntry{ + {Name: core.Resources_CPU, Value: "1024m"}, + {Name: core.Resources_MEMORY, Value: "1Gi"}, + {Name: core.Resources_GPU, Value: "1"}, + }, + Limits: []*core.Resources_ResourceEntry{ + {Name: core.Resources_CPU, Value: "2048m"}, + {Name: core.Resources_MEMORY, Value: "2Gi"}, + {Name: core.Resources_GPU, Value: "1"}, + }, + }, }, }, - }, - EvaluatorReplicas: &kfplugins.DistributedTensorflowTrainingReplicaSpec{ - Replicas: 1, - Image: testImage, - Resources: &core.Resources{ - Requests: []*core.Resources_ResourceEntry{ - {Name: core.Resources_CPU, Value: "250m"}, - {Name: core.Resources_MEMORY, Value: "1Gi"}, + PsReplicas: &kfplugins.DistributedTensorflowTrainingReplicaSpec{ + Common: &kfplugins.CommonReplicaSpec{ + Replicas: 50, + Resources: &core.Resources{ + Requests: []*core.Resources_ResourceEntry{ + {Name: core.Resources_CPU, Value: "250m"}, + {Name: core.Resources_MEMORY, Value: "1Gi"}, + }, + Limits: []*core.Resources_ResourceEntry{ + {Name: core.Resources_CPU, Value: "500m"}, + {Name: core.Resources_MEMORY, Value: "2Gi"}, + }, + }, }, - Limits: []*core.Resources_ResourceEntry{ - {Name: core.Resources_CPU, Value: "500m"}, - {Name: core.Resources_MEMORY, Value: "2Gi"}, + }, + EvaluatorReplicas: &kfplugins.DistributedTensorflowTrainingReplicaSpec{ + Common: &kfplugins.CommonReplicaSpec{ + Replicas: 1, + Image: testImage, + Resources: &core.Resources{ + Requests: []*core.Resources_ResourceEntry{ + {Name: core.Resources_CPU, Value: "250m"}, + {Name: core.Resources_MEMORY, Value: "1Gi"}, + }, + Limits: []*core.Resources_ResourceEntry{ + {Name: core.Resources_CPU, Value: "500m"}, + {Name: core.Resources_MEMORY, Value: "2Gi"}, + }, + }, + RestartPolicy: kfplugins.RestartPolicy_RESTART_POLICY_ALWAYS, }, }, - RestartPolicy: kfplugins.RestartPolicy_RESTART_POLICY_ALWAYS, - }, - RunPolicy: &kfplugins.RunPolicy{ - CleanPodPolicy: kfplugins.CleanPodPolicy_CLEANPOD_POLICY_ALL, - ActiveDeadlineSeconds: int32(100), + RunPolicy: &kfplugins.RunPolicy{ + CleanPodPolicy: kfplugins.CleanPodPolicy_CLEANPOD_POLICY_ALL, + ActiveDeadlineSeconds: int32(100), + }, }, } + for _, taskConfig := range taskConfigs { - resourceRequirementsMap := map[commonOp.ReplicaType]*corev1.ResourceRequirements{ - kubeflowv1.TFJobReplicaTypeChief: { - Requests: corev1.ResourceList{ - corev1.ResourceCPU: resource.MustParse("250m"), - corev1.ResourceMemory: resource.MustParse("1Gi"), - }, - Limits: corev1.ResourceList{ - corev1.ResourceCPU: resource.MustParse("500m"), - corev1.ResourceMemory: resource.MustParse("2Gi"), - }, - }, - kubeflowv1.TFJobReplicaTypeWorker: { - Requests: corev1.ResourceList{ - corev1.ResourceCPU: resource.MustParse("1024m"), - corev1.ResourceMemory: resource.MustParse("1Gi"), - flytek8s.ResourceNvidiaGPU: resource.MustParse("1"), - }, - Limits: corev1.ResourceList{ - corev1.ResourceCPU: resource.MustParse("2048m"), - corev1.ResourceMemory: resource.MustParse("2Gi"), - flytek8s.ResourceNvidiaGPU: resource.MustParse("1"), - }, - }, - kubeflowv1.TFJobReplicaTypePS: { - Requests: corev1.ResourceList{ - corev1.ResourceCPU: resource.MustParse("250m"), - corev1.ResourceMemory: resource.MustParse("1Gi"), + resourceRequirementsMap := map[commonOp.ReplicaType]*corev1.ResourceRequirements{ + kubeflowv1.TFJobReplicaTypeChief: { + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("250m"), + corev1.ResourceMemory: resource.MustParse("1Gi"), + }, + Limits: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("500m"), + corev1.ResourceMemory: resource.MustParse("2Gi"), + }, }, - Limits: corev1.ResourceList{ - corev1.ResourceCPU: resource.MustParse("500m"), - corev1.ResourceMemory: resource.MustParse("2Gi"), + kubeflowv1.TFJobReplicaTypeWorker: { + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("1024m"), + corev1.ResourceMemory: resource.MustParse("1Gi"), + flytek8s.ResourceNvidiaGPU: resource.MustParse("1"), + }, + Limits: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("2048m"), + corev1.ResourceMemory: resource.MustParse("2Gi"), + flytek8s.ResourceNvidiaGPU: resource.MustParse("1"), + }, }, - }, - kubeflowv1.TFJobReplicaTypeEval: { - Requests: corev1.ResourceList{ - corev1.ResourceCPU: resource.MustParse("250m"), - corev1.ResourceMemory: resource.MustParse("1Gi"), + kubeflowv1.TFJobReplicaTypePS: { + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("250m"), + corev1.ResourceMemory: resource.MustParse("1Gi"), + }, + Limits: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("500m"), + corev1.ResourceMemory: resource.MustParse("2Gi"), + }, }, - Limits: corev1.ResourceList{ - corev1.ResourceCPU: resource.MustParse("500m"), - corev1.ResourceMemory: resource.MustParse("2Gi"), + kubeflowv1.TFJobReplicaTypeEval: { + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("250m"), + corev1.ResourceMemory: resource.MustParse("1Gi"), + }, + Limits: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("500m"), + corev1.ResourceMemory: resource.MustParse("2Gi"), + }, }, - }, - } + } - tensorflowResourceHandler := tensorflowOperatorResourceHandler{} + tensorflowResourceHandler := tensorflowOperatorResourceHandler{} - taskTemplate := dummyTensorFlowTaskTemplate("v1", taskConfig) - taskTemplate.TaskTypeVersion = 1 + taskTemplate := dummyTensorFlowTaskTemplate("v1", taskConfig) + taskTemplate.TaskTypeVersion = 1 - resource, err := tensorflowResourceHandler.BuildResource(context.TODO(), dummyTensorFlowTaskContext(taskTemplate, resourceRequirements, nil)) - assert.NoError(t, err) - assert.NotNil(t, resource) + resource, err := tensorflowResourceHandler.BuildResource(context.TODO(), dummyTensorFlowTaskContext(taskTemplate, resourceRequirements, nil)) + assert.NoError(t, err) + assert.NotNil(t, resource) - tensorflowJob, ok := resource.(*kubeflowv1.TFJob) - assert.True(t, ok) - assert.Equal(t, int32(100), *tensorflowJob.Spec.TFReplicaSpecs[kubeflowv1.TFJobReplicaTypeWorker].Replicas) - assert.Equal(t, int32(50), *tensorflowJob.Spec.TFReplicaSpecs[kubeflowv1.TFJobReplicaTypePS].Replicas) - assert.Equal(t, int32(1), *tensorflowJob.Spec.TFReplicaSpecs[kubeflowv1.TFJobReplicaTypeChief].Replicas) - assert.Equal(t, int32(1), *tensorflowJob.Spec.TFReplicaSpecs[kubeflowv1.TFJobReplicaTypeEval].Replicas) + tensorflowJob, ok := resource.(*kubeflowv1.TFJob) + assert.True(t, ok) + assert.Equal(t, int32(100), *tensorflowJob.Spec.TFReplicaSpecs[kubeflowv1.TFJobReplicaTypeWorker].Replicas) + assert.Equal(t, int32(50), *tensorflowJob.Spec.TFReplicaSpecs[kubeflowv1.TFJobReplicaTypePS].Replicas) + assert.Equal(t, int32(1), *tensorflowJob.Spec.TFReplicaSpecs[kubeflowv1.TFJobReplicaTypeChief].Replicas) + assert.Equal(t, int32(1), *tensorflowJob.Spec.TFReplicaSpecs[kubeflowv1.TFJobReplicaTypeEval].Replicas) - for replicaType, replicaSpec := range tensorflowJob.Spec.TFReplicaSpecs { - var hasContainerWithDefaultTensorFlowName = false + for replicaType, replicaSpec := range tensorflowJob.Spec.TFReplicaSpecs { + var hasContainerWithDefaultTensorFlowName = false - for _, container := range replicaSpec.Template.Spec.Containers { - if container.Name == kubeflowv1.TFJobDefaultContainerName { - hasContainerWithDefaultTensorFlowName = true - assert.Equal(t, *resourceRequirementsMap[replicaType], container.Resources) + for _, container := range replicaSpec.Template.Spec.Containers { + if container.Name == kubeflowv1.TFJobDefaultContainerName { + hasContainerWithDefaultTensorFlowName = true + assert.Equal(t, *resourceRequirementsMap[replicaType], container.Resources) + } } - } - assert.True(t, hasContainerWithDefaultTensorFlowName) + assert.True(t, hasContainerWithDefaultTensorFlowName) + } + assert.Equal(t, commonOp.CleanPodPolicyAll, *tensorflowJob.Spec.RunPolicy.CleanPodPolicy) + assert.Equal(t, int64(100), *tensorflowJob.Spec.RunPolicy.ActiveDeadlineSeconds) } - assert.Equal(t, commonOp.CleanPodPolicyAll, *tensorflowJob.Spec.RunPolicy.CleanPodPolicy) - assert.Equal(t, int64(100), *tensorflowJob.Spec.RunPolicy.ActiveDeadlineSeconds) } func TestBuildResourceTensorFlowV1WithOnlyWorker(t *testing.T) { - taskConfig := &kfplugins.DistributedTensorflowTrainingTask{ - WorkerReplicas: &kfplugins.DistributedTensorflowTrainingReplicaSpec{ - Replicas: 100, - Resources: &core.Resources{ - Requests: []*core.Resources_ResourceEntry{ - {Name: core.Resources_CPU, Value: "1024m"}, - {Name: core.Resources_MEMORY, Value: "1Gi"}, - {Name: core.Resources_GPU, Value: "1"}, + taskConfigs := []*kfplugins.DistributedTensorflowTrainingTask{ + { + WorkerReplicas: &kfplugins.DistributedTensorflowTrainingReplicaSpec{ + Replicas: 100, + Resources: &core.Resources{ + Requests: []*core.Resources_ResourceEntry{ + {Name: core.Resources_CPU, Value: "1024m"}, + {Name: core.Resources_MEMORY, Value: "1Gi"}, + {Name: core.Resources_GPU, Value: "1"}, + }, + Limits: []*core.Resources_ResourceEntry{ + {Name: core.Resources_CPU, Value: "2048m"}, + {Name: core.Resources_MEMORY, Value: "2Gi"}, + {Name: core.Resources_GPU, Value: "1"}, + }, }, - Limits: []*core.Resources_ResourceEntry{ - {Name: core.Resources_CPU, Value: "2048m"}, - {Name: core.Resources_MEMORY, Value: "2Gi"}, - {Name: core.Resources_GPU, Value: "1"}, + }, + }, + { + WorkerReplicas: &kfplugins.DistributedTensorflowTrainingReplicaSpec{ + Common: &kfplugins.CommonReplicaSpec{ + Replicas: 100, + Resources: &core.Resources{ + Requests: []*core.Resources_ResourceEntry{ + {Name: core.Resources_CPU, Value: "1024m"}, + {Name: core.Resources_MEMORY, Value: "1Gi"}, + {Name: core.Resources_GPU, Value: "1"}, + }, + Limits: []*core.Resources_ResourceEntry{ + {Name: core.Resources_CPU, Value: "2048m"}, + {Name: core.Resources_MEMORY, Value: "2Gi"}, + {Name: core.Resources_GPU, Value: "1"}, + }, + }, }, }, }, } - resourceRequirementsMap := map[commonOp.ReplicaType]*corev1.ResourceRequirements{ - kubeflowv1.TFJobReplicaTypeWorker: { - Requests: corev1.ResourceList{ - corev1.ResourceCPU: resource.MustParse("1024m"), - corev1.ResourceMemory: resource.MustParse("1Gi"), - flytek8s.ResourceNvidiaGPU: resource.MustParse("1"), - }, - Limits: corev1.ResourceList{ - corev1.ResourceCPU: resource.MustParse("2048m"), - corev1.ResourceMemory: resource.MustParse("2Gi"), - flytek8s.ResourceNvidiaGPU: resource.MustParse("1"), + for _, taskConfig := range taskConfigs { + resourceRequirementsMap := map[commonOp.ReplicaType]*corev1.ResourceRequirements{ + kubeflowv1.TFJobReplicaTypeWorker: { + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("1024m"), + corev1.ResourceMemory: resource.MustParse("1Gi"), + flytek8s.ResourceNvidiaGPU: resource.MustParse("1"), + }, + Limits: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("2048m"), + corev1.ResourceMemory: resource.MustParse("2Gi"), + flytek8s.ResourceNvidiaGPU: resource.MustParse("1"), + }, }, - }, - } + } - tensorflowResourceHandler := tensorflowOperatorResourceHandler{} + tensorflowResourceHandler := tensorflowOperatorResourceHandler{} - taskTemplate := dummyTensorFlowTaskTemplate("v1 with only worker replica", taskConfig) - taskTemplate.TaskTypeVersion = 1 + taskTemplate := dummyTensorFlowTaskTemplate("v1 with only worker replica", taskConfig) + taskTemplate.TaskTypeVersion = 1 - resource, err := tensorflowResourceHandler.BuildResource(context.TODO(), dummyTensorFlowTaskContext(taskTemplate, resourceRequirements, nil)) - assert.NoError(t, err) - assert.NotNil(t, resource) + resource, err := tensorflowResourceHandler.BuildResource(context.TODO(), dummyTensorFlowTaskContext(taskTemplate, resourceRequirements, nil)) + assert.NoError(t, err) + assert.NotNil(t, resource) - tensorflowJob, ok := resource.(*kubeflowv1.TFJob) - assert.True(t, ok) - assert.Equal(t, int32(100), *tensorflowJob.Spec.TFReplicaSpecs[kubeflowv1.TFJobReplicaTypeWorker].Replicas) - assert.Nil(t, tensorflowJob.Spec.TFReplicaSpecs[kubeflowv1.TFJobReplicaTypeChief]) - assert.Nil(t, tensorflowJob.Spec.TFReplicaSpecs[kubeflowv1.TFJobReplicaTypePS]) + tensorflowJob, ok := resource.(*kubeflowv1.TFJob) + assert.True(t, ok) + assert.Equal(t, int32(100), *tensorflowJob.Spec.TFReplicaSpecs[kubeflowv1.TFJobReplicaTypeWorker].Replicas) + assert.Nil(t, tensorflowJob.Spec.TFReplicaSpecs[kubeflowv1.TFJobReplicaTypeChief]) + assert.Nil(t, tensorflowJob.Spec.TFReplicaSpecs[kubeflowv1.TFJobReplicaTypePS]) - for replicaType, replicaSpec := range tensorflowJob.Spec.TFReplicaSpecs { - var hasContainerWithDefaultTensorFlowName = false + for replicaType, replicaSpec := range tensorflowJob.Spec.TFReplicaSpecs { + var hasContainerWithDefaultTensorFlowName = false - for _, container := range replicaSpec.Template.Spec.Containers { - if container.Name == kubeflowv1.TFJobDefaultContainerName { - hasContainerWithDefaultTensorFlowName = true - assert.Equal(t, *resourceRequirementsMap[replicaType], container.Resources) + for _, container := range replicaSpec.Template.Spec.Containers { + if container.Name == kubeflowv1.TFJobDefaultContainerName { + hasContainerWithDefaultTensorFlowName = true + assert.Equal(t, *resourceRequirementsMap[replicaType], container.Resources) + } } - } - assert.True(t, hasContainerWithDefaultTensorFlowName) + assert.True(t, hasContainerWithDefaultTensorFlowName) + } } } @@ -857,49 +982,88 @@ func TestBuildResourceTensorFlowV1ResourceTolerations(t *testing.T) { }, })) - taskConfig := &kfplugins.DistributedTensorflowTrainingTask{ - ChiefReplicas: &kfplugins.DistributedTensorflowTrainingReplicaSpec{ - Replicas: 1, - Resources: &core.Resources{ - Requests: []*core.Resources_ResourceEntry{ - {Name: core.Resources_CPU, Value: "250m"}, - {Name: core.Resources_MEMORY, Value: "1Gi"}, + taskConfigs := []*kfplugins.DistributedTensorflowTrainingTask{ + { + ChiefReplicas: &kfplugins.DistributedTensorflowTrainingReplicaSpec{ + Replicas: 1, + Resources: &core.Resources{ + Requests: []*core.Resources_ResourceEntry{ + {Name: core.Resources_CPU, Value: "250m"}, + {Name: core.Resources_MEMORY, Value: "1Gi"}, + }, + Limits: []*core.Resources_ResourceEntry{ + {Name: core.Resources_CPU, Value: "500m"}, + {Name: core.Resources_MEMORY, Value: "2Gi"}, + }, }, - Limits: []*core.Resources_ResourceEntry{ - {Name: core.Resources_CPU, Value: "500m"}, - {Name: core.Resources_MEMORY, Value: "2Gi"}, + }, + WorkerReplicas: &kfplugins.DistributedTensorflowTrainingReplicaSpec{ + Replicas: 100, + Resources: &core.Resources{ + Requests: []*core.Resources_ResourceEntry{ + {Name: core.Resources_CPU, Value: "1024m"}, + {Name: core.Resources_MEMORY, Value: "1Gi"}, + {Name: core.Resources_GPU, Value: "1"}, + }, + Limits: []*core.Resources_ResourceEntry{ + {Name: core.Resources_CPU, Value: "2048m"}, + {Name: core.Resources_MEMORY, Value: "2Gi"}, + {Name: core.Resources_GPU, Value: "1"}, + }, }, }, }, - WorkerReplicas: &kfplugins.DistributedTensorflowTrainingReplicaSpec{ - Replicas: 100, - Resources: &core.Resources{ - Requests: []*core.Resources_ResourceEntry{ - {Name: core.Resources_CPU, Value: "1024m"}, - {Name: core.Resources_MEMORY, Value: "1Gi"}, - {Name: core.Resources_GPU, Value: "1"}, + { + ChiefReplicas: &kfplugins.DistributedTensorflowTrainingReplicaSpec{ + Common: &kfplugins.CommonReplicaSpec{ + Replicas: 1, + Resources: &core.Resources{ + Requests: []*core.Resources_ResourceEntry{ + {Name: core.Resources_CPU, Value: "250m"}, + {Name: core.Resources_MEMORY, Value: "1Gi"}, + }, + Limits: []*core.Resources_ResourceEntry{ + {Name: core.Resources_CPU, Value: "500m"}, + {Name: core.Resources_MEMORY, Value: "2Gi"}, + }, + }, }, - Limits: []*core.Resources_ResourceEntry{ - {Name: core.Resources_CPU, Value: "2048m"}, - {Name: core.Resources_MEMORY, Value: "2Gi"}, - {Name: core.Resources_GPU, Value: "1"}, + }, + WorkerReplicas: &kfplugins.DistributedTensorflowTrainingReplicaSpec{ + Common: &kfplugins.CommonReplicaSpec{ + Replicas: 100, + Resources: &core.Resources{ + Requests: []*core.Resources_ResourceEntry{ + {Name: core.Resources_CPU, Value: "1024m"}, + {Name: core.Resources_MEMORY, Value: "1Gi"}, + {Name: core.Resources_GPU, Value: "1"}, + }, + Limits: []*core.Resources_ResourceEntry{ + {Name: core.Resources_CPU, Value: "2048m"}, + {Name: core.Resources_MEMORY, Value: "2Gi"}, + {Name: core.Resources_GPU, Value: "1"}, + }, + }, }, }, }, } - tensorflowResourceHandler := tensorflowOperatorResourceHandler{} + for _, taskConfig := range taskConfigs { - taskTemplate := dummyTensorFlowTaskTemplate("v1", taskConfig) - taskTemplate.TaskTypeVersion = 1 + tensorflowResourceHandler := tensorflowOperatorResourceHandler{} - resource, err := tensorflowResourceHandler.BuildResource(context.TODO(), dummyTensorFlowTaskContext(taskTemplate, resourceRequirements, nil)) - assert.NoError(t, err) - assert.NotNil(t, resource) + taskTemplate := dummyTensorFlowTaskTemplate("v1", taskConfig) + taskTemplate.TaskTypeVersion = 1 - tensorflowJob, ok := resource.(*kubeflowv1.TFJob) - assert.True(t, ok) + resource, err := tensorflowResourceHandler.BuildResource(context.TODO(), dummyTensorFlowTaskContext(taskTemplate, resourceRequirements, nil)) + assert.NoError(t, err) + assert.NotNil(t, resource) - assert.NotContains(t, tensorflowJob.Spec.TFReplicaSpecs[kubeflowv1.TFJobReplicaTypeChief].Template.Spec.Tolerations, gpuToleration) - assert.Contains(t, tensorflowJob.Spec.TFReplicaSpecs[kubeflowv1.TFJobReplicaTypeWorker].Template.Spec.Tolerations, gpuToleration) + tensorflowJob, ok := resource.(*kubeflowv1.TFJob) + assert.True(t, ok) + + assert.NotContains(t, tensorflowJob.Spec.TFReplicaSpecs[kubeflowv1.TFJobReplicaTypeChief].Template.Spec.Tolerations, gpuToleration) + assert.Contains(t, tensorflowJob.Spec.TFReplicaSpecs[kubeflowv1.TFJobReplicaTypeWorker].Template.Spec.Tolerations, gpuToleration) + } }