From b766c9907eded3944b62ba10d1cc36a41a03e83e Mon Sep 17 00:00:00 2001 From: Craig Condit Date: Wed, 1 Nov 2023 12:52:16 -0500 Subject: [PATCH] [YUNIKORN-2100] Merge functionality of node coordinator into context Merge the node_coordinator.go pod event handler logic into the Context's pod event handlers. Also, rename addPodToCache() / updatePodInCache() / removePodFromCache() to addPod() / updatePod() / deletePod() for consistency. --- pkg/cache/context.go | 170 +++++++++---- pkg/cache/context_test.go | 361 +++++++++++++++++++++----- pkg/cache/node_coordinator.go | 140 ----------- pkg/cache/node_coordinator_test.go | 392 ----------------------------- 4 files changed, 418 insertions(+), 645 deletions(-) delete mode 100644 pkg/cache/node_coordinator.go delete mode 100644 pkg/cache/node_coordinator_test.go diff --git a/pkg/cache/context.go b/pkg/cache/context.go index a255e78d6..f452c6abc 100644 --- a/pkg/cache/context.go +++ b/pkg/cache/context.go @@ -108,18 +108,9 @@ func (ctx *Context) AddSchedulingEventHandlers() { ctx.apiProvider.AddEventHandler(&client.ResourceEventHandlers{ Type: client.PodInformerHandlers, - FilterFn: ctx.filterPods, - AddFn: ctx.addPodToCache, - UpdateFn: ctx.updatePodInCache, - DeleteFn: ctx.removePodFromCache, - }) - - nodeCoordinator := newNodeResourceCoordinator(ctx.nodes) - ctx.apiProvider.AddEventHandler(&client.ResourceEventHandlers{ - Type: client.PodInformerHandlers, - FilterFn: nodeCoordinator.filterPods, - UpdateFn: nodeCoordinator.updatePod, - DeleteFn: nodeCoordinator.deletePod, + AddFn: ctx.addPod, + UpdateFn: ctx.updatePod, + DeleteFn: ctx.deletePod, }) ctx.apiProvider.AddEventHandler(&client.ResourceEventHandlers{ @@ -147,6 +138,9 @@ func (ctx *Context) SetPluginMode(pluginMode bool) { } func (ctx *Context) addNode(obj interface{}) { + ctx.lock.Lock() + defer ctx.lock.Unlock() + node, err := convertToNode(obj) if err != nil { log.Log(log.ShimContext).Error("node conversion failed", zap.Error(err)) @@ -166,6 +160,9 @@ func (ctx *Context) addNode(obj interface{}) { } func (ctx *Context) updateNode(oldObj, newObj interface{}) { + ctx.lock.Lock() + defer ctx.lock.Unlock() + // we only trigger update when resource changes oldNode, err := convertToNode(oldObj) if err != nil { @@ -189,6 +186,9 @@ func (ctx *Context) updateNode(oldObj, newObj interface{}) { } func (ctx *Context) deleteNode(obj interface{}) { + ctx.lock.Lock() + defer ctx.lock.Unlock() + var node *v1.Node switch t := obj.(type) { case *v1.Node: @@ -217,25 +217,90 @@ func (ctx *Context) deleteNode(obj interface{}) { fmt.Sprintf("node %s is deleted from the scheduler", node.Name)) } -func (ctx *Context) addPodToCache(obj interface{}) { +func (ctx *Context) addPod(obj interface{}) { pod, err := utils.Convert2Pod(obj) if err != nil { - log.Log(log.ShimContext).Error("failed to add pod to cache", zap.Error(err)) + log.Log(log.ShimContext).Error("failed to add pod", zap.Error(err)) + return + } + if utils.GetApplicationIDFromPod(pod) == "" { + ctx.updateForeignPod(pod) + } else { + ctx.updateYuniKornPod(pod) + } +} + +func (ctx *Context) updatePod(_, newObj interface{}) { + pod, err := utils.Convert2Pod(newObj) + if err != nil { + log.Log(log.ShimContext).Error("failed to update pod", zap.Error(err)) return } + if utils.GetApplicationIDFromPod(pod) == "" { + ctx.updateForeignPod(pod) + } else { + ctx.updateYuniKornPod(pod) + } +} - // treat a terminated pod like a removal +func (ctx *Context) updateYuniKornPod(pod *v1.Pod) { + ctx.lock.Lock() + defer ctx.lock.Unlock() + + // treat terminated pods like a remove if utils.IsPodTerminated(pod) { - log.Log(log.ShimContext).Debug("Request to add terminated pod, removing from cache", zap.String("podName", pod.Name)) + log.Log(log.ShimContext).Debug("Request to update terminated pod, removing from cache", zap.String("podName", pod.Name)) ctx.schedulerCache.RemovePod(pod) return } + ctx.schedulerCache.UpdatePod(pod) +} - log.Log(log.ShimContext).Debug("adding pod to cache", zap.String("podName", pod.Name)) - ctx.schedulerCache.AddPod(pod) +func (ctx *Context) updateForeignPod(pod *v1.Pod) { + ctx.lock.Lock() + defer ctx.lock.Unlock() + + podStatusBefore := "" + oldPod, ok := ctx.schedulerCache.GetPod(string(pod.UID)) + if ok { + podStatusBefore = string(oldPod.Status.Phase) + } + + // conditions for allocate: + // 1. pod was previously assigned + // 2. pod is now assigned + // 3. pod is not in terminated state + if oldPod == nil && utils.IsAssignedPod(pod) && !utils.IsPodTerminated(pod) { + log.Log(log.ShimContext).Debug("pod is assigned to a node, trigger occupied resource update", + zap.String("namespace", pod.Namespace), + zap.String("podName", pod.Name), + zap.String("podStatusBefore", podStatusBefore), + zap.String("podStatusCurrent", string(pod.Status.Phase))) + podResource := common.GetPodResource(pod) + ctx.nodes.updateNodeOccupiedResources(pod.Spec.NodeName, podResource, AddOccupiedResource) + ctx.schedulerCache.AddPod(pod) + return + } + + // conditions for release: + // 1. pod was previously assigned + // 2. pod is now in a terminated state + if oldPod != nil && utils.IsPodTerminated(pod) { + log.Log(log.ShimContext).Debug("pod terminated, trigger occupied resource update", + zap.String("namespace", pod.Namespace), + zap.String("podName", pod.Name), + zap.String("podStatusBefore", podStatusBefore), + zap.String("podStatusCurrent", string(pod.Status.Phase))) + // this means pod is terminated + // we need sub the occupied resource and re-sync with the scheduler-core + podResource := common.GetPodResource(pod) + ctx.nodes.updateNodeOccupiedResources(pod.Spec.NodeName, podResource, SubOccupiedResource) + ctx.schedulerCache.RemovePod(pod) + return + } } -func (ctx *Context) removePodFromCache(obj interface{}) { +func (ctx *Context) deletePod(obj interface{}) { var pod *v1.Pod switch t := obj.(type) { case *v1.Pod: @@ -252,39 +317,47 @@ func (ctx *Context) removePodFromCache(obj interface{}) { return } + if utils.GetApplicationIDFromPod(pod) == "" { + ctx.deleteForeignPod(pod) + } else { + ctx.deleteYuniKornPod(pod) + } +} + +func (ctx *Context) deleteYuniKornPod(pod *v1.Pod) { + ctx.lock.Lock() + defer ctx.lock.Unlock() + log.Log(log.ShimContext).Debug("removing pod from cache", zap.String("podName", pod.Name)) ctx.schedulerCache.RemovePod(pod) } -func (ctx *Context) updatePodInCache(oldObj, newObj interface{}) { - _, err := utils.Convert2Pod(oldObj) - if err != nil { - log.Log(log.ShimContext).Error("failed to update pod in cache", zap.Error(err)) - return - } - newPod, err := utils.Convert2Pod(newObj) - if err != nil { - log.Log(log.ShimContext).Error("failed to update pod in cache", zap.Error(err)) - return - } +func (ctx *Context) deleteForeignPod(pod *v1.Pod) { + ctx.lock.Lock() + defer ctx.lock.Unlock() - // treat terminated pods like a remove - if utils.IsPodTerminated(newPod) { - log.Log(log.ShimContext).Debug("Request to update terminated pod, removing from cache", zap.String("podName", newPod.Name)) - ctx.schedulerCache.RemovePod(newPod) + oldPod, ok := ctx.schedulerCache.GetPod(string(pod.UID)) + if !ok { + // if pod is not in scheduler cache, no node updates are needed + log.Log(log.ShimContext).Debug("unknown foreign pod deleted, no resource updated needed", + zap.String("namespace", pod.Namespace), + zap.String("podName", pod.Name)) return } - ctx.schedulerCache.UpdatePod(newPod) -} - -// filter pods by scheduler name and state -func (ctx *Context) filterPods(obj interface{}) bool { - switch obj := obj.(type) { - case *v1.Pod: - return utils.GetApplicationIDFromPod(obj) != "" - default: - return false + // conditions for release: + // 1. pod is already assigned to a node + if oldPod != nil { + log.Log(log.ShimContext).Debug("foreign pod deleted, triggering occupied resource update", + zap.String("namespace", pod.Namespace), + zap.String("podName", pod.Name), + zap.String("podStatusBefore", string(oldPod.Status.Phase)), + zap.String("podStatusCurrent", string(pod.Status.Phase))) + // this means pod is terminated + // we need sub the occupied resource and re-sync with the scheduler-core + podResource := common.GetPodResource(pod) + ctx.nodes.updateNodeOccupiedResources(pod.Spec.NodeName, podResource, SubOccupiedResource) + ctx.schedulerCache.RemovePod(pod) } } @@ -367,6 +440,9 @@ func (ctx *Context) filterPriorityClasses(obj interface{}) bool { } func (ctx *Context) addPriorityClass(obj interface{}) { + ctx.lock.Lock() + defer ctx.lock.Unlock() + log.Log(log.ShimContext).Debug("priority class added") priorityClass := utils.Convert2PriorityClass(obj) if priorityClass != nil { @@ -375,6 +451,9 @@ func (ctx *Context) addPriorityClass(obj interface{}) { } func (ctx *Context) updatePriorityClass(_, newObj interface{}) { + ctx.lock.Lock() + defer ctx.lock.Unlock() + log.Log(log.ShimContext).Debug("priority class updated") priorityClass := utils.Convert2PriorityClass(newObj) if priorityClass != nil { @@ -383,6 +462,9 @@ func (ctx *Context) updatePriorityClass(_, newObj interface{}) { } func (ctx *Context) deletePriorityClass(obj interface{}) { + ctx.lock.Lock() + defer ctx.lock.Unlock() + log.Log(log.ShimContext).Debug("priorityClass deleted") var priorityClass *schedulingv1.PriorityClass switch t := obj.(type) { diff --git a/pkg/cache/context_test.go b/pkg/cache/context_test.go index 599be9183..484cc8355 100644 --- a/pkg/cache/context_test.go +++ b/pkg/cache/context_test.go @@ -48,6 +48,10 @@ import ( "github.com/apache/yunikorn-scheduler-interface/lib/go/si" ) +const ( + Host1 = "HOST1" +) + var ( testGroups = []string{"dev", "yunikorn"} ) @@ -334,49 +338,7 @@ func TestRemoveApplicationInternal(t *testing.T) { assert.Equal(t, ok, false) } -func TestFilterPods(t *testing.T) { - context := initContextForTest() - pod1 := &v1.Pod{ - TypeMeta: apis.TypeMeta{ - Kind: "Pod", - APIVersion: "v1", - }, - ObjectMeta: apis.ObjectMeta{ - Name: "yunikorn-test-00001", - UID: "UID-00001", - }, - Spec: v1.PodSpec{SchedulerName: "yunikorn"}, - } - pod2 := &v1.Pod{ - TypeMeta: apis.TypeMeta{ - Kind: "Pod", - APIVersion: "v1", - }, - ObjectMeta: apis.ObjectMeta{ - Name: "yunikorn-test-00002", - UID: "UID-00002", - }, - Spec: v1.PodSpec{SchedulerName: "default-scheduler"}, - } - pod3 := &v1.Pod{ - TypeMeta: apis.TypeMeta{ - Kind: "Pod", - APIVersion: "v1", - }, - ObjectMeta: apis.ObjectMeta{ - Name: "yunikorn-test-00003", - UID: "UID-00003", - Labels: map[string]string{"applicationId": "test-00003"}, - }, - Spec: v1.PodSpec{SchedulerName: "yunikorn"}, - } - assert.Check(t, !context.filterPods(nil), "nil object was allowed") - assert.Check(t, !context.filterPods(pod1), "yunikorn-managed pod with no app id was allowed") - assert.Check(t, !context.filterPods(pod2), "non-yunikorn-managed pod was allowed") - assert.Check(t, context.filterPods(pod3), "yunikorn-managed pod was filtered") -} - -func TestAddPodToCache(t *testing.T) { +func TestAddPod(t *testing.T) { context := initContextForTest() pod1 := &v1.Pod{ @@ -387,6 +349,9 @@ func TestAddPodToCache(t *testing.T) { ObjectMeta: apis.ObjectMeta{ Name: "yunikorn-test-00001", UID: "UID-00001", + Annotations: map[string]string{ + constants.AnnotationApplicationID: "yunikorn-test-00001", + }, }, Spec: v1.PodSpec{SchedulerName: "yunikorn"}, } @@ -398,6 +363,9 @@ func TestAddPodToCache(t *testing.T) { ObjectMeta: apis.ObjectMeta{ Name: "yunikorn-test-00002", UID: "UID-00002", + Annotations: map[string]string{ + constants.AnnotationApplicationID: "yunikorn-test-00002", + }, }, Spec: v1.PodSpec{SchedulerName: "yunikorn"}, Status: v1.PodStatus{ @@ -405,9 +373,9 @@ func TestAddPodToCache(t *testing.T) { }, } - context.addPodToCache(nil) // no-op, but should not crash - context.addPodToCache(pod1) // should be added - context.addPodToCache(pod2) // should skip as pod is terminated + context.addPod(nil) // no-op, but should not crash + context.addPod(pod1) // should be added + context.addPod(pod2) // should skip as pod is terminated _, ok := context.schedulerCache.GetPod("UID-00001") assert.Check(t, ok, "active pod was not added") @@ -415,7 +383,7 @@ func TestAddPodToCache(t *testing.T) { assert.Check(t, !ok, "terminated pod was added") } -func TestUpdatePodInCache(t *testing.T) { +func TestUpdatePod(t *testing.T) { context := initContextForTest() pod1 := &v1.Pod{ @@ -424,9 +392,12 @@ func TestUpdatePodInCache(t *testing.T) { APIVersion: "v1", }, ObjectMeta: apis.ObjectMeta{ - Name: "yunikorn-test-00001", - UID: "UID-00001", - Annotations: map[string]string{"test.state": "new"}, + Name: "yunikorn-test-00001", + UID: "UID-00001", + Annotations: map[string]string{ + constants.AnnotationApplicationID: "yunikorn-test-00001", + "test.state": "new", + }, }, Spec: v1.PodSpec{SchedulerName: "yunikorn"}, } @@ -436,9 +407,12 @@ func TestUpdatePodInCache(t *testing.T) { APIVersion: "v1", }, ObjectMeta: apis.ObjectMeta{ - Name: "yunikorn-test-00001", - UID: "UID-00001", - Annotations: map[string]string{"test.state": "updated"}, + Name: "yunikorn-test-00001", + UID: "UID-00001", + Annotations: map[string]string{ + constants.AnnotationApplicationID: "yunikorn-test-00001", + "test.state": "updated", + }, }, Spec: v1.PodSpec{SchedulerName: "yunikorn"}, } @@ -450,6 +424,9 @@ func TestUpdatePodInCache(t *testing.T) { ObjectMeta: apis.ObjectMeta{ Name: "yunikorn-test-00001", UID: "UID-00001", + Annotations: map[string]string{ + constants.AnnotationApplicationID: "yunikorn-test-00001", + }, }, Spec: v1.PodSpec{SchedulerName: "yunikorn"}, Status: v1.PodStatus{ @@ -457,29 +434,29 @@ func TestUpdatePodInCache(t *testing.T) { }, } - context.addPodToCache(pod1) + context.addPod(pod1) _, ok := context.schedulerCache.GetPod("UID-00001") assert.Assert(t, ok, "pod1 is not present after adding") // these should not fail, but are no-ops - context.updatePodInCache(nil, nil) - context.updatePodInCache(nil, pod1) - context.updatePodInCache(pod1, nil) + context.updatePod(nil, nil) + context.updatePod(nil, pod1) + context.updatePod(pod1, nil) // ensure a terminated pod is removed - context.updatePodInCache(pod1, pod3) + context.updatePod(pod1, pod3) _, ok = context.schedulerCache.GetPod("UID-00001") assert.Check(t, !ok, "pod still found after termination") // ensure a non-terminated pod is updated - context.updatePodInCache(pod1, pod2) + context.updatePod(pod1, pod2) found, ok := context.schedulerCache.GetPod("UID-00001") if assert.Check(t, ok, "pod not found after update") { assert.Check(t, found.GetAnnotations()["test.state"] == "updated", "pod state not updated") } } -func TestRemovePodFromCache(t *testing.T) { +func TestDeletePod(t *testing.T) { context := initContextForTest() pod1 := &v1.Pod{ @@ -490,6 +467,9 @@ func TestRemovePodFromCache(t *testing.T) { ObjectMeta: apis.ObjectMeta{ Name: "yunikorn-test-00001", UID: "UID-00001", + Annotations: map[string]string{ + constants.AnnotationApplicationID: "yunikorn-test-00001", + }, }, Spec: v1.PodSpec{SchedulerName: "yunikorn"}, } @@ -501,30 +481,222 @@ func TestRemovePodFromCache(t *testing.T) { ObjectMeta: apis.ObjectMeta{ Name: "yunikorn-test-00002", UID: "UID-00002", + Annotations: map[string]string{ + constants.AnnotationApplicationID: "yunikorn-test-00002", + }, }, Spec: v1.PodSpec{SchedulerName: "yunikorn"}, } - context.addPodToCache(pod1) - context.addPodToCache(pod2) + context.addPod(pod1) + context.addPod(pod2) _, ok := context.schedulerCache.GetPod("UID-00001") assert.Assert(t, ok, "pod1 is not present after adding") _, ok = context.schedulerCache.GetPod("UID-00002") assert.Assert(t, ok, "pod2 is not present after adding") // these should not fail, but here for completeness - context.removePodFromCache(nil) - context.removePodFromCache(cache.DeletedFinalStateUnknown{Key: "UID-00000", Obj: nil}) + context.deletePod(nil) + context.deletePod(cache.DeletedFinalStateUnknown{Key: "UID-00000", Obj: nil}) - context.removePodFromCache(pod1) + context.deletePod(pod1) _, ok = context.schedulerCache.GetPod("UID-00001") assert.Check(t, !ok, "pod1 is still present") - context.removePodFromCache(cache.DeletedFinalStateUnknown{Key: "UID-00002", Obj: pod2}) + context.deletePod(cache.DeletedFinalStateUnknown{Key: "UID-00002", Obj: pod2}) _, ok = context.schedulerCache.GetPod("UID-00002") assert.Check(t, !ok, "pod2 is still present") } +//nolint:funlen +func TestAddUpdatePodForeign(t *testing.T) { + mockedSchedulerApi := newMockSchedulerAPI() + context := initContextForTest() + context.nodes = newSchedulerNodes(mockedSchedulerApi, NewTestSchedulerCache()) + host1 := nodeForTest(Host1, "10G", "10") + context.nodes.addNode(host1) + + executed := false + expectAdd := false + expectRemove := false + testCase := "" + + mockedSchedulerApi.UpdateNodeFn = func(request *si.NodeRequest) error { + executed = true + assert.Equal(t, len(request.Nodes), 1, "%s: wrong node count", testCase) + updatedNode := request.Nodes[0] + assert.Equal(t, updatedNode.NodeID, Host1, "%s: wrong nodeID", testCase) + assert.Equal(t, updatedNode.Action, si.NodeInfo_UPDATE, "%s: wrong action", testCase) + assert.Equal(t, updatedNode.SchedulableResource.Resources[siCommon.Memory].Value, int64(10000*1000*1000), "%s: wrong schedulable memory", testCase) + assert.Equal(t, updatedNode.SchedulableResource.Resources[siCommon.CPU].Value, int64(10000), "%s: wrong schedulable cpu", testCase) + if expectAdd { + assert.Equal(t, updatedNode.OccupiedResource.Resources[siCommon.Memory].Value, int64(1000*1000*1000), "%s: wrong occupied memory (add)", testCase) + assert.Equal(t, updatedNode.OccupiedResource.Resources[siCommon.CPU].Value, int64(500), "%s: wrong occupied cpu (add)", testCase) + } + if expectRemove { + assert.Equal(t, updatedNode.OccupiedResource.Resources[siCommon.Memory].Value, int64(0), "%s: wrong occupied memory (remove)", testCase) + assert.Equal(t, updatedNode.OccupiedResource.Resources[siCommon.CPU].Value, int64(0), "%s: wrong occupied cpu (remove)", testCase) + } + return nil + } + + // pod is not assigned to any node + pod1 := foreignPod("pod1", "1G", "500m") + pod1.Status.Phase = v1.PodPending + pod1.Spec.NodeName = "" + + // validate add + testCase = "add-pod1" + executed = false + expectAdd = false + expectRemove = false + context.addPod(pod1) + assert.Assert(t, !executed, "unexpected update") + _, ok := context.schedulerCache.GetPod(string(pod1.UID)) + assert.Assert(t, !ok, "unassigned pod found in cache") + + // validate update + testCase = "update-pod1" + executed = false + expectAdd = false + expectRemove = false + context.updatePod(nil, pod1) + assert.Assert(t, !executed, "unexpected update") + _, ok = context.schedulerCache.GetPod(string(pod1.UID)) + assert.Assert(t, !ok, "unassigned pod found in cache") + + // pod is assigned to a node but still in pending state, should update + pod2 := foreignPod("pod2", "1G", "500m") + pod2.Status.Phase = v1.PodPending + pod2.Spec.NodeName = Host1 + + // validate add + testCase = "add-pod2" + executed = false + expectAdd = true + expectRemove = false + context.addPod(pod2) + assert.Assert(t, executed, "update not executed") + _, ok = context.schedulerCache.GetPod(string(pod2.UID)) + assert.Assert(t, ok, "pod not found in cache") + + // validate update + testCase = "update-pod2" + executed = false + expectAdd = false + expectRemove = false + context.updatePod(nil, pod2) + assert.Assert(t, !executed, "unexpected update") + _, ok = context.schedulerCache.GetPod(string(pod2.UID)) + assert.Assert(t, ok, "pod not found in cache") + + // validate update when not already in cache + testCase = "update-pod2-not-in-cache-pre" + executed = false + expectAdd = false + expectRemove = true + context.deletePod(pod2) + testCase = "update-pod2-not-in-cache" + executed = false + expectAdd = true + expectRemove = false + context.updatePod(nil, pod2) + assert.Assert(t, executed, "update not executed") + _, ok = context.schedulerCache.GetPod(string(pod2.UID)) + assert.Assert(t, ok, "pod not found in cache") + + // pod is failed, should trigger update if already in cache + pod3 := pod2.DeepCopy() + pod3.Status.Phase = v1.PodFailed + + // validate add + testCase = "add-pod3" + executed = false + expectAdd = false + expectRemove = true + context.addPod(pod3) + assert.Assert(t, executed, "update not executed") + _, ok = context.schedulerCache.GetPod(string(pod3.UID)) + assert.Assert(t, !ok, "failed pod found in cache") + + // validate update when not already in cache + testCase = "update-pod3-pre" + executed = false + expectAdd = true + expectRemove = false + context.addPod(pod2) + testCase = "update-pod3" + executed = false + expectAdd = false + expectRemove = true + context.updatePod(nil, pod3) + assert.Assert(t, executed, "update not executed") + _, ok = context.schedulerCache.GetPod(string(pod3.UID)) + assert.Assert(t, !ok, "failed pod found in cache") +} + +func TestDeletePodForeign(t *testing.T) { + mockedSchedulerApi := newMockSchedulerAPI() + context := initContextForTest() + context.nodes = newSchedulerNodes(mockedSchedulerApi, NewTestSchedulerCache()) + host1 := nodeForTest(Host1, "10G", "10") + context.nodes.addNode(host1) + + executed := false + expectAdd := false + expectRemove := false + testCase := "" + + mockedSchedulerApi.UpdateNodeFn = func(request *si.NodeRequest) error { + executed = true + assert.Equal(t, len(request.Nodes), 1, "%s: wrong node count", testCase) + updatedNode := request.Nodes[0] + assert.Equal(t, updatedNode.NodeID, Host1, "%s: wrong nodeID", testCase) + assert.Equal(t, updatedNode.Action, si.NodeInfo_UPDATE, "%s: wrong action", testCase) + assert.Equal(t, updatedNode.SchedulableResource.Resources[siCommon.Memory].Value, int64(10000*1000*1000), "%s: wrong schedulable memory", testCase) + assert.Equal(t, updatedNode.SchedulableResource.Resources[siCommon.CPU].Value, int64(10000), "%s: wrong schedulable cpu", testCase) + if expectAdd { + assert.Equal(t, updatedNode.OccupiedResource.Resources[siCommon.Memory].Value, int64(1000*1000*1000), "%s: wrong occupied memory (add)", testCase) + assert.Equal(t, updatedNode.OccupiedResource.Resources[siCommon.CPU].Value, int64(500), "%s: wrong occupied cpu (add)", testCase) + } + if expectRemove { + assert.Equal(t, updatedNode.OccupiedResource.Resources[siCommon.Memory].Value, int64(0), "%s: wrong occupied memory (remove)", testCase) + assert.Equal(t, updatedNode.OccupiedResource.Resources[siCommon.CPU].Value, int64(0), "%s: wrong occupied cpu (remove)", testCase) + } + return nil + } + + // add existing pod + pod1 := foreignPod("pod1", "1G", "500m") + pod1.Status.Phase = v1.PodRunning + pod1.Spec.NodeName = Host1 + + // validate deletion of existing assigned pod + testCase = "delete-pod1-pre" + executed = false + expectAdd = true + expectRemove = false + context.addPod(pod1) + testCase = "delete-pod1" + executed = false + expectAdd = false + expectRemove = true + context.deletePod(pod1) + assert.Assert(t, executed, "update not executed") + _, ok := context.schedulerCache.GetPod(string(pod1.UID)) + assert.Assert(t, !ok, "deleted pod found in cache") + + // validate delete when not already found + testCase = "delete-pod1-again" + executed = false + expectAdd = false + expectRemove = false + context.deletePod(pod1) + assert.Assert(t, !executed, "unexpected update") + _, ok = context.schedulerCache.GetPod(string(pod1.UID)) + assert.Assert(t, !ok, "deleted pod found in cache") +} + func TestAddTask(t *testing.T) { context := initContextForTest() @@ -1174,9 +1346,9 @@ func TestAddApplicationsWithTags(t *testing.T) { ObjectMeta: apis.ObjectMeta{ Name: "test2", Annotations: map[string]string{ - constants.NamespaceQuota: "{\"cpu\": \"1\", \"memory\": \"256M\", \"nvidia.com/gpu\": \"1\"}", + constants.NamespaceQuota: "{\"cpu\": \"1\", \"memory\": \"256M\", \"nvidia.com/gpu\": \"1\"}", constants.DomainYuniKorn + "parentqueue": "root.test", - constants.NamespaceGuaranteed: "{\"cpu\": \"1\", \"memory\": \"256M\", \"nvidia.com/gpu\": \"1\"}", + constants.NamespaceGuaranteed: "{\"cpu\": \"1\", \"memory\": \"256M\", \"nvidia.com/gpu\": \"1\"}", }, }, } @@ -1412,10 +1584,13 @@ func TestGetStateDump(t *testing.T) { Namespace: "default", Name: "yunikorn-test-00001", UID: "UID-00001", + Annotations: map[string]string{ + constants.AnnotationApplicationID: "yunikorn-test-00001", + }, }, Spec: v1.PodSpec{SchedulerName: "yunikorn"}, } - context.addPodToCache(pod1) + context.addPod(pod1) stateDumpStr, err := context.GetStateDump() assert.NilError(t, err, "error during state dump") @@ -1597,3 +1772,51 @@ func waitForNodeAcceptedEvent(recorder *k8sEvents.FakeRecorder) error { }, 10*time.Millisecond, time.Second) return err } + +func nodeForTest(nodeID, memory, cpu string) *v1.Node { + resourceList := make(map[v1.ResourceName]resource.Quantity) + resourceList[v1.ResourceName("memory")] = resource.MustParse(memory) + resourceList[v1.ResourceName("cpu")] = resource.MustParse(cpu) + return &v1.Node{ + TypeMeta: apis.TypeMeta{ + Kind: "Node", + APIVersion: "v1", + }, + ObjectMeta: apis.ObjectMeta{ + Name: nodeID, + Namespace: "default", + UID: "uid_0001", + }, + Spec: v1.NodeSpec{}, + Status: v1.NodeStatus{ + Allocatable: resourceList, + }, + } +} + +func foreignPod(podName, memory, cpu string) *v1.Pod { + containers := make([]v1.Container, 0) + c1Resources := make(map[v1.ResourceName]resource.Quantity) + c1Resources[v1.ResourceMemory] = resource.MustParse(memory) + c1Resources[v1.ResourceCPU] = resource.MustParse(cpu) + containers = append(containers, v1.Container{ + Name: "container-01", + Resources: v1.ResourceRequirements{ + Requests: c1Resources, + }, + }) + + return &v1.Pod{ + TypeMeta: apis.TypeMeta{ + Kind: "Pod", + APIVersion: "v1", + }, + ObjectMeta: apis.ObjectMeta{ + Name: podName, + UID: types.UID(podName), + }, + Spec: v1.PodSpec{ + Containers: containers, + }, + } +} diff --git a/pkg/cache/node_coordinator.go b/pkg/cache/node_coordinator.go deleted file mode 100644 index 7dbd20fc3..000000000 --- a/pkg/cache/node_coordinator.go +++ /dev/null @@ -1,140 +0,0 @@ -/* - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. -*/ - -package cache - -import ( - "go.uber.org/zap" - v1 "k8s.io/api/core/v1" - k8sCache "k8s.io/client-go/tools/cache" - - "github.com/apache/yunikorn-k8shim/pkg/common" - "github.com/apache/yunikorn-k8shim/pkg/common/utils" - "github.com/apache/yunikorn-k8shim/pkg/log" -) - -// nodeResourceCoordinator looks at the resources that are not allocated by yunikorn, -// and refresh scheduler cache to keep nodes' capacity in-sync. -// this coordinator only looks after the pods that are not scheduled by yunikorn, -// and it registers update/delete handler to the pod informer. It ensures that the -// following operations are done -// 1. when a pod is becoming Running, add occupied node resource -// 2. when a pod is terminated, sub the occupied node resource -// 3. when a pod is deleted, sub the occupied node resource -// -// each of these updates will trigger a node UPDATE action to update the occupied -// resource in the scheduler-core. -type nodeResourceCoordinator struct { - nodes *schedulerNodes -} - -func newNodeResourceCoordinator(nodes *schedulerNodes) *nodeResourceCoordinator { - return &nodeResourceCoordinator{nodes} -} - -// filter pods that not scheduled by us -func (c *nodeResourceCoordinator) filterPods(obj interface{}) bool { - switch obj := obj.(type) { - case *v1.Pod: - return utils.GetApplicationIDFromPod(obj) == "" - default: - return false - } -} - -func (c *nodeResourceCoordinator) updatePod(old, new interface{}) { - oldPod, err := utils.Convert2Pod(old) - if err != nil { - log.Log(log.ShimCacheNode).Error("expecting a pod object", zap.Error(err)) - return - } - - newPod, err := utils.Convert2Pod(new) - if err != nil { - log.Log(log.ShimCacheNode).Error("expecting a pod object", zap.Error(err)) - return - } - - // this handles the allocate and release of a pod that not scheduled by yunikorn - // the check is triggered when a pod status changes - // conditions for allocate: - // 1. pod got assigned to a node - // 2. pod is not in terminated state - if !utils.IsAssignedPod(oldPod) && utils.IsAssignedPod(newPod) && !utils.IsPodTerminated(newPod) { - log.Log(log.ShimCacheNode).Debug("pod is assigned to a node, trigger occupied resource update", - zap.String("namespace", newPod.Namespace), - zap.String("podName", newPod.Name), - zap.String("podStatusBefore", string(oldPod.Status.Phase)), - zap.String("podStatusCurrent", string(newPod.Status.Phase))) - // if pod is running but not scheduled by us, - // we need to notify scheduler-core to re-sync the node resource - podResource := common.GetPodResource(newPod) - c.nodes.updateNodeOccupiedResources(newPod.Spec.NodeName, podResource, AddOccupiedResource) - c.nodes.cache.AddPod(newPod) - return - } - - // conditions for release: - // 1. pod is already assigned to a node - // 2. pod status changes from non-terminated to terminated state - if utils.IsAssignedPod(newPod) && oldPod.Status.Phase != newPod.Status.Phase && utils.IsPodTerminated(newPod) { - log.Log(log.ShimCacheNode).Debug("pod terminated, trigger occupied resource update", - zap.String("namespace", newPod.Namespace), - zap.String("podName", newPod.Name), - zap.String("podStatusBefore", string(oldPod.Status.Phase)), - zap.String("podStatusCurrent", string(newPod.Status.Phase))) - // this means pod is terminated - // we need sub the occupied resource and re-sync with the scheduler-core - podResource := common.GetPodResource(newPod) - c.nodes.updateNodeOccupiedResources(newPod.Spec.NodeName, podResource, SubOccupiedResource) - c.nodes.cache.RemovePod(newPod) - return - } -} - -func (c *nodeResourceCoordinator) deletePod(obj interface{}) { - var pod *v1.Pod - switch t := obj.(type) { - case *v1.Pod: - pod = t - case k8sCache.DeletedFinalStateUnknown: - var err error - pod, err = utils.Convert2Pod(t.Obj) - if err != nil { - log.Log(log.ShimCacheNode).Error(err.Error()) - return - } - default: - log.Log(log.ShimCacheNode).Error("cannot convert to pod") - return - } - - // if pod is already terminated, that means the updates have already done - if utils.IsPodTerminated(pod) { - log.Log(log.ShimCacheNode).Debug("pod is already terminated, occupied resource updated should have already been done") - return - } - - log.Log(log.ShimCacheNode).Info("deleting pod that scheduled by other schedulers", - zap.String("namespace", pod.Namespace), - zap.String("podName", pod.Name)) - - podResource := common.GetPodResource(pod) - c.nodes.updateNodeOccupiedResources(pod.Spec.NodeName, podResource, SubOccupiedResource) - c.nodes.cache.RemovePod(pod) -} diff --git a/pkg/cache/node_coordinator_test.go b/pkg/cache/node_coordinator_test.go deleted file mode 100644 index 68a03f7a2..000000000 --- a/pkg/cache/node_coordinator_test.go +++ /dev/null @@ -1,392 +0,0 @@ -/* - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. -*/ - -package cache - -import ( - "testing" - - "gotest.tools/v3/assert" - v1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/resource" - apis "k8s.io/apimachinery/pkg/apis/meta/v1" - - siCommon "github.com/apache/yunikorn-scheduler-interface/lib/go/common" - "github.com/apache/yunikorn-scheduler-interface/lib/go/si" -) - -const ( - Host1 = "HOST1" - Host2 = "HOST2" - HostEmpty = "" -) - -func PodForTest(podName, memory, cpu string) *v1.Pod { - containers := make([]v1.Container, 0) - c1Resources := make(map[v1.ResourceName]resource.Quantity) - c1Resources[v1.ResourceMemory] = resource.MustParse(memory) - c1Resources[v1.ResourceCPU] = resource.MustParse(cpu) - containers = append(containers, v1.Container{ - Name: "container-01", - Resources: v1.ResourceRequirements{ - Requests: c1Resources, - }, - }) - - return &v1.Pod{ - TypeMeta: apis.TypeMeta{ - Kind: "Pod", - APIVersion: "v1", - }, - ObjectMeta: apis.ObjectMeta{ - Name: podName, - }, - Spec: v1.PodSpec{ - Containers: containers, - }, - } -} - -func NodeForTest(nodeID, memory, cpu string) *v1.Node { - resourceList := make(map[v1.ResourceName]resource.Quantity) - resourceList[v1.ResourceName("memory")] = resource.MustParse(memory) - resourceList[v1.ResourceName("cpu")] = resource.MustParse(cpu) - return &v1.Node{ - TypeMeta: apis.TypeMeta{ - Kind: "Node", - APIVersion: "v1", - }, - ObjectMeta: apis.ObjectMeta{ - Name: nodeID, - Namespace: "default", - UID: "uid_0001", - }, - Spec: v1.NodeSpec{}, - Status: v1.NodeStatus{ - Allocatable: resourceList, - }, - } -} - -func TestUpdatePod(t *testing.T) { - mockedSchedulerApi := newMockSchedulerAPI() - nodes := newSchedulerNodes(mockedSchedulerApi, NewTestSchedulerCache()) - host1 := NodeForTest(Host1, "10G", "10") - host2 := NodeForTest(Host2, "10G", "10") - nodes.addNode(host1) - nodes.addNode(host2) - coordinator := newNodeResourceCoordinator(nodes) - - // pod is not assigned to any node - // this won't trigger an update - pod1 := PodForTest("pod1", "1G", "500m") - pod2 := PodForTest("pod1", "1G", "500m") - pod1.Status.Phase = v1.PodPending - pod1.Status.Phase = v1.PodPending - pod1.Spec.NodeName = "" - pod2.Spec.NodeName = "" - mockedSchedulerApi.UpdateNodeFn = func(request *si.NodeRequest) error { - t.Fatalf("update should not run because state is not changed") - return nil - } - coordinator.updatePod(pod1, pod2) - - // pod is already assigned to a node and state is running - // this won't trigger an update - pod1.Status.Phase = v1.PodRunning - pod1.Status.Phase = v1.PodRunning - pod1.Spec.NodeName = Host1 - pod2.Spec.NodeName = Host1 - mockedSchedulerApi.UpdateNodeFn = func(request *si.NodeRequest) error { - t.Fatalf("update should not run because state is not changed") - return nil - } - coordinator.updatePod(pod1, pod2) - - // pod state remains in Pending, pod is assigned to a node - // this happens when the pod just gets allocated and started, but not in running state yet - // trigger an update - pod1.Status.Phase = v1.PodPending - pod2.Status.Phase = v1.PodPending - pod1.Spec.NodeName = HostEmpty - pod2.Spec.NodeName = Host1 - executed := false - mockedSchedulerApi.UpdateNodeFn = func(request *si.NodeRequest) error { - executed = true - assert.Equal(t, len(request.Nodes), 1) - updatedNode := request.Nodes[0] - assert.Equal(t, updatedNode.NodeID, Host1) - assert.Equal(t, updatedNode.Action, si.NodeInfo_UPDATE) - assert.Equal(t, updatedNode.SchedulableResource.Resources[siCommon.Memory].Value, int64(10000*1000*1000)) - assert.Equal(t, updatedNode.SchedulableResource.Resources[siCommon.CPU].Value, int64(10000)) - assert.Equal(t, updatedNode.OccupiedResource.Resources[siCommon.Memory].Value, int64(1000*1000*1000)) - assert.Equal(t, updatedNode.OccupiedResource.Resources[siCommon.CPU].Value, int64(500)) - return nil - } - coordinator.updatePod(pod1, pod2) - assert.Assert(t, executed) - - // pod state changed from running to failed, pod terminated - // trigger another update - pod1.Status.Phase = v1.PodRunning - pod2.Status.Phase = v1.PodFailed - pod1.Spec.NodeName = Host1 - pod2.Spec.NodeName = Host1 - executed = false - mockedSchedulerApi.UpdateNodeFn = func(request *si.NodeRequest) error { - executed = true - assert.Equal(t, len(request.Nodes), 1) - updatedNode := request.Nodes[0] - assert.Equal(t, updatedNode.NodeID, Host1) - assert.Equal(t, updatedNode.Action, si.NodeInfo_UPDATE) - assert.Equal(t, updatedNode.SchedulableResource.Resources[siCommon.Memory].Value, int64(10000*1000*1000)) - assert.Equal(t, updatedNode.SchedulableResource.Resources[siCommon.CPU].Value, int64(10000)) - assert.Equal(t, updatedNode.OccupiedResource.Resources[siCommon.Memory].Value, int64(0)) - assert.Equal(t, updatedNode.OccupiedResource.Resources[siCommon.CPU].Value, int64(0)) - return nil - } - coordinator.updatePod(pod1, pod2) - assert.Assert(t, executed) - - // pod state changed from pending to running - // this is not triggering a new update because the pod was already allocated to a node - pod1.Status.Phase = v1.PodPending - pod2.Status.Phase = v1.PodRunning - pod1.Spec.NodeName = Host2 - pod2.Spec.NodeName = Host2 - mockedSchedulerApi.UpdateNodeFn = func(request *si.NodeRequest) error { - t.Fatalf("update should not run because pod is already allocated") - return nil - } - coordinator.updatePod(pod1, pod2) - - // pod state Running, pod is assigned to a node - // trigger an update - pod1.Status.Phase = v1.PodRunning - pod2.Status.Phase = v1.PodRunning - pod1.Spec.NodeName = "" - pod2.Spec.NodeName = Host2 - executed = false - mockedSchedulerApi.UpdateNodeFn = func(request *si.NodeRequest) error { - executed = true - assert.Equal(t, len(request.Nodes), 1) - updatedNode := request.Nodes[0] - assert.Equal(t, updatedNode.NodeID, Host2) - assert.Equal(t, updatedNode.Action, si.NodeInfo_UPDATE) - assert.Equal(t, updatedNode.SchedulableResource.Resources[siCommon.Memory].Value, int64(10000*1000*1000)) - assert.Equal(t, updatedNode.SchedulableResource.Resources[siCommon.CPU].Value, int64(10000)) - assert.Equal(t, updatedNode.OccupiedResource.Resources[siCommon.Memory].Value, int64(1000*1000*1000)) - assert.Equal(t, updatedNode.OccupiedResource.Resources[siCommon.CPU].Value, int64(500)) - return nil - } - coordinator.updatePod(pod1, pod2) - assert.Assert(t, executed) - - // pod state Running to Succeed, pod terminated - // this should trigger an update - pod1.Status.Phase = v1.PodRunning - pod2.Status.Phase = v1.PodSucceeded - pod1.Spec.NodeName = Host2 - pod2.Spec.NodeName = Host2 - executed = false - mockedSchedulerApi.UpdateNodeFn = func(request *si.NodeRequest) error { - executed = true - assert.Equal(t, len(request.Nodes), 1) - updatedNode := request.Nodes[0] - assert.Equal(t, updatedNode.NodeID, Host2) - assert.Equal(t, updatedNode.Action, si.NodeInfo_UPDATE) - assert.Equal(t, updatedNode.SchedulableResource.Resources[siCommon.Memory].Value, int64(10000*1000*1000)) - assert.Equal(t, updatedNode.SchedulableResource.Resources[siCommon.CPU].Value, int64(10000)) - assert.Equal(t, updatedNode.OccupiedResource.Resources[siCommon.Memory].Value, int64(0)) - assert.Equal(t, updatedNode.OccupiedResource.Resources[siCommon.CPU].Value, int64(0)) - return nil - } - coordinator.updatePod(pod1, pod2) - assert.Assert(t, executed) - - // pod gets assigned to a node, but pod status is already failed - // this should not trigger an update - pod1.Status.Phase = v1.PodFailed - pod2.Status.Phase = v1.PodFailed - pod1.Spec.NodeName = HostEmpty - pod2.Spec.NodeName = Host2 - mockedSchedulerApi.UpdateNodeFn = func(request *si.NodeRequest) error { - t.Fatalf("update should not run because pod is already allocated") - return nil - } - coordinator.updatePod(pod1, pod2) -} - -func TestDeletePod(t *testing.T) { - mockedSchedulerApi := newMockSchedulerAPI() - nodes := newSchedulerNodes(mockedSchedulerApi, NewTestSchedulerCache()) - host1 := NodeForTest(Host1, "10G", "10") - nodes.addNode(host1) - coordinator := newNodeResourceCoordinator(nodes) - - // pod from pending to running - // occupied resources should be added to the node - pod1 := PodForTest("pod1", "1G", "500m") - pod2 := PodForTest("pod1", "1G", "500m") - pod1.Status.Phase = v1.PodPending - pod2.Status.Phase = v1.PodRunning - pod1.Spec.NodeName = HostEmpty - pod2.Spec.NodeName = Host1 - executed := false - mockedSchedulerApi.UpdateNodeFn = func(request *si.NodeRequest) error { - executed = true - assert.Equal(t, len(request.Nodes), 1) - updatedNode := request.Nodes[0] - assert.Equal(t, updatedNode.NodeID, Host1) - assert.Equal(t, updatedNode.Action, si.NodeInfo_UPDATE) - assert.Equal(t, updatedNode.SchedulableResource.Resources[siCommon.Memory].Value, int64(10000*1000*1000)) - assert.Equal(t, updatedNode.SchedulableResource.Resources[siCommon.CPU].Value, int64(10000)) - assert.Equal(t, updatedNode.OccupiedResource.Resources[siCommon.Memory].Value, int64(1000*1000*1000)) - assert.Equal(t, updatedNode.OccupiedResource.Resources[siCommon.CPU].Value, int64(500)) - return nil - } - coordinator.updatePod(pod1, pod2) - assert.Assert(t, executed) - - // delete pod from the running state - executed = false - mockedSchedulerApi.UpdateNodeFn = func(request *si.NodeRequest) error { - executed = true - assert.Equal(t, len(request.Nodes), 1) - updatedNode := request.Nodes[0] - assert.Equal(t, updatedNode.NodeID, Host1) - assert.Equal(t, updatedNode.Action, si.NodeInfo_UPDATE) - assert.Equal(t, updatedNode.SchedulableResource.Resources[siCommon.Memory].Value, int64(10000)) - assert.Equal(t, updatedNode.SchedulableResource.Resources[siCommon.CPU].Value, int64(10000)) - assert.Equal(t, updatedNode.OccupiedResource.Resources[siCommon.Memory].Value, int64(0)) - assert.Equal(t, updatedNode.OccupiedResource.Resources[siCommon.CPU].Value, int64(0)) - return nil - } - coordinator.deletePod(pod1) -} - -func TestDeleteTerminatedPod(t *testing.T) { - mockedSchedulerApi := newMockSchedulerAPI() - nodes := newSchedulerNodes(mockedSchedulerApi, NewTestSchedulerCache()) - host1 := NodeForTest(Host1, "10G", "10") - nodes.addNode(host1) - coordinator := newNodeResourceCoordinator(nodes) - - // pod from pending to running - // occupied resources should be added to the node - pod1 := PodForTest("pod1", "1G", "500m") - pod2 := PodForTest("pod1", "1G", "500m") - pod1.Status.Phase = v1.PodPending - pod2.Status.Phase = v1.PodRunning - pod1.Spec.NodeName = HostEmpty - pod2.Spec.NodeName = Host1 - executed := false - mockedSchedulerApi.UpdateNodeFn = func(request *si.NodeRequest) error { - executed = true - assert.Equal(t, len(request.Nodes), 1) - updatedNode := request.Nodes[0] - assert.Equal(t, updatedNode.NodeID, Host1) - assert.Equal(t, updatedNode.Action, si.NodeInfo_UPDATE) - assert.Equal(t, updatedNode.SchedulableResource.Resources[siCommon.Memory].Value, int64(10000*1000*1000)) - assert.Equal(t, updatedNode.SchedulableResource.Resources[siCommon.CPU].Value, int64(10000)) - assert.Equal(t, updatedNode.OccupiedResource.Resources[siCommon.Memory].Value, int64(1000*1000*1000)) - assert.Equal(t, updatedNode.OccupiedResource.Resources[siCommon.CPU].Value, int64(500)) - return nil - } - coordinator.updatePod(pod1, pod2) - assert.Assert(t, executed) - - // pod from running to succeed - // occupied resources should be added to the node - pod1.Status.Phase = v1.PodRunning - pod2.Status.Phase = v1.PodSucceeded - pod1.Spec.NodeName = Host1 - pod2.Spec.NodeName = Host1 - executed = false - mockedSchedulerApi.UpdateNodeFn = func(request *si.NodeRequest) error { - executed = true - assert.Equal(t, len(request.Nodes), 1) - updatedNode := request.Nodes[0] - assert.Equal(t, updatedNode.NodeID, Host1) - assert.Equal(t, updatedNode.Action, si.NodeInfo_UPDATE) - assert.Equal(t, updatedNode.SchedulableResource.Resources[siCommon.Memory].Value, int64(10000*1000*1000)) - assert.Equal(t, updatedNode.SchedulableResource.Resources[siCommon.CPU].Value, int64(10000)) - assert.Equal(t, updatedNode.OccupiedResource.Resources[siCommon.Memory].Value, int64(0)) - assert.Equal(t, updatedNode.OccupiedResource.Resources[siCommon.CPU].Value, int64(0)) - return nil - } - coordinator.updatePod(pod1, pod2) - assert.Assert(t, executed) - - // delete pod from the succeed state - executed = false - mockedSchedulerApi.UpdateNodeFn = func(request *si.NodeRequest) error { - executed = true - t.Fatalf("update should not be triggered as it should have already done") - return nil - } - coordinator.deletePod(pod2) - assert.Equal(t, executed, false) -} - -func TestNodeCoordinatorFilterPods(t *testing.T) { - mockedSchedulerAPI := newMockSchedulerAPI() - nodes := newSchedulerNodes(mockedSchedulerAPI, NewTestSchedulerCache()) - host1 := NodeForTest(Host1, "10G", "10") - nodes.addNode(host1) - coordinator := newNodeResourceCoordinator(nodes) - - pod1 := &v1.Pod{ - TypeMeta: apis.TypeMeta{ - Kind: "Pod", - APIVersion: "v1", - }, - ObjectMeta: apis.ObjectMeta{ - Name: "yunikorn-test-00001", - UID: "UID-00001", - }, - Spec: v1.PodSpec{SchedulerName: "yunikorn"}, - } - pod2 := &v1.Pod{ - TypeMeta: apis.TypeMeta{ - Kind: "Pod", - APIVersion: "v1", - }, - ObjectMeta: apis.ObjectMeta{ - Name: "yunikorn-test-00002", - UID: "UID-00002", - }, - Spec: v1.PodSpec{SchedulerName: "default-scheduler"}, - } - pod3 := &v1.Pod{ - TypeMeta: apis.TypeMeta{ - Kind: "Pod", - APIVersion: "v1", - }, - ObjectMeta: apis.ObjectMeta{ - Name: "yunikorn-test-00003", - UID: "UID-00003", - Labels: map[string]string{"applicationId": "test-00003"}, - }, - Spec: v1.PodSpec{SchedulerName: "yunikorn"}, - } - assert.Check(t, !coordinator.filterPods(nil), "nil object was allowed") - assert.Check(t, coordinator.filterPods(pod1), "yunikorn-managed pod with no app id was filtered") - assert.Check(t, coordinator.filterPods(pod2), "non-yunikorn-managed pod was filtered") - assert.Check(t, !coordinator.filterPods(pod3), "yunikorn-managed pod was allowed") -}