From d3d586bd7eb0ccecd7030769c102917791c7f475 Mon Sep 17 00:00:00 2001 From: Ziyang Wu Date: Thu, 23 May 2019 10:02:18 +0800 Subject: [PATCH] Fix wrong calculation for queue deserved in proportion plugin --- pkg/scheduler/api/resource_info.go | 35 ++++++++++ .../plugins/proportion/proportion.go | 18 ++++-- test/e2e/job.go | 64 +++++++++++++++++++ test/e2e/util.go | 1 + 4 files changed, 114 insertions(+), 4 deletions(-) diff --git a/pkg/scheduler/api/resource_info.go b/pkg/scheduler/api/resource_info.go index 6c3caab74..7f38517f3 100644 --- a/pkg/scheduler/api/resource_info.go +++ b/pkg/scheduler/api/resource_info.go @@ -276,6 +276,41 @@ func (r *Resource) LessEqual(rr *Resource) bool { return true } +// Diff calculate the difference between two resource +func (r *Resource) Diff(rr *Resource) (*Resource, *Resource) { + increasedVal := EmptyResource() + decreasedVal := EmptyResource() + if r.MilliCPU > rr.MilliCPU { + increasedVal.MilliCPU += r.MilliCPU - rr.MilliCPU + } else { + decreasedVal.MilliCPU += rr.MilliCPU - r.MilliCPU + } + + if r.Memory > rr.Memory { + increasedVal.Memory += r.Memory - rr.Memory + } else { + decreasedVal.Memory += rr.Memory - r.Memory + } + + for rName, rQuant := range r.ScalarResources { + rrQuant := rr.ScalarResources[rName] + + if rQuant > rrQuant { + if increasedVal.ScalarResources == nil { + increasedVal.ScalarResources = map[v1.ResourceName]float64{} + } + increasedVal.ScalarResources[rName] += rQuant - rrQuant + } else { + if decreasedVal.ScalarResources == nil { + decreasedVal.ScalarResources = map[v1.ResourceName]float64{} + } + decreasedVal.ScalarResources[rName] += rrQuant - rQuant + } + } + + return increasedVal, decreasedVal +} + // String returns resource details in string format func (r *Resource) String() string { str := fmt.Sprintf("cpu %0.2f, memory %0.2f", r.MilliCPU, r.Memory) diff --git a/pkg/scheduler/plugins/proportion/proportion.go b/pkg/scheduler/plugins/proportion/proportion.go index 2059f78de..25f5332da 100644 --- a/pkg/scheduler/plugins/proportion/proportion.go +++ b/pkg/scheduler/plugins/proportion/proportion.go @@ -111,11 +111,15 @@ func (pp *proportionPlugin) OnSessionOpen(ssn *framework.Session) { // If no queues, break if totalWeight == 0 { + glog.V(4).Infof("Exiting when total weight is 0") break } // Calculates the deserved of each Queue. - deserved := api.EmptyResource() + // increasedDeserved is the increased value for attr.deserved of processed queues + // decreasedDeserved is the decreased value for attr.deserved of processed queues + increasedDeserved := api.EmptyResource() + decreasedDeserved := api.EmptyResource() for _, attr := range pp.queueOpts { glog.V(4).Infof("Considering Queue <%s>: weight <%d>, total weight <%d>.", attr.name, attr.weight, totalWeight) @@ -125,20 +129,26 @@ func (pp *proportionPlugin) OnSessionOpen(ssn *framework.Session) { oldDeserved := attr.deserved.Clone() attr.deserved.Add(remaining.Clone().Multi(float64(attr.weight) / float64(totalWeight))) - if !attr.deserved.LessEqual(attr.request) { + + if attr.request.Less(attr.deserved) { attr.deserved = helpers.Min(attr.deserved, attr.request) meet[attr.queueID] = struct{}{} + glog.V(4).Infof("queue <%s> is meet", attr.name) + } pp.updateShare(attr) glog.V(4).Infof("The attributes of queue <%s> in proportion: deserved <%v>, allocate <%v>, request <%v>, share <%0.2f>", attr.name, attr.deserved, attr.allocated, attr.request, attr.share) - deserved.Add(attr.deserved.Clone().Sub(oldDeserved)) + increased, decreased := attr.deserved.Diff(oldDeserved) + increasedDeserved.Add(increased) + decreasedDeserved.Add(decreased) } - remaining.Sub(deserved) + remaining.Sub(increasedDeserved).Add(decreasedDeserved) if remaining.IsEmpty() { + glog.V(4).Infof("Exiting when remaining is empty: <%v>", remaining) break } } diff --git a/test/e2e/job.go b/test/e2e/job.go index 13a342033..e5af14765 100644 --- a/test/e2e/job.go +++ b/test/e2e/job.go @@ -414,4 +414,68 @@ var _ = Describe("Job E2E Test", func() { err = waitPodGroupReady(context, pg2) checkError(context, err) }) + + It("Proportion", func() { + context := initTestContext() + defer cleanupTestContext(context) + + createQueues(context) + defer deleteQueues(context) + + cpuSlot := halfCPU + cpuRep := clusterSize(context, cpuSlot) + + memSlot := oneGigaByteMem + memRep := clusterSize(context, memSlot) + + spec2 := &jobSpec{ + namespace: "test", + tasks: []taskSpec{ + { + img: "nginx", + req: cpuSlot, + min: 1, + rep: 1, + }, + }, + } + + spec2.name = "q2-job-1" + spec2.queue = "q2" + _, pg2 := createJob(context, spec2) + err := waitPodGroupReady(context, pg2) + checkError(context, err) + + spec := &jobSpec{ + namespace: "test", + tasks: []taskSpec{ + { + img: "nginx", + req: cpuSlot, + min: cpuRep - 2, + rep: cpuRep - 2, + }, + { + img: "nginx", + req: memSlot, + min: memRep/2 - 1, + rep: memRep/2 - 1, + }, + }, + } + + spec.name = "q1-job-1" + spec.queue = "q1" + _, pg1 := createJob(context, spec) + err = waitPodGroupReady(context, pg1) + checkError(context, err) + + spec2.name = "q1-job-2" + spec2.queue = "q1" + _, pg3 := createJob(context, spec2) + err = waitPodGroupReady(context, pg3) + checkError(context, err) + + }) + }) diff --git a/test/e2e/util.go b/test/e2e/util.go index fea86c58d..0a73ea9ad 100644 --- a/test/e2e/util.go +++ b/test/e2e/util.go @@ -63,6 +63,7 @@ var oneCPU = v1.ResourceList{"cpu": resource.MustParse("1000m")} var twoCPU = v1.ResourceList{"cpu": resource.MustParse("2000m")} var oneAndHalfCPU = v1.ResourceList{"cpu": resource.MustParse("1500m")} var smallCPU = v1.ResourceList{"cpu": resource.MustParse("2m")} +var oneGigaByteMem = v1.ResourceList{"memory": resource.MustParse("1Gi")} const ( workerPriority = "worker-pri"