From 89d6ecf0cbebf8b20a505b0d0da7da13231a6568 Mon Sep 17 00:00:00 2001 From: ryankert Date: Mon, 8 Jul 2024 23:54:38 +0800 Subject: [PATCH 01/10] [YUNIKORN-2724] Improve the signature of methods notifyTaskComplete() and ensureAppAndTaskCreated() --- pkg/cache/context.go | 28 +++++++++++++--------------- pkg/cache/context_test.go | 4 ++-- 2 files changed, 15 insertions(+), 17 deletions(-) diff --git a/pkg/cache/context.go b/pkg/cache/context.go index f28705197..c08c941df 100644 --- a/pkg/cache/context.go +++ b/pkg/cache/context.go @@ -308,7 +308,7 @@ func (ctx *Context) updateYuniKornPod(pod *v1.Pod) { if utils.IsPodTerminated(pod) { if taskMeta, ok := getTaskMetadata(pod); ok { if app := ctx.getApplication(taskMeta.ApplicationID); app != nil { - ctx.notifyTaskComplete(taskMeta.ApplicationID, taskMeta.TaskID) + ctx.notifyTaskComplete(app, taskMeta.TaskID) } } @@ -444,7 +444,7 @@ func (ctx *Context) deleteYuniKornPod(pod *v1.Pod) { defer ctx.lock.Unlock() if taskMeta, ok := getTaskMetadata(pod); ok { if app := ctx.getApplication(taskMeta.ApplicationID); app != nil { - ctx.notifyTaskComplete(taskMeta.ApplicationID, taskMeta.TaskID) + ctx.notifyTaskComplete(app, taskMeta.TaskID) } } @@ -904,25 +904,23 @@ func (ctx *Context) StartPodAllocation(podKey string, nodeID string) bool { return ctx.schedulerCache.StartPodAllocation(podKey, nodeID) } -func (ctx *Context) NotifyTaskComplete(appID, taskID string) { +func (ctx *Context) NotifyTaskComplete(app *Application, taskID string) { ctx.lock.Lock() defer ctx.lock.Unlock() - ctx.notifyTaskComplete(appID, taskID) + ctx.notifyTaskComplete(app, taskID) } -func (ctx *Context) notifyTaskComplete(appID, taskID string) { +func (ctx *Context) notifyTaskComplete(app *Application, taskID string) { log.Log(log.ShimContext).Debug("NotifyTaskComplete", - zap.String("appID", appID), + zap.String("appID", app.applicationID), zap.String("taskID", taskID)) - if app := ctx.getApplication(appID); app != nil { - log.Log(log.ShimContext).Debug("release allocation", - zap.String("appID", appID), - zap.String("taskID", taskID)) - ev := NewSimpleTaskEvent(appID, taskID, CompleteTask) - dispatcher.Dispatch(ev) - if app.GetApplicationState() == ApplicationStates().Resuming { - dispatcher.Dispatch(NewSimpleApplicationEvent(appID, AppTaskCompleted)) - } + log.Log(log.ShimContext).Debug("release allocation", + zap.String("appID", app.applicationID), + zap.String("taskID", taskID)) + ev := NewSimpleTaskEvent(app.applicationID, taskID, CompleteTask) + dispatcher.Dispatch(ev) + if app.GetApplicationState() == ApplicationStates().Resuming { + dispatcher.Dispatch(NewSimpleApplicationEvent(app.applicationID, AppTaskCompleted)) } } diff --git a/pkg/cache/context_test.go b/pkg/cache/context_test.go index cff04ee8d..51da570d6 100644 --- a/pkg/cache/context_test.go +++ b/pkg/cache/context_test.go @@ -1089,7 +1089,7 @@ func TestTaskReleaseAfterRecovery(t *testing.T) { assert.Equal(t, len(app.GetBoundTasks()), 2) // release one of the tasks - context.NotifyTaskComplete(appID, pod2UID) + context.NotifyTaskComplete(app, pod2UID) // wait for release err = utils.WaitForCondition(func() bool { @@ -2123,7 +2123,7 @@ func TestTaskRemoveOnCompletion(t *testing.T) { assert.NilError(t, err) // mark completion - context.NotifyTaskComplete(appID, taskUID1) + context.NotifyTaskComplete(app, taskUID1) err = utils.WaitForCondition(func() bool { return task.GetTaskState() == TaskStates().Completed }, 100*time.Millisecond, time.Second) From c46c7a8a255cd7022623c33da6e3ab172f7e27e2 Mon Sep 17 00:00:00 2001 From: ryankert Date: Thu, 8 Aug 2024 20:52:28 +0800 Subject: [PATCH 02/10] code review 1 --- pkg/cache/context.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/cache/context.go b/pkg/cache/context.go index 9011434a0..d79b7866b 100644 --- a/pkg/cache/context.go +++ b/pkg/cache/context.go @@ -305,9 +305,9 @@ func (ctx *Context) UpdatePod(_, newObj interface{}) { } func (ctx *Context) updateYuniKornPod(appID string, pod *v1.Pod) { - var app *Application taskID := string(pod.UID) - if app = ctx.getApplication(appID); app != nil { + app := ctx.getApplication(appID) + if app != nil { if task := app.GetTask(taskID); task != nil { task.setTaskPod(pod) } From 99036524b5b254f4fc7a25b63b9bfed6e3153afa Mon Sep 17 00:00:00 2001 From: ryankert Date: Thu, 8 Aug 2024 20:54:33 +0800 Subject: [PATCH 03/10] code review 2 --- pkg/cache/context.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/pkg/cache/context.go b/pkg/cache/context.go index d79b7866b..af9ccccfd 100644 --- a/pkg/cache/context.go +++ b/pkg/cache/context.go @@ -917,10 +917,7 @@ func (ctx *Context) NotifyTaskComplete(app *Application, taskID string) { } func (ctx *Context) notifyTaskComplete(app *Application, taskID string) { - log.Log(log.ShimContext).Debug("NotifyTaskComplete", - zap.String("appID", app.applicationID), - zap.String("taskID", taskID)) - log.Log(log.ShimContext).Debug("release allocation", + log.Log(log.ShimContext).Debug("NotifyTaskComplete and release allocation", zap.String("appID", app.applicationID), zap.String("taskID", taskID)) ev := NewSimpleTaskEvent(app.applicationID, taskID, CompleteTask) From d74d0dca39dbb50112f0f79ef5fe70a0ebda8e80 Mon Sep 17 00:00:00 2001 From: ryankert Date: Thu, 8 Aug 2024 21:03:13 +0800 Subject: [PATCH 04/10] cleanup ensureAppAndTaskCreated --- pkg/cache/context.go | 22 ++++++++++------------ 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/pkg/cache/context.go b/pkg/cache/context.go index af9ccccfd..e72b5bcfa 100644 --- a/pkg/cache/context.go +++ b/pkg/cache/context.go @@ -325,23 +325,21 @@ func (ctx *Context) updateYuniKornPod(appID string, pod *v1.Pod) { if ctx.schedulerCache.UpdatePod(pod) { // pod was accepted; ensure the application and task objects have been created - ctx.ensureAppAndTaskCreated(pod) + ctx.ensureAppAndTaskCreated(pod, app) } } -func (ctx *Context) ensureAppAndTaskCreated(pod *v1.Pod) { - // get app metadata - appMeta, ok := getAppMetadata(pod) - if !ok { - log.Log(log.ShimContext).Warn("BUG: Unable to retrieve application metadata from YuniKorn-managed Pod", - zap.String("namespace", pod.Namespace), - zap.String("name", pod.Name)) - return - } - +func (ctx *Context) ensureAppAndTaskCreated(pod *v1.Pod, app *Application) { // add app if it doesn't already exist - app := ctx.getApplication(appMeta.ApplicationID) if app == nil { + // get app metadata + appMeta, ok := getAppMetadata(pod) + if !ok { + log.Log(log.ShimContext).Warn("BUG: Unable to retrieve application metadata from YuniKorn-managed Pod", + zap.String("namespace", pod.Namespace), + zap.String("name", pod.Name)) + return + } app = ctx.addApplication(&AddApplicationRequest{ Metadata: appMeta, }) From 5721af1578fea2d388c85e1593b69b7dac14ea96 Mon Sep 17 00:00:00 2001 From: ryankert Date: Thu, 8 Aug 2024 21:23:32 +0800 Subject: [PATCH 05/10] very complex merge --- pkg/cache/context.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/pkg/cache/context.go b/pkg/cache/context.go index 306e093eb..f905db765 100644 --- a/pkg/cache/context.go +++ b/pkg/cache/context.go @@ -435,8 +435,6 @@ func (ctx *Context) DeletePod(obj interface{}) { func (ctx *Context) deleteYuniKornPod(pod *v1.Pod) { if taskMeta, ok := getTaskMetadata(pod); ok { - if app := ctx.getApplication(taskMeta.ApplicationID); app != nil { - ctx.notifyTaskComplete(app, taskMeta.TaskID) if app := ctx.GetApplication(taskMeta.ApplicationID); app != nil { ctx.notifyTaskComplete(taskMeta.ApplicationID, taskMeta.TaskID) } From 0468d36d8bb8cd808acade0207805a35ad5fa61b Mon Sep 17 00:00:00 2001 From: ryankert Date: Thu, 8 Aug 2024 21:34:04 +0800 Subject: [PATCH 06/10] fix comficts --- pkg/cache/context.go | 18 ++++++------------ 1 file changed, 6 insertions(+), 12 deletions(-) diff --git a/pkg/cache/context.go b/pkg/cache/context.go index f905db765..cb1302edc 100644 --- a/pkg/cache/context.go +++ b/pkg/cache/context.go @@ -330,7 +330,7 @@ func (ctx *Context) ensureAppAndTaskCreated(pod *v1.Pod, app *Application) { zap.String("name", pod.Name)) return } - app = ctx.addApplication(&AddApplicationRequest{ + app = ctx.AddApplication(&AddApplicationRequest{ Metadata: appMeta, }) } @@ -436,7 +436,7 @@ func (ctx *Context) DeletePod(obj interface{}) { func (ctx *Context) deleteYuniKornPod(pod *v1.Pod) { if taskMeta, ok := getTaskMetadata(pod); ok { if app := ctx.GetApplication(taskMeta.ApplicationID); app != nil { - ctx.notifyTaskComplete(taskMeta.ApplicationID, taskMeta.TaskID) + ctx.notifyTaskComplete(app, taskMeta.TaskID) } } @@ -880,22 +880,16 @@ func (ctx *Context) NotifyTaskComplete(app *Application, taskID string) { } func (ctx *Context) notifyTaskComplete(app *Application, taskID string) { - log.Log(log.ShimContext).Debug("NotifyTaskComplete and release allocation", + log.Log(log.ShimContext).Debug("NotifyTaskComplete", + zap.String("appID", app.applicationID), + zap.String("taskID", taskID)) + log.Log(log.ShimContext).Debug("release allocation", zap.String("appID", app.applicationID), zap.String("taskID", taskID)) ev := NewSimpleTaskEvent(app.applicationID, taskID, CompleteTask) dispatcher.Dispatch(ev) if app.GetApplicationState() == ApplicationStates().Resuming { dispatcher.Dispatch(NewSimpleApplicationEvent(app.applicationID, AppTaskCompleted)) - if app := ctx.GetApplication(appID); app != nil { - log.Log(log.ShimContext).Debug("release allocation", - zap.String("appID", appID), - zap.String("taskID", taskID)) - ev := NewSimpleTaskEvent(appID, taskID, CompleteTask) - dispatcher.Dispatch(ev) - if app.GetApplicationState() == ApplicationStates().Resuming { - dispatcher.Dispatch(NewSimpleApplicationEvent(appID, AppTaskCompleted)) - } } } From 4be3abafb7b54a0c096e11117596a9b8b482d557 Mon Sep 17 00:00:00 2001 From: ryankert Date: Fri, 9 Aug 2024 11:54:47 +0800 Subject: [PATCH 07/10] code review --- pkg/cache/context.go | 11 +---------- pkg/cache/context_test.go | 4 ++-- 2 files changed, 3 insertions(+), 12 deletions(-) diff --git a/pkg/cache/context.go b/pkg/cache/context.go index cb1302edc..264edfdf8 100644 --- a/pkg/cache/context.go +++ b/pkg/cache/context.go @@ -873,17 +873,8 @@ func (ctx *Context) StartPodAllocation(podKey string, nodeID string) bool { return ctx.schedulerCache.StartPodAllocation(podKey, nodeID) } -func (ctx *Context) NotifyTaskComplete(app *Application, taskID string) { - ctx.lock.Lock() - defer ctx.lock.Unlock() - ctx.notifyTaskComplete(app, taskID) -} - func (ctx *Context) notifyTaskComplete(app *Application, taskID string) { - log.Log(log.ShimContext).Debug("NotifyTaskComplete", - zap.String("appID", app.applicationID), - zap.String("taskID", taskID)) - log.Log(log.ShimContext).Debug("release allocation", + log.Log(log.ShimContext).Debug("NotifyTaskComplete and release allocation", zap.String("appID", app.applicationID), zap.String("taskID", taskID)) ev := NewSimpleTaskEvent(app.applicationID, taskID, CompleteTask) diff --git a/pkg/cache/context_test.go b/pkg/cache/context_test.go index 3774947e8..6d7bfafa4 100644 --- a/pkg/cache/context_test.go +++ b/pkg/cache/context_test.go @@ -1034,7 +1034,7 @@ func TestTaskReleaseAfterRecovery(t *testing.T) { assert.Equal(t, len(app.GetBoundTasks()), 2) // release one of the tasks - context.NotifyTaskComplete(app, pod2UID) + context.notifyTaskComplete(app, pod2UID) // wait for release err = utils.WaitForCondition(func() bool { @@ -2075,7 +2075,7 @@ func TestTaskRemoveOnCompletion(t *testing.T) { assert.NilError(t, err) // mark completion - context.NotifyTaskComplete(app, taskUID1) + context.notifyTaskComplete(app, taskUID1) err = utils.WaitForCondition(func() bool { return task.GetTaskState() == TaskStates().Completed }, 100*time.Millisecond, time.Second) From 6849dccfd9c963e19f171d3204da626f4637697e Mon Sep 17 00:00:00 2001 From: ryankert Date: Fri, 9 Aug 2024 12:12:29 +0800 Subject: [PATCH 08/10] simplify deleteYunikornPod --- pkg/cache/context.go | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/pkg/cache/context.go b/pkg/cache/context.go index 264edfdf8..e4d6d7663 100644 --- a/pkg/cache/context.go +++ b/pkg/cache/context.go @@ -435,9 +435,7 @@ func (ctx *Context) DeletePod(obj interface{}) { func (ctx *Context) deleteYuniKornPod(pod *v1.Pod) { if taskMeta, ok := getTaskMetadata(pod); ok { - if app := ctx.GetApplication(taskMeta.ApplicationID); app != nil { - ctx.notifyTaskComplete(app, taskMeta.TaskID) - } + ctx.notifyTaskComplete(ctx.GetApplication(taskMeta.ApplicationID), taskMeta.TaskID) } log.Log(log.ShimContext).Debug("removing pod from cache", zap.String("podName", pod.Name)) @@ -874,7 +872,12 @@ func (ctx *Context) StartPodAllocation(podKey string, nodeID string) bool { } func (ctx *Context) notifyTaskComplete(app *Application, taskID string) { - log.Log(log.ShimContext).Debug("NotifyTaskComplete and release allocation", + if app == nil { + log.Log(log.ShimContext).Debug("In notifyTaskComplete but app is nil", + zap.String("taskID", taskID)) + return + } + log.Log(log.ShimContext).Debug("notifyTaskComplete and release allocation", zap.String("appID", app.applicationID), zap.String("taskID", taskID)) ev := NewSimpleTaskEvent(app.applicationID, taskID, CompleteTask) From 47ba6d5ee6baba8075ff7aef882529d4cf10645e Mon Sep 17 00:00:00 2001 From: "Hsien-Cheng(Ryan) Huang" Date: Sun, 11 Aug 2024 21:47:59 +0800 Subject: [PATCH 09/10] code review --- pkg/cache/context.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/pkg/cache/context.go b/pkg/cache/context.go index e4d6d7663..9246143cd 100644 --- a/pkg/cache/context.go +++ b/pkg/cache/context.go @@ -305,9 +305,7 @@ func (ctx *Context) updateYuniKornPod(appID string, pod *v1.Pod) { // treat terminated pods like a remove if utils.IsPodTerminated(pod) { - if app != nil { - ctx.notifyTaskComplete(app, taskID) - } + ctx.notifyTaskComplete(app, taskID) log.Log(log.ShimContext).Debug("Request to update terminated pod, removing from cache", zap.String("podName", pod.Name)) ctx.schedulerCache.RemovePod(pod) return From 788d6d9f45087b75b976cb5b42e8f7d99b050290 Mon Sep 17 00:00:00 2001 From: "Hsien-Cheng(Ryan) Huang" Date: Sun, 11 Aug 2024 21:53:28 +0800 Subject: [PATCH 10/10] code review --- pkg/cache/context.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/cache/context.go b/pkg/cache/context.go index 9246143cd..6b7919237 100644 --- a/pkg/cache/context.go +++ b/pkg/cache/context.go @@ -875,7 +875,7 @@ func (ctx *Context) notifyTaskComplete(app *Application, taskID string) { zap.String("taskID", taskID)) return } - log.Log(log.ShimContext).Debug("notifyTaskComplete and release allocation", + log.Log(log.ShimContext).Debug("release allocation in notifyTaskComplete", zap.String("appID", app.applicationID), zap.String("taskID", taskID)) ev := NewSimpleTaskEvent(app.applicationID, taskID, CompleteTask)