diff --git a/pkg/scheduler/context.go b/pkg/scheduler/context.go index 55662bb8d..6f0b65eab 100644 --- a/pkg/scheduler/context.go +++ b/pkg/scheduler/context.go @@ -20,6 +20,7 @@ package scheduler import ( "fmt" + "github.com/apache/incubator-yunikorn-core/pkg/common/security" "math" "strconv" "sync" @@ -490,6 +491,8 @@ func (cc *ClusterContext) handleRMUpdateApplicationEvent(event *rmevent.RMUpdate ApplicationID: app.ApplicationID, Reason: msg, }) + rejectedApp := objects.NewApplication(app, security.UserGroup{}, cc.rmEventHandler, request.RmID) + partition.addRejectedApplication(rejectedApp,msg) log.Logger().Error("Failed to add application to non existing partition", zap.String("applicationID", app.ApplicationID), zap.String("partitionName", app.PartitionName)) @@ -503,6 +506,8 @@ func (cc *ClusterContext) handleRMUpdateApplicationEvent(event *rmevent.RMUpdate ApplicationID: app.ApplicationID, Reason: err.Error(), }) + rejectedApp := objects.NewApplication(app, ugi, cc.rmEventHandler, request.RmID) + partition.addRejectedApplication(rejectedApp,err.Error()) log.Logger().Error("Failed to add application to partition (user rejected)", zap.String("applicationID", app.ApplicationID), zap.String("partitionName", app.PartitionName), @@ -516,6 +521,8 @@ func (cc *ClusterContext) handleRMUpdateApplicationEvent(event *rmevent.RMUpdate ApplicationID: app.ApplicationID, Reason: err.Error(), }) + rejectedApp := objects.NewApplication(app, ugi, cc.rmEventHandler, request.RmID) + partition.addRejectedApplication(rejectedApp,err.Error()) log.Logger().Error("Failed to add application to partition (placement rejected)", zap.String("applicationID", app.ApplicationID), zap.String("partitionName", app.PartitionName), diff --git a/pkg/scheduler/objects/application.go b/pkg/scheduler/objects/application.go index 166f5bbc8..61ac32de1 100644 --- a/pkg/scheduler/objects/application.go +++ b/pkg/scheduler/objects/application.go @@ -79,6 +79,7 @@ type Application struct { placeholderTimer *time.Timer // placeholder replace timer gangSchedulingStyle string // gang scheduling style can be hard (after timeout we fail the application), or soft (after timeeout we schedule it as a normal application) finishedTime time.Time // the time of finishing this application. the default value is zero time + rejectionMessage string // If the application is rejected, save the rejection message rmEventHandler handler.EventHandler rmID string @@ -1507,3 +1508,22 @@ func (sa *Application) IsAllocationAssignedToApp(alloc *Allocation) bool { _, ok := sa.allocations[alloc.UUID] return ok } + +func (sa *Application) SetRejectionMessage(rejectionMessage string) { + sa.Lock() + defer sa.Unlock() + sa.rejectionMessage = rejectionMessage +} + +func (sa *Application) GetRejectionMessage() string { + sa.RLock() + defer sa.RUnlock() + return sa.rejectionMessage +} + +//only for rejected application +func (sa *Application) SetFinishedTime() { + sa.Lock() + defer sa.Unlock() + sa.finishedTime = time.Now() +} diff --git a/pkg/scheduler/objects/application_state.go b/pkg/scheduler/objects/application_state.go index 26a608b70..5a9396b8c 100644 --- a/pkg/scheduler/objects/application_state.go +++ b/pkg/scheduler/objects/application_state.go @@ -112,7 +112,7 @@ func NewAppState() *fsm.FSM { Dst: Resuming.String(), }, { Name: ExpireApplication.String(), - Src: []string{Completed.String(), Failed.String()}, + Src: []string{Completed.String(), Failed.String(),Rejected.String()}, Dst: Expired.String(), }, }, @@ -156,6 +156,7 @@ func NewAppState() *fsm.FSM { app := event.Args[0].(*Application) //nolint:errcheck metrics.GetQueueMetrics(app.QueuePath).IncQueueApplicationsRejected() metrics.GetSchedulerMetrics().IncTotalApplicationsRejected() + app.setStateTimer(terminatedTimeout, app.stateMachine.Current(), ExpireApplication) }, fmt.Sprintf("enter_%s", Running.String()): func(event *fsm.Event) { app := event.Args[0].(*Application) //nolint:errcheck diff --git a/pkg/scheduler/partition.go b/pkg/scheduler/partition.go index 9f81c73a1..3e23b534a 100644 --- a/pkg/scheduler/partition.go +++ b/pkg/scheduler/partition.go @@ -50,6 +50,7 @@ type PartitionContext struct { root *objects.Queue // start of the queue hierarchy applications map[string]*objects.Application // applications assigned to this partition completedApplications map[string]*objects.Application // completed applications from this partition + rejectedApplications map[string]*objects.Application // rejected applications from this partition reservedApps map[string]int // applications reserved within this partition, with reservation count nodes objects.NodeCollection // nodes assigned to this partition placementManager *placement.AppPlacementManager // placement manager for this partition @@ -443,6 +444,13 @@ func (pc *PartitionContext) getApplication(appID string) *objects.Application { return pc.applications[appID] } +func (pc *PartitionContext) getRejectedApplication(appID string) *objects.Application { + pc.RLock() + defer pc.RUnlock() + + return pc.rejectedApplications[appID] +} + // Return a copy of the map of all reservations for the partition. // This will return an empty map if there are no reservations. // Visible for tests @@ -1070,6 +1078,16 @@ func (pc *PartitionContext) GetCompletedApplications() []*objects.Application { return appList } +func (pc *PartitionContext) GetRejectedApplications() []*objects.Application { + pc.RLock() + defer pc.RUnlock() + var appList []*objects.Application + for _, app := range pc.rejectedApplications { + appList = append(appList, app) + } + return appList +} + func (pc *PartitionContext) GetAppsByState(state string) []*objects.Application { pc.RLock() defer pc.RUnlock() @@ -1083,6 +1101,15 @@ func (pc *PartitionContext) GetAppsByState(state string) []*objects.Application return appList } + if state == objects.Rejected.String() { + for _, app := range pc.rejectedApplications { + if app.CurrentState() == state { + appList = append(appList, app) + } + } + return appList + } + for _, app := range pc.applications { if app.CurrentState() == state { appList = append(appList, app) @@ -1091,6 +1118,18 @@ func (pc *PartitionContext) GetAppsByState(state string) []*objects.Application return appList } +func (pc *PartitionContext) GetRejectedAppsByState(state string) []*objects.Application { + pc.RLock() + defer pc.RUnlock() + var appList []*objects.Application + for _, app := range pc.rejectedApplications { + if app.CurrentState() == state { + appList = append(appList, app) + } + } + return appList +} + func (pc *PartitionContext) GetAppsInTerminatedState() []*objects.Application { pc.RLock() defer pc.RUnlock() @@ -1382,6 +1421,11 @@ func (pc *PartitionContext) cleanupExpiredApps() { delete(pc.applications, app.ApplicationID) pc.Unlock() } + for _, app := range pc.GetRejectedAppsByState(objects.Expired.String()) { + pc.Lock() + delete(pc.rejectedApplications, app.ApplicationID) + pc.Unlock() + } } func (pc *PartitionContext) GetCurrentState() string { @@ -1445,3 +1489,20 @@ func (pc *PartitionContext) hasUnlimitedNode() bool { } return false } + +func (pc *PartitionContext) addRejectedApplication(rejectedApplication *objects.Application,rejectionMessage string) error { + if err := rejectedApplication.HandleApplicationEvent(objects.RejectApplication); err != nil { + log.Logger().Warn("Application state not changed to Rejected", + zap.String("currentState", rejectedApplication.CurrentState()), + zap.Error(err)) + return err + } + rejectedApplication.HandleApplicationEvent(objects.RejectApplication) + rejectedApplication.SetFinishedTime() + rejectedApplication.SetRejectionMessage(rejectionMessage) + if pc.rejectedApplications == nil{ + pc.rejectedApplications = make(map[string]*objects.Application) + } + pc.rejectedApplications[rejectedApplication.ApplicationID] = rejectedApplication + return nil +} diff --git a/pkg/scheduler/partition_manager.go b/pkg/scheduler/partition_manager.go index f54ec4884..d185ea073 100644 --- a/pkg/scheduler/partition_manager.go +++ b/pkg/scheduler/partition_manager.go @@ -53,10 +53,11 @@ func newPartitionManager(pc *PartitionContext, cc *ClusterContext) *partitionMan } // Run the manager for the partition. -// The manager has three tasks: +// The manager has four tasks: // - clean up the managed queues that are empty and removed from the configuration // - remove empty unmanaged queues // - remove completed applications from the partition +// - remove rejected applications from the partition // When the manager exits the partition is removed from the system and must be cleaned up func (manager *partitionManager) Run() { log.Logger().Info("starting partition manager", diff --git a/pkg/scheduler/partition_test.go b/pkg/scheduler/partition_test.go index 21bcbaa65..23bcd7b39 100644 --- a/pkg/scheduler/partition_test.go +++ b/pkg/scheduler/partition_test.go @@ -1376,6 +1376,33 @@ func TestCleanupCompletedApps(t *testing.T) { assert.Assert(t, len(partition.GetAppsByState(objects.Expired.String())) == 0, "the partition should have 0 expired app") } +func TestCleanupRejectedApps(t *testing.T) { + partition, err := newBasePartition() + assert.NilError(t, err, "partition create failed") + rejectedApp1 := newApplication("new", "default", defQueue) + rejectedApp2 := newApplication("running", "default", defQueue) + rejectedApp2.SetState(objects.Running.String()) + + rejectionMessage := fmt.Sprintf("Failed to add application %s to partition %s, partition doesn't exist", rejectedApp1.ApplicationID, rejectedApp1.Partition) + err = partition.addRejectedApplication(rejectedApp1,rejectionMessage) + assert.NilError(t, err, "no error expected while adding the application from new to rejected state") + assert.Assert(t, len(partition.rejectedApplications) == 1, "the rejectedApplications of the partition should have 1 app") + assert.Equal(t, rejectedApp1.CurrentState(), objects.Rejected.String()) + assert.Equal(t, rejectedApp1.GetRejectionMessage(), rejectionMessage) + assert.Assert(t,!rejectedApp1.FinishedTime().IsZero()) + + err = partition.addRejectedApplication(rejectedApp2,rejectionMessage) + assert.Assert(t,err!=nil,"error expected while adding the application from running to rejected state") + assert.Assert(t, len(partition.rejectedApplications) == 1, "the rejectedApplications of the partition should have 1 app") + + rejectedApp1.SetState(objects.Expired.String()) + partition.cleanupExpiredApps() + assert.Assert(t, len(partition.rejectedApplications) == 0, "the partition should not have app") + assert.Assert(t, partition.getRejectedApplication(rejectedApp1.ApplicationID) == nil, "rejected application should have been deleted") + assert.Assert(t, len(partition.GetRejectedAppsByState(objects.Rejected.String())) == 0, "the partition should have 0 rejected app") + assert.Assert(t, len(partition.GetRejectedAppsByState(objects.Expired.String())) == 0, "the partition should have 0 expired app") +} + func TestUpdateNode(t *testing.T) { partition, err := newBasePartition() assert.NilError(t, err, "test partition create failed with error") diff --git a/pkg/webservice/dao/application_info.go b/pkg/webservice/dao/application_info.go index e2c52641e..97e09cabb 100644 --- a/pkg/webservice/dao/application_info.go +++ b/pkg/webservice/dao/application_info.go @@ -33,6 +33,7 @@ type ApplicationDAOInfo struct { Allocations []AllocationDAOInfo `json:"allocations"` State string `json:"applicationState"` User string `json:"user"` + RejectionMessage string `json:"rejectionMessage"` } type AllocationDAOInfo struct { diff --git a/pkg/webservice/handlers.go b/pkg/webservice/handlers.go index 04b03b0c3..ca4e98251 100644 --- a/pkg/webservice/handlers.go +++ b/pkg/webservice/handlers.go @@ -129,6 +129,9 @@ func getApplicationsInfo(w http.ResponseWriter, r *http.Request) { for _, app := range partition.GetCompletedApplications() { addDao(app) } + for _, app := range partition.GetRejectedApplications() { + addDao(app) + } } if err := json.NewEncoder(w).Encode(appsDao); err != nil { @@ -300,6 +303,7 @@ func getApplicationJSON(app *objects.Application) *dao.ApplicationDAOInfo { Allocations: allocationInfos, State: app.CurrentState(), User: app.GetUser().User, + RejectionMessage: app.GetRejectionMessage(), } }