diff --git a/go.mod b/go.mod index fc0ffd5d3..b5dbbe8c0 100644 --- a/go.mod +++ b/go.mod @@ -23,7 +23,7 @@ go 1.22.0 toolchain go1.22.5 require ( - github.com/apache/yunikorn-core v0.0.0-20241002095736-a2d3d43a145d + github.com/apache/yunikorn-core v0.0.0-20241003152125-4ea225160acf github.com/apache/yunikorn-scheduler-interface v0.0.0-20240924203603-aaf51c93d3a0 github.com/google/go-cmp v0.6.0 github.com/google/uuid v1.6.0 diff --git a/go.sum b/go.sum index c9f1c1bd2..566b07b26 100644 --- a/go.sum +++ b/go.sum @@ -8,8 +8,8 @@ github.com/NYTimes/gziphandler v1.1.1 h1:ZUDjpQae29j0ryrS0u/B8HZfJBtBQHjqw2rQ2cq github.com/NYTimes/gziphandler v1.1.1/go.mod h1:n/CVRwUEOgIxrgPvAQhUUr9oeUtvrhMomdKFjzJNB0c= github.com/antlr4-go/antlr/v4 v4.13.0 h1:lxCg3LAv+EUK6t1i0y1V6/SLeUi0eKEKdhQAlS8TVTI= github.com/antlr4-go/antlr/v4 v4.13.0/go.mod h1:pfChB/xh/Unjila75QW7+VU4TSnWnnk9UTnmpPaOR2g= -github.com/apache/yunikorn-core v0.0.0-20241002095736-a2d3d43a145d h1:awo2goBrw25P1aFNZgYJ0q7V+5ycMqMhvI60B75OzQg= -github.com/apache/yunikorn-core v0.0.0-20241002095736-a2d3d43a145d/go.mod h1:q6OXYpCTGvMJxsEorpIF6icKM/IioMmU6KcsclV1kI0= +github.com/apache/yunikorn-core v0.0.0-20241003152125-4ea225160acf h1:wKySiY4IA9Us287QRnIxFnuTHXaMSeQ3BhAwSrSW/sQ= +github.com/apache/yunikorn-core v0.0.0-20241003152125-4ea225160acf/go.mod h1:q6OXYpCTGvMJxsEorpIF6icKM/IioMmU6KcsclV1kI0= github.com/apache/yunikorn-scheduler-interface v0.0.0-20240924203603-aaf51c93d3a0 h1:/9j0YXuifvoOl4YVEbO0r+DPkkYLzaQ+/ac+xCc7SY8= github.com/apache/yunikorn-scheduler-interface v0.0.0-20240924203603-aaf51c93d3a0/go.mod h1:co3uU98sj1CUTPNTM13lTyi+CY0DOgDndDW2KiUjktU= github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio= diff --git a/pkg/cache/context.go b/pkg/cache/context.go index b4dca41e7..613bbc520 100644 --- a/pkg/cache/context.go +++ b/pkg/cache/context.go @@ -365,12 +365,11 @@ func (ctx *Context) updateForeignPod(pod *v1.Pod) { podStatusBefore = string(oldPod.Status.Phase) } - // conditions for allocate: - // 1. pod was previously assigned - // 2. pod is now assigned - // 3. pod is not in terminated state - // 4. pod references a known node - if oldPod == nil && utils.IsAssignedPod(pod) && !utils.IsPodTerminated(pod) { + // conditions for allocate/update: + // 1. pod is now assigned + // 2. pod is not in terminated state + // 3. pod references a known node + if utils.IsAssignedPod(pod) && !utils.IsPodTerminated(pod) { if ctx.schedulerCache.UpdatePod(pod) { // pod was accepted by a real node log.Log(log.ShimContext).Debug("pod is assigned to a node, trigger occupied resource update", @@ -378,10 +377,14 @@ func (ctx *Context) updateForeignPod(pod *v1.Pod) { zap.String("podName", pod.Name), zap.String("podStatusBefore", podStatusBefore), zap.String("podStatusCurrent", string(pod.Status.Phase))) - ctx.updateNodeOccupiedResources(pod.Spec.NodeName, pod.Namespace, pod.Name, common.GetPodResource(pod), schedulercache.AddOccupiedResource) + allocReq := common.CreateAllocationForForeignPod(pod) + if err := ctx.apiProvider.GetAPIs().SchedulerAPI.UpdateAllocation(allocReq); err != nil { + log.Log(log.ShimContext).Error("failed to add foreign allocation to the core", + zap.Error(err)) + } } else { // pod is orphaned (references an unknown node) - log.Log(log.ShimContext).Info("skipping occupied resource update for assigned orphaned pod", + log.Log(log.ShimContext).Info("skipping updating allocation for assigned orphaned pod", zap.String("namespace", pod.Namespace), zap.String("podName", pod.Name), zap.String("nodeName", pod.Spec.NodeName)) @@ -401,9 +404,13 @@ func (ctx *Context) updateForeignPod(pod *v1.Pod) { zap.String("podStatusBefore", podStatusBefore), zap.String("podStatusCurrent", string(pod.Status.Phase))) // this means pod is terminated - // we need sub the occupied resource and re-sync with the scheduler-core - ctx.updateNodeOccupiedResources(pod.Spec.NodeName, pod.Namespace, pod.Name, common.GetPodResource(pod), schedulercache.SubOccupiedResource) + // remove from the scheduler cache and create release request to remove foreign allocation from the core ctx.schedulerCache.RemovePod(pod) + releaseReq := common.CreateReleaseRequestForForeignPod(string(pod.UID), constants.DefaultPartition) + if err := ctx.apiProvider.GetAPIs().SchedulerAPI.UpdateAllocation(releaseReq); err != nil { + log.Log(log.ShimContext).Error("failed to remove foreign allocation from the core", + zap.Error(err)) + } } else { // pod is orphaned (references an unknown node) log.Log(log.ShimContext).Info("skipping occupied resource update for terminated orphaned pod", @@ -453,40 +460,17 @@ func (ctx *Context) deleteYuniKornPod(pod *v1.Pod) { func (ctx *Context) deleteForeignPod(pod *v1.Pod) { ctx.lock.Lock() defer ctx.lock.Unlock() - oldPod := ctx.schedulerCache.GetPod(string(pod.UID)) - if oldPod == nil { - // if pod is not in scheduler cache, no node updates are needed - log.Log(log.ShimContext).Debug("unknown foreign pod deleted, no resource updated needed", - zap.String("namespace", pod.Namespace), - zap.String("podName", pod.Name)) - return + releaseReq := common.CreateReleaseRequestForForeignPod(string(pod.UID), constants.DefaultPartition) + if err := ctx.apiProvider.GetAPIs().SchedulerAPI.UpdateAllocation(releaseReq); err != nil { + log.Log(log.ShimContext).Error("failed to remove foreign allocation from the core", + zap.Error(err)) } - // conditions for release: - // 1. pod is already assigned to a node - // 2. pod was not in a terminal state before - // 3. pod references a known node - if !utils.IsPodTerminated(oldPod) { - if !ctx.schedulerCache.IsPodOrphaned(string(oldPod.UID)) { - log.Log(log.ShimContext).Debug("foreign pod deleted, triggering occupied resource update", - zap.String("namespace", pod.Namespace), - zap.String("podName", pod.Name), - zap.String("podStatusBefore", string(oldPod.Status.Phase)), - zap.String("podStatusCurrent", string(pod.Status.Phase))) - // this means pod is terminated - // we need sub the occupied resource and re-sync with the scheduler-core - ctx.updateNodeOccupiedResources(pod.Spec.NodeName, pod.Namespace, pod.Name, common.GetPodResource(pod), schedulercache.SubOccupiedResource) - } else { - // pod is orphaned (references an unknown node) - log.Log(log.ShimContext).Info("skipping occupied resource update for removed orphaned pod", - zap.String("namespace", pod.Namespace), - zap.String("podName", pod.Name), - zap.String("nodeName", pod.Spec.NodeName)) - } - ctx.schedulerCache.RemovePod(pod) - } + log.Log(log.ShimContext).Debug("removing pod from cache", zap.String("podName", pod.Name)) + ctx.schedulerCache.RemovePod(pod) } +//nolint:unused func (ctx *Context) updateNodeOccupiedResources(nodeName string, namespace string, podName string, resource *si.Resource, opt schedulercache.UpdateType) { if common.IsZero(resource) { return @@ -1613,7 +1597,7 @@ func (ctx *Context) decommissionNode(node *v1.Node) error { } func (ctx *Context) updateNodeResources(node *v1.Node, capacity *si.Resource, occupied *si.Resource) error { - request := common.CreateUpdateRequestForUpdatedNode(node.Name, capacity, occupied) + request := common.CreateUpdateRequestForUpdatedNode(node.Name, capacity, nil) return ctx.apiProvider.GetAPIs().SchedulerAPI.UpdateNode(request) } diff --git a/pkg/cache/context_test.go b/pkg/cache/context_test.go index b50988720..a8214bcc7 100644 --- a/pkg/cache/context_test.go +++ b/pkg/cache/context_test.go @@ -22,6 +22,7 @@ import ( "encoding/json" "fmt" "strings" + "sync/atomic" "testing" "time" @@ -74,11 +75,13 @@ const ( taskUID4 = "task00004" taskUnknown = "non_existing_taskID" - podName1 = "pod1" - podName2 = "pod2" - podName3 = "pod3" - podName4 = "pod4" - podNamespace = "yk" + podName1 = "pod1" + podName2 = "pod2" + podName3 = "pod3" + podName4 = "pod4" + podForeignName = "foreign-1" + podForeignUID = "UUID-foreign-1" + podNamespace = "yk" nodeName1 = "node1" nodeName2 = "node2" @@ -527,39 +530,11 @@ func TestAddUpdatePodForeign(t *testing.T) { defer dispatcher.UnregisterAllEventHandlers() defer dispatcher.Stop() - executed := false - expectAdd := false - expectRemove := false - tc := "" - - validatorFunc := func(request *si.NodeRequest) error { - assert.Equal(t, len(request.Nodes), 1, "%s: wrong node count", tc) - updatedNode := request.Nodes[0] - assert.Equal(t, updatedNode.NodeID, Host1, "%s: wrong nodeID", tc) - switch updatedNode.Action { - case si.NodeInfo_CREATE_DRAIN: - return nil - case si.NodeInfo_DRAIN_TO_SCHEDULABLE: - return nil - case si.NodeInfo_UPDATE: - executed = true - default: - assert.Equal(t, false, "Unexpected action: %d", updatedNode.Action) - return nil - } - assert.Equal(t, updatedNode.SchedulableResource.Resources[siCommon.Memory].Value, int64(10000*1000*1000), "%s: wrong schedulable memory", tc) - assert.Equal(t, updatedNode.SchedulableResource.Resources[siCommon.CPU].Value, int64(10000), "%s: wrong schedulable cpu", tc) - if expectAdd { - assert.Equal(t, updatedNode.OccupiedResource.Resources[siCommon.Memory].Value, int64(1000*1000*1000), "%s: wrong occupied memory (add)", tc) - assert.Equal(t, updatedNode.OccupiedResource.Resources[siCommon.CPU].Value, int64(500), "%s: wrong occupied cpu (add)", tc) - } - if expectRemove { - assert.Equal(t, updatedNode.OccupiedResource.Resources[siCommon.Memory].Value, int64(0), "%s: wrong occupied memory (remove)", tc) - assert.Equal(t, updatedNode.OccupiedResource.Resources[siCommon.CPU].Value, int64(0), "%s: wrong occupied cpu (remove)", tc) - } + var allocRequest *si.AllocationRequest + apiProvider.MockSchedulerAPIUpdateAllocationFn(func(request *si.AllocationRequest) error { + allocRequest = request return nil - } - + }) apiProvider.MockSchedulerAPIUpdateNodeFn(func(request *si.NodeRequest) error { for _, node := range request.Nodes { if node.Action == si.NodeInfo_CREATE_DRAIN { @@ -569,10 +544,10 @@ func TestAddUpdatePodForeign(t *testing.T) { }) } } - return validatorFunc(request) + return nil }) - host1 := nodeForTest(Host1, "10G", "10") + host1 := nodeForTest(Host1, "10G", "10") // add existing foreign pod context.updateNode(nil, host1) // pod is not assigned to any node @@ -580,22 +555,18 @@ func TestAddUpdatePodForeign(t *testing.T) { pod1.Status.Phase = v1.PodPending pod1.Spec.NodeName = "" - // validate add - tc = "add-pod1" - executed = false - expectAdd = false - expectRemove = false + // validate add (pending, no node assigned) + allocRequest = nil context.AddPod(pod1) - assert.Assert(t, !executed, "unexpected update") + assert.Assert(t, allocRequest == nil, "unexpected update") pod := context.schedulerCache.GetPod(string(pod1.UID)) assert.Assert(t, pod == nil, "unassigned pod found in cache") - // validate update - tc = "update-pod1" - executed = false - expectRemove = false + // validate update (no change) + allocRequest = nil context.UpdatePod(nil, pod1) - assert.Assert(t, !executed, "unexpected update") + assert.Assert(t, allocRequest == nil, "unexpected update") + pod = context.schedulerCache.GetPod(string(pod1.UID)) assert.Assert(t, pod == nil, "unassigned pod found in cache") // pod is assigned to a node but still in pending state, should update @@ -604,155 +575,91 @@ func TestAddUpdatePodForeign(t *testing.T) { pod2.Spec.NodeName = Host1 // validate add - tc = "add-pod2" - executed = false - expectAdd = true - expectRemove = false context.AddPod(pod2) - assert.Assert(t, executed, "updated expected") + assert.Assert(t, allocRequest != nil, "update expected") + assertAddForeignPod(t, podName2, Host1, allocRequest) pod = context.schedulerCache.GetPod(string(pod2.UID)) assert.Assert(t, pod != nil, "pod not found in cache") - // validate update - tc = "update-pod2" - executed = false - expectAdd = false - expectRemove = false + // validate update (no change) + allocRequest = nil context.UpdatePod(nil, pod2) - assert.Assert(t, !executed, "unexpected update") + assert.Assert(t, allocRequest != nil, "update expected") pod = context.schedulerCache.GetPod(string(pod2.UID)) assert.Assert(t, pod != nil, "pod not found in cache") // validate update when not already in cache - tc = "update-pod2-nocache-pre" - executed = false - expectAdd = false - expectRemove = true + allocRequest = nil context.DeletePod(pod2) - assert.Assert(t, executed, "expected update") - tc = "update-pod2-nocache" - executed = false - expectAdd = true - expectRemove = false + assertReleaseForeignPod(t, podName2, allocRequest) + + allocRequest = nil context.UpdatePod(nil, pod2) - assert.Assert(t, executed, "expected update") + assert.Assert(t, allocRequest != nil, "expected update") pod = context.schedulerCache.GetPod(string(pod2.UID)) assert.Assert(t, pod != nil, "pod not found in cache") + assertAddForeignPod(t, podName2, Host1, allocRequest) // pod is failed, should trigger update if already in cache pod3 := pod2.DeepCopy() pod3.Status.Phase = v1.PodFailed // validate add - tc = "add-pod3" - executed = false - expectAdd = false - expectRemove = true + allocRequest = nil context.AddPod(pod3) - assert.Assert(t, executed, "expected update") + assert.Assert(t, allocRequest != nil, "expected update") pod = context.schedulerCache.GetPod(string(pod3.UID)) assert.Assert(t, pod == nil, "failed pod found in cache") + assert.Assert(t, allocRequest.Releases != nil) // expecting a release due to pod status + assertReleaseForeignPod(t, podName2, allocRequest) +} - // validate update when not already in cache - tc = "update-pod3-pre" - executed = false - expectAdd = true - expectRemove = false - context.AddPod(pod2) - tc = "update-pod3" - executed = false - expectAdd = false - expectRemove = true - context.UpdatePod(nil, pod3) - assert.Assert(t, executed, "expected update") - pod = context.schedulerCache.GetPod(string(pod3.UID)) - assert.Assert(t, pod == nil, "failed pod found in cache") +func assertAddForeignPod(t *testing.T, podName, host string, allocRequest *si.AllocationRequest) { + t.Helper() + assert.Equal(t, 1, len(allocRequest.Allocations)) + tags := allocRequest.Allocations[0].AllocationTags + assert.Equal(t, 2, len(tags)) + assert.Equal(t, siCommon.AllocTypeDefault, tags[siCommon.Foreign]) + assert.Equal(t, podName, allocRequest.Allocations[0].AllocationKey) + assert.Equal(t, host, allocRequest.Allocations[0].NodeID) +} + +func assertReleaseForeignPod(t *testing.T, podName string, allocRequest *si.AllocationRequest) { + t.Helper() + assert.Assert(t, allocRequest.Releases != nil) + assert.Equal(t, 1, len(allocRequest.Releases.AllocationsToRelease)) + assert.Equal(t, podName, allocRequest.Releases.AllocationsToRelease[0].AllocationKey) + assert.Equal(t, constants.DefaultPartition, allocRequest.Releases.AllocationsToRelease[0].PartitionName) + assert.Equal(t, "", allocRequest.Releases.AllocationsToRelease[0].ApplicationID) + assert.Equal(t, si.TerminationType_STOPPED_BY_RM, allocRequest.Releases.AllocationsToRelease[0].TerminationType) } func TestDeletePodForeign(t *testing.T) { context, apiProvider := initContextAndAPIProviderForTest() - dispatcher.Start() - defer dispatcher.UnregisterAllEventHandlers() - defer dispatcher.Stop() - executed := false - expectAdd := false - expectRemove := false - tc := "" - - validatorFunc := func(request *si.NodeRequest) error { - executed = true - assert.Equal(t, len(request.Nodes), 1, "%s: wrong node count", tc) - updatedNode := request.Nodes[0] - switch updatedNode.Action { - case si.NodeInfo_CREATE_DRAIN: - return nil - case si.NodeInfo_DRAIN_TO_SCHEDULABLE: - return nil - case si.NodeInfo_UPDATE: - executed = true - default: - assert.Equal(t, false, "Unexpected action: %d", updatedNode.Action) - return nil - } - assert.Equal(t, updatedNode.NodeID, Host1, "%s: wrong nodeID", tc) - assert.Equal(t, updatedNode.Action, si.NodeInfo_UPDATE, "%s: wrong action", tc) - assert.Equal(t, updatedNode.SchedulableResource.Resources[siCommon.Memory].Value, int64(10000*1000*1000), "%s: wrong schedulable memory", tc) - assert.Equal(t, updatedNode.SchedulableResource.Resources[siCommon.CPU].Value, int64(10000), "%s: wrong schedulable cpu", tc) - if expectAdd { - assert.Equal(t, updatedNode.OccupiedResource.Resources[siCommon.Memory].Value, int64(1000*1000*1000), "%s: wrong occupied memory (add)", tc) - assert.Equal(t, updatedNode.OccupiedResource.Resources[siCommon.CPU].Value, int64(500), "%s: wrong occupied cpu (add)", tc) - } - if expectRemove { - assert.Equal(t, updatedNode.OccupiedResource.Resources[siCommon.Memory].Value, int64(0), "%s: wrong occupied memory (remove)", tc) - assert.Equal(t, updatedNode.OccupiedResource.Resources[siCommon.CPU].Value, int64(0), "%s: wrong occupied cpu (remove)", tc) - } + var allocRequest *si.AllocationRequest + apiProvider.MockSchedulerAPIUpdateAllocationFn(func(request *si.AllocationRequest) error { + allocRequest = request return nil - } - - apiProvider.MockSchedulerAPIUpdateNodeFn(func(request *si.NodeRequest) error { - for _, node := range request.Nodes { - if node.Action == si.NodeInfo_CREATE_DRAIN { - dispatcher.Dispatch(CachedSchedulerNodeEvent{ - NodeID: node.NodeID, - Event: NodeAccepted, - }) - } - } - return validatorFunc(request) }) - host1 := nodeForTest(Host1, "10G", "10") - context.updateNode(nil, host1) - - // add existing pod + // add existing foreign pod pod1 := foreignPod(podName1, "1G", "500m") pod1.Status.Phase = v1.PodRunning pod1.Spec.NodeName = Host1 - - // validate deletion of existing assigned pod - tc = "delete-pod1-pre" - executed = false - expectAdd = true - expectRemove = false context.AddPod(pod1) - tc = "delete-pod1" - executed = false - expectAdd = false - expectRemove = true + allocRequest = nil context.DeletePod(pod1) - assert.Assert(t, executed, "update not executed") - pod := context.schedulerCache.GetPod(string(pod1.UID)) - assert.Assert(t, pod == nil, "deleted pod found in cache") - // validate delete when not already found - tc = "delete-pod1-again" - executed = false - expectAdd = false - expectRemove = false - context.DeletePod(pod1) - assert.Assert(t, !executed, "unexpected update") - pod = context.schedulerCache.GetPod(string(pod1.UID)) + assert.Assert(t, allocRequest != nil, "update not executed") + assert.Equal(t, 0, len(allocRequest.Allocations)) + assert.Assert(t, allocRequest.Releases != nil) + assert.Equal(t, 1, len(allocRequest.Releases.AllocationsToRelease)) + assert.Equal(t, podName1, allocRequest.Releases.AllocationsToRelease[0].AllocationKey) + assert.Equal(t, constants.DefaultPartition, allocRequest.Releases.AllocationsToRelease[0].PartitionName) + assert.Equal(t, "", allocRequest.Releases.AllocationsToRelease[0].ApplicationID) + assert.Equal(t, si.TerminationType_STOPPED_BY_RM, allocRequest.Releases.AllocationsToRelease[0].TerminationType) + pod := context.schedulerCache.GetPod(string(pod1.UID)) assert.Assert(t, pod == nil, "deleted pod found in cache") } @@ -1993,6 +1900,10 @@ func TestInitializeState(t *testing.T) { }, }} podLister.AddPod(orphaned) + // add an orphan foreign pod + orphanForeign := newPodHelper(podForeignName, "default", podForeignUID, nodeName2, "", v1.PodRunning) + orphanForeign.Spec.SchedulerName = "" + podLister.AddPod(orphanForeign) err := context.InitializeState() assert.NilError(t, err, "InitializeState failed") @@ -2005,19 +1916,20 @@ func TestInitializeState(t *testing.T) { assert.Equal(t, pc.Annotations[constants.AnnotationAllowPreemption], constants.True, "wrong allow-preemption value") // verify occupied / capacity on node - capacity, occupied, ok := context.schedulerCache.SnapshotResources(nodeName1) + capacity, _, ok := context.schedulerCache.SnapshotResources(nodeName1) assert.Assert(t, ok, "Unable to retrieve node resources") expectedCapacity := common.ParseResource("4", "10G") assert.Equal(t, expectedCapacity.Resources["vcore"].Value, capacity.Resources["vcore"].Value, "wrong capacity vcore") assert.Equal(t, expectedCapacity.Resources["memory"].Value, capacity.Resources["memory"].Value, "wrong capacity memory") - expectedOccupied := common.ParseResource("1500m", "2G") - assert.Equal(t, expectedOccupied.Resources["vcore"].Value, occupied.Resources["vcore"].Value, "wrong occupied vcore") - assert.Equal(t, expectedOccupied.Resources["memory"].Value, occupied.Resources["memory"].Value, "wrong occupied memory") // check that pod orphan status is correct assert.Check(t, !context.schedulerCache.IsPodOrphaned(podName1), "pod1 should not be orphaned") assert.Check(t, !context.schedulerCache.IsPodOrphaned(podName2), "pod2 should not be orphaned") assert.Check(t, context.schedulerCache.IsPodOrphaned(podName3), "pod3 should be orphaned") + assert.Check(t, context.schedulerCache.IsPodOrphaned(podForeignUID), "foreign pod should be orphaned") + assert.Check(t, context.schedulerCache.GetPod("foreignRunning") != nil, "foreign running pod is not in the cache") + assert.Check(t, context.schedulerCache.GetPod("foreignPending") == nil, "foreign pending pod should not be in the cache") + assert.Check(t, !context.schedulerCache.IsPodOrphaned("foreignRunning"), "foreign running pod should not be orphaned") // pod1 is pending task1 := context.getTask(appID1, podName1) @@ -2034,6 +1946,145 @@ func TestInitializeState(t *testing.T) { assert.Assert(t, task3 == nil, "pod3 was found") } +func TestPodAdoption(t *testing.T) { + ctx, apiProvider := initContextAndAPIProviderForTest() + dispatcher.Start() + defer dispatcher.UnregisterAllEventHandlers() + defer dispatcher.Stop() + apiProvider.MockSchedulerAPIUpdateNodeFn(func(request *si.NodeRequest) error { + for _, node := range request.Nodes { + dispatcher.Dispatch(CachedSchedulerNodeEvent{ + NodeID: node.NodeID, + Event: NodeAccepted, + }) + } + return nil + }) + + // add pods w/o node & check orphan status + pod1 := newPodHelper(podName1, namespace, pod1UID, Host1, appID, v1.PodRunning) + pod2 := newPodHelper(podForeignName, namespace, podForeignUID, Host1, "", v1.PodRunning) + pod2.Spec.SchedulerName = "" + ctx.AddPod(pod1) + ctx.AddPod(pod2) + assert.Assert(t, ctx.schedulerCache.IsPodOrphaned(pod1UID), "Yunikorn pod is not orphan") + assert.Assert(t, ctx.schedulerCache.IsPodOrphaned(podForeignUID), "foreign pod is not orphan") + + // add node + node := v1.Node{ + ObjectMeta: apis.ObjectMeta{ + Name: Host1, + Namespace: "default", + UID: uid1, + }, + } + ctx.addNode(&node) + + // check that node has adopted the pods + assert.Assert(t, !ctx.schedulerCache.IsPodOrphaned(pod1UID), "Yunikorn pod has not been adopted") + assert.Assert(t, !ctx.schedulerCache.IsPodOrphaned(podForeignUID), "foreign pod has not been adopted") +} + +func TestOrphanPodUpdate(t *testing.T) { + ctx, apiProvider := initContextAndAPIProviderForTest() + dispatcher.Start() + defer dispatcher.UnregisterAllEventHandlers() + defer dispatcher.Stop() + var update atomic.Bool + apiProvider.MockSchedulerAPIUpdateAllocationFn(func(request *si.AllocationRequest) error { + update.Store(true) + return nil + }) + + // add pods w/o node & check orphan status + pod1 := newPodHelper(podName1, namespace, pod1UID, Host1, appID, v1.PodPending) + pod2 := newPodHelper(podForeignName, namespace, podForeignUID, Host1, "", v1.PodPending) + pod2.Spec.SchedulerName = "" + ctx.AddPod(pod1) + ctx.AddPod(pod2) + assert.Assert(t, ctx.schedulerCache.IsPodOrphaned(pod1UID), "Yunikorn pod is not orphan") + assert.Assert(t, ctx.schedulerCache.IsPodOrphaned(podForeignUID), "foreign pod is not orphan") + assert.Assert(t, ctx.getApplication(appID) == nil) + + // update orphan pods + pod1Upd := pod1.DeepCopy() + pod1Upd.Status.Phase = v1.PodRunning + pod2Upd := pod2.DeepCopy() + pod2Upd.Status.Phase = v1.PodRunning + + ctx.UpdatePod(pod1, pod1Upd) + assert.Assert(t, ctx.getApplication(appID) == nil) + assert.Assert(t, ctx.schedulerCache.IsPodOrphaned(pod1UID), "Yunikorn pod is not orphan after update") + assert.Equal(t, v1.PodRunning, ctx.schedulerCache.GetPod(pod1UID).Status.Phase, "pod has not been updated in the cache") + assert.Assert(t, !update.Load(), "allocation update has been triggered for Yunikorn orphan pod") + + ctx.UpdatePod(pod2, pod2Upd) + assert.Assert(t, ctx.schedulerCache.IsPodOrphaned(podForeignUID), "foreign pod is not orphan after update") + assert.Equal(t, v1.PodRunning, ctx.schedulerCache.GetPod(podForeignUID).Status.Phase, "foreign pod has not been updated in the cache") + assert.Assert(t, !update.Load(), "allocation update has been triggered for foreign orphan pod") +} + +func TestOrphanPodDelete(t *testing.T) { + ctx, apiProvider := initContextAndAPIProviderForTest() + dispatcher.Start() + defer dispatcher.UnregisterAllEventHandlers() + defer dispatcher.Stop() + var taskEventSent atomic.Bool + dispatcher.RegisterEventHandler("TestTaskHandler", dispatcher.EventTypeTask, func(obj interface{}) { + taskEventSent.Store(true) + }) + var request *si.AllocationRequest + apiProvider.MockSchedulerAPIUpdateAllocationFn(func(r *si.AllocationRequest) error { + request = r + return nil + }) + apiProvider.MockSchedulerAPIUpdateNodeFn(func(request *si.NodeRequest) error { + for _, node := range request.Nodes { + dispatcher.Dispatch(CachedSchedulerNodeEvent{ + NodeID: node.NodeID, + Event: NodeAccepted, + }) + } + return nil + }) + + // add pods w/o node & check orphan status + pod1 := newPodHelper(podName1, namespace, pod1UID, Host1, appID, v1.PodPending) + pod2 := newPodHelper(podForeignName, namespace, podForeignUID, Host1, "", v1.PodPending) + pod2.Spec.SchedulerName = "" + ctx.AddPod(pod1) + ctx.AddPod(pod2) + assert.Assert(t, ctx.schedulerCache.IsPodOrphaned(pod1UID), "Yunikorn pod is not orphan") + assert.Assert(t, ctx.schedulerCache.IsPodOrphaned(podForeignUID), "foreign pod is not orphan") + assert.Assert(t, ctx.getApplication(appID) == nil) + + // add a node with pod - this creates the application object + node := v1.Node{ + ObjectMeta: apis.ObjectMeta{ + Name: Host2, + Namespace: "default", + UID: uid1, + }, + } + ctx.addNode(&node) + pod3 := newPodHelper(podName2, namespace, pod2UID, Host2, appID, v1.PodPending) + ctx.AddPod(pod3) + assert.Assert(t, !ctx.schedulerCache.IsPodOrphaned(pod2UID), "Yunikorn pod is orphan") + assert.Assert(t, ctx.getApplication(appID) != nil) + + // delete orphan YK pod + ctx.DeletePod(pod1) + err := utils.WaitForCondition(taskEventSent.Load, 100*time.Millisecond, time.Second) + assert.NilError(t, err) + + // delete foreign pod + ctx.DeletePod(pod2) + assert.Assert(t, request != nil) + assert.Assert(t, request.Releases != nil) + assert.Equal(t, 1, len(request.Releases.AllocationsToRelease)) + assert.Equal(t, podForeignUID, request.Releases.AllocationsToRelease[0].AllocationKey) +} + func TestTaskRemoveOnCompletion(t *testing.T) { context := initContextForTest() dispatcher.Start() diff --git a/pkg/common/constants/constants.go b/pkg/common/constants/constants.go index 5a848b4af..09d9ea139 100644 --- a/pkg/common/constants/constants.go +++ b/pkg/common/constants/constants.go @@ -66,6 +66,7 @@ const SchedulerName = "yunikorn" // OwnerReferences const DaemonSetType = "DaemonSet" +const NodeKind = "Node" // Gang scheduling const PlaceholderContainerImage = "registry.k8s.io/pause:3.7" diff --git a/pkg/common/si_helper.go b/pkg/common/si_helper.go index d94a708cf..d2ef9a5c6 100644 --- a/pkg/common/si_helper.go +++ b/pkg/common/si_helper.go @@ -114,6 +114,33 @@ func CreateAllocationForTask(appID, taskID, nodeID string, resource *si.Resource } } +func CreateAllocationForForeignPod(pod *v1.Pod) *si.AllocationRequest { + podType := common.AllocTypeDefault + for _, ref := range pod.OwnerReferences { + if ref.Kind == constants.NodeKind { + podType = common.AllocTypeStatic + break + } + } + + allocation := si.Allocation{ + AllocationTags: map[string]string{ + common.Foreign: podType, + }, + AllocationKey: string(pod.UID), + ResourcePerAlloc: GetPodResource(pod), + Priority: CreatePriorityForTask(pod), + NodeID: pod.Spec.NodeName, + } + + allocation.AllocationTags[common.CreationTime] = strconv.FormatInt(pod.CreationTimestamp.Unix(), 10) + + return &si.AllocationRequest{ + Allocations: []*si.Allocation{&allocation}, + RmID: conf.GetSchedulerConf().ClusterID, + } +} + func GetTerminationTypeFromString(terminationTypeStr string) si.TerminationType { if v, ok := si.TerminationType_value[terminationTypeStr]; ok { return si.TerminationType(v) @@ -141,6 +168,25 @@ func CreateReleaseRequestForTask(appID, taskID, partition string, terminationTyp } } +func CreateReleaseRequestForForeignPod(uid, partition string) *si.AllocationRequest { + allocToRelease := make([]*si.AllocationRelease, 1) + allocToRelease[0] = &si.AllocationRelease{ + AllocationKey: uid, + PartitionName: partition, + TerminationType: si.TerminationType_STOPPED_BY_RM, + Message: "pod terminated", + } + + releaseRequest := si.AllocationReleasesRequest{ + AllocationsToRelease: allocToRelease, + } + + return &si.AllocationRequest{ + Releases: &releaseRequest, + RmID: conf.GetSchedulerConf().ClusterID, + } +} + // CreateUpdateRequestForUpdatedNode builds a NodeRequest for capacity and occupied resource updates func CreateUpdateRequestForUpdatedNode(nodeID string, capacity *si.Resource, occupied *si.Resource) *si.NodeRequest { nodeInfo := &si.NodeInfo{ diff --git a/pkg/common/si_helper_test.go b/pkg/common/si_helper_test.go index 9ccf619a9..0439f7b37 100644 --- a/pkg/common/si_helper_test.go +++ b/pkg/common/si_helper_test.go @@ -19,9 +19,11 @@ package common import ( "testing" + "time" "gotest.tools/v3/assert" v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" apis "k8s.io/apimachinery/pkg/apis/meta/v1" "github.com/apache/yunikorn-scheduler-interface/lib/go/common" @@ -415,3 +417,79 @@ func TestGetTerminationTypeFromString(t *testing.T) { }) } } + +func TestCreateAllocationForForeignPod(t *testing.T) { + cResources := make(map[v1.ResourceName]resource.Quantity) + cResources[v1.ResourceMemory] = resource.MustParse("500M") + cResources[v1.ResourceCPU] = resource.MustParse("1") + var containers []v1.Container + containers = append(containers, v1.Container{ + Name: "container-01", + Resources: v1.ResourceRequirements{ + Requests: cResources, + }, + }) + + pod := &v1.Pod{ + TypeMeta: apis.TypeMeta{ + Kind: "Pod", + APIVersion: "v1", + }, + ObjectMeta: apis.ObjectMeta{ + Name: "test", + UID: "UID-00001", + CreationTimestamp: apis.Time{ + Time: time.Unix(1, 0), + }, + }, + Spec: v1.PodSpec{ + Containers: containers, + NodeName: nodeID, + }, + } + + allocReq := CreateAllocationForForeignPod(pod) + assert.Equal(t, 1, len(allocReq.Allocations)) + assert.Equal(t, "mycluster", allocReq.RmID) + assert.Assert(t, allocReq.Releases == nil) + alloc := allocReq.Allocations[0] + assert.Equal(t, nodeID, alloc.NodeID) + assert.Equal(t, "UID-00001", alloc.AllocationKey) + assert.Equal(t, int32(0), alloc.Priority) + res := alloc.ResourcePerAlloc + assert.Equal(t, 3, len(res.Resources)) + assert.Equal(t, int64(500000000), res.Resources["memory"].Value) + assert.Equal(t, int64(1000), res.Resources["vcore"].Value) + assert.Equal(t, int64(1), res.Resources["pods"].Value) + assert.Equal(t, 2, len(alloc.AllocationTags)) + assert.Equal(t, "1", alloc.AllocationTags[common.CreationTime]) + assert.Equal(t, common.AllocTypeDefault, alloc.AllocationTags[common.Foreign]) + + // set priority & change pod type to static + prio := int32(12) + pod.Spec.Priority = &prio + pod.OwnerReferences = []apis.OwnerReference{ + { + Kind: "Node", + }, + } + allocReq = CreateAllocationForForeignPod(pod) + assert.Equal(t, 2, len(alloc.AllocationTags)) + alloc = allocReq.Allocations[0] + assert.Equal(t, common.AllocTypeStatic, alloc.AllocationTags[common.Foreign]) + assert.Equal(t, int32(12), alloc.Priority) +} + +func TestCreateReleaseRequestForForeignPod(t *testing.T) { + allocReq := CreateReleaseRequestForForeignPod("UID-0001", "partition") + + assert.Assert(t, allocReq.Releases != nil) + assert.Equal(t, "mycluster", allocReq.RmID) + releaseReq := allocReq.Releases + assert.Equal(t, 1, len(releaseReq.AllocationsToRelease)) + release := releaseReq.AllocationsToRelease[0] + assert.Equal(t, "UID-0001", release.AllocationKey) + assert.Equal(t, "partition", release.PartitionName) + assert.Equal(t, si.TerminationType_STOPPED_BY_RM, release.TerminationType) + assert.Equal(t, "pod terminated", release.Message) +} diff --git a/pkg/shim/scheduler_mock_test.go b/pkg/shim/scheduler_mock_test.go index 1c3f3f36f..7b8de8556 100644 --- a/pkg/shim/scheduler_mock_test.go +++ b/pkg/shim/scheduler_mock_test.go @@ -341,7 +341,6 @@ func createUpdateRequestForNewNode(nodeID string, nodeLabels map[string]string, nodeInfo := &si.NodeInfo{ NodeID: nodeID, SchedulableResource: capacity, - OccupiedResource: occupied, Attributes: map[string]string{ constants.DefaultNodeAttributeHostNameKey: nodeID, constants.DefaultNodeAttributeRackNameKey: constants.DefaultRackName,