Skip to content

Commit

Permalink
support elastic jobset
Browse files Browse the repository at this point in the history
  • Loading branch information
kannon92 committed Aug 17, 2024
1 parent ec39730 commit 58ca914
Show file tree
Hide file tree
Showing 9 changed files with 661 additions and 16 deletions.
53 changes: 53 additions & 0 deletions pkg/controllers/elastic_jobset.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package controllers

import (
"fmt"
"strconv"

batchv1 "k8s.io/api/batch/v1"

jobset "sigs.k8s.io/jobset/api/jobset/v1alpha2"
)

// jobsToDeleteDownScale gathers the excess jobs during a downscale
// and deletes the jobs
func jobsToDeleteDownScale(replicatedJobs []jobset.ReplicatedJob, replicatedJobStatus []jobset.ReplicatedJobStatus, jobItems []batchv1.Job) ([]*batchv1.Job, error) {
jobsToDelete := []*batchv1.Job{}
for _, replicatedJob := range replicatedJobs {
status := findReplicatedJobStatus(replicatedJobStatus, replicatedJob.Name)
countOfJobsToDelete := status.Ready - replicatedJob.Replicas
if countOfJobsToDelete > 0 {
jobsWeDeleted := 0
for _, val := range jobItems {
if val.Labels[jobset.ReplicatedJobNameKey] != replicatedJob.Name {
continue
}
jobIndex, err := strconv.Atoi(val.Labels[jobset.JobIndexKey])
if err != nil {
return nil, fmt.Errorf("unable get integer from job index key")
}
if jobIndex >= int(countOfJobsToDelete) {
jobsWeDeleted = jobsWeDeleted + 1
jobsToDelete = append(jobsToDelete, &val)
}
if jobsWeDeleted == int(countOfJobsToDelete) {
continue
}
}
}
}
return jobsToDelete, nil
}
212 changes: 212 additions & 0 deletions pkg/controllers/elastic_jobset_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package controllers

import (
"testing"

"github.com/google/go-cmp/cmp"
batchv1 "k8s.io/api/batch/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"

jobset "sigs.k8s.io/jobset/api/jobset/v1alpha2"
)

func TestJobsToDeleteDownScale(t *testing.T) {

tests := []struct {
name string
replicatedJobs []jobset.ReplicatedJob
replicatedJobStatus []jobset.ReplicatedJobStatus
jobs []batchv1.Job
expectedJobsToDelete int32
gotError error
}{
{
name: "no elastic downscale",
replicatedJobs: []jobset.ReplicatedJob{
{
Name: "test",
Template: batchv1.JobTemplateSpec{},
Replicas: 2,
},
},
replicatedJobStatus: []jobset.ReplicatedJobStatus{
{
Name: "test",
Ready: 1,
},
},
jobs: []batchv1.Job{
{
ObjectMeta: v1.ObjectMeta{
Labels: map[string]string{
jobset.ReplicatedJobNameKey: "test",
jobset.JobIndexKey: "0",
},
},
},
},
},
{
name: "elastic upscale; do nothing",
replicatedJobs: []jobset.ReplicatedJob{
{
Name: "test",
Template: batchv1.JobTemplateSpec{},
Replicas: 2,
},
},
replicatedJobStatus: []jobset.ReplicatedJobStatus{
{
Name: "test",
Ready: 1,
},
},
jobs: []batchv1.Job{
{
ObjectMeta: v1.ObjectMeta{
Labels: map[string]string{
jobset.ReplicatedJobNameKey: "test",
jobset.JobIndexKey: "0",
},
},
},
},
},
{
name: "elastic downscale is needed",
replicatedJobs: []jobset.ReplicatedJob{
{
Name: "test",
Template: batchv1.JobTemplateSpec{},
Replicas: 1,
},
},
replicatedJobStatus: []jobset.ReplicatedJobStatus{
{
Name: "test",
Ready: 2,
},
},
jobs: []batchv1.Job{
{
ObjectMeta: v1.ObjectMeta{
Labels: map[string]string{
jobset.ReplicatedJobNameKey: "test",
jobset.JobIndexKey: "0",
},
},
},
{
ObjectMeta: v1.ObjectMeta{
Labels: map[string]string{
jobset.ReplicatedJobNameKey: "test",
jobset.JobIndexKey: "1",
},
},
},
},
expectedJobsToDelete: 1,
},
{
name: "elastic downscale is needed for second replicated job",
replicatedJobs: []jobset.ReplicatedJob{
{
Name: "test",
Template: batchv1.JobTemplateSpec{},
Replicas: 2,
},
{
Name: "test-2",
Template: batchv1.JobTemplateSpec{},
Replicas: 2,
},
},
replicatedJobStatus: []jobset.ReplicatedJobStatus{
{
Name: "test",
Ready: 2,
},
{
Name: "test-2",
Ready: 4,
},
},
jobs: []batchv1.Job{
{
ObjectMeta: v1.ObjectMeta{
Labels: map[string]string{
jobset.ReplicatedJobNameKey: "test",
jobset.JobIndexKey: "0",
},
},
},
{
ObjectMeta: v1.ObjectMeta{
Labels: map[string]string{
jobset.ReplicatedJobNameKey: "test",
jobset.JobIndexKey: "1",
},
},
},
{
ObjectMeta: v1.ObjectMeta{
Labels: map[string]string{
jobset.ReplicatedJobNameKey: "test-2",
jobset.JobIndexKey: "0",
},
},
},
{
ObjectMeta: v1.ObjectMeta{
Labels: map[string]string{
jobset.ReplicatedJobNameKey: "test-2",
jobset.JobIndexKey: "1",
},
},
},
{
ObjectMeta: v1.ObjectMeta{
Labels: map[string]string{
jobset.ReplicatedJobNameKey: "test-2",
jobset.JobIndexKey: "2",
},
},
},
{
ObjectMeta: v1.ObjectMeta{
Labels: map[string]string{
jobset.ReplicatedJobNameKey: "test-2",
jobset.JobIndexKey: "3",
},
},
},
},
expectedJobsToDelete: 2,
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
actual, err := jobsToDeleteDownScale(tc.replicatedJobs, tc.replicatedJobStatus, tc.jobs)
if diff := cmp.Diff(tc.gotError, err); diff != "" {
t.Errorf("unexpected finished value (+got/-want): %s", diff)
}
if diff := cmp.Diff(tc.expectedJobsToDelete, int32(len(actual))); diff != "" {
t.Errorf("unexpected finished value (+got/-want): %s", diff)
}
})
}
}
23 changes: 23 additions & 0 deletions pkg/controllers/jobset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,13 @@ func (r *JobSetReconciler) reconcile(ctx context.Context, js *jobset.JobSet, upd
return ctrl.Result{}, err
}

// ElasticJobSet can downscale and upscale Jobs of a given replicated job
// On downscale, we need to gather the jobs to delete.
if err := r.downscaleElasticJobs(ctx, js, rjobStatuses); err != nil {
log.Error(err, "unable to downscale elastic jobs")
return ctrl.Result{}, err
}

// Handle suspending a jobset or resuming a suspended jobset.
jobsetSuspended := jobSetSuspended(js)
if jobsetSuspended {
Expand Down Expand Up @@ -520,6 +527,22 @@ func (r *JobSetReconciler) reconcileReplicatedJobs(ctx context.Context, js *jobs
return nil
}

// We need to check if the replicas of a replicated job are mismatching
// If they are, we should delete the extra jobs
func (r *JobSetReconciler) downscaleElasticJobs(ctx context.Context, js *jobset.JobSet, replicatedJobStatus []jobset.ReplicatedJobStatus) error {
// Get all active jobs owned by JobSet.
var childJobList batchv1.JobList
if err := r.List(ctx, &childJobList, client.InNamespace(js.Namespace), client.MatchingFields{constants.JobOwnerKey: js.Name}); err != nil {
return err
}

jobsToDelete, err := jobsToDeleteDownScale(js.Spec.ReplicatedJobs, replicatedJobStatus, childJobList.Items)
if err != nil {
return err
}
return r.deleteJobs(ctx, jobsToDelete)
}

func (r *JobSetReconciler) createJobs(ctx context.Context, js *jobset.JobSet, jobs []*batchv1.Job) error {
log := ctrl.LoggerFrom(ctx)

Expand Down
34 changes: 29 additions & 5 deletions pkg/webhooks/jobset_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
apivalidation "k8s.io/apimachinery/pkg/api/validation"

"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -272,11 +273,16 @@ func (j *jobSetWebhook) ValidateUpdate(ctx context.Context, old, newObj runtime.
mungedSpec.ReplicatedJobs[index].Template.Spec.Template.Spec.SchedulingGates = oldJS.Spec.ReplicatedJobs[index].Template.Spec.Template.Spec.SchedulingGates
}
}

// Note that SucccessPolicy and failurePolicy are made immutable via CEL.
errs := apivalidation.ValidateImmutableField(mungedSpec.ReplicatedJobs, oldJS.Spec.ReplicatedJobs, field.NewPath("spec").Child("replicatedJobs"))
errs = append(errs, apivalidation.ValidateImmutableField(mungedSpec.ManagedBy, oldJS.Spec.ManagedBy, field.NewPath("spec").Child("managedBy"))...)
return nil, errs.ToAggregate()
// Note that SuccessPolicy and failurePolicy are made immutable via CEL.
// Comparing job templates can be slow
// Only do it if we detect a difference.
if !equality.Semantic.DeepEqual(mungedSpec.ReplicatedJobs, oldJS.Spec.ReplicatedJobs) {
if err := validateReplicatedJobsUpdate(mungedSpec.ReplicatedJobs, oldJS.Spec.ReplicatedJobs); err != nil {
return nil, err
}
}
errList := apivalidation.ValidateImmutableField(mungedSpec.ManagedBy, oldJS.Spec.ManagedBy, field.NewPath("spec").Child("managedBy"))
return nil, errList.ToAggregate()
}

// ValidateDelete implements webhook.Validator so a webhook will be registered for the type
Expand Down Expand Up @@ -396,3 +402,21 @@ func replicatedJobNamesFromSpec(js *jobset.JobSet) []string {
func completionModePtr(mode batchv1.CompletionMode) *batchv1.CompletionMode {
return &mode
}

// validateReplicatedJobsUpdate validates the updates for elastic jobs
// Changing length of jobs, name of jobs and the templates are forbidden
func validateReplicatedJobsUpdate(currentRepJobs, oldRepJobs []jobset.ReplicatedJob) error {
// Changing length of replicated jobs on updates is forbidden
if len(currentRepJobs) != len(oldRepJobs) {
return fmt.Errorf("updates can not change the length of replicated jobs")
}
for i := range currentRepJobs {
if currentRepJobs[i].Name != oldRepJobs[i].Name {
return fmt.Errorf("updates can not change job names or reorder the jobs")
}
if !equality.Semantic.DeepEqual(currentRepJobs[i].Template, oldRepJobs[i].Template) {
return fmt.Errorf("updates can not change job templates")
}
}
return nil
}
Loading

0 comments on commit 58ca914

Please sign in to comment.