From 92f56a00148faa82e0373b6b9d2cc0b7fa523c8a Mon Sep 17 00:00:00 2001 From: wusamzong Date: Thu, 14 Sep 2023 12:27:57 +0800 Subject: [PATCH] [YUNIKORN-1978] Add e2e test for node sorting with the fairness policy --- .../resource_fairness_suite_test.go | 16 +- .../resource_fairness_test.go | 317 +++++++++++++++--- 2 files changed, 285 insertions(+), 48 deletions(-) diff --git a/test/e2e/resource_fairness/resource_fairness_suite_test.go b/test/e2e/resource_fairness/resource_fairness_suite_test.go index 9e8b5dde9..60d87bb2f 100644 --- a/test/e2e/resource_fairness/resource_fairness_suite_test.go +++ b/test/e2e/resource_fairness/resource_fairness_suite_test.go @@ -20,6 +20,7 @@ package resourcefairness_test import ( "path/filepath" + "strings" "testing" "github.com/onsi/ginkgo/v2" @@ -29,7 +30,6 @@ import ( "github.com/apache/yunikorn-k8shim/test/e2e/framework/configmanager" "github.com/apache/yunikorn-k8shim/test/e2e/framework/helpers/common" - "github.com/apache/yunikorn-k8shim/test/e2e/framework/helpers/k8s" "github.com/apache/yunikorn-k8shim/test/e2e/framework/helpers/yunikorn" ) @@ -39,14 +39,16 @@ func init() { var oldConfigMap = new(v1.ConfigMap) var annotation = "ann-" + common.RandSeq(10) -var kClient = k8s.KubeCtl{} //nolint -var _ = BeforeSuite(func() { - Ω(kClient.SetClient()).To(BeNil()) - annotation = "ann-" + common.RandSeq(10) - yunikorn.EnsureYuniKornConfigsPresent() -}) var _ = AfterSuite(func() { + By("Tear down namespace: " + ns) + err := kClient.TearDownNamespace(ns) + Ω(err).NotTo(HaveOccurred()) + + ginkgo.By("Untainting some nodes") + err = kClient.UntaintNodes(nodesToTaint, taintKey) + Ω(err).NotTo(HaveOccurred(), "Could not remove taint from nodes "+strings.Join(nodesToTaint, ",")) + yunikorn.RestoreConfigMapWrapper(oldConfigMap, annotation) }) diff --git a/test/e2e/resource_fairness/resource_fairness_test.go b/test/e2e/resource_fairness/resource_fairness_test.go index fff4104e0..3a89db318 100644 --- a/test/e2e/resource_fairness/resource_fairness_test.go +++ b/test/e2e/resource_fairness/resource_fairness_test.go @@ -24,10 +24,12 @@ import ( "time" "github.com/onsi/ginkgo/v2" + "github.com/onsi/gomega" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/wait" "github.com/apache/yunikorn-core/pkg/common/configs" + "github.com/apache/yunikorn-core/pkg/webservice/dao" "github.com/apache/yunikorn-k8shim/pkg/common/constants" tests "github.com/apache/yunikorn-k8shim/test/e2e" "github.com/apache/yunikorn-k8shim/test/e2e/framework/helpers/common" @@ -35,25 +37,96 @@ import ( "github.com/apache/yunikorn-k8shim/test/e2e/framework/helpers/yunikorn" ) +var kClient k8s.KubeCtl +var restClient yunikorn.RClient +var nodes *v1.NodeList +var workerID1 string +var workerID2 string +var worker1 dao.NodeDAOInfo +var worker2 dao.NodeDAOInfo +var namespace *v1.Namespace +var err error +var queuePath string + +var maxCPU int64 = 500 +var maxMem int64 = 500 +var nodesToTaint = []string{} +var taintKey = "e2e_test_fairness" +var ns = "test-" + common.RandSeq(10) + +var _ = BeforeSuite(func() { + kClient = k8s.KubeCtl{} + Ω(kClient.SetClient()).To(BeNil()) + + restClient = yunikorn.RClient{} + Ω(restClient).NotTo(gomega.BeNil()) + + yunikorn.EnsureYuniKornConfigsPresent() + + ginkgo.By("Port-forward the scheduler pod") + var err = kClient.PortForwardYkSchedulerPod() + Ω(err).NotTo(gomega.HaveOccurred()) + + ginkgo.By("create development namespace") + namespace, err = kClient.CreateNamespace(ns, nil) + gomega.Ω(err).NotTo(gomega.HaveOccurred()) + gomega.Ω(namespace.Status.Phase).To(gomega.Equal(v1.NamespaceActive)) + + // get node available resource + nodes, err = kClient.GetNodes() + Ω(err).NotTo(HaveOccurred()) + Ω(len(nodes.Items)).NotTo(gomega.BeZero(), "Nodes can't be zero") + + for _, node := range nodes.Items { + if k8s.IsMasterNode(&node) || !k8s.IsComputeNode(&node) { + continue + } + if workerID1 == "" { + workerID1 = node.Name + } else if workerID2 == "" { + workerID2 = node.Name + } else { + nodesToTaint = append(nodesToTaint, node.Name) + } + } + Ω(workerID1).NotTo(gomega.BeEmpty(), "worker node not found") + Ω(workerID2).NotTo(gomega.BeEmpty(), "only found one worker node") + + ginkgo.By("Tainting some nodes") + err = kClient.TaintNodes(nodesToTaint, taintKey, "value", v1.TaintEffectNoSchedule) + Ω(err).NotTo(HaveOccurred()) + + nodesDAOInfo, err := restClient.GetNodes(constants.DefaultPartition) + Ω(err).NotTo(HaveOccurred()) + Ω(nodesDAOInfo).NotTo(gomega.BeNil()) + + for _, node := range *nodesDAOInfo { + if node.NodeID == workerID1 { + worker1 = node + } + if node.NodeID == workerID2 { + worker2 = node + } + } +}) + var _ = Describe("FairScheduling:", func() { - var kClient k8s.KubeCtl - var restClient yunikorn.RClient - var err error - var ns string - var queuePath string - - var maxCPU int64 = 500 - var maxMem int64 = 500 - - BeforeEach(func() { - var namespace *v1.Namespace - ns = "test-" + common.RandSeq(10) - queuePath = "root." + ns - kClient = k8s.KubeCtl{} - Ω(kClient.SetClient()).To(BeNil()) + // Validates waitQueue order of requested app resources, according to fairAppScheduling. + // Step 1: Deploy 4 apps, which sum to 95% queueCPU / 35% queueMem + // -> Resource priority order: 1)CPU. 2)Mem + // Step 2: Deploy 1 more blocked pod each for apps0-2 + // Step 3: Kill an App3 pod. + // Step 4: App with least cpu use is allocated pod next. Break tie using mem. + It("Test_Wait_Queue_Order", func() { + + By(fmt.Sprintf("Creating test namespace %s", ns)) + namespace, err := kClient.UpdateNamespace(ns, map[string]string{"vcore": "500m", "memory": "500M"}) + Ω(err).ShouldNot(HaveOccurred()) + Ω(namespace.Status.Phase).Should(Equal(v1.NamespaceActive)) By("Setting custom YuniKorn configuration") annotation = "ann-" + common.RandSeq(10) + queuePath = "root." + ns yunikorn.UpdateCustomConfigMapWrapper(oldConfigMap, "fair", annotation, func(sc *configs.SchedulerConfig) error { // remove placement rules so we can control queue sc.Partitions[0].PlacementRules = nil @@ -68,30 +141,7 @@ var _ = Describe("FairScheduling:", func() { } return nil }) - kClient = k8s.KubeCtl{} - Ω(kClient.SetClient()).To(BeNil()) - - // Restart yunikorn and port-forward - // Required to change node sort policy. - ginkgo.By("Restart the scheduler pod") - yunikorn.RestartYunikorn(&kClient) - - ginkgo.By("Port-forward scheduler pod after restart") - yunikorn.RestorePortForwarding(&kClient) - - By(fmt.Sprintf("Creating test namespace %s", ns)) - namespace, err = kClient.CreateNamespace(ns, map[string]string{"vcore": "500m", "memory": "500M"}) - Ω(err).ShouldNot(HaveOccurred()) - Ω(namespace.Status.Phase).Should(Equal(v1.NamespaceActive)) - }) - // Validates waitQueue order of requested app resources, according to fairAppScheduling. - // Step 1: Deploy 4 apps, which sum to 95% queueCPU / 35% queueMem - // -> Resource priority order: 1)CPU. 2)Mem - // Step 2: Deploy 1 more blocked pod each for apps0-2 - // Step 3: Kill an App3 pod. - // Step 4: App with least cpu use is allocated pod next. Break tie using mem. - It("Test_Wait_Queue_Order", func() { // Create appIDs var apps []string for j := 0; j < 4; j++ { @@ -191,14 +241,199 @@ var _ = Describe("FairScheduling:", func() { } }) + // Validates the order of node allocation for requested pod resources, following the fairNodeScheduling approach. + // Step 1: Deploy 2 apps, which utilizing 5% of the resources on worker1 and 10% on worker2. + // Step 2: Deploy 1 apps, according to fair scheduling principles, will be allocated to worker1. + // This allocation increases the resource allocation on worker1 to 15%. + // Step 3: Deploy 1 apps, according to fair scheduling principles, will be allocated to worker2. + It("Verify_basic_node_sorting_with_fairness_policy", func() { + By(fmt.Sprintf("update test namespace %s", ns)) + namespace, err := kClient.UpdateNamespace(ns, nil) + Ω(err).ShouldNot(HaveOccurred()) + Ω(namespace.Status.Phase).Should(Equal(v1.NamespaceActive)) + + By("Setting custom YuniKorn configuration") + annotation = "ann-" + common.RandSeq(10) + queuePath = "root.sandbox" + yunikorn.UpdateCustomConfigMapWrapper(oldConfigMap, "fair", annotation, func(sc *configs.SchedulerConfig) error { + // remove placement rules so we can control queue + sc.Partitions[0].PlacementRules = nil + if err = common.AddQueue(sc, "default", "root", configs.QueueConfig{ + Name: "sandbox", + Parent: false, + }); err != nil { + return err + } + return nil + }) + + sleepPodConfs := []k8s.SleepPodConfig{} + + // Select worker1, utilizing 5% of the resources on worker1 + sleepPod1Conf := k8s.SleepPodConfig{ + Name: "pod1", NS: ns, AppID: "app1", + CPU: fillNodeUtil(&worker1, "vcore", float64(0.05)), + Mem: fillNodeUtil(&worker1, "memory", float64(0.05)) / 1000 / 1000, + RequiredNode: workerID1, + Labels: map[string]string{constants.LabelQueueName: queuePath}} + + // Select worker2, utilizing 10% of the resources on worker2 + sleepPod2Conf := k8s.SleepPodConfig{ + Name: "pod2", NS: ns, AppID: "app2", + CPU: fillNodeUtil(&worker2, "vcore", float64(0.10)), + Mem: fillNodeUtil(&worker2, "memory", float64(0.10)) / 1000 / 1000, + RequiredNode: workerID2, + Labels: map[string]string{constants.LabelQueueName: queuePath}} + + // It wouldn't select node, but would utilizing 10% of the resources on worker1 + sleepPod3Conf := k8s.SleepPodConfig{ + Name: "pod3", NS: ns, AppID: "app3", + CPU: fillNodeUtil(&worker1, "vcore", float64(0.10)), + Mem: fillNodeUtil(&worker1, "memory", float64(0.10)) / 1000 / 1000, + Labels: map[string]string{constants.LabelQueueName: queuePath}} + + // It wouldn't select node, but would utilizing 10% of the resources on worker2 + sleepPod4Conf := k8s.SleepPodConfig{ + Name: "pod4", NS: ns, AppID: "app4", + CPU: fillNodeUtil(&worker2, "vcore", float64(0.10)), + Mem: fillNodeUtil(&worker2, "memory", float64(0.10)) / 1000 / 1000, + Labels: map[string]string{constants.LabelQueueName: queuePath}} + + sleepPodConfs = append(sleepPodConfs, sleepPod1Conf) + sleepPodConfs = append(sleepPodConfs, sleepPod2Conf) + sleepPodConfs = append(sleepPodConfs, sleepPod3Conf) + sleepPodConfs = append(sleepPodConfs, sleepPod4Conf) + + // Deploy pod + for _, config := range sleepPodConfs { + ginkgo.By("Deploy the sleep pod " + config.Name) + initPod, podErr := k8s.InitSleepPod(config) + Ω(podErr).NotTo(HaveOccurred()) + _, err = kClient.CreatePod(initPod, ns) + Ω(err).NotTo(HaveOccurred()) + err = kClient.WaitForPodRunning(ns, config.Name, 3600*time.Second) + Ω(err).NotTo(HaveOccurred()) + } + + // Verify pod3, pod4 have been deployed on the correct node. + ginkgo.By(sleepPod3Conf.Name + " should deploy on " + workerID1) + RespPod, err := kClient.GetPod(sleepPod3Conf.Name, ns) + Ω(err).NotTo(HaveOccurred()) + Ω(RespPod.Spec.NodeName).To(gomega.Equal(workerID1), "Pod should place on "+workerID1) + + ginkgo.By(sleepPod4Conf.Name + " should deploy on " + workerID2) + RespPod, err = kClient.GetPod(sleepPod4Conf.Name, ns) + Ω(err).NotTo(HaveOccurred()) + Ω(RespPod.Spec.NodeName).To(gomega.Equal(workerID2), "Pod should place on "+workerID2) + }) + + // Validates the order of node allocation for requested pod resources, considering fairNodeScheduling with resource weights. + // Step 1: Set the Resource weights to {"vcore": 2.0, "memory": 1.0} + // Step 2: Deploy 2 apps, utilizing 13% of the resource on worker1, + // , and 17% of on worker2, + // Step 3: Deploy 1 apps, according to fair scheduling principles, will be allocated to worker1. + // This allocation increases the resource allocation on worker1 to 20%. + // Step 4: Deploy 1 apps, according to fair scheduling principles, will be allocated to worker2. + It("Verify_node_sorting_fairness_policy_with_resource_weight", func() { + By("Setting custom YuniKorn configuration") + annotation = "ann-" + common.RandSeq(10) + queuePath = "root.sandbox" + yunikorn.UpdateCustomConfigMapWrapper(oldConfigMap, "fair", annotation, func(sc *configs.SchedulerConfig) error { + // remove placement rules so we can control queue + sc.Partitions[0].PlacementRules = nil + sc.Partitions[0].NodeSortPolicy = configs.NodeSortingPolicy{ + Type: "fair", + ResourceWeights: map[string]float64{ + "vcore": 2.0, + "memory": 1.0, + }, + } + if err = common.AddQueue(sc, "default", "root", configs.QueueConfig{ + Name: "sandbox", + Parent: false, + }); err != nil { + return err + } + return nil + }) + + sleepPodConfs := []k8s.SleepPodConfig{} + + // Select worker1, utilizing 13% of the resources on worker1 + sleepPod1Conf := k8s.SleepPodConfig{ + Name: "pod1", NS: ns, AppID: "app1", + CPU: fillNodeUtil(&worker1, "vcore", float64(0.10)), + Mem: fillNodeUtil(&worker1, "memory", float64(0.20)) / 1000 / 1000, + RequiredNode: workerID1, + Labels: map[string]string{constants.LabelQueueName: queuePath}} + + // Select worker1, utilizing 17% of the resources on worker2 + sleepPod2Conf := k8s.SleepPodConfig{ + Name: "pod2", NS: ns, AppID: "app2", + CPU: fillNodeUtil(&worker2, "vcore", float64(0.20)), + Mem: fillNodeUtil(&worker2, "memory", float64(0.10)) / 1000 / 1000, + RequiredNode: workerID2, + Labels: map[string]string{constants.LabelQueueName: queuePath}} + + // It wouldn't select node, but would utilizing 17% of the resources on worker1 + sleepPod3Conf := k8s.SleepPodConfig{ + Name: "pod3", NS: ns, AppID: "app3", + CPU: fillNodeUtil(&worker1, "vcore", float64(0.10)), + Mem: 0, + Labels: map[string]string{constants.LabelQueueName: queuePath}} + + // It wouldn't select node, but would utilizing 13% of the resources on worker2 + sleepPod4Conf := k8s.SleepPodConfig{ + Name: "pod4", NS: ns, AppID: "app4", + CPU: 0, + Mem: fillNodeUtil(&worker2, "memory", float64(0.10)) / 1000 / 1000, + Labels: map[string]string{constants.LabelQueueName: queuePath}} + + sleepPodConfs = append(sleepPodConfs, sleepPod1Conf) + sleepPodConfs = append(sleepPodConfs, sleepPod2Conf) + sleepPodConfs = append(sleepPodConfs, sleepPod3Conf) + sleepPodConfs = append(sleepPodConfs, sleepPod4Conf) + + // Deploy pod + for _, config := range sleepPodConfs { + ginkgo.By("Deploy the sleeppod " + config.Name) + initPod, podErr := k8s.InitSleepPod(config) + Ω(podErr).NotTo(HaveOccurred()) + _, err = kClient.CreatePod(initPod, ns) + Ω(err).NotTo(HaveOccurred()) + err = kClient.WaitForPodRunning(ns, config.Name, 30*time.Second) + Ω(err).NotTo(HaveOccurred()) + } + + // Verify pod3, pod4 have been deployed on the correct node. + ginkgo.By(sleepPod3Conf.Name + " should deploy on " + workerID1) + RespPod, err := kClient.GetPod(sleepPod3Conf.Name, ns) + Ω(err).NotTo(HaveOccurred()) + Ω(RespPod.Spec.NodeName).To(gomega.Equal(workerID1), "Pod should place on "+workerID1) + + ginkgo.By(sleepPod4Conf.Name + " should deploy on " + workerID2) + RespPod, err = kClient.GetPod(sleepPod4Conf.Name, ns) + Ω(err).NotTo(HaveOccurred()) + Ω(RespPod.Spec.NodeName).To(gomega.Equal(workerID2), "Pod should place on "+workerID2) + }) + AfterEach(func() { testDescription := ginkgo.CurrentSpecReport() if testDescription.Failed() { tests.LogTestClusterInfoWrapper(testDescription.FailureMessage(), []string{ns}) tests.LogYunikornContainer(testDescription.FailureMessage()) } - By("Tear down namespace: " + ns) - err := kClient.TearDownNamespace(ns) - Ω(err).NotTo(HaveOccurred()) + + // Delete all sleep pods + ginkgo.By("Delete all sleep pods") + err := kClient.DeletePods(ns) + if err != nil { + fmt.Fprintf(ginkgo.GinkgoWriter, "Failed to delete pods in namespace %s - reason is %s\n", ns, err.Error()) + } }) }) + +func fillNodeUtil(node *dao.NodeDAOInfo, resourceType string, percent float64) int64 { + fillingResource := percent*float64(node.Capacity[resourceType]) - float64(node.Allocated[resourceType]) + return int64(fillingResource) +}