Skip to content

Commit

Permalink
[YUNIKORN-2483] Core: Remove stateaware sort policy (apache#820)
Browse files Browse the repository at this point in the history
Closes: apache#820

Signed-off-by: Manikandan R <[email protected]>
  • Loading branch information
craigcondit authored and manirajv06 committed Mar 14, 2024
1 parent bb31066 commit 47c3ddc
Show file tree
Hide file tree
Showing 12 changed files with 35 additions and 268 deletions.
4 changes: 2 additions & 2 deletions pkg/common/configs/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ partitions:
- name: root
submitacl: '*'
properties:
application.sort.policy: stateaware
application.sort.policy: fifo
sample: value2
`

Expand Down Expand Up @@ -1714,7 +1714,7 @@ partitions:
- name: root
submitacl: '*'
properties:
application.sort.policy: stateaware
application.sort.policy: fifo
`
testCases := []struct {
name string
Expand Down
7 changes: 1 addition & 6 deletions pkg/scheduler/objects/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ import (
"github.com/apache/yunikorn-core/pkg/metrics"
"github.com/apache/yunikorn-core/pkg/rmproxy/rmevent"
"github.com/apache/yunikorn-core/pkg/scheduler/ugm"
siCommon "github.com/apache/yunikorn-scheduler-interface/lib/go/common"
"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
)

Expand Down Expand Up @@ -185,11 +184,7 @@ func NewApplication(siApp *si.AddApplicationRequest, ugi security.UserGroup, eve
}
app.gangSchedulingStyle = gangSchedStyle
app.execTimeout = placeholderTimeout
if app.GetTag(siCommon.AppTagStateAwareDisable) != "" {
app.startTimeout = 0 // transition immediately to Running
} else {
app.startTimeout = startingTimeout
}
app.startTimeout = startingTimeout
app.user = ugi
app.rmEventHandler = eventHandler
app.rmID = rmID
Expand Down
20 changes: 0 additions & 20 deletions pkg/scheduler/objects/application_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1013,26 +1013,6 @@ func TestStateTimeOut(t *testing.T) {
if !app.stateMachine.Is(Running.String()) || app.stateTimer != nil {
t.Fatalf("State is not running or timer was not cleared, state: %s, timer %v", app.stateMachine.Current(), app.stateTimer)
}

startingTimeout = time.Minute * 5
app = newApplicationWithTags(appID2, "default", "root.a", map[string]string{siCommon.AppTagStateAwareDisable: "true"})
err = app.handleApplicationEventWithLocking(RunApplication)
assert.NilError(t, err, "no error expected new to accepted (timeout test)")
err = app.handleApplicationEventWithLocking(RunApplication)
assert.NilError(t, err, "no error expected accepted to starting (timeout test)")
// give it some time to run and progress
time.Sleep(time.Millisecond * 100)
if app.IsStarting() {
t.Fatal("Starting state should have timed out")
}
if app.stateTimer != nil {
t.Fatalf("Startup timer has not be cleared on time out as expected, %v", app.stateTimer)
}
log := app.GetStateLog()
assert.Equal(t, len(log), 3, "wrong number of app events")
assert.Equal(t, log[0].ApplicationState, Accepted.String())
assert.Equal(t, log[1].ApplicationState, Starting.String())
assert.Equal(t, log[2].ApplicationState, Running.String())
}

func TestCompleted(t *testing.T) {
Expand Down
27 changes: 8 additions & 19 deletions pkg/scheduler/objects/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -1126,7 +1126,7 @@ func (sq *Queue) IsPrioritySortEnabled() bool {
// Only applications with a pending resource request are considered.
// Lock free call all locks are taken when needed in called functions
// If withPlaceholdersOnly is true, then only applications with at least one placeholder allocation are considered.
func (sq *Queue) sortApplications(filterApps, withPlaceholdersOnly bool) []*Application {
func (sq *Queue) sortApplications(withPlaceholdersOnly bool) []*Application {
if !sq.IsLeafQueue() {
return nil
}
Expand All @@ -1144,16 +1144,7 @@ func (sq *Queue) sortApplications(filterApps, withPlaceholdersOnly bool) []*Appl
}

// sort applications based on the sorting policy
// some apps might be filtered out based on the policy specific conditions.
// currently, only the stateAware policy does the filtering (based on app state).
// if the filterApps flag is true, the app filtering is skipped while doing the sorting.
queueSortType := sq.getSortType()
if !filterApps && queueSortType == policies.StateAwarePolicy {
// fallback to FIFO policy if the filterApps flag is false
// this is to skip the app filtering in the StateAware policy sorting
queueSortType = policies.FifoSortPolicy
}
return sortApplications(apps, queueSortType, sq.IsPrioritySortEnabled(), sq.GetGuaranteedResource())
return sortApplications(apps, sq.getSortType(), sq.IsPrioritySortEnabled(), sq.GetGuaranteedResource())
}

// sortQueues returns a sorted shallow copy of the queues for this parent queue.
Expand Down Expand Up @@ -1360,7 +1351,7 @@ func (sq *Queue) TryAllocate(iterator func() NodeIterator, fullIterator func() N
preemptAttemptsRemaining := maxPreemptionsPerQueue

// process the apps (filters out app without pending requests)
for _, app := range sq.sortApplications(true, false) {
for _, app := range sq.sortApplications(false) {
runnableInQueue := sq.canRunApp(app.ApplicationID)
runnableByUserLimit := ugm.GetUserManager().CanRunApp(sq.QueuePath, app.ApplicationID, app.user)
app.updateRunnableStatus(runnableInQueue, runnableByUserLimit)
Expand Down Expand Up @@ -1402,7 +1393,7 @@ func (sq *Queue) TryAllocate(iterator func() NodeIterator, fullIterator func() N
func (sq *Queue) TryPlaceholderAllocate(iterator func() NodeIterator, getnode func(string) *Node) *Allocation {
if sq.IsLeafQueue() {
// process the apps (filters out app without pending requests)
for _, app := range sq.sortApplications(true, true) {
for _, app := range sq.sortApplications(true) {
alloc := app.tryPlaceholderAllocate(iterator, getnode)
if alloc != nil {
log.Log(log.SchedQueue).Info("allocation found on queue",
Expand All @@ -1428,11 +1419,9 @@ func (sq *Queue) TryPlaceholderAllocate(iterator func() NodeIterator, getnode fu
func (sq *Queue) GetQueueOutstandingRequests(total *[]*AllocationAsk) {
if sq.IsLeafQueue() {
headRoom := sq.getMaxHeadRoom()
// while calculating outstanding requests, we do not need to filter apps.
// e.g. StateAware filters apps by state in order to schedule app one by one.
// we calculate all the requests that can fit into the queue's headroom,
// while calculating outstanding requests, we calculate all the requests that can fit into the queue's headroom,
// all these requests are qualified to trigger the up scaling.
for _, app := range sq.sortApplications(false, false) {
for _, app := range sq.sortApplications(false) {
// calculate the users' headroom
userHeadroom := ugm.GetUserManager().Headroom(app.queuePath, app.ApplicationID, app.user)
app.getOutstandingRequests(headRoom, userHeadroom, total)
Expand Down Expand Up @@ -1586,15 +1575,15 @@ func (sq *Queue) getSortType() policies.SortPolicy {
}

// SupportTaskGroup returns true if the queue supports task groups.
// FIFO and StateAware sorting policies can support this.
// FIFO policy is required to support this.
// NOTE: this call does not make sense for a parent queue, and always returns false
func (sq *Queue) SupportTaskGroup() bool {
sq.RLock()
defer sq.RUnlock()
if !sq.isLeaf {
return false
}
return sq.sortType == policies.FifoSortPolicy || sq.sortType == policies.StateAwarePolicy
return sq.sortType == policies.FifoSortPolicy
}

// updateGuaranteedResourceMetrics updates guaranteed resource metrics.
Expand Down
79 changes: 7 additions & 72 deletions pkg/scheduler/objects/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -647,21 +647,21 @@ func TestSortApplications(t *testing.T) {
// empty parent queue
parent, err = createManagedQueue(root, "parent", true, nil)
assert.NilError(t, err, "failed to create parent queue: %v")
if apps := parent.sortApplications(true, false); apps != nil {
if apps := parent.sortApplications(false); apps != nil {
t.Errorf("parent queue should not return sorted apps: %v", apps)
}

// empty leaf queue
leaf, err = createManagedQueue(parent, "leaf", false, nil)
assert.NilError(t, err, "failed to create leaf queue")
if len(leaf.sortApplications(true, false)) != 0 {
if len(leaf.sortApplications(false)) != 0 {
t.Errorf("empty queue should return no app from sort: %v", leaf)
}
// new app does not have pending res, does not get returned
app := newApplication(appID1, "default", leaf.QueuePath)
app.queue = leaf
leaf.AddApplication(app)
if len(leaf.sortApplications(true, false)) != 0 {
if len(leaf.sortApplications(false)) != 0 {
t.Errorf("app without ask should not be in sorted apps: %v", app)
}
var res *resources.Resource
Expand All @@ -670,73 +670,17 @@ func TestSortApplications(t *testing.T) {
// add an ask app must be returned
err = app.AddAllocationAsk(newAllocationAsk("alloc-1", appID1, res))
assert.NilError(t, err, "failed to add allocation ask")
sortedApp := leaf.sortApplications(true, false)
sortedApp := leaf.sortApplications(false)
if len(sortedApp) != 1 || sortedApp[0].ApplicationID != appID1 {
t.Errorf("sorted application is missing expected app: %v", sortedApp)
}
// set 0 repeat
_, err = app.UpdateAskRepeat("alloc-1", -1)
if err != nil || len(leaf.sortApplications(true, false)) != 0 {
if err != nil || len(leaf.sortApplications(false)) != 0 {
t.Errorf("app with ask but 0 pending resources should not be in sorted apps: %v (err = %v)", app, err)
}
}

func TestSortApplicationsWithoutFiltering(t *testing.T) {
// create the root
root, err := createRootQueue(nil)
assert.NilError(t, err, "queue create failed")

var leaf *Queue
properties := map[string]string{configs.ApplicationSortPolicy: "stateaware"}
leaf, err = createManagedQueueWithProps(root, "leaf", false, nil, properties)
assert.NilError(t, err, "failed to create queue: %v", err)

// new app does not have pending res, does not get returned
app1 := newApplication(appID1, "default", leaf.QueuePath)
app1.queue = leaf
leaf.AddApplication(app1)

app2 := newApplication(appID2, "default", leaf.QueuePath)
app2.queue = leaf
leaf.AddApplication(app2)

// both apps have no pending resource, they will be excluded by the sorting result
apps := leaf.sortApplications(true, false)
assertAppListLength(t, apps, []string{}, "sort with the filter")
apps = leaf.sortApplications(false, false)
assertAppListLength(t, apps, []string{}, "sort without the filter")

// add pending ask to app1
var res *resources.Resource
res, err = resources.NewResourceFromConf(map[string]string{"first": "1"})
assert.NilError(t, err, "failed to create basic resource")
// add an ask app must be returned
err = app1.AddAllocationAsk(newAllocationAsk("app1-alloc-1", appID1, res))
assert.NilError(t, err, "failed to add allocation ask")

// the sorting result will return app1
apps = leaf.sortApplications(true, false)
assertAppListLength(t, apps, []string{appID1}, "sort with the filter")
apps = leaf.sortApplications(false, false)
assertAppListLength(t, apps, []string{appID1}, "sort without the filter")

// add pending ask to app2
res, err = resources.NewResourceFromConf(map[string]string{"first": "1"})
assert.NilError(t, err, "failed to create basic resource")
// add an ask app must be returned
err = app2.AddAllocationAsk(newAllocationAsk("app2-alloc-1", appID2, res))
assert.NilError(t, err, "failed to add allocation ask")

// now there are 2 apps in the queue
// according to the state aware policy, if we sort with the filter
// only 1 app will be returning in the result; if sort without the filter
// it should return the both 2 apps
apps = leaf.sortApplications(true, false)
assertAppListLength(t, apps, []string{appID1}, "sort with the filter")
apps = leaf.sortApplications(false, false)
assertAppListLength(t, apps, []string{appID1, appID2}, "sort without the filter")
}

func TestSortAppsWithPlaceholderAllocations(t *testing.T) {
root, err := createRootQueue(nil)
assert.NilError(t, err, "queue create failed")
Expand All @@ -758,9 +702,7 @@ func TestSortAppsWithPlaceholderAllocations(t *testing.T) {
app1.AddAllocation(alloc)
err = app1.AddAllocationAsk(newAllocationAsk("ask-0", appID1, res))
assert.NilError(t, err, "could not add ask")
phApps := leaf.sortApplications(true, true)
assert.Equal(t, 1, len(phApps))
phApps = leaf.sortApplications(false, true)
phApps := leaf.sortApplications(true)
assert.Equal(t, 1, len(phApps))

// adding a placeholder allocation & pending request to "app2"
Expand All @@ -769,9 +711,7 @@ func TestSortAppsWithPlaceholderAllocations(t *testing.T) {
app2.AddAllocation(alloc2)
err = app2.AddAllocationAsk(newAllocationAsk("ask-0", appID1, res))
assert.NilError(t, err, "could not add ask")
phApps = leaf.sortApplications(true, true)
assert.Equal(t, 2, len(phApps))
phApps = leaf.sortApplications(false, true)
phApps = leaf.sortApplications(true)
assert.Equal(t, 2, len(phApps))
}

Expand Down Expand Up @@ -1668,11 +1608,6 @@ func TestSupportTaskGroup(t *testing.T) {
assert.NilError(t, err, "failed to create queue: %v", err)
assert.Assert(t, leaf.SupportTaskGroup(), "leaf queue (FIFO policy) should support task group")

properties = map[string]string{configs.ApplicationSortPolicy: "StateAware"}
leaf, err = createManagedQueueWithProps(parent, "leaf2", false, nil, properties)
assert.NilError(t, err, "failed to create queue: %v", err)
assert.Assert(t, leaf.SupportTaskGroup(), "leaf queue (StateAware policy) should support task group")

properties = map[string]string{configs.ApplicationSortPolicy: "fair"}
leaf, err = createManagedQueueWithProps(parent, "leaf3", false, nil, properties)
assert.NilError(t, err, "failed to create queue: %v", err)
Expand Down
44 changes: 0 additions & 44 deletions pkg/scheduler/objects/sorters.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,13 +113,6 @@ func sortApplications(apps map[string]*Application, sortType policies.SortPolicy
} else {
sortApplicationsBySubmissionTimeAndPriority(sortedApps)
}
case policies.StateAwarePolicy:
sortedApps = stateAwareFilter(apps)
if considerPriority {
sortApplicationsByPriorityAndSubmissionTime(sortedApps)
} else {
sortApplicationsBySubmissionTimeAndPriority(sortedApps)
}
}
metrics.GetSchedulerMetrics().ObserveAppSortingLatency(sortingStart)
return sortedApps
Expand Down Expand Up @@ -192,40 +185,3 @@ func filterOnPendingResources(apps map[string]*Application) []*Application {
}
return filteredApps
}

// This filter only allows one (1) application with a state that is not running in the list of candidates.
// The preference is a state of Starting. If we can not find an app with a starting state we will use an app
// with an Accepted state. However if there is an app with a Starting state even with no pending resource
// requests, no Accepted apps can be scheduled. An app in New state does not have any asks and can never be
// scheduled.
func stateAwareFilter(apps map[string]*Application) []*Application {
filteredApps := make([]*Application, 0)
var acceptedApp *Application
var foundStarting bool
for _, app := range apps {
// found a starting app clear out the accepted app (independent of pending resources)
if app.IsStarting() {
foundStarting = true
acceptedApp = nil
}
// Now just look at app when pending-res > 0
if resources.StrictlyGreaterThanZero(app.GetPendingResource()) {
// filter accepted apps
if app.IsAccepted() {
// check if we have not seen a starting app
// replace the currently tracked accepted app if this is an older one
if !foundStarting && (acceptedApp == nil || acceptedApp.SubmissionTime.After(app.SubmissionTime)) {
acceptedApp = app
}
continue
}
// this is a running or starting app add it to the list
filteredApps = append(filteredApps, app)
}
}
// just add the accepted app if we need to: apps are not sorted yet
if acceptedApp != nil {
filteredApps = append(filteredApps, acceptedApp)
}
return filteredApps
}
Loading

0 comments on commit 47c3ddc

Please sign in to comment.