Skip to content

Commit

Permalink
Infinite preemption loop in hierarchal cohorts
Browse files Browse the repository at this point in the history
  • Loading branch information
vladikkuzn committed Jan 28, 2025
1 parent dc93d1f commit a383df6
Show file tree
Hide file tree
Showing 3 changed files with 144 additions and 0 deletions.
6 changes: 6 additions & 0 deletions pkg/util/testingjobs/deployment/wrappers.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,3 +141,9 @@ func (d *DeploymentWrapper) PodTemplateAnnotation(k, v string) *DeploymentWrappe
func (d *DeploymentWrapper) PodTemplateSpecQueue(q string) *DeploymentWrapper {
return d.PodTemplateSpecLabel(constants.QueueLabel, q)
}

// PodTemplateSpecPriorityClass sets the priority class of the pod template spec of the Deployment
func (d *DeploymentWrapper) PodTemplateSpecPriorityClass(priorityClassName string) *DeploymentWrapper {
d.Spec.Template.Spec.PriorityClassName = priorityClassName
return d
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package fairsharing

import (
"context"
"fmt"
"time"

Expand All @@ -28,10 +29,12 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/utils/clock"
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/client"

kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
"sigs.k8s.io/kueue/pkg/util/testing"
testingdeployment "sigs.k8s.io/kueue/pkg/util/testingjobs/deployment"
"sigs.k8s.io/kueue/pkg/workload"
"sigs.k8s.io/kueue/test/integration/framework"
"sigs.k8s.io/kueue/test/util"
Expand Down Expand Up @@ -333,3 +336,135 @@ func finishEvictionOfWorkloadsInCQ(cq *kueue.ClusterQueue, n int) {
g.Expect(finished.Len()).Should(gomega.Equal(n), "Not enough workloads evicted")
}, util.Timeout, util.Interval).Should(gomega.Succeed())
}

var _ = ginkgo.FDescribe("Infinite Preemption Loop", func() {
var (
ctx context.Context
ns *corev1.Namespace
)

ginkgo.BeforeEach(func() {
ctx = context.Background()

// Create a namespace for the test
ns = &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
GenerateName: "preemption-loop-test-",
},
}
gomega.Expect(k8sClient.Create(ctx, ns)).To(gomega.Succeed())
})

ginkgo.AfterEach(func() {
// Cleanup namespace
gomega.Expect(k8sClient.Delete(ctx, ns)).To(gomega.Succeed())
fwk.StopManager(ctx)
})

ginkgo.It("should reproduce the infinite preemption loop", func() {
// Step 1: Create ClusterQueues and Cohort
cohortName := "testing"

clusterQueueA := testing.MakeClusterQueue("guaranteed-tenant-a").
Cohort(cohortName).
ResourceGroup(*testing.MakeFlavorQuotas("default").
Resource("cpu", "150m").Obj()).
Preemption(kueue.ClusterQueuePreemption{
ReclaimWithinCohort: kueue.PreemptionPolicyLowerPriority,
BorrowWithinCohort: &kueue.BorrowWithinCohort{
Policy: kueue.BorrowWithinCohortPolicyLowerPriority,
MaxPriorityThreshold: ptr.To[int32](100),
},
WithinClusterQueue: kueue.PreemptionPolicyNever,
}).Obj()
gomega.Expect(k8sClient.Create(ctx, clusterQueueA)).To(gomega.Succeed())

clusterQueueB := testing.MakeClusterQueue("guaranteed-tenant-b").
Cohort(cohortName).
ResourceGroup(*testing.MakeFlavorQuotas("default").
Resource("cpu", "150m").Obj()).
Preemption(kueue.ClusterQueuePreemption{
ReclaimWithinCohort: kueue.PreemptionPolicyLowerPriority,
BorrowWithinCohort: &kueue.BorrowWithinCohort{
Policy: kueue.BorrowWithinCohortPolicyLowerPriority,
MaxPriorityThreshold: ptr.To[int32](100),
},
WithinClusterQueue: kueue.PreemptionPolicyNever,
}).Obj()
gomega.Expect(k8sClient.Create(ctx, clusterQueueB)).To(gomega.Succeed())

clusterQueueBestEffort := testing.MakeClusterQueue("best-effort").
Cohort(cohortName).
ResourceGroup(*testing.MakeFlavorQuotas("default").
Resource("cpu", "0m").Obj()).
Preemption(kueue.ClusterQueuePreemption{
ReclaimWithinCohort: kueue.PreemptionPolicyNever,
BorrowWithinCohort: &kueue.BorrowWithinCohort{
Policy: kueue.BorrowWithinCohortPolicyNever,
},
WithinClusterQueue: kueue.PreemptionPolicyNever,
}).Obj()
gomega.Expect(k8sClient.Create(ctx, clusterQueueBestEffort)).To(gomega.Succeed())

// Step 2: Create LocalQueues
localQueueA := testing.MakeLocalQueue("guaranteed-tenant-a", ns.Name).
ClusterQueue("guaranteed-tenant-a").Obj()
gomega.Expect(k8sClient.Create(ctx, localQueueA)).To(gomega.Succeed())

localQueueB := testing.MakeLocalQueue("guaranteed-tenant-b", ns.Name).
ClusterQueue("guaranteed-tenant-b").Obj()
gomega.Expect(k8sClient.Create(ctx, localQueueB)).To(gomega.Succeed())

localQueueBestEffort := testing.MakeLocalQueue("best-effort", ns.Name).
ClusterQueue("best-effort").Obj()
gomega.Expect(k8sClient.Create(ctx, localQueueBestEffort)).To(gomega.Succeed())

// Step 3: Create Deployments
// Guaranteed Tenant A Workload 1
deploymentA1 := testingdeployment.MakeDeployment("guaranteed-tenant-a-1", ns.Name).
Queue("guaranteed-tenant-a").
Replicas(1).
Request(corev1.ResourceCPU, "250m").
PodTemplateSpecPriorityClass("medium").
Obj()
gomega.Expect(k8sClient.Create(ctx, deploymentA1)).To(gomega.Succeed())

// Best Effort Workload
deploymentBestEffort := testingdeployment.MakeDeployment("best-effort-1", ns.Name).
Queue("best-effort").
Replicas(1).
Request(corev1.ResourceCPU, "50m").
PodTemplateSpecPriorityClass("low").
Obj()
gomega.Expect(k8sClient.Create(ctx, deploymentBestEffort)).To(gomega.Succeed())

// Guaranteed Tenant A Workload 2
deploymentA2 := testingdeployment.MakeDeployment("guaranteed-tenant-a-2", ns.Name).
Queue("guaranteed-tenant-a").
Replicas(1).
Request(corev1.ResourceCPU, "50m").
PodTemplateSpecPriorityClass("medium").
Obj()
gomega.Expect(k8sClient.Create(ctx, deploymentA2)).To(gomega.Succeed())

// Step 4: Verify Infinite Preemption Loop
ginkgo.By("Verifying the infinite preemption loop", func() {
// Wait for a few seconds to observe the loop
time.Sleep(10 * time.Second)

// Check the status of the pods
pods := &corev1.PodList{}
gomega.Expect(k8sClient.List(ctx, pods, client.InNamespace(ns.Name))).To(gomega.Succeed())

// Verify that the best-effort pod is continuously preempted and recreated
var bestEffortPodCount int
for _, pod := range pods.Items {
if pod.Labels["app"] == "best-effort-1" {
bestEffortPodCount++
gomega.Expect(pod.Status.Phase).To(gomega.Or(gomega.Equal(corev1.PodPending), gomega.Equal(corev1.PodRunning)))
}
}
gomega.Expect(bestEffortPodCount).To(gomega.BeNumerically(">", 1), "Best-effort pod should be recreated multiple times")
})
})
})
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"sigs.k8s.io/kueue/pkg/constants"
"sigs.k8s.io/kueue/pkg/controller/core"
"sigs.k8s.io/kueue/pkg/controller/core/indexer"
deploymentcontroller "sigs.k8s.io/kueue/pkg/controller/jobs/deployment"
workloadjob "sigs.k8s.io/kueue/pkg/controller/jobs/job"
"sigs.k8s.io/kueue/pkg/queue"
"sigs.k8s.io/kueue/pkg/scheduler"
Expand Down Expand Up @@ -93,4 +94,6 @@ func managerAndSchedulerSetup(ctx context.Context, mgr manager.Manager) {
scheduler.WithFairSharing(fairSharing))
err = sched.Start(ctx)
gomega.Expect(err).NotTo(gomega.HaveOccurred())

gomega.Expect(deploymentcontroller.SetupWebhook(mgr)).ToNot(gomega.HaveOccurred())
}

0 comments on commit a383df6

Please sign in to comment.