Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable 0 workers replicaSpec for pytorchjob #6201

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 9 additions & 8 deletions flyteplugins/go/tasks/plugins/k8s/kfoperators/pytorch/pytorch.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,16 +145,17 @@ func (p pytorchOperatorResourceHandler) BuildResource(ctx context.Context, taskC
"Invalid TaskSpecification, unsupported task template version [%v] key", taskTemplate.GetTaskTypeVersion())
}

if *workerReplicaSpec.Replicas <= 0 {
return nil, fmt.Errorf("number of workers must be greater than 0")
jobSpec := kubeflowv1.PyTorchJobSpec{}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we add a small unit test?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @pingsutw, I've removed TestBuildResourcePytorchV1WithZeroWorker as it was intended to fail in the case corrected with this PR. Replaced it with the TestBuildResourcePytorchV1WithDifferentWorkersNumber test, thank you.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One unit test is failing. otherwise, lgtm.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be fixed now, thanks.

replicaSpecs := map[commonOp.ReplicaType]*commonOp.ReplicaSpec{
kubeflowv1.PyTorchJobReplicaTypeMaster: masterReplicaSpec,
}
Comment on lines +148 to +151
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider validating masterReplicaSpec before use

Consider validating that masterReplicaSpec is not nil before using it in the replicaSpecs map. A nil masterReplicaSpec could cause runtime issues.

Code suggestion
Check the AI-generated fix before applying
Suggested change
jobSpec := kubeflowv1.PyTorchJobSpec{}
replicaSpecs := map[commonOp.ReplicaType]*commonOp.ReplicaSpec{
kubeflowv1.PyTorchJobReplicaTypeMaster: masterReplicaSpec,
}
jobSpec := kubeflowv1.PyTorchJobSpec{}
if masterReplicaSpec == nil {
return nil, fmt.Errorf("master replica spec cannot be nil")
}
replicaSpecs := map[commonOp.ReplicaType]*commonOp.ReplicaSpec{
kubeflowv1.PyTorchJobReplicaTypeMaster: masterReplicaSpec,
}

Code Review Run #59b7b8


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

if *workerReplicaSpec.Replicas > 0 {
replicaSpecs[kubeflowv1.PyTorchJobReplicaTypeWorker] = workerReplicaSpec
}

jobSpec := kubeflowv1.PyTorchJobSpec{
PyTorchReplicaSpecs: map[commonOp.ReplicaType]*commonOp.ReplicaSpec{
kubeflowv1.PyTorchJobReplicaTypeMaster: masterReplicaSpec,
kubeflowv1.PyTorchJobReplicaTypeWorker: workerReplicaSpec,
},
RunPolicy: runPolicy,
jobSpec = kubeflowv1.PyTorchJobSpec{
PyTorchReplicaSpecs: replicaSpecs,
RunPolicy: runPolicy,
}

if elasticPolicy != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -746,7 +746,7 @@ func TestReplicaCounts(t *testing.T) {
contains []commonOp.ReplicaType
notContains []commonOp.ReplicaType
}{
{"NoWorkers", 0, true, nil, nil},
{"NoWorkers", 0, false, []commonOp.ReplicaType{kubeflowv1.PyTorchJobReplicaTypeMaster}, []commonOp.ReplicaType{}},
{"Works", 1, false, []commonOp.ReplicaType{kubeflowv1.PyTorchJobReplicaTypeMaster, kubeflowv1.PyTorchJobReplicaTypeWorker}, []commonOp.ReplicaType{}},
} {
t.Run(test.name, func(t *testing.T) {
Expand Down Expand Up @@ -1210,29 +1210,88 @@ func TestBuildResourcePytorchV1WithElastic(t *testing.T) {
}
}

func TestBuildResourcePytorchV1WithZeroWorker(t *testing.T) {
func TestBuildResourcePytorchV1WithDifferentWorkersNumber(t *testing.T) {
taskConfigs := []*kfplugins.DistributedPyTorchTrainingTask{
{
// Test case 1: Zero workers - should only have master
WorkerReplicas: &kfplugins.DistributedPyTorchTrainingReplicaSpec{
Replicas: 0,
},
MasterReplicas: &kfplugins.DistributedPyTorchTrainingReplicaSpec{
Image: testImageMaster,
Resources: &core.Resources{
Requests: []*core.Resources_ResourceEntry{
{Name: core.Resources_CPU, Value: "250m"},
{Name: core.Resources_MEMORY, Value: "250Mi"},
},
Limits: []*core.Resources_ResourceEntry{
{Name: core.Resources_CPU, Value: "500m"},
{Name: core.Resources_MEMORY, Value: "500Mi"},
},
},
},
},
{
// Test case 2: One worker - should have both master and worker
WorkerReplicas: &kfplugins.DistributedPyTorchTrainingReplicaSpec{
Common: &kfplugins.CommonReplicaSpec{
Replicas: 0,
Replicas: 1,
},
MasterReplicas: &kfplugins.DistributedPyTorchTrainingReplicaSpec{
Image: testImageMaster,
Resources: &core.Resources{
Requests: []*core.Resources_ResourceEntry{
{Name: core.Resources_CPU, Value: "250m"},
{Name: core.Resources_MEMORY, Value: "250Mi"},
},
Limits: []*core.Resources_ResourceEntry{
{Name: core.Resources_CPU, Value: "500m"},
{Name: core.Resources_MEMORY, Value: "500Mi"},
},
},
},
},
}

for _, taskConfig := range taskConfigs {
pytorchResourceHandler := pytorchOperatorResourceHandler{}
for i, taskConfig := range taskConfigs {
t.Run(fmt.Sprintf("Case %d", i+1), func(t *testing.T) {
pytorchResourceHandler := pytorchOperatorResourceHandler{}

taskTemplate := dummyPytorchTaskTemplate("job5", taskConfig)
taskTemplate.TaskTypeVersion = 1
_, err := pytorchResourceHandler.BuildResource(context.TODO(), dummyPytorchTaskContext(taskTemplate, resourceRequirements, nil, "", k8s.PluginState{}))
assert.Error(t, err)
taskTemplate := dummyPytorchTaskTemplate("job5", taskConfig)
taskTemplate.TaskTypeVersion = 1

res, err := pytorchResourceHandler.BuildResource(context.TODO(), dummyPytorchTaskContext(taskTemplate, resourceRequirements, nil, "", k8s.PluginState{}))
assert.NoError(t, err)
assert.NotNil(t, res)

pytorchJob, ok := res.(*kubeflowv1.PyTorchJob)
assert.True(t, ok)

if taskConfig.GetWorkerReplicas().GetReplicas() == 0 {
// Should only contain master spec
assert.Equal(t, 1, len(pytorchJob.Spec.PyTorchReplicaSpecs))
assert.Contains(t, pytorchJob.Spec.PyTorchReplicaSpecs, kubeflowv1.PyTorchJobReplicaTypeMaster)
assert.NotContains(t, pytorchJob.Spec.PyTorchReplicaSpecs, kubeflowv1.PyTorchJobReplicaTypeWorker)

// Verify master spec details
masterSpec := pytorchJob.Spec.PyTorchReplicaSpecs[kubeflowv1.PyTorchJobReplicaTypeMaster]
assert.Equal(t, int32(1), *masterSpec.Replicas)
assert.Equal(t, testImageMaster, masterSpec.Template.Spec.Containers[0].Image)
} else {
// Should contain both master and worker specs
assert.Equal(t, 2, len(pytorchJob.Spec.PyTorchReplicaSpecs))
assert.Contains(t, pytorchJob.Spec.PyTorchReplicaSpecs, kubeflowv1.PyTorchJobReplicaTypeMaster)
assert.Contains(t, pytorchJob.Spec.PyTorchReplicaSpecs, kubeflowv1.PyTorchJobReplicaTypeWorker)

// Verify master spec details
masterSpec := pytorchJob.Spec.PyTorchReplicaSpecs[kubeflowv1.PyTorchJobReplicaTypeMaster]
assert.Equal(t, int32(1), *masterSpec.Replicas)
assert.Equal(t, testImageMaster, masterSpec.Template.Spec.Containers[0].Image)

// Verify worker spec details
workerSpec := pytorchJob.Spec.PyTorchReplicaSpecs[kubeflowv1.PyTorchJobReplicaTypeWorker]
assert.Equal(t, int32(1), *workerSpec.Replicas)
}
})
}
}

Expand Down
Loading