From 9dc4c1113da193b12e6cb36da3a4ca1fc6f97bca Mon Sep 17 00:00:00 2001 From: lowang-bh Date: Thu, 13 Jun 2024 22:44:25 +0800 Subject: [PATCH] enquable and allocatable compare resource with the required dimensions and add testcaes Signed-off-by: lowang-bh --- pkg/scheduler/api/resource_info.go | 3 +++ pkg/scheduler/plugins/capacity/capacity.go | 20 ++++++---------- .../plugins/capacity/capacity_test.go | 24 +++++++++++++++++++ .../plugins/proportion/proportion.go | 15 ++++-------- pkg/scheduler/uthelper/helper.go | 7 ++++++ 5 files changed, 45 insertions(+), 24 deletions(-) diff --git a/pkg/scheduler/api/resource_info.go b/pkg/scheduler/api/resource_info.go index 3f6c9f4681..f9362ac42d 100644 --- a/pkg/scheduler/api/resource_info.go +++ b/pkg/scheduler/api/resource_info.go @@ -456,6 +456,9 @@ func (r *Resource) LessEqualWithDimension(rr *Resource, req *Resource) bool { } for name, quant := range req.ScalarResources { + if IsIgnoredScalarResource(name) { + continue + } rQuant := r.ScalarResources[name] rrQuant := rr.ScalarResources[name] if quant > 0 && rQuant > rrQuant { diff --git a/pkg/scheduler/plugins/capacity/capacity.go b/pkg/scheduler/plugins/capacity/capacity.go index 6c4abd3d16..3906b2ab5f 100644 --- a/pkg/scheduler/plugins/capacity/capacity.go +++ b/pkg/scheduler/plugins/capacity/capacity.go @@ -257,7 +257,8 @@ func (cp *capacityPlugin) OnSessionOpen(ssn *framework.Session) { task := candidate.(*api.TaskInfo) attr := cp.queueOpts[queue.UID] - overused := attr.deserved.LessEqualWithDimension(attr.allocated, task.InitResreq) + futureUsed := attr.allocated.Clone().Add(task.Resreq) + overused := !futureUsed.LessEqualWithDimension(attr.deserved, task.Resreq) metrics.UpdateQueueOverused(attr.name, overused) if overused { klog.V(3).Infof("Queue <%v> can not reclaim, deserved <%v>, allocated <%v>, share <%v>", @@ -272,8 +273,8 @@ func (cp *capacityPlugin) OnSessionOpen(ssn *framework.Session) { ssn.AddAllocatableFn(cp.Name(), func(queue *api.QueueInfo, candidate *api.TaskInfo) bool { attr := cp.queueOpts[queue.UID] - free, _ := attr.realCapability.Diff(attr.allocated, api.Zero) - allocatable := candidate.Resreq.LessEqual(free, api.Zero) + futureUsed := attr.allocated.Clone().Add(candidate.Resreq) + allocatable := futureUsed.LessEqualWithDimension(attr.realCapability, candidate.Resreq) if !allocatable { klog.V(3).Infof("Queue <%v>: realCapability <%v>, allocated <%v>; Candidate <%v>: resource request <%v>", queue.Name, attr.realCapability, attr.allocated, candidate.Name, candidate.Resreq) @@ -303,19 +304,12 @@ func (cp *capacityPlugin) OnSessionOpen(ssn *framework.Session) { klog.V(5).Infof("job %s min resource <%s>, queue %s capability <%s> allocated <%s> inqueue <%s> elastic <%s>", job.Name, minReq.String(), queue.Name, attr.realCapability.String(), attr.allocated.String(), attr.inqueue.String(), attr.elastic.String()) // The queue resource quota limit has not reached - r := minReq.Add(attr.allocated).Add(attr.inqueue).Sub(attr.elastic) - rr := attr.realCapability.Clone() + r := minReq.Clone().Add(attr.allocated).Add(attr.inqueue).Sub(attr.elastic) - for name := range rr.ScalarResources { - if _, ok := r.ScalarResources[name]; !ok { - delete(rr.ScalarResources, name) - } - } - - inqueue := r.LessEqual(rr, api.Infinity) + inqueue := r.LessEqualWithDimension(attr.realCapability, minReq) klog.V(5).Infof("job %s inqueue %v", job.Name, inqueue) if inqueue { - attr.inqueue.Add(job.DeductSchGatedResources(job.GetMinResources())) + attr.inqueue.Add(job.DeductSchGatedResources(minReq)) return util.Permit } ssn.RecordPodGroupEvent(job.PodGroup, v1.EventTypeNormal, string(scheduling.PodGroupUnschedulableType), "queue resource quota insufficient") diff --git a/pkg/scheduler/plugins/capacity/capacity_test.go b/pkg/scheduler/plugins/capacity/capacity_test.go index df469650dd..71907f3160 100644 --- a/pkg/scheduler/plugins/capacity/capacity_test.go +++ b/pkg/scheduler/plugins/capacity/capacity_test.go @@ -124,6 +124,18 @@ func Test_capacityPlugin_OnSessionOpen(t *testing.T) { queue8 := util.BuildQueueWithPriorityAndResourcesQuantity("q8", 1, nil, api.BuildResourceList("2", "4Gi")) queue9 := util.BuildQueueWithPriorityAndResourcesQuantity("q9", 10, nil, api.BuildResourceList("2", "4Gi")) + // case5: p16 + p17 in queue10 will exceed queue's deserved, is not preemptive + p16 := util.BuildPod("ns1", "p16", "n1", corev1.PodRunning, api.BuildResourceList("1", "3Gi"), "pg16", make(map[string]string), nil) + p17 := util.BuildPod("ns1", "p17", "", corev1.PodPending, api.BuildResourceList("1", "1Gi"), "pg17", make(map[string]string), nil) + p18 := util.BuildPod("ns1", "p18", "n1", corev1.PodRunning, api.BuildResourceList("1", "1Gi"), "pg18", make(map[string]string), nil) + // podgroup + pg16 := util.BuildPodGroup("pg16", "ns1", "q10", 1, nil, schedulingv1beta1.PodGroupRunning) + pg17 := util.BuildPodGroup("pg17", "ns1", "q10", 1, nil, schedulingv1beta1.PodGroupInqueue) + pg18 := util.BuildPodGroup("pg18", "ns1", "q11", 1, nil, schedulingv1beta1.PodGroupRunning) + // queue + queue10 := util.BuildQueueWithResourcesQuantity("q10", api.BuildResourceList("2", "2Gi"), api.BuildResourceList("4", "4Gi")) + queue11 := util.BuildQueueWithResourcesQuantity("q11", api.BuildResourceList("0", "0Gi"), api.BuildResourceList("2", "2Gi")) + tests := []uthelper.TestCommonStruct{ { Name: "case0: Pod allocatable when queue has not exceed capability", @@ -182,8 +194,20 @@ func Test_capacityPlugin_OnSessionOpen(t *testing.T) { ExpectBindMap: map[string]string{ "ns1/p15": "n6", }, + ExpectBindsNum: 1, }, + { + Name: "case5: Can not reclaim from other queues when allocated + req > deserved", + Plugins: plugins, + Pods: []*corev1.Pod{p16, p17, p18}, + Nodes: []*corev1.Node{n1}, + PodGroups: []*schedulingv1beta1.PodGroup{pg16, pg17, pg18}, + Queues: []*schedulingv1beta1.Queue{queue10, queue11}, + ExpectPipeLined: map[string][]string{}, + ExpectEvicted: []string{}, + ExpectEvictNum: 0, + }, } tiers := []conf.Tier{ diff --git a/pkg/scheduler/plugins/proportion/proportion.go b/pkg/scheduler/plugins/proportion/proportion.go index 50e56b65d9..07203c6298 100644 --- a/pkg/scheduler/plugins/proportion/proportion.go +++ b/pkg/scheduler/plugins/proportion/proportion.go @@ -315,8 +315,8 @@ func (pp *proportionPlugin) OnSessionOpen(ssn *framework.Session) { ssn.AddAllocatableFn(pp.Name(), func(queue *api.QueueInfo, candidate *api.TaskInfo) bool { attr := pp.queueOpts[queue.UID] - free, _ := attr.deserved.Diff(attr.allocated, api.Zero) - allocatable := candidate.Resreq.LessEqual(free, api.Zero) + futureUsed := attr.allocated.Clone().Add(candidate.Resreq) + allocatable := futureUsed.LessEqualWithDimension(attr.deserved, candidate.Resreq) if !allocatable { klog.V(3).Infof("Queue <%v>: deserved <%v>, allocated <%v>; Candidate <%v>: resource request <%v>", queue.Name, attr.deserved, attr.allocated, candidate.Name, candidate.Resreq) @@ -346,16 +346,9 @@ func (pp *proportionPlugin) OnSessionOpen(ssn *framework.Session) { klog.V(5).Infof("job %s min resource <%s>, queue %s capability <%s> allocated <%s> inqueue <%s> elastic <%s>", job.Name, minReq.String(), queue.Name, attr.realCapability.String(), attr.allocated.String(), attr.inqueue.String(), attr.elastic.String()) // The queue resource quota limit has not reached - r := minReq.Add(attr.allocated).Add(attr.inqueue).Sub(attr.elastic) - rr := attr.realCapability.Clone() + r := minReq.Clone().Add(attr.allocated).Add(attr.inqueue).Sub(attr.elastic) - for name := range rr.ScalarResources { - if _, ok := r.ScalarResources[name]; !ok { - delete(rr.ScalarResources, name) - } - } - - inqueue := r.LessEqual(rr, api.Infinity) + inqueue := r.LessEqualWithDimension(attr.realCapability, minReq) klog.V(5).Infof("job %s inqueue %v", job.Name, inqueue) if inqueue { // deduct the resources of scheduling gated tasks in a job when calculating inqueued resources diff --git a/pkg/scheduler/uthelper/helper.go b/pkg/scheduler/uthelper/helper.go index 912386f728..56de82b444 100644 --- a/pkg/scheduler/uthelper/helper.go +++ b/pkg/scheduler/uthelper/helper.go @@ -131,6 +131,13 @@ func (test *TestCommonStruct) Run(actions []framework.Action) { if len(actions) == 0 { panic("no actions provided, please specify a list of actions to execute") } + + // registry actions in conf variables + conf.EnabledActionMap = make(map[string]bool, len(actions)) + for _, action := range actions { + conf.EnabledActionMap[action.Name()] = true + } + for _, action := range actions { action.Initialize() action.Execute(test.ssn)