Skip to content

Commit

Permalink
[YUNIKORN-2180] Clean up scheduler state initialization
Browse files Browse the repository at this point in the history
- Moved all state initialization code into context
- Unified primary and secondary scheduler caches
- Properly handle orphaned pods (those referring to unknown nodes)
  • Loading branch information
craigcondit committed Dec 19, 2023
1 parent a118ba6 commit aa39008
Show file tree
Hide file tree
Showing 44 changed files with 1,805 additions and 4,147 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ module github.com/apache/yunikorn-k8shim
go 1.20

require (
github.com/apache/yunikorn-core v0.0.0-20231203141034-3ae625bcfc07
github.com/apache/yunikorn-scheduler-interface v0.0.0-20231201001639-c81397b31653
github.com/apache/yunikorn-core v0.0.0-20231212223146-620687afe106
github.com/apache/yunikorn-scheduler-interface v0.0.0-20231211235204-ec7bfad7d00e
github.com/google/go-cmp v0.6.0
github.com/google/uuid v1.3.1
github.com/looplab/fsm v1.0.1
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,10 @@ github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRF
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho=
github.com/antlr/antlr4/runtime/Go/antlr v1.4.10 h1:yL7+Jz0jTC6yykIK/Wh74gnTJnrGr5AyrNMXuA0gves=
github.com/antlr/antlr4/runtime/Go/antlr v1.4.10/go.mod h1:F7bn7fEU90QkQ3tnmaTx3LTKLEDqnwWODIYppRQ5hnY=
github.com/apache/yunikorn-core v0.0.0-20231203141034-3ae625bcfc07 h1:DNhQrQJYmPpujUBzLtSxFyV4Y1L69hVXuNiE0+EitYA=
github.com/apache/yunikorn-core v0.0.0-20231203141034-3ae625bcfc07/go.mod h1:JG66N3TskSNVAMoAUbAVagS14ZrOgcjGpRXbcpAMMvI=
github.com/apache/yunikorn-scheduler-interface v0.0.0-20231201001639-c81397b31653 h1:pUbVmmR+LWuy0L8dGCZNue9UNpWKsY7yFYcCtPtWAic=
github.com/apache/yunikorn-scheduler-interface v0.0.0-20231201001639-c81397b31653/go.mod h1:zDWV5y9Zh9DM1C65RCVXT1nhNNO8kykVW7bzPFamNYw=
github.com/apache/yunikorn-core v0.0.0-20231212223146-620687afe106 h1:+r/B8TyLdCcGVU9YLBFou4DdIkTuk8/+HdtzBuc27Cw=
github.com/apache/yunikorn-core v0.0.0-20231212223146-620687afe106/go.mod h1:lSAZNt47HGygsVG6mJTl0rW7acBl3tbN/Fg2wGJXYs8=
github.com/apache/yunikorn-scheduler-interface v0.0.0-20231211235204-ec7bfad7d00e h1:WiDns+JSNrp1jUfTkwtTwVyfxAhe3vPMtxJxs2CRseE=
github.com/apache/yunikorn-scheduler-interface v0.0.0-20231211235204-ec7bfad7d00e/go.mod h1:zDWV5y9Zh9DM1C65RCVXT1nhNNO8kykVW7bzPFamNYw=
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio=
github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a h1:idn718Q4B6AGu/h5Sxe66HYVdqdGu2l9Iebqhi/AEoA=
github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a/go.mod h1:lB+ZfQJz7igIIfQNfa7Ml4HSf2uFQQRzpGGRXenZAgY=
Expand Down
39 changes: 0 additions & 39 deletions pkg/cache/amprotocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,45 +24,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// app management protocol defines all the APIs needed for app management,
// this is the protocol between scheduler cache and app management plugins
type ApplicationManagementProtocol interface {
// returns app that already existed in the cache,
// or nil, false if app with the given appID is not found
GetApplication(appID string) *Application

// add app to the context, app manager needs to provide all
// necessary app metadata through this call. If this a existing app
// for recovery, the AddApplicationRequest#Recovery must be true.
AddApplication(request *AddApplicationRequest) *Application

// remove application from the context
// returns an error if for some reason the app cannot be removed,
// e.g the given app is not found in current context.
RemoveApplication(appID string) error

// add task to the context, if add is successful,
AddTask(request *AddTaskRequest) *Task

// remove task from the app
// return an error if for some reason the task cannot be removed
// e.g app that owns this task is not found in context.
RemoveTask(appID, taskID string)

// notify the context that an app is completed,
// this will trigger some consequent operations for the given app
NotifyApplicationComplete(appID string)

// notify the context that an app has failed,
// this will trigger some consequent operations for the given app
NotifyApplicationFail(appID string)

// notify the context that an task is completed,
// this will trigger some consequent operations for a given task,
// e.g release the allocations that assigned for this task.
NotifyTaskComplete(appID, taskID string)
}

type AddApplicationRequest struct {
Metadata ApplicationMetadata
}
Expand Down
147 changes: 0 additions & 147 deletions pkg/cache/amprotocol_mock.go

This file was deleted.

56 changes: 14 additions & 42 deletions pkg/cache/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,8 +328,8 @@ func (app *Application) SetState(state string) {
app.sm.SetState(state)
}

func (app *Application) TriggerAppRecovery() error {
return app.handle(NewSimpleApplicationEvent(app.applicationID, RecoverApplication))
func (app *Application) TriggerAppSubmission() error {
return app.handle(NewSubmitApplicationEvent(app.applicationID))
}

// Schedule is called in every scheduling interval,
Expand All @@ -344,8 +344,7 @@ func (app *Application) Schedule() bool {
case ApplicationStates().New:
ev := NewSubmitApplicationEvent(app.GetApplicationID())
if err := app.handle(ev); err != nil {
log.Log(log.ShimCacheApplication).Warn("failed to handle SUBMIT app event",
zap.Error(err))
log.Log(log.ShimCacheApplication).Warn("failed to handle SUBMIT app event", zap.Error(err))

Check warning on line 347 in pkg/cache/application.go

View check run for this annotation

Codecov / codecov/patch

pkg/cache/application.go#L347

Added line #L347 was not covered by tests
}
case ApplicationStates().Accepted:
// once the app is accepted by the scheduler core,
Expand Down Expand Up @@ -406,42 +405,12 @@ func (app *Application) scheduleTasks(taskScheduleCondition func(t *Task) bool)
}
}

func (app *Application) handleSubmitApplicationEvent() {
func (app *Application) handleSubmitApplicationEvent() error {
log.Log(log.ShimCacheApplication).Info("handle app submission",
zap.Stringer("app", app),
zap.String("clusterID", conf.GetSchedulerConf().ClusterID))
err := app.schedulerAPI.UpdateApplication(
&si.ApplicationRequest{
New: []*si.AddApplicationRequest{
{
ApplicationID: app.applicationID,
QueueName: app.queue,
PartitionName: app.partition,
Ugi: &si.UserGroupInformation{
User: app.user,
Groups: app.groups,
},
Tags: app.tags,
PlaceholderAsk: app.placeholderAsk,
ExecutionTimeoutMilliSeconds: app.placeholderTimeoutInSec * 1000,
GangSchedulingStyle: app.schedulingStyle,
},
},
RmID: conf.GetSchedulerConf().ClusterID,
})

if err != nil {
// submission failed
log.Log(log.ShimCacheApplication).Warn("failed to submit app", zap.Error(err))
dispatcher.Dispatch(NewFailApplicationEvent(app.applicationID, err.Error()))
}
}

func (app *Application) handleRecoverApplicationEvent() {
log.Log(log.ShimCacheApplication).Info("handle app recovering",
zap.Stringer("app", app),
zap.String("clusterID", conf.GetSchedulerConf().ClusterID))
err := app.schedulerAPI.UpdateApplication(
if err := app.schedulerAPI.UpdateApplication(
&si.ApplicationRequest{
New: []*si.AddApplicationRequest{
{
Expand All @@ -459,13 +428,13 @@ func (app *Application) handleRecoverApplicationEvent() {
},
},
RmID: conf.GetSchedulerConf().ClusterID,
})

if err != nil {
// recovery failed
log.Log(log.ShimCacheApplication).Warn("failed to recover app", zap.Error(err))
}); err != nil {
// submission failed
log.Log(log.ShimCacheApplication).Warn("failed to submit new app request to core", zap.Error(err))

Check warning on line 433 in pkg/cache/application.go

View check run for this annotation

Codecov / codecov/patch

pkg/cache/application.go#L432-L433

Added lines #L432 - L433 were not covered by tests
dispatcher.Dispatch(NewFailApplicationEvent(app.applicationID, err.Error()))
return err

Check warning on line 435 in pkg/cache/application.go

View check run for this annotation

Codecov / codecov/patch

pkg/cache/application.go#L435

Added line #L435 was not covered by tests
}
return nil
}

func (app *Application) skipReservationStage() bool {
Expand Down Expand Up @@ -572,8 +541,11 @@ func (app *Application) handleRejectApplicationEvent(reason string) {
fmt.Sprintf("%s: %s", constants.ApplicationRejectedFailure, reason)))
}

func (app *Application) handleAcceptApplicationEvent() {
log.Log(log.ShimCacheApplication).Info("app is accepted by scheduler", zap.String("appID", app.applicationID))
}

func (app *Application) handleCompleteApplicationEvent() {
// TODO app lifecycle updates
go func() {
getPlaceholderManager().cleanUp(app)
}()
Expand Down
Loading

0 comments on commit aa39008

Please sign in to comment.