Skip to content

Commit

Permalink
Fix wrong calculation for queue deserved in proportion plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
zionwu committed May 23, 2019
1 parent be78f91 commit d3d586b
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 4 deletions.
35 changes: 35 additions & 0 deletions pkg/scheduler/api/resource_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,41 @@ func (r *Resource) LessEqual(rr *Resource) bool {
return true
}

// Diff calculate the difference between two resource
func (r *Resource) Diff(rr *Resource) (*Resource, *Resource) {
increasedVal := EmptyResource()
decreasedVal := EmptyResource()
if r.MilliCPU > rr.MilliCPU {
increasedVal.MilliCPU += r.MilliCPU - rr.MilliCPU
} else {
decreasedVal.MilliCPU += rr.MilliCPU - r.MilliCPU
}

if r.Memory > rr.Memory {
increasedVal.Memory += r.Memory - rr.Memory
} else {
decreasedVal.Memory += rr.Memory - r.Memory
}

for rName, rQuant := range r.ScalarResources {
rrQuant := rr.ScalarResources[rName]

if rQuant > rrQuant {
if increasedVal.ScalarResources == nil {
increasedVal.ScalarResources = map[v1.ResourceName]float64{}
}
increasedVal.ScalarResources[rName] += rQuant - rrQuant
} else {
if decreasedVal.ScalarResources == nil {
decreasedVal.ScalarResources = map[v1.ResourceName]float64{}
}
decreasedVal.ScalarResources[rName] += rrQuant - rQuant
}
}

return increasedVal, decreasedVal
}

// String returns resource details in string format
func (r *Resource) String() string {
str := fmt.Sprintf("cpu %0.2f, memory %0.2f", r.MilliCPU, r.Memory)
Expand Down
18 changes: 14 additions & 4 deletions pkg/scheduler/plugins/proportion/proportion.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,11 +111,15 @@ func (pp *proportionPlugin) OnSessionOpen(ssn *framework.Session) {

// If no queues, break
if totalWeight == 0 {
glog.V(4).Infof("Exiting when total weight is 0")
break
}

// Calculates the deserved of each Queue.
deserved := api.EmptyResource()
// increasedDeserved is the increased value for attr.deserved of processed queues
// decreasedDeserved is the decreased value for attr.deserved of processed queues
increasedDeserved := api.EmptyResource()
decreasedDeserved := api.EmptyResource()
for _, attr := range pp.queueOpts {
glog.V(4).Infof("Considering Queue <%s>: weight <%d>, total weight <%d>.",
attr.name, attr.weight, totalWeight)
Expand All @@ -125,20 +129,26 @@ func (pp *proportionPlugin) OnSessionOpen(ssn *framework.Session) {

oldDeserved := attr.deserved.Clone()
attr.deserved.Add(remaining.Clone().Multi(float64(attr.weight) / float64(totalWeight)))
if !attr.deserved.LessEqual(attr.request) {

if attr.request.Less(attr.deserved) {
attr.deserved = helpers.Min(attr.deserved, attr.request)
meet[attr.queueID] = struct{}{}
glog.V(4).Infof("queue <%s> is meet", attr.name)

}
pp.updateShare(attr)

glog.V(4).Infof("The attributes of queue <%s> in proportion: deserved <%v>, allocate <%v>, request <%v>, share <%0.2f>",
attr.name, attr.deserved, attr.allocated, attr.request, attr.share)

deserved.Add(attr.deserved.Clone().Sub(oldDeserved))
increased, decreased := attr.deserved.Diff(oldDeserved)
increasedDeserved.Add(increased)
decreasedDeserved.Add(decreased)
}

remaining.Sub(deserved)
remaining.Sub(increasedDeserved).Add(decreasedDeserved)
if remaining.IsEmpty() {
glog.V(4).Infof("Exiting when remaining is empty: <%v>", remaining)
break
}
}
Expand Down
64 changes: 64 additions & 0 deletions test/e2e/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,4 +414,68 @@ var _ = Describe("Job E2E Test", func() {
err = waitPodGroupReady(context, pg2)
checkError(context, err)
})

It("Proportion", func() {
context := initTestContext()
defer cleanupTestContext(context)

createQueues(context)
defer deleteQueues(context)

cpuSlot := halfCPU
cpuRep := clusterSize(context, cpuSlot)

memSlot := oneGigaByteMem
memRep := clusterSize(context, memSlot)

spec2 := &jobSpec{
namespace: "test",
tasks: []taskSpec{
{
img: "nginx",
req: cpuSlot,
min: 1,
rep: 1,
},
},
}

spec2.name = "q2-job-1"
spec2.queue = "q2"
_, pg2 := createJob(context, spec2)
err := waitPodGroupReady(context, pg2)
checkError(context, err)

spec := &jobSpec{
namespace: "test",
tasks: []taskSpec{
{
img: "nginx",
req: cpuSlot,
min: cpuRep - 2,
rep: cpuRep - 2,
},
{
img: "nginx",
req: memSlot,
min: memRep/2 - 1,
rep: memRep/2 - 1,
},
},
}

spec.name = "q1-job-1"
spec.queue = "q1"
_, pg1 := createJob(context, spec)
err = waitPodGroupReady(context, pg1)
checkError(context, err)

spec2.name = "q1-job-2"
spec2.queue = "q1"
_, pg3 := createJob(context, spec2)
err = waitPodGroupReady(context, pg3)
checkError(context, err)

})

})
1 change: 1 addition & 0 deletions test/e2e/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ var oneCPU = v1.ResourceList{"cpu": resource.MustParse("1000m")}
var twoCPU = v1.ResourceList{"cpu": resource.MustParse("2000m")}
var oneAndHalfCPU = v1.ResourceList{"cpu": resource.MustParse("1500m")}
var smallCPU = v1.ResourceList{"cpu": resource.MustParse("2m")}
var oneGigaByteMem = v1.ResourceList{"memory": resource.MustParse("1Gi")}

const (
workerPriority = "worker-pri"
Expand Down

0 comments on commit d3d586b

Please sign in to comment.