diff --git a/pkg/cache/context.go b/pkg/cache/context.go index c608b98e7..0c0dd30ab 100644 --- a/pkg/cache/context.go +++ b/pkg/cache/context.go @@ -166,8 +166,6 @@ func (ctx *Context) addNode(obj interface{}) { } func (ctx *Context) updateNode(_, obj interface{}) { - ctx.lock.Lock() - defer ctx.lock.Unlock() node, err := convertToNode(obj) if err != nil { log.Log(log.ShimContext).Error("node conversion failed", zap.Error(err)) @@ -229,8 +227,6 @@ func (ctx *Context) updateNodeInternal(node *v1.Node, register bool) { } func (ctx *Context) deleteNode(obj interface{}) { - ctx.lock.Lock() - defer ctx.lock.Unlock() var node *v1.Node switch t := obj.(type) { case *v1.Node: @@ -250,9 +246,6 @@ func (ctx *Context) deleteNode(obj interface{}) { } func (ctx *Context) addNodesWithoutRegistering(nodes []*v1.Node) { - ctx.lock.Lock() - defer ctx.lock.Unlock() - for _, node := range nodes { ctx.updateNodeInternal(node, false) } @@ -288,9 +281,6 @@ func (ctx *Context) AddPod(obj interface{}) { } func (ctx *Context) UpdatePod(_, newObj interface{}) { - ctx.lock.Lock() - defer ctx.lock.Unlock() - pod, err := utils.Convert2Pod(newObj) if err != nil { log.Log(log.ShimContext).Error("failed to update pod", zap.Error(err)) @@ -338,9 +328,9 @@ func (ctx *Context) ensureAppAndTaskCreated(pod *v1.Pod) { } // add app if it doesn't already exist - app := ctx.getApplication(appMeta.ApplicationID) + app := ctx.GetApplication(appMeta.ApplicationID) if app == nil { - app = ctx.addApplication(&AddApplicationRequest{ + app = ctx.AddApplication(&AddApplicationRequest{ Metadata: appMeta, }) } @@ -444,10 +434,8 @@ func (ctx *Context) DeletePod(obj interface{}) { } func (ctx *Context) deleteYuniKornPod(pod *v1.Pod) { - ctx.lock.Lock() - defer ctx.lock.Unlock() if taskMeta, ok := getTaskMetadata(pod); ok { - if app := ctx.getApplication(taskMeta.ApplicationID); app != nil { + if app := ctx.GetApplication(taskMeta.ApplicationID); app != nil { ctx.notifyTaskComplete(taskMeta.ApplicationID, taskMeta.TaskID) } } @@ -457,9 +445,6 @@ func (ctx *Context) deleteYuniKornPod(pod *v1.Pod) { } func (ctx *Context) deleteForeignPod(pod *v1.Pod) { - ctx.lock.Lock() - defer ctx.lock.Unlock() - oldPod := ctx.schedulerCache.GetPod(string(pod.UID)) if oldPod == nil { // if pod is not in scheduler cache, no node updates are needed @@ -590,8 +575,6 @@ func (ctx *Context) addPriorityClass(obj interface{}) { } func (ctx *Context) updatePriorityClass(_, newObj interface{}) { - ctx.lock.Lock() - defer ctx.lock.Unlock() if priorityClass := utils.Convert2PriorityClass(newObj); priorityClass != nil { ctx.updatePriorityClassInternal(priorityClass) } @@ -602,9 +585,6 @@ func (ctx *Context) updatePriorityClassInternal(priorityClass *schedulingv1.Prio } func (ctx *Context) deletePriorityClass(obj interface{}) { - ctx.lock.Lock() - defer ctx.lock.Unlock() - log.Log(log.ShimContext).Debug("priorityClass deleted") var priorityClass *schedulingv1.PriorityClass switch t := obj.(type) { @@ -670,8 +650,6 @@ func (ctx *Context) EventsToRegister(queueingHintFn framework.QueueingHintFn) [] // IsPodFitNode evaluates given predicates based on current context func (ctx *Context) IsPodFitNode(name, node string, allocate bool) error { - ctx.lock.RLock() - defer ctx.lock.RUnlock() pod := ctx.schedulerCache.GetPod(name) if pod == nil { return ErrorPodNotFound @@ -692,8 +670,6 @@ func (ctx *Context) IsPodFitNode(name, node string, allocate bool) error { } func (ctx *Context) IsPodFitNodeViaPreemption(name, node string, allocations []string, startIndex int) (int, bool) { - ctx.lock.RLock() - defer ctx.lock.RUnlock() if pod := ctx.schedulerCache.GetPod(name); pod != nil { // if pod exists in cache, try to run predicates if targetNode := ctx.schedulerCache.GetNode(node); targetNode != nil { @@ -802,8 +778,6 @@ func (ctx *Context) bindPodVolumes(pod *v1.Pod) error { // this way, the core can make allocation decisions with consideration of // other assumed pods before they are actually bound to the node (bound is slow). func (ctx *Context) AssumePod(name, node string) error { - ctx.lock.Lock() - defer ctx.lock.Unlock() if pod := ctx.schedulerCache.GetPod(name); pod != nil { // when add assumed pod, we make a copy of the pod to avoid // modifying its original reference. otherwise, it may have @@ -863,9 +837,6 @@ func (ctx *Context) AssumePod(name, node string) error { // forget pod must be called when a pod is assumed to be running on a node, // but then for some reason it is failed to bind or released. func (ctx *Context) ForgetPod(name string) { - ctx.lock.Lock() - defer ctx.lock.Unlock() - if pod := ctx.schedulerCache.GetPod(name); pod != nil { log.Log(log.ShimContext).Debug("forget pod", zap.String("pod", pod.Name)) ctx.schedulerCache.ForgetPod(pod) @@ -908,17 +879,11 @@ func (ctx *Context) StartPodAllocation(podKey string, nodeID string) bool { return ctx.schedulerCache.StartPodAllocation(podKey, nodeID) } -func (ctx *Context) NotifyTaskComplete(appID, taskID string) { - ctx.lock.Lock() - defer ctx.lock.Unlock() - ctx.notifyTaskComplete(appID, taskID) -} - func (ctx *Context) notifyTaskComplete(appID, taskID string) { log.Log(log.ShimContext).Debug("NotifyTaskComplete", zap.String("appID", appID), zap.String("taskID", taskID)) - if app := ctx.getApplication(appID); app != nil { + if app := ctx.GetApplication(appID); app != nil { log.Log(log.ShimContext).Debug("release allocation", zap.String("appID", appID), zap.String("taskID", taskID)) @@ -994,10 +959,6 @@ func (ctx *Context) AddApplication(request *AddApplicationRequest) *Application ctx.lock.Lock() defer ctx.lock.Unlock() - return ctx.addApplication(request) -} - -func (ctx *Context) addApplication(request *AddApplicationRequest) *Application { log.Log(log.ShimContext).Debug("AddApplication", zap.Any("Request", request)) if app := ctx.getApplication(request.Metadata.ApplicationID); app != nil { return app @@ -1065,25 +1026,28 @@ func (ctx *Context) getApplication(appID string) *Application { func (ctx *Context) RemoveApplication(appID string) error { ctx.lock.Lock() - defer ctx.lock.Unlock() if app, exist := ctx.applications[appID]; exist { // get the non-terminated task alias nonTerminatedTaskAlias := app.getNonTerminatedTaskAlias() // check there are any non-terminated task or not if len(nonTerminatedTaskAlias) > 0 { - return fmt.Errorf("failed to remove application %s because it still has task in non-terminated task, tasks: %s", appID, strings.Join(nonTerminatedTaskAlias, ",")) + ctx.lock.Unlock() + return fmt.Errorf("failed to remove application %s because it still has task in non-terminated tasks: %s", appID, strings.Join(nonTerminatedTaskAlias, ",")) } + delete(ctx.applications, appID) + ctx.lock.Unlock() // send the update request to scheduler core rr := common.CreateUpdateRequestForRemoveApplication(app.applicationID, app.partition) if err := ctx.apiProvider.GetAPIs().SchedulerAPI.UpdateApplication(rr); err != nil { log.Log(log.ShimContext).Error("failed to send remove application request to core", zap.Error(err)) } - delete(ctx.applications, appID) + log.Log(log.ShimContext).Info("app removed", zap.String("appID", appID)) return nil } + ctx.lock.Unlock() return fmt.Errorf("application %s is not found in the context", appID) } @@ -1099,8 +1063,6 @@ func (ctx *Context) RemoveApplicationInternal(appID string) { // this implements ApplicationManagementProtocol func (ctx *Context) AddTask(request *AddTaskRequest) *Task { - ctx.lock.Lock() - defer ctx.lock.Unlock() return ctx.addTask(request) } @@ -1160,9 +1122,7 @@ func (ctx *Context) RemoveTask(appID, taskID string) { } func (ctx *Context) getTask(appID string, taskID string) *Task { - ctx.lock.RLock() - defer ctx.lock.RUnlock() - app := ctx.getApplication(appID) + app := ctx.GetApplication(appID) if app == nil { log.Log(log.ShimContext).Debug("application is not found in the context", zap.String("appID", appID)) @@ -1684,9 +1644,6 @@ func (ctx *Context) finalizeNodes(existingNodes []*v1.Node) error { nodeMap[node.Name] = node } - ctx.lock.Lock() - defer ctx.lock.Unlock() - // find any existing nodes that no longer exist for _, node := range existingNodes { if _, ok := nodeMap[node.Name]; !ok { diff --git a/pkg/cache/context_test.go b/pkg/cache/context_test.go index c4802d4a3..717c4eb0c 100644 --- a/pkg/cache/context_test.go +++ b/pkg/cache/context_test.go @@ -368,7 +368,7 @@ func TestRemoveApplication(t *testing.T) { assert.Equal(t, len(context.applications), 3) err := context.RemoveApplication(appID1) assert.Assert(t, err != nil) - assert.ErrorContains(t, err, "application app00001 because it still has task in non-terminated task, tasks: /remove-test-00001") + assert.ErrorContains(t, err, "application app00001 because it still has task in non-terminated tasks: /remove-test-00001") app := context.GetApplication(appID1) assert.Assert(t, app != nil) @@ -1103,7 +1103,7 @@ func TestTaskReleaseAfterRecovery(t *testing.T) { assert.Equal(t, len(app.GetBoundTasks()), 2) // release one of the tasks - context.NotifyTaskComplete(appID, pod2UID) + context.notifyTaskComplete(appID, pod2UID) // wait for release err = utils.WaitForCondition(func() bool { @@ -2144,7 +2144,7 @@ func TestTaskRemoveOnCompletion(t *testing.T) { assert.NilError(t, err) // mark completion - context.NotifyTaskComplete(appID, taskUID1) + context.notifyTaskComplete(appID, taskUID1) err = utils.WaitForCondition(func() bool { return task.GetTaskState() == TaskStates().Completed }, 100*time.Millisecond, time.Second) diff --git a/pkg/cache/scheduler_callback_test.go b/pkg/cache/scheduler_callback_test.go index 47d1b146e..59bd875e4 100644 --- a/pkg/cache/scheduler_callback_test.go +++ b/pkg/cache/scheduler_callback_test.go @@ -85,7 +85,6 @@ func TestUpdateAllocation_NewTask_TaskNotFound(t *testing.T) { } func TestUpdateAllocation_NewTask_AssumePodFails(t *testing.T) { - t.Skip("disabled until YUNIKORN-2629 is resolved") // test can randomly trigger a deadlock, resulting in a failed build callback, context := initCallbackTest(t, false, false) defer dispatcher.UnregisterAllEventHandlers() defer dispatcher.Stop()