From 8c6f4c533a5d18f10baea495b6b668f8a7040e4f Mon Sep 17 00:00:00 2001 From: Peter Bacsko Date: Thu, 25 Jan 2024 16:59:04 +1100 Subject: [PATCH] [YUNIKORN-2212] Outstanding request collection optimisation (#745) Auto scaling n K8s relies on the pod to be marked unschedulable. That is done via a regular callback from the core to the shim. The logic for building the pod list in this callback is simple. This could cause over scaling nodes or excessive K8s API server calls. Improvements added: * Don't collect requests that have not been scheduled yet. * If already triggered scale up do not sent again * Check for gang placeholders for any real ask * Don't scale if pod has required node set (daemon set pod) Track the state of scheduling and auto scale triggering. Closes: #745 Signed-off-by: Wilfred Spiegelenburg --- pkg/scheduler/objects/allocation_ask.go | 26 ++++++ pkg/scheduler/objects/application.go | 11 ++- pkg/scheduler/objects/application_test.go | 103 ++++++++++++++++++++++ pkg/scheduler/objects/queue_test.go | 25 +++--- pkg/scheduler/partition_test.go | 67 ++++++++++++++ pkg/scheduler/scheduler.go | 17 +++- pkg/scheduler/scheduler_test.go | 92 +++++++++++++++++++ pkg/webservice/dao/allocation_ask_info.go | 2 + pkg/webservice/handlers.go | 2 + 9 files changed, 329 insertions(+), 16 deletions(-) create mode 100644 pkg/scheduler/scheduler_test.go diff --git a/pkg/scheduler/objects/allocation_ask.go b/pkg/scheduler/objects/allocation_ask.go index 38370aec3..7a42d04c0 100644 --- a/pkg/scheduler/objects/allocation_ask.go +++ b/pkg/scheduler/objects/allocation_ask.go @@ -55,6 +55,8 @@ type AllocationAsk struct { allocLog map[string]*AllocationLogEntry preemptionTriggered bool preemptCheckTime time.Time + schedulingAttempted bool // whether scheduler core has tried to schedule this ask + scaleUpTriggered bool // whether this ask has triggered autoscaling or not resKeyPerNode map[string]string // reservation key for a given node sync.RWMutex @@ -299,6 +301,30 @@ func (aa *AllocationAsk) LessThan(other *AllocationAsk) bool { return aa.priority < other.priority } +func (aa *AllocationAsk) SetSchedulingAttempted(attempted bool) { + aa.Lock() + defer aa.Unlock() + aa.schedulingAttempted = attempted +} + +func (aa *AllocationAsk) IsSchedulingAttempted() bool { + aa.RLock() + defer aa.RUnlock() + return aa.schedulingAttempted +} + +func (aa *AllocationAsk) SetScaleUpTriggered(triggered bool) { + aa.Lock() + defer aa.Unlock() + aa.scaleUpTriggered = triggered +} + +func (aa *AllocationAsk) HasTriggeredScaleUp() bool { + aa.RLock() + defer aa.RUnlock() + return aa.scaleUpTriggered +} + // completedPendingAsk How many pending asks has been completed or processed so far? func (aa *AllocationAsk) completedPendingAsk() int { aa.RLock() diff --git a/pkg/scheduler/objects/application.go b/pkg/scheduler/objects/application.go index bc7793d98..1899ce620 100644 --- a/pkg/scheduler/objects/application.go +++ b/pkg/scheduler/objects/application.go @@ -899,13 +899,16 @@ func (sa *Application) getOutstandingRequests(headRoom *resources.Resource, user return } for _, request := range sa.sortedRequests { - if request.GetPendingAskRepeat() == 0 { + if request.GetPendingAskRepeat() == 0 || !request.IsSchedulingAttempted() { continue } + // ignore nil checks resource function calls are nil safe if headRoom.FitInMaxUndef(request.GetAllocatedResource()) && userHeadRoom.FitInMaxUndef(request.GetAllocatedResource()) { - // if headroom is still enough for the resources - *total = append(*total, request) + if !request.HasTriggeredScaleUp() && request.requiredNode == common.Empty && !sa.canReplace(request) { + // if headroom is still enough for the resources + *total = append(*total, request) + } headRoom.SubOnlyExisting(request.GetAllocatedResource()) userHeadRoom.SubOnlyExisting(request.GetAllocatedResource()) } @@ -951,6 +954,8 @@ func (sa *Application) tryAllocate(headRoom *resources.Resource, allowPreemption continue } + request.SetSchedulingAttempted(true) + // resource must fit in headroom otherwise skip the request (unless preemption could help) if !headRoom.FitInMaxUndef(request.GetAllocatedResource()) { // attempt preemption diff --git a/pkg/scheduler/objects/application_test.go b/pkg/scheduler/objects/application_test.go index 3ad703c95..684f50c4f 100644 --- a/pkg/scheduler/objects/application_test.go +++ b/pkg/scheduler/objects/application_test.go @@ -2302,6 +2302,8 @@ func TestGetOutstandingRequests(t *testing.T) { allocationAsk1 := newAllocationAsk("alloc-1", "app-1", res) allocationAsk2 := newAllocationAsk("alloc-2", "app-1", res) + allocationAsk1.SetSchedulingAttempted(true) + allocationAsk2.SetSchedulingAttempted(true) // Create an Application instance app := &Application{ @@ -2357,6 +2359,107 @@ func TestGetOutstandingRequests(t *testing.T) { assert.Equal(t, 0, len(total4), "expected no outstanding requests for TestCase 4") } +func TestGetOutstandingRequests_NoSchedulingAttempt(t *testing.T) { + res := resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 1}) + + allocationAsk1 := newAllocationAsk("alloc-1", "app-1", res) + allocationAsk2 := newAllocationAsk("alloc-2", "app-1", res) + allocationAsk3 := newAllocationAsk("alloc-3", "app-1", res) + allocationAsk4 := newAllocationAsk("alloc-4", "app-1", res) + allocationAsk2.SetSchedulingAttempted(true) + allocationAsk4.SetSchedulingAttempted(true) + app := &Application{ + ApplicationID: "app-1", + queuePath: "default", + } + sr := sortedRequests{} + sr.insert(allocationAsk1) + sr.insert(allocationAsk2) + sr.insert(allocationAsk3) + sr.insert(allocationAsk4) + app.sortedRequests = sr + + var total []*AllocationAsk + headroom := resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 10}) + userHeadroom := resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 8}) + app.getOutstandingRequests(headroom, userHeadroom, &total) + + assert.Equal(t, 2, len(total)) + assert.Equal(t, "alloc-2", total[0].allocationKey) + assert.Equal(t, "alloc-4", total[1].allocationKey) +} + +func TestGetOutstandingRequests_RequestTriggeredPreemptionHasRequiredNode(t *testing.T) { + // Test that we decrease headrooms even if the requests have triggered upscaling or + // the ask is a DaemonSet pod (requiredNode != "") + res := resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 1}) + + allocationAsk1 := newAllocationAsk("alloc-1", "app-1", res) + allocationAsk2 := newAllocationAsk("alloc-2", "app-1", res) + allocationAsk3 := newAllocationAsk("alloc-3", "app-1", res) + allocationAsk4 := newAllocationAsk("alloc-4", "app-1", res) + allocationAsk1.SetSchedulingAttempted(true) + allocationAsk2.SetSchedulingAttempted(true) + allocationAsk3.SetSchedulingAttempted(true) + allocationAsk4.SetSchedulingAttempted(true) // hasn't triggered scaling, no required node --> picked + allocationAsk1.SetScaleUpTriggered(true) // triggered scaling, no required node --> not selected + allocationAsk2.SetScaleUpTriggered(true) // triggered scaling, has required node --> not selected + allocationAsk2.SetRequiredNode("node-1") + allocationAsk3.SetRequiredNode("node-1") // hasn't triggered scaling, has required node --> not selected + + app := &Application{ + ApplicationID: "app-1", + queuePath: "default", + } + sr := sortedRequests{} + sr.insert(allocationAsk1) + sr.insert(allocationAsk2) + sr.insert(allocationAsk3) + sr.insert(allocationAsk4) + app.sortedRequests = sr + + var total []*AllocationAsk + headroom := resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 10}) + userHeadroom := resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 8}) + app.getOutstandingRequests(headroom, userHeadroom, &total) + + assert.Equal(t, 1, len(total)) + assert.Equal(t, "alloc-4", total[0].allocationKey) +} + +func TestGetOutstandingRequests_AskReplaceable(t *testing.T) { + res := resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 1}) + + allocationAsk1 := newAllocationAsk("alloc-1", "app-1", res) // replaceable + allocationAsk2 := newAllocationAsk("alloc-2", "app-1", res) // replaceable + allocationAsk3 := newAllocationAsk("alloc-3", "app-1", res) // non-replaceable + allocationAsk1.SetSchedulingAttempted(true) + allocationAsk2.SetSchedulingAttempted(true) + allocationAsk3.SetSchedulingAttempted(true) + allocationAsk1.taskGroupName = "testgroup" + allocationAsk2.taskGroupName = "testgroup" + + app := &Application{ + ApplicationID: "app-1", + queuePath: "default", + } + sr := sortedRequests{} + sr.insert(allocationAsk1) + sr.insert(allocationAsk2) + sr.insert(allocationAsk3) + app.sortedRequests = sr + app.addPlaceholderDataWithLocking(allocationAsk1) + app.addPlaceholderDataWithLocking(allocationAsk2) + + var total []*AllocationAsk + headroom := resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 10}) + userHeadroom := resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 8}) + app.getOutstandingRequests(headroom, userHeadroom, &total) + + assert.Equal(t, 1, len(total)) + assert.Equal(t, "alloc-3", total[0].allocationKey) +} + func TestGetRateLimitedAppLog(t *testing.T) { l := getRateLimitedAppLog() assert.Check(t, l != nil) diff --git a/pkg/scheduler/objects/queue_test.go b/pkg/scheduler/objects/queue_test.go index f1cfa172b..4c9e33031 100644 --- a/pkg/scheduler/objects/queue_test.go +++ b/pkg/scheduler/objects/queue_test.go @@ -1295,8 +1295,9 @@ func testOutstanding(t *testing.T, alloc, used *resources.Resource) { app1.queue = queue1 queue1.AddApplication(app1) for i := 0; i < 20; i++ { - err = app1.AddAllocationAsk( - newAllocationAsk(fmt.Sprintf("alloc-%d", i), appID1, alloc)) + ask := newAllocationAsk(fmt.Sprintf("alloc-%d", i), appID1, alloc) + ask.SetSchedulingAttempted(true) + err = app1.AddAllocationAsk(ask) assert.NilError(t, err, "failed to add allocation ask") } @@ -1304,8 +1305,9 @@ func testOutstanding(t *testing.T, alloc, used *resources.Resource) { app2.queue = queue2 queue2.AddApplication(app2) for i := 0; i < 20; i++ { - err = app2.AddAllocationAsk( - newAllocationAsk(fmt.Sprintf("alloc-%d", i), appID2, alloc)) + ask := newAllocationAsk(fmt.Sprintf("alloc-%d", i), appID2, alloc) + ask.SetSchedulingAttempted(true) + err = app2.AddAllocationAsk(ask) assert.NilError(t, err, "failed to add allocation ask") } @@ -1376,8 +1378,9 @@ func TestGetOutstandingOnlyUntracked(t *testing.T) { app1.queue = queue1 queue1.AddApplication(app1) for i := 0; i < 20; i++ { - err = app1.AddAllocationAsk( - newAllocationAsk(fmt.Sprintf("alloc-%d", i), appID1, alloc)) + ask := newAllocationAsk(fmt.Sprintf("alloc-%d", i), appID1, alloc) + ask.SetSchedulingAttempted(true) + err = app1.AddAllocationAsk(ask) assert.NilError(t, err, "failed to add allocation ask") } @@ -1424,8 +1427,9 @@ func TestGetOutstandingRequestNoMax(t *testing.T) { res, err = resources.NewResourceFromConf(map[string]string{"cpu": "1"}) assert.NilError(t, err, "failed to create basic resource") for i := 0; i < 10; i++ { - err = app1.AddAllocationAsk( - newAllocationAsk(fmt.Sprintf("alloc-%d", i), appID1, res)) + ask := newAllocationAsk(fmt.Sprintf("alloc-%d", i), appID1, res) + ask.SetSchedulingAttempted(true) + err = app1.AddAllocationAsk(ask) assert.NilError(t, err, "failed to add allocation ask") } @@ -1433,8 +1437,9 @@ func TestGetOutstandingRequestNoMax(t *testing.T) { app2.queue = queue2 queue2.AddApplication(app2) for i := 0; i < 20; i++ { - err = app2.AddAllocationAsk( - newAllocationAsk(fmt.Sprintf("alloc-%d", i), appID2, res)) + ask := newAllocationAsk(fmt.Sprintf("alloc-%d", i), appID2, res) + ask.SetSchedulingAttempted(true) + err = app2.AddAllocationAsk(ask) assert.NilError(t, err, "failed to add allocation ask") } diff --git a/pkg/scheduler/partition_test.go b/pkg/scheduler/partition_test.go index 3e59a3f27..9237d885d 100644 --- a/pkg/scheduler/partition_test.go +++ b/pkg/scheduler/partition_test.go @@ -4324,3 +4324,70 @@ func TestLimitMaxApplicationsForReservedAllocation(t *testing.T) { }) } } + +func TestCalculateOutstandingRequests(t *testing.T) { + partition, err := newBasePartition() + assert.NilError(t, err, "unable to create partition: %v", err) + + // no application&asks + requests := partition.calculateOutstandingRequests() + assert.Equal(t, 0, len(requests)) + + // two applications with no asks + app1 := newApplication(appID1, "test", "root.default") + app2 := newApplication(appID2, "test", "root.default") + err = partition.AddApplication(app1) + assert.NilError(t, err) + err = partition.AddApplication(app2) + assert.NilError(t, err) + requests = partition.calculateOutstandingRequests() + assert.Equal(t, 0, len(requests)) + + // new asks for the two apps, but the scheduler hasn't processed them + askResource := resources.NewResourceFromMap(map[string]resources.Quantity{ + "vcores": 1, + "memory": 1, + }) + siAsk1 := &si.AllocationAsk{ + AllocationKey: "ask-uuid-1", + ApplicationID: appID1, + ResourceAsk: askResource.ToProto(), + MaxAllocations: 1, + } + siAsk2 := &si.AllocationAsk{ + AllocationKey: "ask-uuid-2", + ApplicationID: appID1, + ResourceAsk: askResource.ToProto(), + MaxAllocations: 1, + } + siAsk3 := &si.AllocationAsk{ + AllocationKey: "ask-uuid-3", + ApplicationID: appID2, + ResourceAsk: askResource.ToProto(), + MaxAllocations: 1, + } + err = partition.addAllocationAsk(siAsk1) + assert.NilError(t, err) + err = partition.addAllocationAsk(siAsk2) + assert.NilError(t, err) + err = partition.addAllocationAsk(siAsk3) + assert.NilError(t, err) + requests = partition.calculateOutstandingRequests() + assert.Equal(t, 0, len(requests)) + + // mark asks as attempted + app1.GetAllocationAsk("ask-uuid-1").SetSchedulingAttempted(true) + app1.GetAllocationAsk("ask-uuid-2").SetSchedulingAttempted(true) + app2.GetAllocationAsk("ask-uuid-3").SetSchedulingAttempted(true) + requests = partition.calculateOutstandingRequests() + total := resources.NewResource() + expectedTotal := resources.NewResourceFromMap(map[string]resources.Quantity{ + "memory": 3, + "vcores": 3, + }) + for _, req := range requests { + total.AddTo(req.GetAllocatedResource()) + } + assert.Equal(t, 3, len(requests)) + assert.Assert(t, resources.Equals(expectedTotal, total), "total resource expected: %v, got: %v", expectedTotal, total) +} diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 31d3b360d..a47a5be4e 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -24,6 +24,7 @@ import ( "go.uber.org/zap" + "github.com/apache/yunikorn-core/pkg/common/resources" "github.com/apache/yunikorn-core/pkg/handler" "github.com/apache/yunikorn-core/pkg/log" "github.com/apache/yunikorn-core/pkg/plugins" @@ -96,7 +97,11 @@ func (s *Scheduler) internalInspectOutstandingRequests() { case <-s.stop: return case <-time.After(time.Second): - s.inspectOutstandingRequests() + if noRequests, totalResources := s.inspectOutstandingRequests(); noRequests > 0 { + log.Log(log.Scheduler).Info("Found outstanding requests that will trigger autoscaling", + zap.Int("number of requests", noRequests), + zap.Stringer("total resources", totalResources)) + } } } } @@ -163,12 +168,15 @@ func (s *Scheduler) registerActivity() { // skipped due to insufficient cluster resources and update the // state through the ContainerSchedulingStateUpdaterPlugin in order // to trigger the auto-scaling. -func (s *Scheduler) inspectOutstandingRequests() { +func (s *Scheduler) inspectOutstandingRequests() (int, *resources.Resource) { log.Log(log.Scheduler).Debug("inspect outstanding requests") // schedule each partition defined in the cluster + total := resources.NewResource() + noRequests := 0 for _, psc := range s.clusterContext.GetPartitionMapClone() { requests := psc.calculateOutstandingRequests() - if len(requests) > 0 { + noRequests = len(requests) + if noRequests > 0 { for _, ask := range requests { log.Log(log.Scheduler).Debug("outstanding request", zap.String("appID", ask.GetApplicationID()), @@ -183,9 +191,12 @@ func (s *Scheduler) inspectOutstandingRequests() { Reason: "request is waiting for cluster resources become available", }) } + total.AddTo(ask.GetAllocatedResource()) + ask.SetScaleUpTriggered(true) } } } + return noRequests, total } // Visible by tests diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go new file mode 100644 index 000000000..5dc7cba5b --- /dev/null +++ b/pkg/scheduler/scheduler_test.go @@ -0,0 +1,92 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package scheduler + +import ( + "testing" + + "gotest.tools/v3/assert" + + "github.com/apache/yunikorn-core/pkg/common/resources" + "github.com/apache/yunikorn-scheduler-interface/lib/go/si" +) + +func TestInspectOutstandingRequests(t *testing.T) { + scheduler := NewScheduler() + partition, err := newBasePartition() + assert.NilError(t, err, "unable to create partition: %v", err) + scheduler.clusterContext.partitions["test"] = partition + + // two applications with no asks + app1 := newApplication(appID1, "test", "root.default") + app2 := newApplication(appID2, "test", "root.default") + err = partition.AddApplication(app1) + assert.NilError(t, err) + err = partition.AddApplication(app2) + assert.NilError(t, err) + + // add asks + askResource := resources.NewResourceFromMap(map[string]resources.Quantity{ + "vcores": 1, + "memory": 1, + }) + siAsk1 := &si.AllocationAsk{ + AllocationKey: "ask-uuid-1", + ApplicationID: appID1, + ResourceAsk: askResource.ToProto(), + MaxAllocations: 1, + } + siAsk2 := &si.AllocationAsk{ + AllocationKey: "ask-uuid-2", + ApplicationID: appID1, + ResourceAsk: askResource.ToProto(), + MaxAllocations: 1, + } + siAsk3 := &si.AllocationAsk{ + AllocationKey: "ask-uuid-3", + ApplicationID: appID2, + ResourceAsk: askResource.ToProto(), + MaxAllocations: 1, + } + err = partition.addAllocationAsk(siAsk1) + assert.NilError(t, err) + err = partition.addAllocationAsk(siAsk2) + assert.NilError(t, err) + err = partition.addAllocationAsk(siAsk3) + assert.NilError(t, err) + + // mark asks as attempted + expectedTotal := resources.NewResourceFromMap(map[string]resources.Quantity{ + "memory": 3, + "vcores": 3, + }) + app1.GetAllocationAsk("ask-uuid-1").SetSchedulingAttempted(true) + app1.GetAllocationAsk("ask-uuid-2").SetSchedulingAttempted(true) + app2.GetAllocationAsk("ask-uuid-3").SetSchedulingAttempted(true) + + // Check #1: collected 3 requests + noRequests, totalResources := scheduler.inspectOutstandingRequests() + assert.Equal(t, 3, noRequests) + assert.Assert(t, resources.Equals(totalResources, expectedTotal), + "total resource expected: %v, got: %v", expectedTotal, totalResources) + + // Check #2: try again, pending asks are not collected + noRequests, totalResources = scheduler.inspectOutstandingRequests() + assert.Equal(t, 0, noRequests) + assert.Assert(t, resources.IsZero(totalResources), "total resource is not zero: %v", totalResources) +} diff --git a/pkg/webservice/dao/allocation_ask_info.go b/pkg/webservice/dao/allocation_ask_info.go index 2d56bf461..07fbd0737 100644 --- a/pkg/webservice/dao/allocation_ask_info.go +++ b/pkg/webservice/dao/allocation_ask_info.go @@ -40,4 +40,6 @@ type AllocationAskDAOInfo struct { AllocationLog []*AllocationAskLogDAOInfo `json:"allocationLog,omitempty"` TriggeredPreemption bool `json:"triggeredPreemption,omitempty"` Originator bool `json:"originator,omitempty"` + SchedulingAttempted bool `json:"schedulingAttempted,omitempty"` + TriggeredScaleUp bool `json:"triggeredScaleUp,omitempty"` } diff --git a/pkg/webservice/handlers.go b/pkg/webservice/handlers.go index d01f12cf2..31d1dff7b 100644 --- a/pkg/webservice/handlers.go +++ b/pkg/webservice/handlers.go @@ -334,6 +334,8 @@ func getAllocationAskDAO(ask *objects.AllocationAsk) *dao.AllocationAskDAOInfo { AllocationLog: getAllocationLogsDAO(ask.GetAllocationLog()), TriggeredPreemption: ask.HasTriggeredPreemption(), Originator: ask.IsOriginator(), + SchedulingAttempted: ask.IsSchedulingAttempted(), + TriggeredScaleUp: ask.HasTriggeredScaleUp(), } }