diff --git a/pkg/cache/application.go b/pkg/cache/application.go index 0f787e018..a9e48491d 100644 --- a/pkg/cache/application.go +++ b/pkg/cache/application.go @@ -319,6 +319,10 @@ func (app *Application) getNonTerminatedTaskAlias() []string { return nonTerminatedTaskAlias } +func (app *Application) IsAllTasksTerminated() bool { + return len(app.getNonTerminatedTaskAlias()) == 0 +} + // SetState is only for testing // this is just used for testing, it is not supposed to change state like this func (app *Application) SetState(state string) { diff --git a/pkg/shim/scheduler.go b/pkg/shim/scheduler.go index c7c418709..7458a7d05 100644 --- a/pkg/shim/scheduler.go +++ b/pkg/shim/scheduler.go @@ -188,6 +188,26 @@ func (ss *KubernetesShim) registerShimLayer() error { func (ss *KubernetesShim) schedule() { apps := ss.context.GetAllApplications() for _, app := range apps { + // Clean up terminal failed apps for shim side + // 1. When we reject an app, we set the app state to Rejected, and immediately set it to Failed, but we don't clean up the app. + // 2. When we failed an app, we set the app state to Failed, but we don't clean up the app. + // 3. The completed app already handled by UpdateApplication function. + // case cache.ApplicationStates().Completed: + // callback.context.RemoveApplicationInternal(updated.ApplicationID) + // 4. The killed status is not used until now, so we don't need to handle it. + if app.GetApplicationState() == cache.ApplicationStates().Failed { + if app.IsAllTasksTerminated() { + log.Log(log.ShimScheduler).Info("Clean up failed application", + zap.String("appID", app.GetApplicationID())) + ss.context.RemoveApplicationInternal(app.GetApplicationID()) + + } else { + log.Log(log.ShimScheduler).Info("Failed application is not cleaned up due to not all tasks terminated, wait for next scheduling iteration", + zap.String("appID", app.GetApplicationID())) + } + continue + } + if app.Schedule() { ss.setOutstandingAppsFound(true) }