Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[YUNIKORN-2724] Improve the signature of methods notifyTaskComplete() and ensureAppAndTaskCreated() #873

Closed
wants to merge 15 commits into from
Closed
30 changes: 15 additions & 15 deletions pkg/cache/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,9 @@ func (ctx *Context) updateYuniKornPod(appID string, pod *v1.Pod) {

// treat terminated pods like a remove
if utils.IsPodTerminated(pod) {
ctx.notifyTaskComplete(appID, taskID)
if app != nil {
ctx.notifyTaskComplete(app, taskID)
}
log.Log(log.ShimContext).Debug("Request to update terminated pod, removing from cache", zap.String("podName", pod.Name))
ctx.schedulerCache.RemovePod(pod)
return
Expand Down Expand Up @@ -448,7 +450,7 @@ func (ctx *Context) deleteYuniKornPod(pod *v1.Pod) {
defer ctx.lock.Unlock()
if taskMeta, ok := getTaskMetadata(pod); ok {
if app := ctx.getApplication(taskMeta.ApplicationID); app != nil {
ctx.notifyTaskComplete(taskMeta.ApplicationID, taskMeta.TaskID)
ctx.notifyTaskComplete(app, taskMeta.TaskID)
}
}

Expand Down Expand Up @@ -908,25 +910,23 @@ func (ctx *Context) StartPodAllocation(podKey string, nodeID string) bool {
return ctx.schedulerCache.StartPodAllocation(podKey, nodeID)
}

func (ctx *Context) NotifyTaskComplete(appID, taskID string) {
func (ctx *Context) NotifyTaskComplete(app *Application, taskID string) {
ctx.lock.Lock()
defer ctx.lock.Unlock()
ctx.notifyTaskComplete(appID, taskID)
ctx.notifyTaskComplete(app, taskID)
}

func (ctx *Context) notifyTaskComplete(appID, taskID string) {
func (ctx *Context) notifyTaskComplete(app *Application, taskID string) {
log.Log(log.ShimContext).Debug("NotifyTaskComplete",
zap.String("appID", appID),
zap.String("appID", app.applicationID),
zap.String("taskID", taskID))
if app := ctx.getApplication(appID); app != nil {
log.Log(log.ShimContext).Debug("release allocation",
zap.String("appID", appID),
zap.String("taskID", taskID))
ev := NewSimpleTaskEvent(appID, taskID, CompleteTask)
dispatcher.Dispatch(ev)
if app.GetApplicationState() == ApplicationStates().Resuming {
dispatcher.Dispatch(NewSimpleApplicationEvent(appID, AppTaskCompleted))
}
log.Log(log.ShimContext).Debug("release allocation",
zap.String("appID", app.applicationID),
zap.String("taskID", taskID))
ev := NewSimpleTaskEvent(app.applicationID, taskID, CompleteTask)
dispatcher.Dispatch(ev)
if app.GetApplicationState() == ApplicationStates().Resuming {
dispatcher.Dispatch(NewSimpleApplicationEvent(app.applicationID, AppTaskCompleted))
}
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/cache/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1103,7 +1103,7 @@ func TestTaskReleaseAfterRecovery(t *testing.T) {
assert.Equal(t, len(app.GetBoundTasks()), 2)

// release one of the tasks
context.NotifyTaskComplete(appID, pod2UID)
context.NotifyTaskComplete(app, pod2UID)

// wait for release
err = utils.WaitForCondition(func() bool {
Expand Down Expand Up @@ -2137,7 +2137,7 @@ func TestTaskRemoveOnCompletion(t *testing.T) {
assert.NilError(t, err)

// mark completion
context.NotifyTaskComplete(appID, taskUID1)
context.NotifyTaskComplete(app, taskUID1)
err = utils.WaitForCondition(func() bool {
return task.GetTaskState() == TaskStates().Completed
}, 100*time.Millisecond, time.Second)
Expand Down
Loading