Skip to content

Commit

Permalink
[WIP] [YUNIKORN-2180] Clean up scheduler state initialization
Browse files Browse the repository at this point in the history
- Moved all state initialization code into context
- Unified primary and secondary scheduler caches
- Properly handle orphaned pods (those referring to unknown nodes)
- TODO: Missing tests
  • Loading branch information
craigcondit committed Dec 7, 2023
1 parent dc98af0 commit e8e547c
Show file tree
Hide file tree
Showing 39 changed files with 1,435 additions and 4,001 deletions.
39 changes: 0 additions & 39 deletions pkg/cache/amprotocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,45 +24,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// app management protocol defines all the APIs needed for app management,
// this is the protocol between scheduler cache and app management plugins
type ApplicationManagementProtocol interface {
// returns app that already existed in the cache,
// or nil, false if app with the given appID is not found
GetApplication(appID string) *Application

// add app to the context, app manager needs to provide all
// necessary app metadata through this call. If this a existing app
// for recovery, the AddApplicationRequest#Recovery must be true.
AddApplication(request *AddApplicationRequest) *Application

// remove application from the context
// returns an error if for some reason the app cannot be removed,
// e.g the given app is not found in current context.
RemoveApplication(appID string) error

// add task to the context, if add is successful,
AddTask(request *AddTaskRequest) *Task

// remove task from the app
// return an error if for some reason the task cannot be removed
// e.g app that owns this task is not found in context.
RemoveTask(appID, taskID string)

// notify the context that an app is completed,
// this will trigger some consequent operations for the given app
NotifyApplicationComplete(appID string)

// notify the context that an app has failed,
// this will trigger some consequent operations for the given app
NotifyApplicationFail(appID string)

// notify the context that an task is completed,
// this will trigger some consequent operations for a given task,
// e.g release the allocations that assigned for this task.
NotifyTaskComplete(appID, taskID string)
}

type AddApplicationRequest struct {
Metadata ApplicationMetadata
}
Expand Down
147 changes: 0 additions & 147 deletions pkg/cache/amprotocol_mock.go

This file was deleted.

18 changes: 9 additions & 9 deletions pkg/cache/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,8 +328,12 @@ func (app *Application) SetState(state string) {
app.sm.SetState(state)
}

func (app *Application) TriggerAppSubmission() error {
return app.handle(NewSubmitApplicationEvent(app.applicationID))
}

func (app *Application) TriggerAppRecovery() error {
return app.handle(NewSimpleApplicationEvent(app.applicationID, RecoverApplication))
return app.handle(NewRecoverApplicationEvent(app.applicationID))
}

// Schedule is called in every scheduling interval,
Expand All @@ -341,12 +345,6 @@ func (app *Application) TriggerAppRecovery() error {
// return true if the app needs scheduling or false if not
func (app *Application) Schedule() bool {
switch app.GetApplicationState() {
case ApplicationStates().New:
ev := NewSubmitApplicationEvent(app.GetApplicationID())
if err := app.handle(ev); err != nil {
log.Log(log.ShimCacheApplication).Warn("failed to handle SUBMIT app event",
zap.Error(err))
}
case ApplicationStates().Accepted:
// once the app is accepted by the scheduler core,
// the next step is to send requests for scheduling
Expand Down Expand Up @@ -406,7 +404,7 @@ func (app *Application) scheduleTasks(taskScheduleCondition func(t *Task) bool)
}
}

func (app *Application) handleSubmitApplicationEvent() {
func (app *Application) handleSubmitApplicationEvent() error {
log.Log(log.ShimCacheApplication).Info("handle app submission",
zap.Stringer("app", app),
zap.String("clusterID", conf.GetSchedulerConf().ClusterID))
Expand Down Expand Up @@ -435,9 +433,10 @@ func (app *Application) handleSubmitApplicationEvent() {
log.Log(log.ShimCacheApplication).Warn("failed to submit app", zap.Error(err))
dispatcher.Dispatch(NewFailApplicationEvent(app.applicationID, err.Error()))
}
return err
}

func (app *Application) handleRecoverApplicationEvent() {
func (app *Application) handleRecoverApplicationEvent() error {
log.Log(log.ShimCacheApplication).Info("handle app recovering",
zap.Stringer("app", app),
zap.String("clusterID", conf.GetSchedulerConf().ClusterID))
Expand Down Expand Up @@ -466,6 +465,7 @@ func (app *Application) handleRecoverApplicationEvent() {
log.Log(log.ShimCacheApplication).Warn("failed to recover app", zap.Error(err))
dispatcher.Dispatch(NewFailApplicationEvent(app.applicationID, err.Error()))
}
return err
}

func (app *Application) skipReservationStage() bool {
Expand Down
11 changes: 9 additions & 2 deletions pkg/cache/application_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,13 @@ func NewSubmitApplicationEvent(appID string) SubmitApplicationEvent {
}
}

func NewRecoverApplicationEvent(appID string) SubmitApplicationEvent {
return SubmitApplicationEvent{
applicationID: appID,
event: RecoverApplication,
}
}

func (se SubmitApplicationEvent) GetEvent() string {
return se.event.String()
}
Expand Down Expand Up @@ -517,11 +524,11 @@ func newAppState() *fsm.FSM { //nolint:funlen
},
SubmitApplication.String(): func(_ context.Context, event *fsm.Event) {
app := event.Args[0].(*Application) //nolint:errcheck
app.handleSubmitApplicationEvent()
event.Err = app.handleSubmitApplicationEvent()
},
RecoverApplication.String(): func(_ context.Context, event *fsm.Event) {
app := event.Args[0].(*Application) //nolint:errcheck
app.handleRecoverApplicationEvent()
event.Err = app.handleRecoverApplicationEvent()
},
RejectApplication.String(): func(_ context.Context, event *fsm.Event) {
app := event.Args[0].(*Application) //nolint:errcheck
Expand Down
Loading

0 comments on commit e8e547c

Please sign in to comment.