diff --git a/pkg/scheduler/objects/allocation_ask.go b/pkg/scheduler/objects/allocation_ask.go index 38370aec3..7a42d04c0 100644 --- a/pkg/scheduler/objects/allocation_ask.go +++ b/pkg/scheduler/objects/allocation_ask.go @@ -55,6 +55,8 @@ type AllocationAsk struct { allocLog map[string]*AllocationLogEntry preemptionTriggered bool preemptCheckTime time.Time + schedulingAttempted bool // whether scheduler core has tried to schedule this ask + scaleUpTriggered bool // whether this ask has triggered autoscaling or not resKeyPerNode map[string]string // reservation key for a given node sync.RWMutex @@ -299,6 +301,30 @@ func (aa *AllocationAsk) LessThan(other *AllocationAsk) bool { return aa.priority < other.priority } +func (aa *AllocationAsk) SetSchedulingAttempted(attempted bool) { + aa.Lock() + defer aa.Unlock() + aa.schedulingAttempted = attempted +} + +func (aa *AllocationAsk) IsSchedulingAttempted() bool { + aa.RLock() + defer aa.RUnlock() + return aa.schedulingAttempted +} + +func (aa *AllocationAsk) SetScaleUpTriggered(triggered bool) { + aa.Lock() + defer aa.Unlock() + aa.scaleUpTriggered = triggered +} + +func (aa *AllocationAsk) HasTriggeredScaleUp() bool { + aa.RLock() + defer aa.RUnlock() + return aa.scaleUpTriggered +} + // completedPendingAsk How many pending asks has been completed or processed so far? func (aa *AllocationAsk) completedPendingAsk() int { aa.RLock() diff --git a/pkg/scheduler/objects/application.go b/pkg/scheduler/objects/application.go index bc7793d98..1899ce620 100644 --- a/pkg/scheduler/objects/application.go +++ b/pkg/scheduler/objects/application.go @@ -899,13 +899,16 @@ func (sa *Application) getOutstandingRequests(headRoom *resources.Resource, user return } for _, request := range sa.sortedRequests { - if request.GetPendingAskRepeat() == 0 { + if request.GetPendingAskRepeat() == 0 || !request.IsSchedulingAttempted() { continue } + // ignore nil checks resource function calls are nil safe if headRoom.FitInMaxUndef(request.GetAllocatedResource()) && userHeadRoom.FitInMaxUndef(request.GetAllocatedResource()) { - // if headroom is still enough for the resources - *total = append(*total, request) + if !request.HasTriggeredScaleUp() && request.requiredNode == common.Empty && !sa.canReplace(request) { + // if headroom is still enough for the resources + *total = append(*total, request) + } headRoom.SubOnlyExisting(request.GetAllocatedResource()) userHeadRoom.SubOnlyExisting(request.GetAllocatedResource()) } @@ -951,6 +954,8 @@ func (sa *Application) tryAllocate(headRoom *resources.Resource, allowPreemption continue } + request.SetSchedulingAttempted(true) + // resource must fit in headroom otherwise skip the request (unless preemption could help) if !headRoom.FitInMaxUndef(request.GetAllocatedResource()) { // attempt preemption diff --git a/pkg/scheduler/objects/application_test.go b/pkg/scheduler/objects/application_test.go index 3ad703c95..684f50c4f 100644 --- a/pkg/scheduler/objects/application_test.go +++ b/pkg/scheduler/objects/application_test.go @@ -2302,6 +2302,8 @@ func TestGetOutstandingRequests(t *testing.T) { allocationAsk1 := newAllocationAsk("alloc-1", "app-1", res) allocationAsk2 := newAllocationAsk("alloc-2", "app-1", res) + allocationAsk1.SetSchedulingAttempted(true) + allocationAsk2.SetSchedulingAttempted(true) // Create an Application instance app := &Application{ @@ -2357,6 +2359,107 @@ func TestGetOutstandingRequests(t *testing.T) { assert.Equal(t, 0, len(total4), "expected no outstanding requests for TestCase 4") } +func TestGetOutstandingRequests_NoSchedulingAttempt(t *testing.T) { + res := resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 1}) + + allocationAsk1 := newAllocationAsk("alloc-1", "app-1", res) + allocationAsk2 := newAllocationAsk("alloc-2", "app-1", res) + allocationAsk3 := newAllocationAsk("alloc-3", "app-1", res) + allocationAsk4 := newAllocationAsk("alloc-4", "app-1", res) + allocationAsk2.SetSchedulingAttempted(true) + allocationAsk4.SetSchedulingAttempted(true) + app := &Application{ + ApplicationID: "app-1", + queuePath: "default", + } + sr := sortedRequests{} + sr.insert(allocationAsk1) + sr.insert(allocationAsk2) + sr.insert(allocationAsk3) + sr.insert(allocationAsk4) + app.sortedRequests = sr + + var total []*AllocationAsk + headroom := resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 10}) + userHeadroom := resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 8}) + app.getOutstandingRequests(headroom, userHeadroom, &total) + + assert.Equal(t, 2, len(total)) + assert.Equal(t, "alloc-2", total[0].allocationKey) + assert.Equal(t, "alloc-4", total[1].allocationKey) +} + +func TestGetOutstandingRequests_RequestTriggeredPreemptionHasRequiredNode(t *testing.T) { + // Test that we decrease headrooms even if the requests have triggered upscaling or + // the ask is a DaemonSet pod (requiredNode != "") + res := resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 1}) + + allocationAsk1 := newAllocationAsk("alloc-1", "app-1", res) + allocationAsk2 := newAllocationAsk("alloc-2", "app-1", res) + allocationAsk3 := newAllocationAsk("alloc-3", "app-1", res) + allocationAsk4 := newAllocationAsk("alloc-4", "app-1", res) + allocationAsk1.SetSchedulingAttempted(true) + allocationAsk2.SetSchedulingAttempted(true) + allocationAsk3.SetSchedulingAttempted(true) + allocationAsk4.SetSchedulingAttempted(true) // hasn't triggered scaling, no required node --> picked + allocationAsk1.SetScaleUpTriggered(true) // triggered scaling, no required node --> not selected + allocationAsk2.SetScaleUpTriggered(true) // triggered scaling, has required node --> not selected + allocationAsk2.SetRequiredNode("node-1") + allocationAsk3.SetRequiredNode("node-1") // hasn't triggered scaling, has required node --> not selected + + app := &Application{ + ApplicationID: "app-1", + queuePath: "default", + } + sr := sortedRequests{} + sr.insert(allocationAsk1) + sr.insert(allocationAsk2) + sr.insert(allocationAsk3) + sr.insert(allocationAsk4) + app.sortedRequests = sr + + var total []*AllocationAsk + headroom := resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 10}) + userHeadroom := resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 8}) + app.getOutstandingRequests(headroom, userHeadroom, &total) + + assert.Equal(t, 1, len(total)) + assert.Equal(t, "alloc-4", total[0].allocationKey) +} + +func TestGetOutstandingRequests_AskReplaceable(t *testing.T) { + res := resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 1}) + + allocationAsk1 := newAllocationAsk("alloc-1", "app-1", res) // replaceable + allocationAsk2 := newAllocationAsk("alloc-2", "app-1", res) // replaceable + allocationAsk3 := newAllocationAsk("alloc-3", "app-1", res) // non-replaceable + allocationAsk1.SetSchedulingAttempted(true) + allocationAsk2.SetSchedulingAttempted(true) + allocationAsk3.SetSchedulingAttempted(true) + allocationAsk1.taskGroupName = "testgroup" + allocationAsk2.taskGroupName = "testgroup" + + app := &Application{ + ApplicationID: "app-1", + queuePath: "default", + } + sr := sortedRequests{} + sr.insert(allocationAsk1) + sr.insert(allocationAsk2) + sr.insert(allocationAsk3) + app.sortedRequests = sr + app.addPlaceholderDataWithLocking(allocationAsk1) + app.addPlaceholderDataWithLocking(allocationAsk2) + + var total []*AllocationAsk + headroom := resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 10}) + userHeadroom := resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 8}) + app.getOutstandingRequests(headroom, userHeadroom, &total) + + assert.Equal(t, 1, len(total)) + assert.Equal(t, "alloc-3", total[0].allocationKey) +} + func TestGetRateLimitedAppLog(t *testing.T) { l := getRateLimitedAppLog() assert.Check(t, l != nil) diff --git a/pkg/scheduler/objects/queue_test.go b/pkg/scheduler/objects/queue_test.go index f1cfa172b..4c9e33031 100644 --- a/pkg/scheduler/objects/queue_test.go +++ b/pkg/scheduler/objects/queue_test.go @@ -1295,8 +1295,9 @@ func testOutstanding(t *testing.T, alloc, used *resources.Resource) { app1.queue = queue1 queue1.AddApplication(app1) for i := 0; i < 20; i++ { - err = app1.AddAllocationAsk( - newAllocationAsk(fmt.Sprintf("alloc-%d", i), appID1, alloc)) + ask := newAllocationAsk(fmt.Sprintf("alloc-%d", i), appID1, alloc) + ask.SetSchedulingAttempted(true) + err = app1.AddAllocationAsk(ask) assert.NilError(t, err, "failed to add allocation ask") } @@ -1304,8 +1305,9 @@ func testOutstanding(t *testing.T, alloc, used *resources.Resource) { app2.queue = queue2 queue2.AddApplication(app2) for i := 0; i < 20; i++ { - err = app2.AddAllocationAsk( - newAllocationAsk(fmt.Sprintf("alloc-%d", i), appID2, alloc)) + ask := newAllocationAsk(fmt.Sprintf("alloc-%d", i), appID2, alloc) + ask.SetSchedulingAttempted(true) + err = app2.AddAllocationAsk(ask) assert.NilError(t, err, "failed to add allocation ask") } @@ -1376,8 +1378,9 @@ func TestGetOutstandingOnlyUntracked(t *testing.T) { app1.queue = queue1 queue1.AddApplication(app1) for i := 0; i < 20; i++ { - err = app1.AddAllocationAsk( - newAllocationAsk(fmt.Sprintf("alloc-%d", i), appID1, alloc)) + ask := newAllocationAsk(fmt.Sprintf("alloc-%d", i), appID1, alloc) + ask.SetSchedulingAttempted(true) + err = app1.AddAllocationAsk(ask) assert.NilError(t, err, "failed to add allocation ask") } @@ -1424,8 +1427,9 @@ func TestGetOutstandingRequestNoMax(t *testing.T) { res, err = resources.NewResourceFromConf(map[string]string{"cpu": "1"}) assert.NilError(t, err, "failed to create basic resource") for i := 0; i < 10; i++ { - err = app1.AddAllocationAsk( - newAllocationAsk(fmt.Sprintf("alloc-%d", i), appID1, res)) + ask := newAllocationAsk(fmt.Sprintf("alloc-%d", i), appID1, res) + ask.SetSchedulingAttempted(true) + err = app1.AddAllocationAsk(ask) assert.NilError(t, err, "failed to add allocation ask") } @@ -1433,8 +1437,9 @@ func TestGetOutstandingRequestNoMax(t *testing.T) { app2.queue = queue2 queue2.AddApplication(app2) for i := 0; i < 20; i++ { - err = app2.AddAllocationAsk( - newAllocationAsk(fmt.Sprintf("alloc-%d", i), appID2, res)) + ask := newAllocationAsk(fmt.Sprintf("alloc-%d", i), appID2, res) + ask.SetSchedulingAttempted(true) + err = app2.AddAllocationAsk(ask) assert.NilError(t, err, "failed to add allocation ask") } diff --git a/pkg/scheduler/partition_test.go b/pkg/scheduler/partition_test.go index 3e59a3f27..9237d885d 100644 --- a/pkg/scheduler/partition_test.go +++ b/pkg/scheduler/partition_test.go @@ -4324,3 +4324,70 @@ func TestLimitMaxApplicationsForReservedAllocation(t *testing.T) { }) } } + +func TestCalculateOutstandingRequests(t *testing.T) { + partition, err := newBasePartition() + assert.NilError(t, err, "unable to create partition: %v", err) + + // no application&asks + requests := partition.calculateOutstandingRequests() + assert.Equal(t, 0, len(requests)) + + // two applications with no asks + app1 := newApplication(appID1, "test", "root.default") + app2 := newApplication(appID2, "test", "root.default") + err = partition.AddApplication(app1) + assert.NilError(t, err) + err = partition.AddApplication(app2) + assert.NilError(t, err) + requests = partition.calculateOutstandingRequests() + assert.Equal(t, 0, len(requests)) + + // new asks for the two apps, but the scheduler hasn't processed them + askResource := resources.NewResourceFromMap(map[string]resources.Quantity{ + "vcores": 1, + "memory": 1, + }) + siAsk1 := &si.AllocationAsk{ + AllocationKey: "ask-uuid-1", + ApplicationID: appID1, + ResourceAsk: askResource.ToProto(), + MaxAllocations: 1, + } + siAsk2 := &si.AllocationAsk{ + AllocationKey: "ask-uuid-2", + ApplicationID: appID1, + ResourceAsk: askResource.ToProto(), + MaxAllocations: 1, + } + siAsk3 := &si.AllocationAsk{ + AllocationKey: "ask-uuid-3", + ApplicationID: appID2, + ResourceAsk: askResource.ToProto(), + MaxAllocations: 1, + } + err = partition.addAllocationAsk(siAsk1) + assert.NilError(t, err) + err = partition.addAllocationAsk(siAsk2) + assert.NilError(t, err) + err = partition.addAllocationAsk(siAsk3) + assert.NilError(t, err) + requests = partition.calculateOutstandingRequests() + assert.Equal(t, 0, len(requests)) + + // mark asks as attempted + app1.GetAllocationAsk("ask-uuid-1").SetSchedulingAttempted(true) + app1.GetAllocationAsk("ask-uuid-2").SetSchedulingAttempted(true) + app2.GetAllocationAsk("ask-uuid-3").SetSchedulingAttempted(true) + requests = partition.calculateOutstandingRequests() + total := resources.NewResource() + expectedTotal := resources.NewResourceFromMap(map[string]resources.Quantity{ + "memory": 3, + "vcores": 3, + }) + for _, req := range requests { + total.AddTo(req.GetAllocatedResource()) + } + assert.Equal(t, 3, len(requests)) + assert.Assert(t, resources.Equals(expectedTotal, total), "total resource expected: %v, got: %v", expectedTotal, total) +} diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 31d3b360d..a47a5be4e 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -24,6 +24,7 @@ import ( "go.uber.org/zap" + "github.com/apache/yunikorn-core/pkg/common/resources" "github.com/apache/yunikorn-core/pkg/handler" "github.com/apache/yunikorn-core/pkg/log" "github.com/apache/yunikorn-core/pkg/plugins" @@ -96,7 +97,11 @@ func (s *Scheduler) internalInspectOutstandingRequests() { case <-s.stop: return case <-time.After(time.Second): - s.inspectOutstandingRequests() + if noRequests, totalResources := s.inspectOutstandingRequests(); noRequests > 0 { + log.Log(log.Scheduler).Info("Found outstanding requests that will trigger autoscaling", + zap.Int("number of requests", noRequests), + zap.Stringer("total resources", totalResources)) + } } } } @@ -163,12 +168,15 @@ func (s *Scheduler) registerActivity() { // skipped due to insufficient cluster resources and update the // state through the ContainerSchedulingStateUpdaterPlugin in order // to trigger the auto-scaling. -func (s *Scheduler) inspectOutstandingRequests() { +func (s *Scheduler) inspectOutstandingRequests() (int, *resources.Resource) { log.Log(log.Scheduler).Debug("inspect outstanding requests") // schedule each partition defined in the cluster + total := resources.NewResource() + noRequests := 0 for _, psc := range s.clusterContext.GetPartitionMapClone() { requests := psc.calculateOutstandingRequests() - if len(requests) > 0 { + noRequests = len(requests) + if noRequests > 0 { for _, ask := range requests { log.Log(log.Scheduler).Debug("outstanding request", zap.String("appID", ask.GetApplicationID()), @@ -183,9 +191,12 @@ func (s *Scheduler) inspectOutstandingRequests() { Reason: "request is waiting for cluster resources become available", }) } + total.AddTo(ask.GetAllocatedResource()) + ask.SetScaleUpTriggered(true) } } } + return noRequests, total } // Visible by tests diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go new file mode 100644 index 000000000..5dc7cba5b --- /dev/null +++ b/pkg/scheduler/scheduler_test.go @@ -0,0 +1,92 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package scheduler + +import ( + "testing" + + "gotest.tools/v3/assert" + + "github.com/apache/yunikorn-core/pkg/common/resources" + "github.com/apache/yunikorn-scheduler-interface/lib/go/si" +) + +func TestInspectOutstandingRequests(t *testing.T) { + scheduler := NewScheduler() + partition, err := newBasePartition() + assert.NilError(t, err, "unable to create partition: %v", err) + scheduler.clusterContext.partitions["test"] = partition + + // two applications with no asks + app1 := newApplication(appID1, "test", "root.default") + app2 := newApplication(appID2, "test", "root.default") + err = partition.AddApplication(app1) + assert.NilError(t, err) + err = partition.AddApplication(app2) + assert.NilError(t, err) + + // add asks + askResource := resources.NewResourceFromMap(map[string]resources.Quantity{ + "vcores": 1, + "memory": 1, + }) + siAsk1 := &si.AllocationAsk{ + AllocationKey: "ask-uuid-1", + ApplicationID: appID1, + ResourceAsk: askResource.ToProto(), + MaxAllocations: 1, + } + siAsk2 := &si.AllocationAsk{ + AllocationKey: "ask-uuid-2", + ApplicationID: appID1, + ResourceAsk: askResource.ToProto(), + MaxAllocations: 1, + } + siAsk3 := &si.AllocationAsk{ + AllocationKey: "ask-uuid-3", + ApplicationID: appID2, + ResourceAsk: askResource.ToProto(), + MaxAllocations: 1, + } + err = partition.addAllocationAsk(siAsk1) + assert.NilError(t, err) + err = partition.addAllocationAsk(siAsk2) + assert.NilError(t, err) + err = partition.addAllocationAsk(siAsk3) + assert.NilError(t, err) + + // mark asks as attempted + expectedTotal := resources.NewResourceFromMap(map[string]resources.Quantity{ + "memory": 3, + "vcores": 3, + }) + app1.GetAllocationAsk("ask-uuid-1").SetSchedulingAttempted(true) + app1.GetAllocationAsk("ask-uuid-2").SetSchedulingAttempted(true) + app2.GetAllocationAsk("ask-uuid-3").SetSchedulingAttempted(true) + + // Check #1: collected 3 requests + noRequests, totalResources := scheduler.inspectOutstandingRequests() + assert.Equal(t, 3, noRequests) + assert.Assert(t, resources.Equals(totalResources, expectedTotal), + "total resource expected: %v, got: %v", expectedTotal, totalResources) + + // Check #2: try again, pending asks are not collected + noRequests, totalResources = scheduler.inspectOutstandingRequests() + assert.Equal(t, 0, noRequests) + assert.Assert(t, resources.IsZero(totalResources), "total resource is not zero: %v", totalResources) +} diff --git a/pkg/webservice/dao/allocation_ask_info.go b/pkg/webservice/dao/allocation_ask_info.go index 2d56bf461..07fbd0737 100644 --- a/pkg/webservice/dao/allocation_ask_info.go +++ b/pkg/webservice/dao/allocation_ask_info.go @@ -40,4 +40,6 @@ type AllocationAskDAOInfo struct { AllocationLog []*AllocationAskLogDAOInfo `json:"allocationLog,omitempty"` TriggeredPreemption bool `json:"triggeredPreemption,omitempty"` Originator bool `json:"originator,omitempty"` + SchedulingAttempted bool `json:"schedulingAttempted,omitempty"` + TriggeredScaleUp bool `json:"triggeredScaleUp,omitempty"` } diff --git a/pkg/webservice/handlers.go b/pkg/webservice/handlers.go index d01f12cf2..31d1dff7b 100644 --- a/pkg/webservice/handlers.go +++ b/pkg/webservice/handlers.go @@ -334,6 +334,8 @@ func getAllocationAskDAO(ask *objects.AllocationAsk) *dao.AllocationAskDAOInfo { AllocationLog: getAllocationLogsDAO(ask.GetAllocationLog()), TriggeredPreemption: ask.HasTriggeredPreemption(), Originator: ask.IsOriginator(), + SchedulingAttempted: ask.IsSchedulingAttempted(), + TriggeredScaleUp: ask.HasTriggeredScaleUp(), } }