Skip to content

Commit

Permalink
[YUNIKORN-2212] Outstanding request collection optimisation (apache#745)
Browse files Browse the repository at this point in the history
Auto scaling n K8s relies on the pod to be marked unschedulable. That is
done via a regular callback from the core to the shim. The logic for
building the pod list in this callback is simple. This could cause over
scaling nodes or excessive K8s API server calls.

Improvements added:
* Don't collect requests that have not been scheduled yet.
* If already triggered scale up do not sent again
* Check for gang placeholders for any real ask
* Don't scale if pod has required node set (daemon set pod)

Track the state of scheduling and auto scale triggering.

Closes: apache#745

Signed-off-by: Wilfred Spiegelenburg <[email protected]>
  • Loading branch information
pbacsko authored and wilfred-s committed Jan 25, 2024
1 parent 681a4cb commit 8c6f4c5
Show file tree
Hide file tree
Showing 9 changed files with 329 additions and 16 deletions.
26 changes: 26 additions & 0 deletions pkg/scheduler/objects/allocation_ask.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ type AllocationAsk struct {
allocLog map[string]*AllocationLogEntry
preemptionTriggered bool
preemptCheckTime time.Time
schedulingAttempted bool // whether scheduler core has tried to schedule this ask
scaleUpTriggered bool // whether this ask has triggered autoscaling or not
resKeyPerNode map[string]string // reservation key for a given node

sync.RWMutex
Expand Down Expand Up @@ -299,6 +301,30 @@ func (aa *AllocationAsk) LessThan(other *AllocationAsk) bool {
return aa.priority < other.priority
}

func (aa *AllocationAsk) SetSchedulingAttempted(attempted bool) {
aa.Lock()
defer aa.Unlock()
aa.schedulingAttempted = attempted
}

func (aa *AllocationAsk) IsSchedulingAttempted() bool {
aa.RLock()
defer aa.RUnlock()
return aa.schedulingAttempted
}

func (aa *AllocationAsk) SetScaleUpTriggered(triggered bool) {
aa.Lock()
defer aa.Unlock()
aa.scaleUpTriggered = triggered
}

func (aa *AllocationAsk) HasTriggeredScaleUp() bool {
aa.RLock()
defer aa.RUnlock()
return aa.scaleUpTriggered
}

// completedPendingAsk How many pending asks has been completed or processed so far?
func (aa *AllocationAsk) completedPendingAsk() int {
aa.RLock()
Expand Down
11 changes: 8 additions & 3 deletions pkg/scheduler/objects/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -899,13 +899,16 @@ func (sa *Application) getOutstandingRequests(headRoom *resources.Resource, user
return
}
for _, request := range sa.sortedRequests {
if request.GetPendingAskRepeat() == 0 {
if request.GetPendingAskRepeat() == 0 || !request.IsSchedulingAttempted() {
continue
}

// ignore nil checks resource function calls are nil safe
if headRoom.FitInMaxUndef(request.GetAllocatedResource()) && userHeadRoom.FitInMaxUndef(request.GetAllocatedResource()) {
// if headroom is still enough for the resources
*total = append(*total, request)
if !request.HasTriggeredScaleUp() && request.requiredNode == common.Empty && !sa.canReplace(request) {
// if headroom is still enough for the resources
*total = append(*total, request)
}
headRoom.SubOnlyExisting(request.GetAllocatedResource())
userHeadRoom.SubOnlyExisting(request.GetAllocatedResource())
}
Expand Down Expand Up @@ -951,6 +954,8 @@ func (sa *Application) tryAllocate(headRoom *resources.Resource, allowPreemption
continue
}

request.SetSchedulingAttempted(true)

// resource must fit in headroom otherwise skip the request (unless preemption could help)
if !headRoom.FitInMaxUndef(request.GetAllocatedResource()) {
// attempt preemption
Expand Down
103 changes: 103 additions & 0 deletions pkg/scheduler/objects/application_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2302,6 +2302,8 @@ func TestGetOutstandingRequests(t *testing.T) {

allocationAsk1 := newAllocationAsk("alloc-1", "app-1", res)
allocationAsk2 := newAllocationAsk("alloc-2", "app-1", res)
allocationAsk1.SetSchedulingAttempted(true)
allocationAsk2.SetSchedulingAttempted(true)

// Create an Application instance
app := &Application{
Expand Down Expand Up @@ -2357,6 +2359,107 @@ func TestGetOutstandingRequests(t *testing.T) {
assert.Equal(t, 0, len(total4), "expected no outstanding requests for TestCase 4")
}

func TestGetOutstandingRequests_NoSchedulingAttempt(t *testing.T) {
res := resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 1})

allocationAsk1 := newAllocationAsk("alloc-1", "app-1", res)
allocationAsk2 := newAllocationAsk("alloc-2", "app-1", res)
allocationAsk3 := newAllocationAsk("alloc-3", "app-1", res)
allocationAsk4 := newAllocationAsk("alloc-4", "app-1", res)
allocationAsk2.SetSchedulingAttempted(true)
allocationAsk4.SetSchedulingAttempted(true)
app := &Application{
ApplicationID: "app-1",
queuePath: "default",
}
sr := sortedRequests{}
sr.insert(allocationAsk1)
sr.insert(allocationAsk2)
sr.insert(allocationAsk3)
sr.insert(allocationAsk4)
app.sortedRequests = sr

var total []*AllocationAsk
headroom := resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 10})
userHeadroom := resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 8})
app.getOutstandingRequests(headroom, userHeadroom, &total)

assert.Equal(t, 2, len(total))
assert.Equal(t, "alloc-2", total[0].allocationKey)
assert.Equal(t, "alloc-4", total[1].allocationKey)
}

func TestGetOutstandingRequests_RequestTriggeredPreemptionHasRequiredNode(t *testing.T) {
// Test that we decrease headrooms even if the requests have triggered upscaling or
// the ask is a DaemonSet pod (requiredNode != "")
res := resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 1})

allocationAsk1 := newAllocationAsk("alloc-1", "app-1", res)
allocationAsk2 := newAllocationAsk("alloc-2", "app-1", res)
allocationAsk3 := newAllocationAsk("alloc-3", "app-1", res)
allocationAsk4 := newAllocationAsk("alloc-4", "app-1", res)
allocationAsk1.SetSchedulingAttempted(true)
allocationAsk2.SetSchedulingAttempted(true)
allocationAsk3.SetSchedulingAttempted(true)
allocationAsk4.SetSchedulingAttempted(true) // hasn't triggered scaling, no required node --> picked
allocationAsk1.SetScaleUpTriggered(true) // triggered scaling, no required node --> not selected
allocationAsk2.SetScaleUpTriggered(true) // triggered scaling, has required node --> not selected
allocationAsk2.SetRequiredNode("node-1")
allocationAsk3.SetRequiredNode("node-1") // hasn't triggered scaling, has required node --> not selected

app := &Application{
ApplicationID: "app-1",
queuePath: "default",
}
sr := sortedRequests{}
sr.insert(allocationAsk1)
sr.insert(allocationAsk2)
sr.insert(allocationAsk3)
sr.insert(allocationAsk4)
app.sortedRequests = sr

var total []*AllocationAsk
headroom := resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 10})
userHeadroom := resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 8})
app.getOutstandingRequests(headroom, userHeadroom, &total)

assert.Equal(t, 1, len(total))
assert.Equal(t, "alloc-4", total[0].allocationKey)
}

func TestGetOutstandingRequests_AskReplaceable(t *testing.T) {
res := resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 1})

allocationAsk1 := newAllocationAsk("alloc-1", "app-1", res) // replaceable
allocationAsk2 := newAllocationAsk("alloc-2", "app-1", res) // replaceable
allocationAsk3 := newAllocationAsk("alloc-3", "app-1", res) // non-replaceable
allocationAsk1.SetSchedulingAttempted(true)
allocationAsk2.SetSchedulingAttempted(true)
allocationAsk3.SetSchedulingAttempted(true)
allocationAsk1.taskGroupName = "testgroup"
allocationAsk2.taskGroupName = "testgroup"

app := &Application{
ApplicationID: "app-1",
queuePath: "default",
}
sr := sortedRequests{}
sr.insert(allocationAsk1)
sr.insert(allocationAsk2)
sr.insert(allocationAsk3)
app.sortedRequests = sr
app.addPlaceholderDataWithLocking(allocationAsk1)
app.addPlaceholderDataWithLocking(allocationAsk2)

var total []*AllocationAsk
headroom := resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 10})
userHeadroom := resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 8})
app.getOutstandingRequests(headroom, userHeadroom, &total)

assert.Equal(t, 1, len(total))
assert.Equal(t, "alloc-3", total[0].allocationKey)
}

func TestGetRateLimitedAppLog(t *testing.T) {
l := getRateLimitedAppLog()
assert.Check(t, l != nil)
Expand Down
25 changes: 15 additions & 10 deletions pkg/scheduler/objects/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1295,17 +1295,19 @@ func testOutstanding(t *testing.T, alloc, used *resources.Resource) {
app1.queue = queue1
queue1.AddApplication(app1)
for i := 0; i < 20; i++ {
err = app1.AddAllocationAsk(
newAllocationAsk(fmt.Sprintf("alloc-%d", i), appID1, alloc))
ask := newAllocationAsk(fmt.Sprintf("alloc-%d", i), appID1, alloc)
ask.SetSchedulingAttempted(true)
err = app1.AddAllocationAsk(ask)
assert.NilError(t, err, "failed to add allocation ask")
}

app2 := newApplication(appID2, "default", "root.queue2")
app2.queue = queue2
queue2.AddApplication(app2)
for i := 0; i < 20; i++ {
err = app2.AddAllocationAsk(
newAllocationAsk(fmt.Sprintf("alloc-%d", i), appID2, alloc))
ask := newAllocationAsk(fmt.Sprintf("alloc-%d", i), appID2, alloc)
ask.SetSchedulingAttempted(true)
err = app2.AddAllocationAsk(ask)
assert.NilError(t, err, "failed to add allocation ask")
}

Expand Down Expand Up @@ -1376,8 +1378,9 @@ func TestGetOutstandingOnlyUntracked(t *testing.T) {
app1.queue = queue1
queue1.AddApplication(app1)
for i := 0; i < 20; i++ {
err = app1.AddAllocationAsk(
newAllocationAsk(fmt.Sprintf("alloc-%d", i), appID1, alloc))
ask := newAllocationAsk(fmt.Sprintf("alloc-%d", i), appID1, alloc)
ask.SetSchedulingAttempted(true)
err = app1.AddAllocationAsk(ask)
assert.NilError(t, err, "failed to add allocation ask")
}

Expand Down Expand Up @@ -1424,17 +1427,19 @@ func TestGetOutstandingRequestNoMax(t *testing.T) {
res, err = resources.NewResourceFromConf(map[string]string{"cpu": "1"})
assert.NilError(t, err, "failed to create basic resource")
for i := 0; i < 10; i++ {
err = app1.AddAllocationAsk(
newAllocationAsk(fmt.Sprintf("alloc-%d", i), appID1, res))
ask := newAllocationAsk(fmt.Sprintf("alloc-%d", i), appID1, res)
ask.SetSchedulingAttempted(true)
err = app1.AddAllocationAsk(ask)
assert.NilError(t, err, "failed to add allocation ask")
}

app2 := newApplication(appID2, "default", "root.queue2")
app2.queue = queue2
queue2.AddApplication(app2)
for i := 0; i < 20; i++ {
err = app2.AddAllocationAsk(
newAllocationAsk(fmt.Sprintf("alloc-%d", i), appID2, res))
ask := newAllocationAsk(fmt.Sprintf("alloc-%d", i), appID2, res)
ask.SetSchedulingAttempted(true)
err = app2.AddAllocationAsk(ask)
assert.NilError(t, err, "failed to add allocation ask")
}

Expand Down
67 changes: 67 additions & 0 deletions pkg/scheduler/partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4324,3 +4324,70 @@ func TestLimitMaxApplicationsForReservedAllocation(t *testing.T) {
})
}
}

func TestCalculateOutstandingRequests(t *testing.T) {
partition, err := newBasePartition()
assert.NilError(t, err, "unable to create partition: %v", err)

// no application&asks
requests := partition.calculateOutstandingRequests()
assert.Equal(t, 0, len(requests))

// two applications with no asks
app1 := newApplication(appID1, "test", "root.default")
app2 := newApplication(appID2, "test", "root.default")
err = partition.AddApplication(app1)
assert.NilError(t, err)
err = partition.AddApplication(app2)
assert.NilError(t, err)
requests = partition.calculateOutstandingRequests()
assert.Equal(t, 0, len(requests))

// new asks for the two apps, but the scheduler hasn't processed them
askResource := resources.NewResourceFromMap(map[string]resources.Quantity{
"vcores": 1,
"memory": 1,
})
siAsk1 := &si.AllocationAsk{
AllocationKey: "ask-uuid-1",
ApplicationID: appID1,
ResourceAsk: askResource.ToProto(),
MaxAllocations: 1,
}
siAsk2 := &si.AllocationAsk{
AllocationKey: "ask-uuid-2",
ApplicationID: appID1,
ResourceAsk: askResource.ToProto(),
MaxAllocations: 1,
}
siAsk3 := &si.AllocationAsk{
AllocationKey: "ask-uuid-3",
ApplicationID: appID2,
ResourceAsk: askResource.ToProto(),
MaxAllocations: 1,
}
err = partition.addAllocationAsk(siAsk1)
assert.NilError(t, err)
err = partition.addAllocationAsk(siAsk2)
assert.NilError(t, err)
err = partition.addAllocationAsk(siAsk3)
assert.NilError(t, err)
requests = partition.calculateOutstandingRequests()
assert.Equal(t, 0, len(requests))

// mark asks as attempted
app1.GetAllocationAsk("ask-uuid-1").SetSchedulingAttempted(true)
app1.GetAllocationAsk("ask-uuid-2").SetSchedulingAttempted(true)
app2.GetAllocationAsk("ask-uuid-3").SetSchedulingAttempted(true)
requests = partition.calculateOutstandingRequests()
total := resources.NewResource()
expectedTotal := resources.NewResourceFromMap(map[string]resources.Quantity{
"memory": 3,
"vcores": 3,
})
for _, req := range requests {
total.AddTo(req.GetAllocatedResource())
}
assert.Equal(t, 3, len(requests))
assert.Assert(t, resources.Equals(expectedTotal, total), "total resource expected: %v, got: %v", expectedTotal, total)
}
17 changes: 14 additions & 3 deletions pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"go.uber.org/zap"

"github.com/apache/yunikorn-core/pkg/common/resources"
"github.com/apache/yunikorn-core/pkg/handler"
"github.com/apache/yunikorn-core/pkg/log"
"github.com/apache/yunikorn-core/pkg/plugins"
Expand Down Expand Up @@ -96,7 +97,11 @@ func (s *Scheduler) internalInspectOutstandingRequests() {
case <-s.stop:
return
case <-time.After(time.Second):
s.inspectOutstandingRequests()
if noRequests, totalResources := s.inspectOutstandingRequests(); noRequests > 0 {
log.Log(log.Scheduler).Info("Found outstanding requests that will trigger autoscaling",
zap.Int("number of requests", noRequests),
zap.Stringer("total resources", totalResources))
}
}
}
}
Expand Down Expand Up @@ -163,12 +168,15 @@ func (s *Scheduler) registerActivity() {
// skipped due to insufficient cluster resources and update the
// state through the ContainerSchedulingStateUpdaterPlugin in order
// to trigger the auto-scaling.
func (s *Scheduler) inspectOutstandingRequests() {
func (s *Scheduler) inspectOutstandingRequests() (int, *resources.Resource) {
log.Log(log.Scheduler).Debug("inspect outstanding requests")
// schedule each partition defined in the cluster
total := resources.NewResource()
noRequests := 0
for _, psc := range s.clusterContext.GetPartitionMapClone() {
requests := psc.calculateOutstandingRequests()
if len(requests) > 0 {
noRequests = len(requests)
if noRequests > 0 {
for _, ask := range requests {
log.Log(log.Scheduler).Debug("outstanding request",
zap.String("appID", ask.GetApplicationID()),
Expand All @@ -183,9 +191,12 @@ func (s *Scheduler) inspectOutstandingRequests() {
Reason: "request is waiting for cluster resources become available",
})
}
total.AddTo(ask.GetAllocatedResource())
ask.SetScaleUpTriggered(true)
}
}
}
return noRequests, total
}

// Visible by tests
Expand Down
Loading

0 comments on commit 8c6f4c5

Please sign in to comment.