Skip to content

Commit

Permalink
[YUNIKORN-2180] Clean up scheduler state initialization (#734)
Browse files Browse the repository at this point in the history
- Moved all state initialization code into context
- Unified primary and secondary scheduler caches
- Properly handle orphaned pods (those referring to unknown nodes)

Closes: #734
  • Loading branch information
craigcondit committed Jan 4, 2024
1 parent 5d65dbf commit d79cafc
Show file tree
Hide file tree
Showing 42 changed files with 1,861 additions and 4,138 deletions.
39 changes: 0 additions & 39 deletions pkg/cache/amprotocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,45 +24,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// app management protocol defines all the APIs needed for app management,
// this is the protocol between scheduler cache and app management plugins
type ApplicationManagementProtocol interface {
// returns app that already existed in the cache,
// or nil, false if app with the given appID is not found
GetApplication(appID string) *Application

// add app to the context, app manager needs to provide all
// necessary app metadata through this call. If this a existing app
// for recovery, the AddApplicationRequest#Recovery must be true.
AddApplication(request *AddApplicationRequest) *Application

// remove application from the context
// returns an error if for some reason the app cannot be removed,
// e.g the given app is not found in current context.
RemoveApplication(appID string) error

// add task to the context, if add is successful,
AddTask(request *AddTaskRequest) *Task

// remove task from the app
// return an error if for some reason the task cannot be removed
// e.g app that owns this task is not found in context.
RemoveTask(appID, taskID string)

// notify the context that an app is completed,
// this will trigger some consequent operations for the given app
NotifyApplicationComplete(appID string)

// notify the context that an app has failed,
// this will trigger some consequent operations for the given app
NotifyApplicationFail(appID string)

// notify the context that an task is completed,
// this will trigger some consequent operations for a given task,
// e.g release the allocations that assigned for this task.
NotifyTaskComplete(appID, taskID string)
}

type AddApplicationRequest struct {
Metadata ApplicationMetadata
}
Expand Down
147 changes: 0 additions & 147 deletions pkg/cache/amprotocol_mock.go

This file was deleted.

49 changes: 9 additions & 40 deletions pkg/cache/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,8 +328,8 @@ func (app *Application) SetState(state string) {
app.sm.SetState(state)
}

func (app *Application) TriggerAppRecovery() error {
return app.handle(NewSimpleApplicationEvent(app.applicationID, RecoverApplication))
func (app *Application) TriggerAppSubmission() error {
return app.handle(NewSubmitApplicationEvent(app.applicationID))
}

// Schedule is called in every scheduling interval,
Expand Down Expand Up @@ -406,42 +406,12 @@ func (app *Application) scheduleTasks(taskScheduleCondition func(t *Task) bool)
}
}

func (app *Application) handleSubmitApplicationEvent() {
func (app *Application) handleSubmitApplicationEvent() error {
log.Log(log.ShimCacheApplication).Info("handle app submission",
zap.Stringer("app", app),
zap.String("clusterID", conf.GetSchedulerConf().ClusterID))
err := app.schedulerAPI.UpdateApplication(
&si.ApplicationRequest{
New: []*si.AddApplicationRequest{
{
ApplicationID: app.applicationID,
QueueName: app.queue,
PartitionName: app.partition,
Ugi: &si.UserGroupInformation{
User: app.user,
Groups: app.groups,
},
Tags: app.tags,
PlaceholderAsk: app.placeholderAsk,
ExecutionTimeoutMilliSeconds: app.placeholderTimeoutInSec * 1000,
GangSchedulingStyle: app.schedulingStyle,
},
},
RmID: conf.GetSchedulerConf().ClusterID,
})

if err != nil {
// submission failed
log.Log(log.ShimCacheApplication).Warn("failed to submit app", zap.Error(err))
dispatcher.Dispatch(NewFailApplicationEvent(app.applicationID, err.Error()))
}
}

func (app *Application) handleRecoverApplicationEvent() {
log.Log(log.ShimCacheApplication).Info("handle app recovering",
zap.Stringer("app", app),
zap.String("clusterID", conf.GetSchedulerConf().ClusterID))
err := app.schedulerAPI.UpdateApplication(
if err := app.schedulerAPI.UpdateApplication(
&si.ApplicationRequest{
New: []*si.AddApplicationRequest{
{
Expand All @@ -459,13 +429,13 @@ func (app *Application) handleRecoverApplicationEvent() {
},
},
RmID: conf.GetSchedulerConf().ClusterID,
})

if err != nil {
// recovery failed
log.Log(log.ShimCacheApplication).Warn("failed to recover app", zap.Error(err))
}); err != nil {
// submission failed
log.Log(log.ShimCacheApplication).Warn("failed to submit new app request to core", zap.Error(err))
dispatcher.Dispatch(NewFailApplicationEvent(app.applicationID, err.Error()))
return err
}
return nil
}

func (app *Application) skipReservationStage() bool {
Expand Down Expand Up @@ -573,7 +543,6 @@ func (app *Application) handleRejectApplicationEvent(reason string) {
}

func (app *Application) handleCompleteApplicationEvent() {
// TODO app lifecycle updates
go func() {
getPlaceholderManager().cleanUp(app)
}()
Expand Down
66 changes: 27 additions & 39 deletions pkg/cache/application_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ type ApplicationEventType int

const (
SubmitApplication ApplicationEventType = iota
RecoverApplication
AcceptApplication
TryReserve
UpdateReservation
Expand Down Expand Up @@ -361,37 +360,35 @@ func (re ResumingApplicationEvent) GetApplicationID() string {
var storeApplicationStates *AStates

type AStates struct {
New string
Recovering string
Submitted string
Accepted string
Reserving string
Running string
Rejected string
Completed string
Killing string
Killed string
Failing string
Failed string
Resuming string
New string
Submitted string
Accepted string
Reserving string
Running string
Rejected string
Completed string
Killing string
Killed string
Failing string
Failed string
Resuming string
}

func ApplicationStates() *AStates {
applicationStatesOnce.Do(func() {
storeApplicationStates = &AStates{
New: "New",
Recovering: "Recovering",
Submitted: "Submitted",
Accepted: "Accepted",
Reserving: "Reserving",
Running: "Running",
Rejected: "Rejected",
Completed: "Completed",
Killing: "Killing",
Killed: "Killed",
Failed: "Failed",
Failing: "Failing",
Resuming: "Resuming",
New: "New",
Submitted: "Submitted",
Accepted: "Accepted",
Reserving: "Reserving",
Running: "Running",
Rejected: "Rejected",
Completed: "Completed",
Killing: "Killing",
Killed: "Killed",
Failed: "Failed",
Failing: "Failing",
Resuming: "Resuming",
}
})
return storeApplicationStates
Expand All @@ -406,14 +403,9 @@ func newAppState() *fsm.FSM { //nolint:funlen
Src: []string{states.New},
Dst: states.Submitted,
},
{
Name: RecoverApplication.String(),
Src: []string{states.New},
Dst: states.Recovering,
},
{
Name: AcceptApplication.String(),
Src: []string{states.Submitted, states.Recovering},
Src: []string{states.Submitted},
Dst: states.Accepted,
},
{
Expand Down Expand Up @@ -478,7 +470,7 @@ func newAppState() *fsm.FSM { //nolint:funlen
},
{
Name: RejectApplication.String(),
Src: []string{states.Submitted, states.Recovering},
Src: []string{states.Submitted},
Dst: states.Rejected,
},
{
Expand Down Expand Up @@ -517,11 +509,7 @@ func newAppState() *fsm.FSM { //nolint:funlen
},
SubmitApplication.String(): func(_ context.Context, event *fsm.Event) {
app := event.Args[0].(*Application) //nolint:errcheck
app.handleSubmitApplicationEvent()
},
RecoverApplication.String(): func(_ context.Context, event *fsm.Event) {
app := event.Args[0].(*Application) //nolint:errcheck
app.handleRecoverApplicationEvent()
event.Err = app.handleSubmitApplicationEvent()
},
RejectApplication.String(): func(_ context.Context, event *fsm.Event) {
app := event.Args[0].(*Application) //nolint:errcheck
Expand Down
Loading

0 comments on commit d79cafc

Please sign in to comment.