Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support elastic jobset #622

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 89 additions & 0 deletions pkg/controllers/elastic_jobset.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package controllers

import (
"fmt"
"slices"
"strconv"

batchv1 "k8s.io/api/batch/v1"

jobset "sigs.k8s.io/jobset/api/jobset/v1alpha2"
)

func indexFunc(a, b batchv1.Job) int {
jobIndexA, errA := strconv.Atoi(a.Labels[jobset.JobIndexKey])
jobIndexB, errB := strconv.Atoi(b.Labels[jobset.JobIndexKey])
if errA != nil {
return 0
}
if errB != nil {
return 0
}
if jobIndexA > jobIndexB {
return 1
} else if jobIndexA < jobIndexB {
return -1
} else {
return 0
}
}

// jobsToDeleteForDownScale gathers the excess jobs during a downscale
// and deletes the jobs
func jobsToDeleteForDownScale(replicatedJobs []jobset.ReplicatedJob, replicatedJobStatuses []jobset.ReplicatedJobStatus, jobItems []batchv1.Job) ([]*batchv1.Job, error) {
jobsToDelete := []*batchv1.Job{}
type payload struct {
batchJobs []batchv1.Job
rjStatus jobset.ReplicatedJobStatus
replicas int32
}
replicatedJobToBatchJobMap := map[string]payload{}
for _, replicatedJob := range replicatedJobs {
status := findReplicatedJobStatus(replicatedJobStatuses, replicatedJob.Name)
newPayload := &payload{}
newPayload.rjStatus = status
newPayload.replicas = replicatedJob.Replicas
for _, val := range jobItems {
if val.Labels[jobset.ReplicatedJobNameKey] != replicatedJob.Name {
continue
}
newPayload.batchJobs = append(newPayload.batchJobs, val)
}
slices.SortFunc(newPayload.batchJobs, indexFunc)
replicatedJobToBatchJobMap[replicatedJob.Name] = *newPayload
}
for _, jobAndStatus := range replicatedJobToBatchJobMap {
countOfJobsToDelete := jobAndStatus.rjStatus.Ready - jobAndStatus.replicas
if countOfJobsToDelete > 0 {
jobsWeDeleted := 0
for i := len(jobAndStatus.batchJobs) - 1; i >= 0; i-- {

jobIndex, err := strconv.Atoi(jobAndStatus.batchJobs[i].Labels[jobset.JobIndexKey])
if err != nil {
return nil, fmt.Errorf("unable get integer from job index key")
}
if jobIndex >= int(countOfJobsToDelete) {
Copy link
Contributor

@danielvegamyhre danielvegamyhre Sep 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So if we have 4 jobs (indexes 0,1,2,3) and we want to delete 1 job, then countOfJobsToDelete=1 and we may end up deleting any of job indexes 1,2,3 - so it is non-deterministic and may leave us with an non-contiguous range of job indexes (remaining jobs indexes 0,2,3).

Rather than this, I think we should delete jobs from largest job index to smallest, so the remaining jobs still occupy a contiguous index range. This follows the same pattern as how the Job controller behaves for elastic indexed jobs, deleting pods from largest to smallest completion indexes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I went ahead and added this change.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@danielvegamyhre , keeping the indices unchanged could be desirable in cases where we don't want to restart the entire jobset right?

Copy link
Contributor

@danielvegamyhre danielvegamyhre Oct 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@danielvegamyhre , keeping the indices unchanged could be desirable in cases where we don't want to restart the entire jobset right?

Can you clarify what you mean? In this thread we are discussing which jobs to delete if the JobSet is scaled down (e.g., number of replicas in a replicatedJob reduced from 4 to 3) - not restarting the JobSet.

jobsWeDeleted = jobsWeDeleted + 1
jobsToDelete = append(jobsToDelete, &jobAndStatus.batchJobs[i])
}
if jobsWeDeleted == int(countOfJobsToDelete) {
break
}
}
}
}
return jobsToDelete, nil
}
252 changes: 252 additions & 0 deletions pkg/controllers/elastic_jobset_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,252 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package controllers

import (
"testing"

"github.com/google/go-cmp/cmp"
batchv1 "k8s.io/api/batch/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"

jobset "sigs.k8s.io/jobset/api/jobset/v1alpha2"
)

func TestJobsToDeleteDownScale(t *testing.T) {

tests := []struct {
name string
replicatedJobs []jobset.ReplicatedJob
replicatedJobStatus []jobset.ReplicatedJobStatus
jobs []batchv1.Job
expectedJobsThatWereDeleted []batchv1.Job
gotError error
}{
{
name: "no elastic downscale",
replicatedJobs: []jobset.ReplicatedJob{
{
Name: "test",
Template: batchv1.JobTemplateSpec{},
Replicas: 2,
},
},
replicatedJobStatus: []jobset.ReplicatedJobStatus{
{
Name: "test",
Ready: 1,
},
},
jobs: []batchv1.Job{
{
ObjectMeta: v1.ObjectMeta{
Labels: map[string]string{
jobset.ReplicatedJobNameKey: "test",
jobset.JobIndexKey: "0",
},
},
},
},
},
{
name: "elastic upscale; do nothing",
replicatedJobs: []jobset.ReplicatedJob{
{
Name: "test",
Template: batchv1.JobTemplateSpec{},
Replicas: 2,
},
},
replicatedJobStatus: []jobset.ReplicatedJobStatus{
{
Name: "test",
Ready: 1,
},
},
jobs: []batchv1.Job{
{
ObjectMeta: v1.ObjectMeta{
Labels: map[string]string{
jobset.ReplicatedJobNameKey: "test",
jobset.JobIndexKey: "0",
},
},
},
},
},
{
name: "elastic downscale is needed",
replicatedJobs: []jobset.ReplicatedJob{
{
Name: "test",
Template: batchv1.JobTemplateSpec{},
Replicas: 1,
},
},
replicatedJobStatus: []jobset.ReplicatedJobStatus{
{
Name: "test",
Ready: 2,
},
},
jobs: []batchv1.Job{
{
ObjectMeta: v1.ObjectMeta{
Labels: map[string]string{
jobset.ReplicatedJobNameKey: "test",
jobset.JobIndexKey: "0",
},
},
},
{
ObjectMeta: v1.ObjectMeta{
Labels: map[string]string{
jobset.ReplicatedJobNameKey: "test",
jobset.JobIndexKey: "1",
},
},
},
},
expectedJobsThatWereDeleted: []batchv1.Job{
{
ObjectMeta: v1.ObjectMeta{
Labels: map[string]string{
jobset.ReplicatedJobNameKey: "test",
jobset.JobIndexKey: "1",
},
},
},
},
},
{
name: "elastic downscale is needed for second replicated job",
replicatedJobs: []jobset.ReplicatedJob{
{
Name: "test",
Template: batchv1.JobTemplateSpec{},
Replicas: 2,
},
{
Name: "test-2",
Template: batchv1.JobTemplateSpec{},
Replicas: 2,
},
},
replicatedJobStatus: []jobset.ReplicatedJobStatus{
{
Name: "test",
Ready: 2,
},
{
Name: "test-2",
Ready: 4,
},
},
jobs: []batchv1.Job{
{
ObjectMeta: v1.ObjectMeta{
Labels: map[string]string{
jobset.ReplicatedJobNameKey: "test",
jobset.JobIndexKey: "0",
},
},
},
{
ObjectMeta: v1.ObjectMeta{
Labels: map[string]string{
jobset.ReplicatedJobNameKey: "test",
jobset.JobIndexKey: "1",
},
},
},
{
ObjectMeta: v1.ObjectMeta{
Labels: map[string]string{
jobset.ReplicatedJobNameKey: "test-2",
jobset.JobIndexKey: "2",
},
},
},
{
ObjectMeta: v1.ObjectMeta{
Labels: map[string]string{
jobset.ReplicatedJobNameKey: "test-2",
jobset.JobIndexKey: "3",
},
},
},
{
ObjectMeta: v1.ObjectMeta{
Labels: map[string]string{
jobset.ReplicatedJobNameKey: "test-2",
jobset.JobIndexKey: "0",
},
},
},
{
ObjectMeta: v1.ObjectMeta{
Labels: map[string]string{
jobset.ReplicatedJobNameKey: "test-2",
jobset.JobIndexKey: "1",
},
},
},
},
expectedJobsThatWereDeleted: []batchv1.Job{
{
ObjectMeta: v1.ObjectMeta{
Labels: map[string]string{
jobset.ReplicatedJobNameKey: "test-2",
jobset.JobIndexKey: "3",
},
},
},
{
ObjectMeta: v1.ObjectMeta{
Labels: map[string]string{
jobset.ReplicatedJobNameKey: "test-2",
jobset.JobIndexKey: "2",
},
},
},
},
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
actual, err := jobsToDeleteForDownScale(tc.replicatedJobs, tc.replicatedJobStatus, tc.jobs)
if diff := cmp.Diff(tc.gotError, err); diff != "" {
t.Errorf("unexpected finished value (+got/-want): %s", diff)
}
if len(actual) != len(tc.expectedJobsThatWereDeleted) {
t.Errorf("unexpected length mismatch for deleted jobs: got: %d want: %d", len(actual), len(tc.expectedJobsThatWereDeleted))
}
if tc.expectedJobsThatWereDeleted != nil {
for i := range actual {
actualReplicatedJobName := actual[i].ObjectMeta.Labels[jobset.ReplicatedJobNameKey]
actualJobIndexKey := actual[i].ObjectMeta.Labels[jobset.JobIndexKey]
expectedReplicatedJobName := tc.expectedJobsThatWereDeleted[i].ObjectMeta.Labels[jobset.ReplicatedJobNameKey]
expectedJobIndexKey := tc.expectedJobsThatWereDeleted[i].ObjectMeta.Labels[jobset.JobIndexKey]
if diff := cmp.Diff(actualReplicatedJobName, expectedReplicatedJobName); diff != "" {
t.Errorf("unexpected replicated job name (+got/-want): %s", diff)
}
if diff := cmp.Diff(actualJobIndexKey, expectedJobIndexKey); diff != "" {
t.Errorf("unexpected job index (+got/-want): %s", diff)
}
}
}
})
}
}
23 changes: 23 additions & 0 deletions pkg/controllers/jobset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,13 @@ func (r *JobSetReconciler) reconcile(ctx context.Context, js *jobset.JobSet, upd
return ctrl.Result{}, err
}

// ElasticJobSet can downscale and upscale Jobs of a given replicated job
// On downscale, we need to gather the jobs to delete.
if err := r.downscaleElasticJobs(ctx, js, rjobStatuses); err != nil {
log.Error(err, "unable to downscale elastic jobs")
return ctrl.Result{}, err
}

// Handle suspending a jobset or resuming a suspended jobset.
jobsetSuspended := jobSetSuspended(js)
if jobsetSuspended {
Expand Down Expand Up @@ -520,6 +527,22 @@ func (r *JobSetReconciler) reconcileReplicatedJobs(ctx context.Context, js *jobs
return nil
}

// We need to check if the replicas of a replicated job are mismatching
// If they are, we should delete the extra jobs
func (r *JobSetReconciler) downscaleElasticJobs(ctx context.Context, js *jobset.JobSet, replicatedJobStatus []jobset.ReplicatedJobStatus) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can also be moved to the elastic_jobset.go file

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it can. We keep anything related to the JobSetReconciler in jobset_controller.

ElasticJob and other utilites seem to be pure functions that can easily be tested with units. This does rely on the reconcile class to talk to the client.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's what we've done so far but I think it makes more sense for all functions and struct methods that are feature-specific to be colocated in the same feature file, rather than group them based on whether or not the function has a pointer receiver. I think this is a useful pattern for organizing the methods of large structs with a growing number of methods.

For now we can leave this as is though, since this refactor I'm describing isn't really in the scope of this PR. I'll work on this refactor in a separate PR later and we can discuss the pros/cons of it in the PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, in theory that sounds like a good idea. I tend to like separating code from controller logic as unit testing controllers is quite painful.

I will eagerly await your PR.

// Get all active jobs owned by JobSet.
var childJobList batchv1.JobList
if err := r.List(ctx, &childJobList, client.InNamespace(js.Namespace), client.MatchingFields{constants.JobOwnerKey: js.Name}); err != nil {
return err
}

jobsToDelete, err := jobsToDeleteForDownScale(js.Spec.ReplicatedJobs, replicatedJobStatus, childJobList.Items)
if err != nil {
return err
}
return r.deleteJobs(ctx, jobsToDelete)
}

func (r *JobSetReconciler) createJobs(ctx context.Context, js *jobset.JobSet, jobs []*batchv1.Job) error {
log := ctrl.LoggerFrom(ctx)

Expand Down
Loading