diff --git a/test/e2e/preemption/preemption_test.go b/test/e2e/preemption/preemption_test.go index dc8585aa2..80c744b6b 100644 --- a/test/e2e/preemption/preemption_test.go +++ b/test/e2e/preemption/preemption_test.go @@ -41,6 +41,7 @@ var kClient k8s.KubeCtl var restClient yunikorn.RClient var ns *v1.Namespace var dev = "dev" + common.RandSeq(5) +var devNew = "dev" + common.RandSeq(5) var oldConfigMap = new(v1.ConfigMap) var annotation = "ann-" + common.RandSeq(10) @@ -49,6 +50,8 @@ var Worker = "" var WorkerMemRes int64 var sleepPodMemLimit int64 var sleepPodMemLimit2 int64 +var sleepPodMemOverLimit int64 +var nodeName string var taintKey = "e2e_test_preemption" var nodesToTaint []string @@ -102,12 +105,14 @@ var _ = ginkgo.BeforeSuite(func() { for _, node := range *nodesDAOInfo { if node.NodeID == Worker { WorkerMemRes = node.Available["memory"] + nodeName = node.HostName } } WorkerMemRes /= (1000 * 1000) // change to M fmt.Fprintf(ginkgo.GinkgoWriter, "Worker node %s available memory %dM\n", Worker, WorkerMemRes) sleepPodMemLimit = int64(float64(WorkerMemRes) / 3) + sleepPodMemOverLimit = int64(float64(WorkerMemRes) * 1.5) Ω(sleepPodMemLimit).NotTo(gomega.BeZero(), "Sleep pod memory limit cannot be zero") fmt.Fprintf(ginkgo.GinkgoWriter, "Sleep pod limit memory %dM\n", sleepPodMemLimit) @@ -175,7 +180,7 @@ var _ = ginkgo.Describe("Preemption", func() { // Wait for pod to move to running state podErr = kClient.WaitForPodBySelectorRunning(dev, fmt.Sprintf("app=%s", sleepRespPod.ObjectMeta.Labels["app"]), - 60) + 120) gomega.Ω(podErr).NotTo(gomega.HaveOccurred()) } @@ -546,6 +551,85 @@ var _ = ginkgo.Describe("Preemption", func() { gomega.Ω(err).ShouldNot(HaveOccurred()) }) + ginkgo.It("Verify_preemption_on_specific_node", func() { + /* + 1. Create Two Queue (High and Low Guaranteed Limit) + 2. Select a schedulable node from the cluster + 3. Schedule a number of small, Low priority pause tasks on Low Guaranteed queue (Enough to fill the node) + 4. Schedule a large task in High Priority queue with same node + 5. Wait for few seconds to schedule the task + 6. This should trigger preemption on low-priority queue and remove or preempt task from low priority queue + 7. Do cleanup once test is done either passed or failed + */ + time.Sleep(20 * time.Second) + ginkgo.By("A queue uses resource more than the guaranteed value even after removing one of the pods. The cluster doesn't have enough resource to deploy a pod in another queue which uses resource less than the guaranteed value.") + // update config + ginkgo.By(fmt.Sprintf("Update root.sandbox1 and root.sandbox2 with guaranteed memory %dM", sleepPodMemLimit)) + annotation = "ann-" + common.RandSeq(10) + yunikorn.UpdateCustomConfigMapWrapper(oldConfigMap, "", annotation, func(sc *configs.SchedulerConfig) error { + // remove placement rules so we can control queue + sc.Partitions[0].PlacementRules = nil + var err error + if err = common.AddQueue(sc, "default", "root", configs.QueueConfig{ + Name: "sandbox1", + Resources: configs.Resources{Guaranteed: map[string]string{"memory": fmt.Sprintf("%dM", sleepPodMemLimit)}}, + Properties: map[string]string{"preemption.delay": "1s"}, + }); err != nil { + return err + } + + if err = common.AddQueue(sc, "default", "root", configs.QueueConfig{ + Name: "sandbox2", + Resources: configs.Resources{Guaranteed: map[string]string{"memory": fmt.Sprintf("%dM", sleepPodMemOverLimit)}}, + Properties: map[string]string{"preemption.delay": "1s"}, + }); err != nil { + return err + } + return nil + }) + + newNamespace, err := kClient.CreateNamespace(devNew, nil) + gomega.Ω(err).NotTo(gomega.HaveOccurred()) + gomega.Ω(newNamespace.Status.Phase).To(gomega.Equal(v1.NamespaceActive)) + // Define sleepPod + sleepPodConfigs := createSandbox1SleepPodCofigsWithStaticNode(3, 600) + sleepPod4Config := k8s.SleepPodConfig{Name: "sleepjob4", NS: devNew, Mem: sleepPodMemLimit, Time: 600, Optedout: k8s.Allow, Labels: map[string]string{"queue": "root.sandbox2"}, RequiredNode: nodeName} + sleepPodConfigs = append(sleepPodConfigs, sleepPod4Config) + + for _, config := range sleepPodConfigs { + ginkgo.By("Deploy the sleep pod " + config.Name + " to the development namespace") + sleepObj, podErr := k8s.InitSleepPod(config) + Ω(podErr).NotTo(gomega.HaveOccurred()) + sleepRespPod, podErr := kClient.CreatePod(sleepObj, devNew) + gomega.Ω(podErr).NotTo(gomega.HaveOccurred()) + + // Wait for pod to move to running state + podErr = kClient.WaitForPodBySelectorRunning(devNew, + fmt.Sprintf("app=%s", sleepRespPod.ObjectMeta.Labels["app"]), + 120) + gomega.Ω(podErr).NotTo(gomega.HaveOccurred()) + } + + // assert one of the pods in root.sandbox1 is preempted + ginkgo.By("One of the pods in root.sanbox1 is preempted") + sandbox1RunningPodsCnt := 0 + pods, err := kClient.ListPodsByLabelSelector(devNew, "queue=root.sandbox1") + gomega.Ω(err).NotTo(gomega.HaveOccurred()) + for _, pod := range pods.Items { + if pod.DeletionTimestamp != nil { + continue + } + if pod.Status.Phase == v1.PodRunning { + sandbox1RunningPodsCnt++ + } + } + Ω(sandbox1RunningPodsCnt).To(gomega.Equal(2), "One of the pods in root.sandbox1 should be preempted") + errNew := kClient.DeletePods(newNamespace.Name) + if errNew != nil { + fmt.Fprintf(ginkgo.GinkgoWriter, "Failed to delete pods in namespace %s - reason is %s\n", newNamespace.Name, err.Error()) + } + }) + ginkgo.AfterEach(func() { testDescription := ginkgo.CurrentSpecReport() if testDescription.Failed() { @@ -572,3 +656,11 @@ func createSandbox1SleepPodCofigs(cnt, time int) []k8s.SleepPodConfig { } return sandbox1Configs } + +func createSandbox1SleepPodCofigsWithStaticNode(cnt, time int) []k8s.SleepPodConfig { + sandbox1Configs := make([]k8s.SleepPodConfig, 0, cnt) + for i := 0; i < cnt; i++ { + sandbox1Configs = append(sandbox1Configs, k8s.SleepPodConfig{Name: fmt.Sprintf("sleepjob%d", i+1), NS: devNew, Mem: sleepPodMemLimit2, Time: time, Optedout: k8s.Allow, Labels: map[string]string{"queue": "root.sandbox1"}, RequiredNode: nodeName}) + } + return sandbox1Configs +}