Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[YUNIKORN-2578] Refactor SchedulerCache.GetPod() remove bool return #837

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 12 additions & 13 deletions pkg/cache/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,8 +343,8 @@

func (ctx *Context) updateForeignPod(pod *v1.Pod) {
podStatusBefore := ""
oldPod, ok := ctx.schedulerCache.GetPod(string(pod.UID))
if ok {
oldPod := ctx.schedulerCache.GetPod(string(pod.UID))
if oldPod != nil {
podStatusBefore = string(oldPod.Status.Phase)
}

Expand Down Expand Up @@ -439,8 +439,8 @@
ctx.lock.Lock()
defer ctx.lock.Unlock()

oldPod, ok := ctx.schedulerCache.GetPod(string(pod.UID))
if !ok {
oldPod := ctx.schedulerCache.GetPod(string(pod.UID))
if oldPod == nil {
// if pod is not in scheduler cache, no node updates are needed
log.Log(log.ShimContext).Debug("unknown foreign pod deleted, no resource updated needed",
zap.String("namespace", pod.Namespace),
Expand All @@ -452,7 +452,7 @@
// 1. pod is already assigned to a node
// 2. pod was not in a terminal state before
// 3. pod references a known node
if oldPod != nil && !utils.IsPodTerminated(oldPod) {
if !utils.IsPodTerminated(oldPod) {
if !ctx.schedulerCache.IsPodOrphaned(string(oldPod.UID)) {
log.Log(log.ShimContext).Debug("foreign pod deleted, triggering occupied resource update",
zap.String("namespace", pod.Namespace),
Expand Down Expand Up @@ -644,9 +644,8 @@
func (ctx *Context) IsPodFitNode(name, node string, allocate bool) error {
ctx.lock.RLock()
defer ctx.lock.RUnlock()
var pod *v1.Pod
var ok bool
if pod, ok = ctx.schedulerCache.GetPod(name); !ok {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

	pod := ctx.schedulerCache.GetPod(name)
	if pod == nil {
		return ErrorPodNotFound
	}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

pod := ctx.schedulerCache.GetPod(name)
if pod == nil {

Check warning on line 648 in pkg/cache/context.go

View check run for this annotation

Codecov / codecov/patch

pkg/cache/context.go#L647-L648

Added lines #L647 - L648 were not covered by tests
return ErrorPodNotFound
}
// if pod exists in cache, try to run predicates
Expand All @@ -672,7 +671,7 @@

ctx.lock.RLock()
defer ctx.lock.RUnlock()
if pod, _ := ctx.schedulerCache.GetPod(name); pod != nil {
if pod := ctx.schedulerCache.GetPod(name); pod != nil {

Check warning on line 674 in pkg/cache/context.go

View check run for this annotation

Codecov / codecov/patch

pkg/cache/context.go#L674

Added line #L674 was not covered by tests
// if pod exists in cache, try to run predicates
if targetNode := ctx.schedulerCache.GetNode(node); targetNode != nil {
// need to lock cache here as predicates need a stable view into the cache
Expand All @@ -682,7 +681,7 @@
// look up each victim in the scheduler cache
victims := make([]*v1.Pod, len(allocations))
for index, uid := range allocations {
victim, _ := ctx.schedulerCache.GetPodNoLock(uid)
victim := ctx.schedulerCache.GetPodNoLock(uid)

Check warning on line 684 in pkg/cache/context.go

View check run for this annotation

Codecov / codecov/patch

pkg/cache/context.go#L684

Added line #L684 was not covered by tests
victims[index] = victim
}

Expand All @@ -704,7 +703,7 @@
// during scheduling process as they have directly impact to other scheduling processes.
// when assumePodVolumes was called, we caches the value if all pod volumes are bound in schedulerCache,
// then here we just need to retrieve that value from cache, to skip bindings if volumes are already bound.
if assumedPod, exist := ctx.schedulerCache.GetPod(podKey); exist {
if assumedPod := ctx.schedulerCache.GetPod(podKey); assumedPod != nil {
if ctx.schedulerCache.ArePodVolumesAllBound(podKey) {
log.Log(log.ShimContext).Info("Binding Pod Volumes skipped: all volumes already bound",
zap.String("podName", pod.Name))
Expand Down Expand Up @@ -782,7 +781,7 @@
func (ctx *Context) AssumePod(name, node string) error {
ctx.lock.Lock()
defer ctx.lock.Unlock()
if pod, ok := ctx.schedulerCache.GetPod(name); ok {
if pod := ctx.schedulerCache.GetPod(name); pod != nil {
// when add assumed pod, we make a copy of the pod to avoid
// modifying its original reference. otherwise, it may have
// race when some other go-routines accessing it in parallel.
Expand Down Expand Up @@ -844,7 +843,7 @@
ctx.lock.Lock()
defer ctx.lock.Unlock()

if pod, ok := ctx.schedulerCache.GetPod(name); ok {
if pod := ctx.schedulerCache.GetPod(name); pod != nil {

Check warning on line 846 in pkg/cache/context.go

View check run for this annotation

Codecov / codecov/patch

pkg/cache/context.go#L846

Added line #L846 was not covered by tests
log.Log(log.ShimContext).Debug("forget pod", zap.String("pod", pod.Name))
ctx.schedulerCache.ForgetPod(pod)
return
Expand Down
94 changes: 47 additions & 47 deletions pkg/cache/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,10 +424,10 @@ func TestAddPod(t *testing.T) {
context.AddPod(pod1) // should be added
context.AddPod(pod2) // should skip as pod is terminated

_, ok := context.schedulerCache.GetPod("UID-00001")
assert.Check(t, ok, "active pod was not added")
_, ok = context.schedulerCache.GetPod("UID-00002")
assert.Check(t, !ok, "terminated pod was added")
pod := context.schedulerCache.GetPod("UID-00001")
assert.Check(t, pod != nil, "active pod was not added")
pod = context.schedulerCache.GetPod("UID-00002")
assert.Check(t, pod == nil, "terminated pod was added")
}

func TestUpdatePod(t *testing.T) {
Expand Down Expand Up @@ -482,8 +482,8 @@ func TestUpdatePod(t *testing.T) {
}

context.AddPod(pod1)
_, ok := context.schedulerCache.GetPod("UID-00001")
assert.Assert(t, ok, "pod1 is not present after adding")
pod := context.schedulerCache.GetPod("UID-00001")
assert.Assert(t, pod != nil, "pod1 is not present after adding")

// these should not fail, but are no-ops
context.UpdatePod(nil, nil)
Expand All @@ -492,13 +492,13 @@ func TestUpdatePod(t *testing.T) {

// ensure a terminated pod is removed
context.UpdatePod(pod1, pod3)
_, ok = context.schedulerCache.GetPod("UID-00001")
assert.Check(t, !ok, "pod still found after termination")
pod = context.schedulerCache.GetPod("UID-00001")
assert.Check(t, pod == nil, "pod still found after termination")

// ensure a non-terminated pod is updated
context.UpdatePod(pod1, pod2)
found, ok := context.schedulerCache.GetPod("UID-00001")
if assert.Check(t, ok, "pod not found after update") {
found := context.schedulerCache.GetPod("UID-00001")
if assert.Check(t, found != nil, "pod not found after update") {
assert.Check(t, found.GetAnnotations()["test.state"] == "updated", "pod state not updated")
}
}
Expand Down Expand Up @@ -537,22 +537,22 @@ func TestDeletePod(t *testing.T) {

context.AddPod(pod1)
context.AddPod(pod2)
_, ok := context.schedulerCache.GetPod("UID-00001")
assert.Assert(t, ok, "pod1 is not present after adding")
_, ok = context.schedulerCache.GetPod("UID-00002")
assert.Assert(t, ok, "pod2 is not present after adding")
pod := context.schedulerCache.GetPod("UID-00001")
assert.Assert(t, pod != nil, "pod1 is not present after adding")
pod = context.schedulerCache.GetPod("UID-00002")
assert.Assert(t, pod != nil, "pod2 is not present after adding")

// these should not fail, but here for completeness
context.DeletePod(nil)
context.DeletePod(cache.DeletedFinalStateUnknown{Key: "UID-00000", Obj: nil})

context.DeletePod(pod1)
_, ok = context.schedulerCache.GetPod("UID-00001")
assert.Check(t, !ok, "pod1 is still present")
pod = context.schedulerCache.GetPod("UID-00001")
assert.Check(t, pod == nil, "pod1 is still present")

context.DeletePod(cache.DeletedFinalStateUnknown{Key: "UID-00002", Obj: pod2})
_, ok = context.schedulerCache.GetPod("UID-00002")
assert.Check(t, !ok, "pod2 is still present")
pod = context.schedulerCache.GetPod("UID-00002")
assert.Check(t, pod == nil, "pod2 is still present")
}

//nolint:funlen
Expand Down Expand Up @@ -622,16 +622,16 @@ func TestAddUpdatePodForeign(t *testing.T) {
expectRemove = false
context.AddPod(pod1)
assert.Assert(t, !executed, "unexpected update")
_, ok := context.schedulerCache.GetPod(string(pod1.UID))
assert.Assert(t, !ok, "unassigned pod found in cache")
pod := context.schedulerCache.GetPod(string(pod1.UID))
assert.Assert(t, pod == nil, "unassigned pod found in cache")

// validate update
tc = "update-pod1"
executed = false
expectRemove = false
context.UpdatePod(nil, pod1)
assert.Assert(t, !executed, "unexpected update")
assert.Assert(t, !ok, "unassigned pod found in cache")
assert.Assert(t, pod == nil, "unassigned pod found in cache")

// pod is assigned to a node but still in pending state, should update
pod2 := foreignPod("pod2", "1G", "500m")
Expand All @@ -645,8 +645,8 @@ func TestAddUpdatePodForeign(t *testing.T) {
expectRemove = false
context.AddPod(pod2)
assert.Assert(t, executed, "updated expected")
_, ok = context.schedulerCache.GetPod(string(pod2.UID))
assert.Assert(t, ok, "pod not found in cache")
pod = context.schedulerCache.GetPod(string(pod2.UID))
assert.Assert(t, pod != nil, "pod not found in cache")

// validate update
tc = "update-pod2"
Expand All @@ -655,8 +655,8 @@ func TestAddUpdatePodForeign(t *testing.T) {
expectRemove = false
context.UpdatePod(nil, pod2)
assert.Assert(t, !executed, "unexpected update")
_, ok = context.schedulerCache.GetPod(string(pod2.UID))
assert.Assert(t, ok, "pod not found in cache")
pod = context.schedulerCache.GetPod(string(pod2.UID))
assert.Assert(t, pod != nil, "pod not found in cache")

// validate update when not already in cache
tc = "update-pod2-nocache-pre"
Expand All @@ -671,8 +671,8 @@ func TestAddUpdatePodForeign(t *testing.T) {
expectRemove = false
context.UpdatePod(nil, pod2)
assert.Assert(t, executed, "expected update")
_, ok = context.schedulerCache.GetPod(string(pod2.UID))
assert.Assert(t, ok, "pod not found in cache")
pod = context.schedulerCache.GetPod(string(pod2.UID))
assert.Assert(t, pod != nil, "pod not found in cache")

// pod is failed, should trigger update if already in cache
pod3 := pod2.DeepCopy()
Expand All @@ -685,8 +685,8 @@ func TestAddUpdatePodForeign(t *testing.T) {
expectRemove = true
context.AddPod(pod3)
assert.Assert(t, executed, "expected update")
_, ok = context.schedulerCache.GetPod(string(pod3.UID))
assert.Assert(t, !ok, "failed pod found in cache")
pod = context.schedulerCache.GetPod(string(pod3.UID))
assert.Assert(t, pod == nil, "failed pod found in cache")

// validate update when not already in cache
tc = "update-pod3-pre"
Expand All @@ -700,8 +700,8 @@ func TestAddUpdatePodForeign(t *testing.T) {
expectRemove = true
context.UpdatePod(nil, pod3)
assert.Assert(t, executed, "expected update")
_, ok = context.schedulerCache.GetPod(string(pod3.UID))
assert.Assert(t, !ok, "failed pod found in cache")
pod = context.schedulerCache.GetPod(string(pod3.UID))
assert.Assert(t, pod == nil, "failed pod found in cache")
}

func TestDeletePodForeign(t *testing.T) {
Expand Down Expand Up @@ -777,8 +777,8 @@ func TestDeletePodForeign(t *testing.T) {
expectRemove = true
context.DeletePod(pod1)
assert.Assert(t, executed, "update not executed")
_, ok := context.schedulerCache.GetPod(string(pod1.UID))
assert.Assert(t, !ok, "deleted pod found in cache")
pod := context.schedulerCache.GetPod(string(pod1.UID))
assert.Assert(t, pod == nil, "deleted pod found in cache")

// validate delete when not already found
tc = "delete-pod1-again"
Expand All @@ -787,8 +787,8 @@ func TestDeletePodForeign(t *testing.T) {
expectRemove = false
context.DeletePod(pod1)
assert.Assert(t, !executed, "unexpected update")
_, ok = context.schedulerCache.GetPod(string(pod1.UID))
assert.Assert(t, !ok, "deleted pod found in cache")
pod = context.schedulerCache.GetPod(string(pod1.UID))
assert.Assert(t, pod == nil, "deleted pod found in cache")
}

func TestAddTask(t *testing.T) {
Expand Down Expand Up @@ -2142,8 +2142,8 @@ func TestAssumePod(t *testing.T) {
err := context.AssumePod(pod1UID, fakeNodeName)
assert.NilError(t, err)
assert.Assert(t, context.schedulerCache.ArePodVolumesAllBound(pod1UID))
assumedPod, ok := context.schedulerCache.GetPod(pod1UID)
assert.Assert(t, ok, "pod not found in cache")
assumedPod := context.schedulerCache.GetPod(pod1UID)
assert.Assert(t, assumedPod != nil, "pod not found in cache")
assert.Equal(t, assumedPod.Spec.NodeName, fakeNodeName)
assert.Assert(t, context.schedulerCache.IsAssumedPod(pod1UID))
}
Expand All @@ -2159,8 +2159,8 @@ func TestAssumePod_GetPodVolumeClaimsError(t *testing.T) {
err := context.AssumePod(pod1UID, fakeNodeName)
assert.Error(t, err, errMsg)
assert.Assert(t, !context.schedulerCache.IsAssumedPod(pod1UID))
podInCache, ok := context.schedulerCache.GetPod(pod1UID)
assert.Assert(t, ok, "pod not found in cache")
podInCache := context.schedulerCache.GetPod(pod1UID)
assert.Assert(t, podInCache != nil, "pod not found in cache")
assert.Equal(t, podInCache.Spec.NodeName, "", "NodeName in pod spec was set unexpectedly")
}

Expand All @@ -2175,8 +2175,8 @@ func TestAssumePod_FindPodVolumesError(t *testing.T) {
err := context.AssumePod(pod1UID, fakeNodeName)
assert.Error(t, err, errMsg)
assert.Assert(t, !context.schedulerCache.IsAssumedPod(pod1UID))
podInCache, ok := context.schedulerCache.GetPod(pod1UID)
assert.Assert(t, ok, "pod not found in cache")
podInCache := context.schedulerCache.GetPod(pod1UID)
assert.Assert(t, podInCache != nil, "pod not found in cache")
assert.Equal(t, podInCache.Spec.NodeName, "", "NodeName in pod spec was set unexpectedly")
}

Expand All @@ -2190,8 +2190,8 @@ func TestAssumePod_ConflictingVolumes(t *testing.T) {
err := context.AssumePod(pod1UID, fakeNodeName)
assert.Error(t, err, "pod my-pod-1 has conflicting volume claims: reason1, reason2")
assert.Assert(t, !context.schedulerCache.IsAssumedPod(pod1UID))
podInCache, ok := context.schedulerCache.GetPod(pod1UID)
assert.Assert(t, ok, "pod not found in cache")
podInCache := context.schedulerCache.GetPod(pod1UID)
assert.Assert(t, podInCache != nil, "pod not found in cache")
assert.Equal(t, podInCache.Spec.NodeName, "", "NodeName in pod spec was set unexpectedly")
}

Expand All @@ -2206,8 +2206,8 @@ func TestAssumePod_AssumePodVolumesError(t *testing.T) {
err := context.AssumePod(pod1UID, fakeNodeName)
assert.Error(t, err, errMsg)
assert.Assert(t, !context.schedulerCache.IsAssumedPod(pod1UID))
podInCache, ok := context.schedulerCache.GetPod(pod1UID)
assert.Assert(t, ok, "pod not found in cache")
podInCache := context.schedulerCache.GetPod(pod1UID)
assert.Assert(t, podInCache != nil, "pod not found in cache")
assert.Equal(t, podInCache.Spec.NodeName, "", "NodeName in pod spec was set unexpectedly")
}

Expand All @@ -2219,8 +2219,8 @@ func TestAssumePod_PodNotFound(t *testing.T) {
err := context.AssumePod("nonexisting", fakeNodeName)
assert.NilError(t, err)
assert.Assert(t, !context.schedulerCache.IsAssumedPod(pod1UID))
podInCache, ok := context.schedulerCache.GetPod(pod1UID)
assert.Assert(t, ok)
podInCache := context.schedulerCache.GetPod(pod1UID)
assert.Assert(t, podInCache != nil)
assert.Equal(t, podInCache.Spec.NodeName, "", "NodeName in pod spec was set unexpectedly")
}

Expand Down
10 changes: 5 additions & 5 deletions pkg/cache/external/scheduler_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@
cache.lock.Lock()
defer cache.lock.Unlock()
// verify that the pod exists in the cache, otherwise ignore
if _, ok := cache.GetPodNoLock(taskID); !ok {
if pod := cache.GetPodNoLock(taskID); pod == nil {

Check warning on line 381 in pkg/cache/external/scheduler_cache.go

View check run for this annotation

Codecov / codecov/patch

pkg/cache/external/scheduler_cache.go#L381

Added line #L381 was not covered by tests
return
}
cache.addSchedulingTask(taskID)
Expand Down Expand Up @@ -635,7 +635,7 @@
return true
}

func (cache *SchedulerCache) GetPod(uid string) (*v1.Pod, bool) {
func (cache *SchedulerCache) GetPod(uid string) *v1.Pod {
cache.lock.RLock()
defer cache.lock.RUnlock()
return cache.GetPodNoLock(uid)
Expand All @@ -648,11 +648,11 @@
return ok
}

func (cache *SchedulerCache) GetPodNoLock(uid string) (*v1.Pod, bool) {
func (cache *SchedulerCache) GetPodNoLock(uid string) *v1.Pod {
if pod, ok := cache.podsMap[uid]; ok {
return pod, true
return pod
}
return nil, false
return nil
}

func (cache *SchedulerCache) AssumePod(pod *v1.Pod, allBound bool) {
Expand Down
Loading