From 85077d422c717712edb95caaa21a59748c485904 Mon Sep 17 00:00:00 2001 From: Michal Wozniak Date: Fri, 26 Jul 2024 09:04:35 +0200 Subject: [PATCH] add e2e test and fixes # Conflicts: # pkg/webhooks/jobset_webhook.go --- pkg/controllers/jobset_controller.go | 14 ++--- pkg/webhooks/jobset_webhook.go | 28 +++++++--- test/e2e/e2e_test.go | 84 ++++++++++++++++++++++++++++ 3 files changed, 111 insertions(+), 15 deletions(-) diff --git a/pkg/controllers/jobset_controller.go b/pkg/controllers/jobset_controller.go index 86ce029a..89eb480a 100644 --- a/pkg/controllers/jobset_controller.go +++ b/pkg/controllers/jobset_controller.go @@ -379,13 +379,8 @@ func (r *JobSetReconciler) calculateReplicatedJobStatuses(ctx context.Context, j } func (r *JobSetReconciler) suspendJobs(ctx context.Context, js *jobset.JobSet, activeJobs []*batchv1.Job, updateStatusOpts *statusUpdateOpts) error { - for _, job := range activeJobs { - if !jobSuspended(job) { - job.Spec.Suspend = ptr.To(true) - if err := r.Update(ctx, job); err != nil { - return err - } - } + if err := r.deleteJobs(ctx, activeJobs); err != nil { + return err } setJobSetSuspendedCondition(js, updateStatusOpts) return nil @@ -493,6 +488,11 @@ func (r *JobSetReconciler) reconcileReplicatedJobs(ctx context.Context, js *jobs return err } + // Don't create child Jobs if the JobSet is suspended + if jobSetSuspended(js) { + continue + } + status := findReplicatedJobStatus(replicatedJobStatus, replicatedJob.Name) // For startup policy, if the replicatedJob is started we can skip this loop. diff --git a/pkg/webhooks/jobset_webhook.go b/pkg/webhooks/jobset_webhook.go index c85ae713..ad6cf292 100644 --- a/pkg/webhooks/jobset_webhook.go +++ b/pkg/webhooks/jobset_webhook.go @@ -313,14 +313,26 @@ func (j *jobSetWebhook) ValidateUpdate(ctx context.Context, old, newObj runtime. // This is needed for integration with Kueue/DWS. if ptr.Deref(oldJS.Spec.Suspend, false) || ptr.Deref(js.Spec.Suspend, false) { for index := range js.Spec.ReplicatedJobs { - // Pod values which must be mutable for Kueue are defined here: https://github.com/kubernetes-sigs/kueue/blob/a50d395c36a2cb3965be5232162cf1fded1bdb08/apis/kueue/v1beta1/workload_types.go#L256-L260 - mungedSpec.ReplicatedJobs[index].Template.Spec.Template.Annotations = oldJS.Spec.ReplicatedJobs[index].Template.Spec.Template.Annotations - mungedSpec.ReplicatedJobs[index].Template.Spec.Template.Labels = oldJS.Spec.ReplicatedJobs[index].Template.Spec.Template.Labels - mungedSpec.ReplicatedJobs[index].Template.Spec.Template.Spec.NodeSelector = oldJS.Spec.ReplicatedJobs[index].Template.Spec.Template.Spec.NodeSelector - mungedSpec.ReplicatedJobs[index].Template.Spec.Template.Spec.Tolerations = oldJS.Spec.ReplicatedJobs[index].Template.Spec.Template.Spec.Tolerations - - // Pod Scheduling Gates can be updated for batch/v1 Job: https://github.com/kubernetes/kubernetes/blob/ceb58a4dbc671b9d0a2de6d73a1616bc0c299863/pkg/apis/batch/validation/validation.go#L662 - mungedSpec.ReplicatedJobs[index].Template.Spec.Template.Spec.SchedulingGates = oldJS.Spec.ReplicatedJobs[index].Template.Spec.Template.Spec.SchedulingGates + munge := true + // Don't allow to unsuspend if there are still active Jobs with + // different PodTemplate. We do this to avoid a race condition when + // Jobs with an old PodTemplate (before suspension) are still + // running and would not be deleted if we unsuspend the JobSet. + if !ptr.Deref(js.Spec.Suspend, false) { + if len(js.Status.ReplicatedJobsStatus) > index && js.Status.ReplicatedJobsStatus[index].Active > 0 { + munge = false + } + } + if munge { + // Pod values which must be mutable for Kueue are defined here: https://github.com/kubernetes-sigs/kueue/blob/a50d395c36a2cb3965be5232162cf1fded1bdb08/apis/kueue/v1beta1/workload_types.go#L256-L260 + mungedSpec.ReplicatedJobs[index].Template.Spec.Template.Annotations = oldJS.Spec.ReplicatedJobs[index].Template.Spec.Template.Annotations + mungedSpec.ReplicatedJobs[index].Template.Spec.Template.Labels = oldJS.Spec.ReplicatedJobs[index].Template.Spec.Template.Labels + mungedSpec.ReplicatedJobs[index].Template.Spec.Template.Spec.NodeSelector = oldJS.Spec.ReplicatedJobs[index].Template.Spec.Template.Spec.NodeSelector + mungedSpec.ReplicatedJobs[index].Template.Spec.Template.Spec.Tolerations = oldJS.Spec.ReplicatedJobs[index].Template.Spec.Template.Spec.Tolerations + + // Pod Scheduling Gates can be updated for batch/v1 Job: https://github.com/kubernetes/kubernetes/blob/ceb58a4dbc671b9d0a2de6d73a1616bc0c299863/pkg/apis/batch/validation/validation.go#L662 + mungedSpec.ReplicatedJobs[index].Template.Spec.Template.Spec.SchedulingGates = oldJS.Spec.ReplicatedJobs[index].Template.Spec.Template.Spec.SchedulingGates + } } } diff --git a/test/e2e/e2e_test.go b/test/e2e/e2e_test.go index 8c713a72..114254cb 100644 --- a/test/e2e/e2e_test.go +++ b/test/e2e/e2e_test.go @@ -24,6 +24,7 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/utils/ptr" jobset "sigs.k8s.io/jobset/api/jobset/v1alpha2" "sigs.k8s.io/jobset/pkg/util/testing" @@ -131,6 +132,67 @@ var _ = ginkgo.Describe("JobSet", func() { }) }) + ginkgo.When("job is unsuspended and suspend", func() { + for rep := 0; rep < 100; rep++ { + ginkgo.It(fmt.Sprintf("should allow to update PodTemplate on unsuspend and restore the PodTemplate on suspend-%v", rep), func() { + ctx := context.Background() + js := shortSleepTestJobSet(ns).Obj() + jsKey := types.NamespacedName{Name: js.Name, Namespace: js.Namespace} + + ginkgo.By("Create a suspended JobSet", func() { + js.Spec.Suspend = ptr.To(true) + js.Spec.TTLSecondsAfterFinished = ptr.To[int32](5) + gomega.Expect(k8sClient.Create(ctx, js)).Should(gomega.Succeed()) + }) + + ginkgo.By("Unsuspend the JobSet setting nodeSelectors that prevent pods from being scheduled", func() { + gomega.Eventually(func() error { + gomega.Expect(k8sClient.Get(ctx, jsKey, js)).Should(gomega.Succeed()) + js.Spec.Suspend = ptr.To(false) + podTemplate := &js.Spec.ReplicatedJobs[0].Template.Spec.Template + if podTemplate.Spec.NodeSelector == nil { + podTemplate.Spec.NodeSelector = make(map[string]string) + } + podTemplate.Spec.NodeSelector["kubernetes.io/hostname"] = "non-existing-node" + if podTemplate.Labels == nil { + podTemplate.Labels = make(map[string]string) + } + podTemplate.Labels["custom-label-key"] = "custom-label-value" + if podTemplate.Annotations == nil { + podTemplate.Annotations = make(map[string]string) + } + podTemplate.Annotations["custom-annotation-key"] = "custom-annotation-value" + return k8sClient.Update(ctx, js) + }, timeout, interval).Should(gomega.Succeed()) + }) + + ginkgo.By("Suspend the JobSet restoring the PodTemplate properties", func() { + gomega.Eventually(func() error { + gomega.Expect(k8sClient.Get(ctx, jsKey, js)).Should(gomega.Succeed()) + js.Spec.Suspend = ptr.To(true) + podTemplate := &js.Spec.ReplicatedJobs[0].Template.Spec.Template + delete(podTemplate.Spec.NodeSelector, "kubernetes.io/hostname") + delete(podTemplate.Labels, "custom-label-key") + delete(podTemplate.Annotations, "custom-annotation-key") + return k8sClient.Update(ctx, js) + }, timeout, interval).Should(gomega.Succeed()) + }) + + ginkgo.By("Unsuspending the JobSet again with PodTemplate allowing completion", func() { + gomega.Eventually(func() error { + gomega.Expect(k8sClient.Get(ctx, jsKey, js)).Should(gomega.Succeed()) + js.Spec.Suspend = ptr.To(false) + return k8sClient.Update(ctx, js) + }, timeout, interval).Should(gomega.Succeed()) + }) + + ginkgo.By("Await for the JobSet to complete successfully", func() { + util.JobSetCompleted(ctx, k8sClient, js, timeout) + }) + }) + } + }) + }) // end of Describe // getPingCommand returns ping command for 4 hostnames @@ -246,3 +308,25 @@ func sleepTestJobSet(ns *corev1.Namespace) *testing.JobSetWrapper { Replicas(int32(replicas)). Obj()) } + +func shortSleepTestJobSet(ns *corev1.Namespace) *testing.JobSetWrapper { + jsName := "js" + rjobName := "rjob" + replicas := 3 + return testing.MakeJobSet(jsName, ns.Name). + ReplicatedJob(testing.MakeReplicatedJob(rjobName). + Job(testing.MakeJobTemplate("job", ns.Name). + PodSpec(corev1.PodSpec{ + RestartPolicy: "Never", + Containers: []corev1.Container{ + { + Name: "short-sleep-test-container", + Image: "bash:latest", + Command: []string{"bash", "-c"}, + Args: []string{"sleep 1"}, + }, + }, + }).Obj()). + Replicas(int32(replicas)). + Obj()) +}