Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[YUNIKORN-2180] Clean up scheduler state initialization #734

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 0 additions & 39 deletions pkg/cache/amprotocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,45 +24,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// app management protocol defines all the APIs needed for app management,
// this is the protocol between scheduler cache and app management plugins
type ApplicationManagementProtocol interface {
// returns app that already existed in the cache,
// or nil, false if app with the given appID is not found
GetApplication(appID string) *Application

// add app to the context, app manager needs to provide all
// necessary app metadata through this call. If this a existing app
// for recovery, the AddApplicationRequest#Recovery must be true.
AddApplication(request *AddApplicationRequest) *Application

// remove application from the context
// returns an error if for some reason the app cannot be removed,
// e.g the given app is not found in current context.
RemoveApplication(appID string) error

// add task to the context, if add is successful,
AddTask(request *AddTaskRequest) *Task

// remove task from the app
// return an error if for some reason the task cannot be removed
// e.g app that owns this task is not found in context.
RemoveTask(appID, taskID string)

// notify the context that an app is completed,
// this will trigger some consequent operations for the given app
NotifyApplicationComplete(appID string)

// notify the context that an app has failed,
// this will trigger some consequent operations for the given app
NotifyApplicationFail(appID string)

// notify the context that an task is completed,
// this will trigger some consequent operations for a given task,
// e.g release the allocations that assigned for this task.
NotifyTaskComplete(appID, taskID string)
}

type AddApplicationRequest struct {
Metadata ApplicationMetadata
}
Expand Down
147 changes: 0 additions & 147 deletions pkg/cache/amprotocol_mock.go

This file was deleted.

49 changes: 9 additions & 40 deletions pkg/cache/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,8 +328,8 @@
app.sm.SetState(state)
}

func (app *Application) TriggerAppRecovery() error {
return app.handle(NewSimpleApplicationEvent(app.applicationID, RecoverApplication))
func (app *Application) TriggerAppSubmission() error {
return app.handle(NewSubmitApplicationEvent(app.applicationID))
}

// Schedule is called in every scheduling interval,
Expand Down Expand Up @@ -406,42 +406,12 @@
}
}

func (app *Application) handleSubmitApplicationEvent() {
func (app *Application) handleSubmitApplicationEvent() error {
log.Log(log.ShimCacheApplication).Info("handle app submission",
zap.Stringer("app", app),
zap.String("clusterID", conf.GetSchedulerConf().ClusterID))
err := app.schedulerAPI.UpdateApplication(
&si.ApplicationRequest{
New: []*si.AddApplicationRequest{
{
ApplicationID: app.applicationID,
QueueName: app.queue,
PartitionName: app.partition,
Ugi: &si.UserGroupInformation{
User: app.user,
Groups: app.groups,
},
Tags: app.tags,
PlaceholderAsk: app.placeholderAsk,
ExecutionTimeoutMilliSeconds: app.placeholderTimeoutInSec * 1000,
GangSchedulingStyle: app.schedulingStyle,
},
},
RmID: conf.GetSchedulerConf().ClusterID,
})

if err != nil {
// submission failed
log.Log(log.ShimCacheApplication).Warn("failed to submit app", zap.Error(err))
dispatcher.Dispatch(NewFailApplicationEvent(app.applicationID, err.Error()))
}
}

func (app *Application) handleRecoverApplicationEvent() {
log.Log(log.ShimCacheApplication).Info("handle app recovering",
zap.Stringer("app", app),
zap.String("clusterID", conf.GetSchedulerConf().ClusterID))
err := app.schedulerAPI.UpdateApplication(
if err := app.schedulerAPI.UpdateApplication(
&si.ApplicationRequest{
New: []*si.AddApplicationRequest{
{
Expand All @@ -459,13 +429,13 @@
},
},
RmID: conf.GetSchedulerConf().ClusterID,
})

if err != nil {
// recovery failed
log.Log(log.ShimCacheApplication).Warn("failed to recover app", zap.Error(err))
}); err != nil {
// submission failed
log.Log(log.ShimCacheApplication).Warn("failed to submit new app request to core", zap.Error(err))

Check warning on line 434 in pkg/cache/application.go

View check run for this annotation

Codecov / codecov/patch

pkg/cache/application.go#L433-L434

Added lines #L433 - L434 were not covered by tests
dispatcher.Dispatch(NewFailApplicationEvent(app.applicationID, err.Error()))
return err

Check warning on line 436 in pkg/cache/application.go

View check run for this annotation

Codecov / codecov/patch

pkg/cache/application.go#L436

Added line #L436 was not covered by tests
}
return nil
}

func (app *Application) skipReservationStage() bool {
Expand Down Expand Up @@ -573,7 +543,6 @@
}

func (app *Application) handleCompleteApplicationEvent() {
// TODO app lifecycle updates
go func() {
getPlaceholderManager().cleanUp(app)
}()
Expand Down
66 changes: 27 additions & 39 deletions pkg/cache/application_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ type ApplicationEventType int

const (
SubmitApplication ApplicationEventType = iota
RecoverApplication
AcceptApplication
TryReserve
UpdateReservation
Expand Down Expand Up @@ -361,37 +360,35 @@ func (re ResumingApplicationEvent) GetApplicationID() string {
var storeApplicationStates *AStates

type AStates struct {
New string
Recovering string
Submitted string
Accepted string
Reserving string
Running string
Rejected string
Completed string
Killing string
Killed string
Failing string
Failed string
Resuming string
New string
Submitted string
Accepted string
Reserving string
Running string
Rejected string
Completed string
Killing string
Killed string
Failing string
Failed string
Resuming string
}

func ApplicationStates() *AStates {
applicationStatesOnce.Do(func() {
storeApplicationStates = &AStates{
New: "New",
Recovering: "Recovering",
Submitted: "Submitted",
Accepted: "Accepted",
Reserving: "Reserving",
Running: "Running",
Rejected: "Rejected",
Completed: "Completed",
Killing: "Killing",
Killed: "Killed",
Failed: "Failed",
Failing: "Failing",
Resuming: "Resuming",
New: "New",
Submitted: "Submitted",
Accepted: "Accepted",
Reserving: "Reserving",
Running: "Running",
Rejected: "Rejected",
Completed: "Completed",
Killing: "Killing",
Killed: "Killed",
Failed: "Failed",
Failing: "Failing",
Resuming: "Resuming",
}
})
return storeApplicationStates
Expand All @@ -406,14 +403,9 @@ func newAppState() *fsm.FSM { //nolint:funlen
Src: []string{states.New},
Dst: states.Submitted,
},
{
Name: RecoverApplication.String(),
Src: []string{states.New},
Dst: states.Recovering,
},
{
Name: AcceptApplication.String(),
Src: []string{states.Submitted, states.Recovering},
Src: []string{states.Submitted},
Dst: states.Accepted,
},
{
Expand Down Expand Up @@ -478,7 +470,7 @@ func newAppState() *fsm.FSM { //nolint:funlen
},
{
Name: RejectApplication.String(),
Src: []string{states.Submitted, states.Recovering},
Src: []string{states.Submitted},
Dst: states.Rejected,
},
{
Expand Down Expand Up @@ -517,11 +509,7 @@ func newAppState() *fsm.FSM { //nolint:funlen
},
SubmitApplication.String(): func(_ context.Context, event *fsm.Event) {
app := event.Args[0].(*Application) //nolint:errcheck
app.handleSubmitApplicationEvent()
},
RecoverApplication.String(): func(_ context.Context, event *fsm.Event) {
app := event.Args[0].(*Application) //nolint:errcheck
app.handleRecoverApplicationEvent()
event.Err = app.handleSubmitApplicationEvent()
},
RejectApplication.String(): func(_ context.Context, event *fsm.Event) {
app := event.Args[0].(*Application) //nolint:errcheck
Expand Down
Loading