Skip to content

Commit

Permalink
[YUNIKORN-2782] Cleanup dead code in cache/context (#888)
Browse files Browse the repository at this point in the history
Closes: #888

Signed-off-by: Chia-Ping Tsai <[email protected]>
  • Loading branch information
blueBlue0102 authored and chia7712 committed Aug 7, 2024
1 parent 7f9901f commit ba192d4
Show file tree
Hide file tree
Showing 4 changed files with 6 additions and 108 deletions.
35 changes: 1 addition & 34 deletions pkg/cache/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -845,12 +845,6 @@ func (ctx *Context) ForgetPod(name string) {
log.Log(log.ShimContext).Debug("unable to forget pod: not found in cache", zap.String("pod", name))
}

func (ctx *Context) UpdateApplication(app *Application) {
ctx.lock.Lock()
defer ctx.lock.Unlock()
ctx.applications[app.applicationID] = app
}

// IsTaskMaybeSchedulable returns true if a task might be currently able to be scheduled. This uses a bloom filter
// cached from a set of taskIDs to perform efficient negative lookups.
func (ctx *Context) IsTaskMaybeSchedulable(taskID string) bool {
Expand Down Expand Up @@ -1024,34 +1018,7 @@ func (ctx *Context) getApplication(appID string) *Application {
return nil
}

func (ctx *Context) RemoveApplication(appID string) error {
ctx.lock.Lock()
if app, exist := ctx.applications[appID]; exist {
// get the non-terminated task alias
nonTerminatedTaskAlias := app.getNonTerminatedTaskAlias()
// check there are any non-terminated task or not
if len(nonTerminatedTaskAlias) > 0 {
ctx.lock.Unlock()
return fmt.Errorf("failed to remove application %s because it still has task in non-terminated tasks: %s", appID, strings.Join(nonTerminatedTaskAlias, ","))
}
delete(ctx.applications, appID)
ctx.lock.Unlock()
// send the update request to scheduler core
rr := common.CreateUpdateRequestForRemoveApplication(app.applicationID, app.partition)
if err := ctx.apiProvider.GetAPIs().SchedulerAPI.UpdateApplication(rr); err != nil {
log.Log(log.ShimContext).Error("failed to send remove application request to core", zap.Error(err))
}

log.Log(log.ShimContext).Info("app removed",
zap.String("appID", appID))

return nil
}
ctx.lock.Unlock()
return fmt.Errorf("application %s is not found in the context", appID)
}

func (ctx *Context) RemoveApplicationInternal(appID string) {
func (ctx *Context) RemoveApplication(appID string) {
ctx.lock.Lock()
defer ctx.lock.Unlock()
if _, exist := ctx.applications[appID]; !exist {
Expand Down
75 changes: 3 additions & 72 deletions pkg/cache/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,75 +324,6 @@ func TestGetApplication(t *testing.T) {
}

func TestRemoveApplication(t *testing.T) {
// add 3 applications
context := initContextForTest()
app1 := NewApplication(appID1, queueNameA, testUser, testGroups, map[string]string{}, newMockSchedulerAPI())
app2 := NewApplication(appID2, queueNameB, testUser, testGroups, map[string]string{}, newMockSchedulerAPI())
app3 := NewApplication(appID3, queueNameC, testUser, testGroups, map[string]string{}, newMockSchedulerAPI())
context.applications[appID1] = app1
context.applications[appID2] = app2
context.applications[appID3] = app3
pod1 := &v1.Pod{
TypeMeta: apis.TypeMeta{
Kind: "Pod",
APIVersion: "v1",
},
ObjectMeta: apis.ObjectMeta{
Name: "remove-test-00001",
UID: uid1,
},
}
pod2 := &v1.Pod{
TypeMeta: apis.TypeMeta{
Kind: "Pod",
APIVersion: "v1",
},
ObjectMeta: apis.ObjectMeta{
Name: "remove-test-00002",
UID: uid2,
},
}
// New task to application 1
// set task state in Pending (non-terminated)
task1 := NewTask(taskUID1, app1, context, pod1)
app1.taskMap[taskUID1] = task1
task1.sm.SetState(TaskStates().Pending)
// New task to application 2
// set task state in Failed (terminated)
task2 := NewTask(taskUID2, app2, context, pod2)
app2.taskMap[taskUID2] = task2
task2.sm.SetState(TaskStates().Failed)

// remove application 1 which have non-terminated task
// this should fail
assert.Equal(t, len(context.applications), 3)
err := context.RemoveApplication(appID1)
assert.Assert(t, err != nil)
assert.ErrorContains(t, err, "application app00001 because it still has task in non-terminated tasks: /remove-test-00001")

app := context.GetApplication(appID1)
assert.Assert(t, app != nil)

// remove application 2 which have terminated task
// this should be successful
err = context.RemoveApplication(appID2)
assert.Assert(t, err == nil)

app = context.GetApplication(appID2)
assert.Assert(t, app == nil)

// try remove again
// this should fail
err = context.RemoveApplication(appID2)
assert.Assert(t, err != nil)
assert.ErrorContains(t, err, "application app00002 is not found in the context")

// make sure the other app is not affected
app = context.GetApplication(appID3)
assert.Assert(t, app != nil)
}

func TestRemoveApplicationInternal(t *testing.T) {
context := initContextForTest()
app1 := NewApplication(appID1, queueNameA, testUser, testGroups, map[string]string{}, newMockSchedulerAPI())
app2 := NewApplication(appID2, queueNameB, testUser, testGroups, map[string]string{}, newMockSchedulerAPI())
Expand All @@ -401,17 +332,17 @@ func TestRemoveApplicationInternal(t *testing.T) {
assert.Equal(t, len(context.applications), 2)

// remove non-exist app
context.RemoveApplicationInternal(appID3)
context.RemoveApplication(appID3)
assert.Equal(t, len(context.applications), 2)

// remove app1
context.RemoveApplicationInternal(appID1)
context.RemoveApplication(appID1)
assert.Equal(t, len(context.applications), 1)
_, ok := context.applications[appID1]
assert.Equal(t, ok, false)

// remove app2
context.RemoveApplicationInternal(appID2)
context.RemoveApplication(appID2)
assert.Equal(t, len(context.applications), 0)
_, ok = context.applications[appID2]
assert.Equal(t, ok, false)
Expand Down
2 changes: 1 addition & 1 deletion pkg/cache/scheduler_callback.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func (callback *AsyncRMCallback) UpdateApplication(response *si.ApplicationRespo
zap.String("new status", updated.State))
switch updated.State {
case ApplicationStates().Completed:
callback.context.RemoveApplicationInternal(updated.ApplicationID)
callback.context.RemoveApplication(updated.ApplicationID)
case ApplicationStates().Resuming:
app := callback.context.GetApplication(updated.ApplicationID)
if app != nil && app.GetApplicationState() == ApplicationStates().Reserving {
Expand Down
2 changes: 1 addition & 1 deletion pkg/shim/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func (ss *KubernetesShim) schedule() {
for _, app := range apps {
if app.GetApplicationState() == cache.ApplicationStates().Failed {
if app.AreAllTasksTerminated() {
ss.context.RemoveApplicationInternal(app.GetApplicationID())
ss.context.RemoveApplication(app.GetApplicationID())
}
continue
}
Expand Down

0 comments on commit ba192d4

Please sign in to comment.