From 286abb6403d65ad8098e9299c9f66027c5a10def Mon Sep 17 00:00:00 2001 From: ryankert Date: Mon, 12 Aug 2024 04:44:45 +0800 Subject: [PATCH] [YUNIKORN-2724] Improve the signature of methods notifyTaskComplete() and ensureAppAndTaskCreated() (#873) Closes: #873 Signed-off-by: Chia-Ping Tsai --- pkg/cache/context.go | 56 ++++++++++++++++++--------------------- pkg/cache/context_test.go | 4 +-- 2 files changed, 28 insertions(+), 32 deletions(-) diff --git a/pkg/cache/context.go b/pkg/cache/context.go index 211bf0e9b..6b7919237 100644 --- a/pkg/cache/context.go +++ b/pkg/cache/context.go @@ -295,9 +295,9 @@ func (ctx *Context) UpdatePod(_, newObj interface{}) { } func (ctx *Context) updateYuniKornPod(appID string, pod *v1.Pod) { - var app *Application taskID := string(pod.UID) - if app = ctx.getApplication(appID); app != nil { + app := ctx.getApplication(appID) + if app != nil { if task := app.GetTask(taskID); task != nil { task.setTaskPod(pod) } @@ -305,7 +305,7 @@ func (ctx *Context) updateYuniKornPod(appID string, pod *v1.Pod) { // treat terminated pods like a remove if utils.IsPodTerminated(pod) { - ctx.notifyTaskComplete(appID, taskID) + ctx.notifyTaskComplete(app, taskID) log.Log(log.ShimContext).Debug("Request to update terminated pod, removing from cache", zap.String("podName", pod.Name)) ctx.schedulerCache.RemovePod(pod) return @@ -313,23 +313,21 @@ func (ctx *Context) updateYuniKornPod(appID string, pod *v1.Pod) { if ctx.schedulerCache.UpdatePod(pod) { // pod was accepted; ensure the application and task objects have been created - ctx.ensureAppAndTaskCreated(pod) + ctx.ensureAppAndTaskCreated(pod, app) } } -func (ctx *Context) ensureAppAndTaskCreated(pod *v1.Pod) { - // get app metadata - appMeta, ok := getAppMetadata(pod) - if !ok { - log.Log(log.ShimContext).Warn("BUG: Unable to retrieve application metadata from YuniKorn-managed Pod", - zap.String("namespace", pod.Namespace), - zap.String("name", pod.Name)) - return - } - +func (ctx *Context) ensureAppAndTaskCreated(pod *v1.Pod, app *Application) { // add app if it doesn't already exist - app := ctx.GetApplication(appMeta.ApplicationID) if app == nil { + // get app metadata + appMeta, ok := getAppMetadata(pod) + if !ok { + log.Log(log.ShimContext).Warn("BUG: Unable to retrieve application metadata from YuniKorn-managed Pod", + zap.String("namespace", pod.Namespace), + zap.String("name", pod.Name)) + return + } app = ctx.AddApplication(&AddApplicationRequest{ Metadata: appMeta, }) @@ -435,9 +433,7 @@ func (ctx *Context) DeletePod(obj interface{}) { func (ctx *Context) deleteYuniKornPod(pod *v1.Pod) { if taskMeta, ok := getTaskMetadata(pod); ok { - if app := ctx.GetApplication(taskMeta.ApplicationID); app != nil { - ctx.notifyTaskComplete(taskMeta.ApplicationID, taskMeta.TaskID) - } + ctx.notifyTaskComplete(ctx.GetApplication(taskMeta.ApplicationID), taskMeta.TaskID) } log.Log(log.ShimContext).Debug("removing pod from cache", zap.String("podName", pod.Name)) @@ -873,19 +869,19 @@ func (ctx *Context) StartPodAllocation(podKey string, nodeID string) bool { return ctx.schedulerCache.StartPodAllocation(podKey, nodeID) } -func (ctx *Context) notifyTaskComplete(appID, taskID string) { - log.Log(log.ShimContext).Debug("NotifyTaskComplete", - zap.String("appID", appID), - zap.String("taskID", taskID)) - if app := ctx.GetApplication(appID); app != nil { - log.Log(log.ShimContext).Debug("release allocation", - zap.String("appID", appID), +func (ctx *Context) notifyTaskComplete(app *Application, taskID string) { + if app == nil { + log.Log(log.ShimContext).Debug("In notifyTaskComplete but app is nil", zap.String("taskID", taskID)) - ev := NewSimpleTaskEvent(appID, taskID, CompleteTask) - dispatcher.Dispatch(ev) - if app.GetApplicationState() == ApplicationStates().Resuming { - dispatcher.Dispatch(NewSimpleApplicationEvent(appID, AppTaskCompleted)) - } + return + } + log.Log(log.ShimContext).Debug("release allocation in notifyTaskComplete", + zap.String("appID", app.applicationID), + zap.String("taskID", taskID)) + ev := NewSimpleTaskEvent(app.applicationID, taskID, CompleteTask) + dispatcher.Dispatch(ev) + if app.GetApplicationState() == ApplicationStates().Resuming { + dispatcher.Dispatch(NewSimpleApplicationEvent(app.applicationID, AppTaskCompleted)) } } diff --git a/pkg/cache/context_test.go b/pkg/cache/context_test.go index ba661c39b..6d7bfafa4 100644 --- a/pkg/cache/context_test.go +++ b/pkg/cache/context_test.go @@ -1034,7 +1034,7 @@ func TestTaskReleaseAfterRecovery(t *testing.T) { assert.Equal(t, len(app.GetBoundTasks()), 2) // release one of the tasks - context.notifyTaskComplete(appID, pod2UID) + context.notifyTaskComplete(app, pod2UID) // wait for release err = utils.WaitForCondition(func() bool { @@ -2075,7 +2075,7 @@ func TestTaskRemoveOnCompletion(t *testing.T) { assert.NilError(t, err) // mark completion - context.notifyTaskComplete(appID, taskUID1) + context.notifyTaskComplete(app, taskUID1) err = utils.WaitForCondition(func() bool { return task.GetTaskState() == TaskStates().Completed }, 100*time.Millisecond, time.Second)