diff --git a/test/e2e/framework/configmanager/constants.go b/test/e2e/framework/configmanager/constants.go index e00ae7764..179f7900a 100644 --- a/test/e2e/framework/configmanager/constants.go +++ b/test/e2e/framework/configmanager/constants.go @@ -45,6 +45,7 @@ const ( AppPath = "ws/v1/partition/%s/queue/%s/application/%s" ClustersPath = "ws/v1/clusters" NodesPath = "ws/v1/partition/%s/nodes" + NodePath = "ws/v1/partition/%s/node/%s" UserUsagePath = "ws/v1/partition/%s/usage/user/%s" GroupUsagePath = "ws/v1/partition/%s/usage/group/%s" HealthCheckPath = "ws/v1/scheduler/healthcheck" diff --git a/test/e2e/framework/helpers/yunikorn/rest_api_utils.go b/test/e2e/framework/helpers/yunikorn/rest_api_utils.go index 26289dff1..49ad76105 100644 --- a/test/e2e/framework/helpers/yunikorn/rest_api_utils.go +++ b/test/e2e/framework/helpers/yunikorn/rest_api_utils.go @@ -233,6 +233,16 @@ func (c *RClient) GetNodes(partition string) (*[]dao.NodeDAOInfo, error) { return &nodes, err } +func (c *RClient) GetNode(partition string, nodeId string) (*dao.NodeDAOInfo, error) { + req, err := c.newRequest("GET", fmt.Sprintf(configmanager.NodePath, partition, nodeId), nil) + if err != nil { + return nil, err + } + var node dao.NodeDAOInfo + _, err = c.do(req, &node) + return &node, err +} + func (c *RClient) WaitForAppStateTransition(partition string, queue string, appID string, state string, timeout int) error { return wait.PollImmediate(time.Millisecond*300, time.Duration(timeout)*time.Second, c.isAppInDesiredState(partition, queue, appID, state)) } diff --git a/test/e2e/resource_fairness/resource_fairness_suite_test.go b/test/e2e/resource_fairness/resource_fairness_suite_test.go index 9e8b5dde9..60d87bb2f 100644 --- a/test/e2e/resource_fairness/resource_fairness_suite_test.go +++ b/test/e2e/resource_fairness/resource_fairness_suite_test.go @@ -20,6 +20,7 @@ package resourcefairness_test import ( "path/filepath" + "strings" "testing" "github.com/onsi/ginkgo/v2" @@ -29,7 +30,6 @@ import ( "github.com/apache/yunikorn-k8shim/test/e2e/framework/configmanager" "github.com/apache/yunikorn-k8shim/test/e2e/framework/helpers/common" - "github.com/apache/yunikorn-k8shim/test/e2e/framework/helpers/k8s" "github.com/apache/yunikorn-k8shim/test/e2e/framework/helpers/yunikorn" ) @@ -39,14 +39,16 @@ func init() { var oldConfigMap = new(v1.ConfigMap) var annotation = "ann-" + common.RandSeq(10) -var kClient = k8s.KubeCtl{} //nolint -var _ = BeforeSuite(func() { - Ω(kClient.SetClient()).To(BeNil()) - annotation = "ann-" + common.RandSeq(10) - yunikorn.EnsureYuniKornConfigsPresent() -}) var _ = AfterSuite(func() { + By("Tear down namespace: " + ns) + err := kClient.TearDownNamespace(ns) + Ω(err).NotTo(HaveOccurred()) + + ginkgo.By("Untainting some nodes") + err = kClient.UntaintNodes(nodesToTaint, taintKey) + Ω(err).NotTo(HaveOccurred(), "Could not remove taint from nodes "+strings.Join(nodesToTaint, ",")) + yunikorn.RestoreConfigMapWrapper(oldConfigMap, annotation) }) diff --git a/test/e2e/resource_fairness/resource_fairness_test.go b/test/e2e/resource_fairness/resource_fairness_test.go index fff4104e0..6dd225d00 100644 --- a/test/e2e/resource_fairness/resource_fairness_test.go +++ b/test/e2e/resource_fairness/resource_fairness_test.go @@ -24,10 +24,12 @@ import ( "time" "github.com/onsi/ginkgo/v2" + "github.com/onsi/gomega" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/wait" "github.com/apache/yunikorn-core/pkg/common/configs" + "github.com/apache/yunikorn-core/pkg/webservice/dao" "github.com/apache/yunikorn-k8shim/pkg/common/constants" tests "github.com/apache/yunikorn-k8shim/test/e2e" "github.com/apache/yunikorn-k8shim/test/e2e/framework/helpers/common" @@ -35,25 +37,103 @@ import ( "github.com/apache/yunikorn-k8shim/test/e2e/framework/helpers/yunikorn" ) +var kClient k8s.KubeCtl +var restClient yunikorn.RClient +var nodes *v1.NodeList +var workerID1 string +var workerID2 string +var namespace *v1.Namespace +var err error +var queuePath string +var basicUtiliz float64 + +var maxCPU int64 = 500 +var maxMem int64 = 500 +var nodesToTaint = []string{} +var taintKey = "e2e_test_fairness" +var ns = "test-" + common.RandSeq(10) + +var _ = BeforeSuite(func() { + kClient = k8s.KubeCtl{} + Ω(kClient.SetClient()).To(BeNil()) + + restClient = yunikorn.RClient{} + Ω(restClient).NotTo(gomega.BeNil()) + + yunikorn.EnsureYuniKornConfigsPresent() + + ginkgo.By("Port-forward the scheduler pod") + err = kClient.PortForwardYkSchedulerPod() + Ω(err).NotTo(gomega.HaveOccurred()) + + ginkgo.By("create development namespace") + namespace, err = kClient.CreateNamespace(ns, nil) + gomega.Ω(err).NotTo(gomega.HaveOccurred()) + gomega.Ω(namespace.Status.Phase).To(gomega.Equal(v1.NamespaceActive)) + + // get node available resource + nodes, err = kClient.GetNodes() + Ω(err).NotTo(HaveOccurred()) + Ω(len(nodes.Items)).NotTo(gomega.BeZero(), "Nodes can't be zero") + + // get node id + for _, node := range nodes.Items { + node := node + if k8s.IsMasterNode(&node) || !k8s.IsComputeNode(&node) { + continue + } + if workerID1 == "" { + workerID1 = node.Name + } else if workerID2 == "" { + workerID2 = node.Name + } else { + nodesToTaint = append(nodesToTaint, node.Name) + } + } + Ω(workerID1).NotTo(gomega.BeEmpty(), "worker node not found") + Ω(workerID2).NotTo(gomega.BeEmpty(), "only found one worker node") + + ginkgo.By("Tainting some nodes") + err = kClient.TaintNodes(nodesToTaint, taintKey, "value", v1.TaintEffectNoSchedule) + Ω(err).NotTo(HaveOccurred()) + + // Get the highest utilization of a specific resource type on worker nodes. + var nodesDAOInfo *[]dao.NodeDAOInfo + nodesDAOInfo, err = restClient.GetNodes(constants.DefaultPartition) + Ω(err).NotTo(HaveOccurred()) + Ω(nodesDAOInfo).NotTo(gomega.BeNil()) + + for _, node := range *nodesDAOInfo { + if node.NodeID == workerID1 || node.NodeID == workerID2 { + memUtiliz := 1.0 - float64(node.Available["memory"])/float64(node.Capacity["memory"]) + cpuUtiliz := 1.0 - float64(node.Available["vcore"])/float64(node.Capacity["vcore"]) + if memUtiliz > basicUtiliz { + basicUtiliz = memUtiliz + } + if cpuUtiliz > basicUtiliz { + basicUtiliz = cpuUtiliz + } + } + } +}) + var _ = Describe("FairScheduling:", func() { - var kClient k8s.KubeCtl - var restClient yunikorn.RClient - var err error - var ns string - var queuePath string - - var maxCPU int64 = 500 - var maxMem int64 = 500 - - BeforeEach(func() { - var namespace *v1.Namespace - ns = "test-" + common.RandSeq(10) - queuePath = "root." + ns - kClient = k8s.KubeCtl{} - Ω(kClient.SetClient()).To(BeNil()) + // Validates waitQueue order of requested app resources, according to fairAppScheduling. + // Step 1: Deploy 4 apps, which sum to 95% queueCPU / 35% queueMem + // -> Resource priority order: 1)CPU. 2)Mem + // Step 2: Deploy 1 more blocked pod each for apps0-2 + // Step 3: Kill an App3 pod. + // Step 4: App with least cpu use is allocated pod next. Break tie using mem. + It("Test_Wait_Queue_Order", func() { + + By(fmt.Sprintf("Creating test namespace %s", ns)) + namespace, err = kClient.UpdateNamespace(ns, map[string]string{"vcore": "500m", "memory": "500M"}) + Ω(err).ShouldNot(HaveOccurred()) + Ω(namespace.Status.Phase).Should(Equal(v1.NamespaceActive)) By("Setting custom YuniKorn configuration") annotation = "ann-" + common.RandSeq(10) + queuePath = "root." + ns yunikorn.UpdateCustomConfigMapWrapper(oldConfigMap, "fair", annotation, func(sc *configs.SchedulerConfig) error { // remove placement rules so we can control queue sc.Partitions[0].PlacementRules = nil @@ -68,30 +148,7 @@ var _ = Describe("FairScheduling:", func() { } return nil }) - kClient = k8s.KubeCtl{} - Ω(kClient.SetClient()).To(BeNil()) - - // Restart yunikorn and port-forward - // Required to change node sort policy. - ginkgo.By("Restart the scheduler pod") - yunikorn.RestartYunikorn(&kClient) - - ginkgo.By("Port-forward scheduler pod after restart") - yunikorn.RestorePortForwarding(&kClient) - - By(fmt.Sprintf("Creating test namespace %s", ns)) - namespace, err = kClient.CreateNamespace(ns, map[string]string{"vcore": "500m", "memory": "500M"}) - Ω(err).ShouldNot(HaveOccurred()) - Ω(namespace.Status.Phase).Should(Equal(v1.NamespaceActive)) - }) - // Validates waitQueue order of requested app resources, according to fairAppScheduling. - // Step 1: Deploy 4 apps, which sum to 95% queueCPU / 35% queueMem - // -> Resource priority order: 1)CPU. 2)Mem - // Step 2: Deploy 1 more blocked pod each for apps0-2 - // Step 3: Kill an App3 pod. - // Step 4: App with least cpu use is allocated pod next. Break tie using mem. - It("Test_Wait_Queue_Order", func() { // Create appIDs var apps []string for j := 0; j < 4; j++ { @@ -191,14 +248,203 @@ var _ = Describe("FairScheduling:", func() { } }) + // Validates the order of node allocation for requested pod resources, following the fairNodeScheduling approach. + // Step 1: Deploy 2 apps, 5% increase in resource utilization on worker1 and a 10% increase on worker2. + // Step 2: Deploy 1 apps is expected to allocate it to worker1, 15% increase in the utilization of resources on worker1. + // Step 3: Deploy 1 apps is expected to allocate it to worker2. + It("Verify_basic_node_sorting_with_fairness_policy", func() { + By(fmt.Sprintf("update test namespace %s", ns)) + namespace, err = kClient.UpdateNamespace(ns, nil) + Ω(err).ShouldNot(HaveOccurred()) + Ω(namespace.Status.Phase).Should(Equal(v1.NamespaceActive)) + + By("Setting custom YuniKorn configuration") + annotation = "ann-" + common.RandSeq(10) + queuePath = "root.sandbox" + yunikorn.UpdateCustomConfigMapWrapper(oldConfigMap, "fair", annotation, func(sc *configs.SchedulerConfig) error { + // remove placement rules so we can control queue + sc.Partitions[0].PlacementRules = nil + if err = common.AddQueue(sc, "default", "root", configs.QueueConfig{ + Name: "sandbox", + Parent: false, + }); err != nil { + return err + } + return nil + }) + + // Assign pod1 to worker1 and pod2 to worker2. + sleepPod1Conf := k8s.SleepPodConfig{ + Name: "pod1", NS: ns, AppID: "app1", + RequiredNode: workerID1, + Labels: map[string]string{constants.LabelQueueName: queuePath}} + sleepPod2Conf := k8s.SleepPodConfig{ + Name: "pod2", NS: ns, AppID: "app2", + RequiredNode: workerID2, + Labels: map[string]string{constants.LabelQueueName: queuePath}} + + // Verify if pod3 and pod4 are assigned based on node sorting fairness. + sleepPod3Conf := k8s.SleepPodConfig{ + Name: "pod3", NS: ns, AppID: "app3", + Labels: map[string]string{constants.LabelQueueName: queuePath}} + sleepPod4Conf := k8s.SleepPodConfig{ + Name: "pod4", NS: ns, AppID: "app4", + Labels: map[string]string{constants.LabelQueueName: queuePath}} + + sleepPodConfs := []k8s.SleepPodConfig{} + sleepPodConfs = append(sleepPodConfs, sleepPod1Conf) + sleepPodConfs = append(sleepPodConfs, sleepPod2Conf) + sleepPodConfs = append(sleepPodConfs, sleepPod3Conf) + sleepPodConfs = append(sleepPodConfs, sleepPod4Conf) + + // The node where the pod is expected to be deployed affects its resource utilization. + expectedNode := []string{ + workerID1, + workerID2, + workerID1, + workerID2, + } + increaseUtiliz := []map[string]float64{ + {"vcore": 0.05, "memory": 0.05}, + {"vcore": 0.10, "memory": 0.10}, + {"vcore": 0.15, "memory": 0.15}, + {"vcore": 0.15, "memory": 0.15}, + } + // Deploy pod + for idx, config := range sleepPodConfs { + ginkgo.By("Deploy the sleep pod " + config.Name) + config.CPU = fillNodeUtil(expectedNode[idx], "vcore", increaseUtiliz[idx]["vcore"]+basicUtiliz) + config.Mem = fillNodeUtil(expectedNode[idx], "memory", increaseUtiliz[idx]["memory"]+basicUtiliz) / 1000 / 1000 + initPod, podErr := k8s.InitSleepPod(config) + Ω(podErr).NotTo(HaveOccurred()) + _, err = kClient.CreatePod(initPod, ns) + Ω(err).NotTo(HaveOccurred()) + err = kClient.WaitForPodRunning(ns, config.Name, 60*time.Second) + Ω(err).NotTo(HaveOccurred()) + } + + // Verify pod3, pod4 have been deployed on the correct node. + var RespPod *v1.Pod + ginkgo.By(sleepPod3Conf.Name + " should deploy on " + workerID1) + RespPod, err = kClient.GetPod(sleepPod3Conf.Name, ns) + Ω(err).NotTo(HaveOccurred()) + Ω(RespPod.Spec.NodeName).To(gomega.Equal(workerID1), "Pod should place on "+workerID1) + + ginkgo.By(sleepPod4Conf.Name + " should deploy on " + workerID2) + RespPod, err = kClient.GetPod(sleepPod4Conf.Name, ns) + Ω(err).NotTo(HaveOccurred()) + Ω(RespPod.Spec.NodeName).To(gomega.Equal(workerID2), "Pod should place on "+workerID2) + }) + + // Validates the order of node allocation for requested pod resources, considering fairNodeScheduling with resource weights. + // Step 1: Set the Resource weights to {"vcore": 5.0, "memory": 1.0} + // Step 2: Deploy 2 apps, increase resource utilization on both worker1 and worker2. It's expected that worker2 will experience higher resource utilization. + // Step 3: Deploy 1 apps is expected to allocate it to worker1, Leading to higher resource utilization on worker1. + // Step 4: Deploy 1 apps is expected to allocate it to worker2. + It("Verify_node_sorting_fairness_policy_with_resource_weight", func() { + By("Setting custom YuniKorn configuration") + annotation = "ann-" + common.RandSeq(10) + queuePath = "root.sandbox" + yunikorn.UpdateCustomConfigMapWrapper(oldConfigMap, "fair", annotation, func(sc *configs.SchedulerConfig) error { + // remove placement rules so we can control queue + sc.Partitions[0].PlacementRules = nil + sc.Partitions[0].NodeSortPolicy = configs.NodeSortingPolicy{ + Type: "fair", + ResourceWeights: map[string]float64{ + "vcore": 5.0, + "memory": 1.0, + }, + } + if err = common.AddQueue(sc, "default", "root", configs.QueueConfig{ + Name: "sandbox", + Parent: false, + }); err != nil { + return err + } + return nil + }) + + // Assign pod1 to worker1 and pod2 to worker2. + sleepPod1Conf := k8s.SleepPodConfig{ + Name: "pod1", NS: ns, AppID: "app1", + RequiredNode: workerID1, + Labels: map[string]string{constants.LabelQueueName: queuePath}} + sleepPod2Conf := k8s.SleepPodConfig{ + Name: "pod2", NS: ns, AppID: "app2", + RequiredNode: workerID2, + Labels: map[string]string{constants.LabelQueueName: queuePath}} + + // Verify if pod3 and pod4 are assigned based on node sorting fairness. + sleepPod3Conf := k8s.SleepPodConfig{ + Name: "pod3", NS: ns, AppID: "app3", + Labels: map[string]string{constants.LabelQueueName: queuePath}} + sleepPod4Conf := k8s.SleepPodConfig{ + Name: "pod4", NS: ns, AppID: "app4", + Labels: map[string]string{constants.LabelQueueName: queuePath}} + + sleepPodConfs := []k8s.SleepPodConfig{} + sleepPodConfs = append(sleepPodConfs, sleepPod1Conf) + sleepPodConfs = append(sleepPodConfs, sleepPod2Conf) + sleepPodConfs = append(sleepPodConfs, sleepPod3Conf) + sleepPodConfs = append(sleepPodConfs, sleepPod4Conf) + + // The node where the pod is expected to be deployed affects its resource utilization. + expectedNode := []string{ + workerID1, + workerID2, + workerID1, + workerID2, + } + increaseUtiliz := []map[string]float64{ + {"vcore": 0.05, "memory": 0.10}, + {"vcore": 0.10, "memory": 0.05}, + {"vcore": 0.15, "memory": 0.15}, + {"vcore": 0.15, "memory": 0.15}, + } + // Deploy pod + for idx, config := range sleepPodConfs { + ginkgo.By("Deploy the sleeppod " + config.Name) + config.CPU = fillNodeUtil(expectedNode[idx], "vcore", increaseUtiliz[idx]["vcore"]+basicUtiliz) + config.Mem = fillNodeUtil(expectedNode[idx], "memory", increaseUtiliz[idx]["memory"]+basicUtiliz) / 1000 / 1000 + initPod, podErr := k8s.InitSleepPod(config) + Ω(podErr).NotTo(HaveOccurred()) + _, err = kClient.CreatePod(initPod, ns) + Ω(err).NotTo(HaveOccurred()) + err = kClient.WaitForPodRunning(ns, config.Name, 30*time.Second) + Ω(err).NotTo(HaveOccurred()) + } + + // Verify pod3, pod4 have been deployed on the correct node. + ginkgo.By(sleepPod3Conf.Name + " should deploy on " + workerID1) + RespPod, err := kClient.GetPod(sleepPod3Conf.Name, ns) + Ω(err).NotTo(HaveOccurred()) + Ω(RespPod.Spec.NodeName).To(gomega.Equal(workerID1), "Pod should place on "+workerID1) + + ginkgo.By(sleepPod4Conf.Name + " should deploy on " + workerID2) + RespPod, err = kClient.GetPod(sleepPod4Conf.Name, ns) + Ω(err).NotTo(HaveOccurred()) + Ω(RespPod.Spec.NodeName).To(gomega.Equal(workerID2), "Pod should place on "+workerID2) + }) + AfterEach(func() { testDescription := ginkgo.CurrentSpecReport() if testDescription.Failed() { tests.LogTestClusterInfoWrapper(testDescription.FailureMessage(), []string{ns}) tests.LogYunikornContainer(testDescription.FailureMessage()) } - By("Tear down namespace: " + ns) - err := kClient.TearDownNamespace(ns) - Ω(err).NotTo(HaveOccurred()) + + // Delete all sleep pods + ginkgo.By("Delete all sleep pods") + err := kClient.DeletePods(ns) + if err != nil { + fmt.Fprintf(ginkgo.GinkgoWriter, "Failed to delete pods in namespace %s - reason is %s\n", ns, err.Error()) + } }) }) + +func fillNodeUtil(nodeId string, resourceType string, percent float64) int64 { + node, err := restClient.GetNode(constants.DefaultPartition, nodeId) + Ω(err).NotTo(HaveOccurred()) + fillingResource := percent*float64(node.Capacity[resourceType]) - float64(node.Allocated[resourceType]+node.Occupied[resourceType]) + return int64(fillingResource) +}