Skip to content

Commit 71ffc2e

Browse files
committed
[WIP] [YUNIKORN-2180] Clean up scheduler state initialization
- Moved all state initialization code into context - Unified primary and secondary scheduler caches - Properly handle orphaned pods (those referring to unknown nodes) - TODO: Missing tests
1 parent dc98af0 commit 71ffc2e

39 files changed

+1434
-4001
lines changed

pkg/cache/amprotocol.go

-39
Original file line numberDiff line numberDiff line change
@@ -24,45 +24,6 @@ import (
2424
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2525
)
2626

27-
// app management protocol defines all the APIs needed for app management,
28-
// this is the protocol between scheduler cache and app management plugins
29-
type ApplicationManagementProtocol interface {
30-
// returns app that already existed in the cache,
31-
// or nil, false if app with the given appID is not found
32-
GetApplication(appID string) *Application
33-
34-
// add app to the context, app manager needs to provide all
35-
// necessary app metadata through this call. If this a existing app
36-
// for recovery, the AddApplicationRequest#Recovery must be true.
37-
AddApplication(request *AddApplicationRequest) *Application
38-
39-
// remove application from the context
40-
// returns an error if for some reason the app cannot be removed,
41-
// e.g the given app is not found in current context.
42-
RemoveApplication(appID string) error
43-
44-
// add task to the context, if add is successful,
45-
AddTask(request *AddTaskRequest) *Task
46-
47-
// remove task from the app
48-
// return an error if for some reason the task cannot be removed
49-
// e.g app that owns this task is not found in context.
50-
RemoveTask(appID, taskID string)
51-
52-
// notify the context that an app is completed,
53-
// this will trigger some consequent operations for the given app
54-
NotifyApplicationComplete(appID string)
55-
56-
// notify the context that an app has failed,
57-
// this will trigger some consequent operations for the given app
58-
NotifyApplicationFail(appID string)
59-
60-
// notify the context that an task is completed,
61-
// this will trigger some consequent operations for a given task,
62-
// e.g release the allocations that assigned for this task.
63-
NotifyTaskComplete(appID, taskID string)
64-
}
65-
6627
type AddApplicationRequest struct {
6728
Metadata ApplicationMetadata
6829
}

pkg/cache/amprotocol_mock.go

-147
This file was deleted.

pkg/cache/application.go

+9-9
Original file line numberDiff line numberDiff line change
@@ -328,8 +328,12 @@ func (app *Application) SetState(state string) {
328328
app.sm.SetState(state)
329329
}
330330

331+
func (app *Application) TriggerAppSubmission() error {
332+
return app.handle(NewSubmitApplicationEvent(app.applicationID))
333+
}
334+
331335
func (app *Application) TriggerAppRecovery() error {
332-
return app.handle(NewSimpleApplicationEvent(app.applicationID, RecoverApplication))
336+
return app.handle(NewRecoverApplicationEvent(app.applicationID))
333337
}
334338

335339
// Schedule is called in every scheduling interval,
@@ -341,12 +345,6 @@ func (app *Application) TriggerAppRecovery() error {
341345
// return true if the app needs scheduling or false if not
342346
func (app *Application) Schedule() bool {
343347
switch app.GetApplicationState() {
344-
case ApplicationStates().New:
345-
ev := NewSubmitApplicationEvent(app.GetApplicationID())
346-
if err := app.handle(ev); err != nil {
347-
log.Log(log.ShimCacheApplication).Warn("failed to handle SUBMIT app event",
348-
zap.Error(err))
349-
}
350348
case ApplicationStates().Accepted:
351349
// once the app is accepted by the scheduler core,
352350
// the next step is to send requests for scheduling
@@ -406,7 +404,7 @@ func (app *Application) scheduleTasks(taskScheduleCondition func(t *Task) bool)
406404
}
407405
}
408406

409-
func (app *Application) handleSubmitApplicationEvent() {
407+
func (app *Application) handleSubmitApplicationEvent() error {
410408
log.Log(log.ShimCacheApplication).Info("handle app submission",
411409
zap.Stringer("app", app),
412410
zap.String("clusterID", conf.GetSchedulerConf().ClusterID))
@@ -435,9 +433,10 @@ func (app *Application) handleSubmitApplicationEvent() {
435433
log.Log(log.ShimCacheApplication).Warn("failed to submit app", zap.Error(err))
436434
dispatcher.Dispatch(NewFailApplicationEvent(app.applicationID, err.Error()))
437435
}
436+
return err
438437
}
439438

440-
func (app *Application) handleRecoverApplicationEvent() {
439+
func (app *Application) handleRecoverApplicationEvent() error {
441440
log.Log(log.ShimCacheApplication).Info("handle app recovering",
442441
zap.Stringer("app", app),
443442
zap.String("clusterID", conf.GetSchedulerConf().ClusterID))
@@ -466,6 +465,7 @@ func (app *Application) handleRecoverApplicationEvent() {
466465
log.Log(log.ShimCacheApplication).Warn("failed to recover app", zap.Error(err))
467466
dispatcher.Dispatch(NewFailApplicationEvent(app.applicationID, err.Error()))
468467
}
468+
return err
469469
}
470470

471471
func (app *Application) skipReservationStage() bool {

pkg/cache/application_state.go

+9-2
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,13 @@ func NewSubmitApplicationEvent(appID string) SubmitApplicationEvent {
166166
}
167167
}
168168

169+
func NewRecoverApplicationEvent(appID string) SubmitApplicationEvent {
170+
return SubmitApplicationEvent{
171+
applicationID: appID,
172+
event: RecoverApplication,
173+
}
174+
}
175+
169176
func (se SubmitApplicationEvent) GetEvent() string {
170177
return se.event.String()
171178
}
@@ -517,11 +524,11 @@ func newAppState() *fsm.FSM { //nolint:funlen
517524
},
518525
SubmitApplication.String(): func(_ context.Context, event *fsm.Event) {
519526
app := event.Args[0].(*Application) //nolint:errcheck
520-
app.handleSubmitApplicationEvent()
527+
event.Err = app.handleSubmitApplicationEvent()
521528
},
522529
RecoverApplication.String(): func(_ context.Context, event *fsm.Event) {
523530
app := event.Args[0].(*Application) //nolint:errcheck
524-
app.handleRecoverApplicationEvent()
531+
event.Err = app.handleRecoverApplicationEvent()
525532
},
526533
RejectApplication.String(): func(_ context.Context, event *fsm.Event) {
527534
app := event.Args[0].(*Application) //nolint:errcheck

0 commit comments

Comments
 (0)