Skip to content

Commit

Permalink
[YUNIKORN-2834] [shim] Add non-YuniKorn allocation tracking logic
Browse files Browse the repository at this point in the history
  • Loading branch information
pbacsko committed Oct 8, 2024
1 parent f79d10c commit 3c92e97
Show file tree
Hide file tree
Showing 9 changed files with 212 additions and 204 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ go 1.22.0
toolchain go1.22.5

require (
github.com/apache/yunikorn-core v0.0.0-20241002095736-a2d3d43a145d
github.com/apache/yunikorn-core v0.0.0-20241003152125-4ea225160acf
github.com/apache/yunikorn-scheduler-interface v0.0.0-20240924203603-aaf51c93d3a0
github.com/google/go-cmp v0.6.0
github.com/google/uuid v1.6.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ github.com/NYTimes/gziphandler v1.1.1 h1:ZUDjpQae29j0ryrS0u/B8HZfJBtBQHjqw2rQ2cq
github.com/NYTimes/gziphandler v1.1.1/go.mod h1:n/CVRwUEOgIxrgPvAQhUUr9oeUtvrhMomdKFjzJNB0c=
github.com/antlr4-go/antlr/v4 v4.13.0 h1:lxCg3LAv+EUK6t1i0y1V6/SLeUi0eKEKdhQAlS8TVTI=
github.com/antlr4-go/antlr/v4 v4.13.0/go.mod h1:pfChB/xh/Unjila75QW7+VU4TSnWnnk9UTnmpPaOR2g=
github.com/apache/yunikorn-core v0.0.0-20241002095736-a2d3d43a145d h1:awo2goBrw25P1aFNZgYJ0q7V+5ycMqMhvI60B75OzQg=
github.com/apache/yunikorn-core v0.0.0-20241002095736-a2d3d43a145d/go.mod h1:q6OXYpCTGvMJxsEorpIF6icKM/IioMmU6KcsclV1kI0=
github.com/apache/yunikorn-core v0.0.0-20241003152125-4ea225160acf h1:wKySiY4IA9Us287QRnIxFnuTHXaMSeQ3BhAwSrSW/sQ=
github.com/apache/yunikorn-core v0.0.0-20241003152125-4ea225160acf/go.mod h1:q6OXYpCTGvMJxsEorpIF6icKM/IioMmU6KcsclV1kI0=
github.com/apache/yunikorn-scheduler-interface v0.0.0-20240924203603-aaf51c93d3a0 h1:/9j0YXuifvoOl4YVEbO0r+DPkkYLzaQ+/ac+xCc7SY8=
github.com/apache/yunikorn-scheduler-interface v0.0.0-20240924203603-aaf51c93d3a0/go.mod h1:co3uU98sj1CUTPNTM13lTyi+CY0DOgDndDW2KiUjktU=
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio=
Expand Down
51 changes: 18 additions & 33 deletions pkg/cache/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,11 @@ func (ctx *Context) updateForeignPod(pod *v1.Pod) {
zap.String("podName", pod.Name),
zap.String("podStatusBefore", podStatusBefore),
zap.String("podStatusCurrent", string(pod.Status.Phase)))
ctx.updateNodeOccupiedResources(pod.Spec.NodeName, pod.Namespace, pod.Name, common.GetPodResource(pod), schedulercache.AddOccupiedResource)
allocReq := common.CreateAllocationForForeignPod(pod)
if err := ctx.apiProvider.GetAPIs().SchedulerAPI.UpdateAllocation(allocReq); err != nil {
log.Log(log.ShimContext).Error("failed to add foreign allocation to the core",
zap.Error(err))
}
} else {
// pod is orphaned (references an unknown node)
log.Log(log.ShimContext).Info("skipping occupied resource update for assigned orphaned pod",
Expand All @@ -394,8 +398,12 @@ func (ctx *Context) updateForeignPod(pod *v1.Pod) {
zap.String("podStatusCurrent", string(pod.Status.Phase)))
// this means pod is terminated
// we need sub the occupied resource and re-sync with the scheduler-core
ctx.updateNodeOccupiedResources(pod.Spec.NodeName, pod.Namespace, pod.Name, common.GetPodResource(pod), schedulercache.SubOccupiedResource)
ctx.schedulerCache.RemovePod(pod)
releaseReq := common.CreateReleaseRequestForForeignPod(string(pod.UID), constants.DefaultPartition)
if err := ctx.apiProvider.GetAPIs().SchedulerAPI.UpdateAllocation(releaseReq); err != nil {
log.Log(log.ShimContext).Error("failed to remove foreign allocation from the core",
zap.Error(err))
}
} else {
// pod is orphaned (references an unknown node)
log.Log(log.ShimContext).Info("skipping occupied resource update for terminated orphaned pod",
Expand Down Expand Up @@ -441,40 +449,17 @@ func (ctx *Context) deleteYuniKornPod(pod *v1.Pod) {
}

func (ctx *Context) deleteForeignPod(pod *v1.Pod) {
oldPod := ctx.schedulerCache.GetPod(string(pod.UID))
if oldPod == nil {
// if pod is not in scheduler cache, no node updates are needed
log.Log(log.ShimContext).Debug("unknown foreign pod deleted, no resource updated needed",
zap.String("namespace", pod.Namespace),
zap.String("podName", pod.Name))
return
releaseReq := common.CreateReleaseRequestForForeignPod(string(pod.UID), constants.DefaultPartition)
if err := ctx.apiProvider.GetAPIs().SchedulerAPI.UpdateAllocation(releaseReq); err != nil {
log.Log(log.ShimContext).Error("failed to remove foreign allocation from the core",
zap.Error(err))
}

// conditions for release:
// 1. pod is already assigned to a node
// 2. pod was not in a terminal state before
// 3. pod references a known node
if !utils.IsPodTerminated(oldPod) {
if !ctx.schedulerCache.IsPodOrphaned(string(oldPod.UID)) {
log.Log(log.ShimContext).Debug("foreign pod deleted, triggering occupied resource update",
zap.String("namespace", pod.Namespace),
zap.String("podName", pod.Name),
zap.String("podStatusBefore", string(oldPod.Status.Phase)),
zap.String("podStatusCurrent", string(pod.Status.Phase)))
// this means pod is terminated
// we need sub the occupied resource and re-sync with the scheduler-core
ctx.updateNodeOccupiedResources(pod.Spec.NodeName, pod.Namespace, pod.Name, common.GetPodResource(pod), schedulercache.SubOccupiedResource)
} else {
// pod is orphaned (references an unknown node)
log.Log(log.ShimContext).Info("skipping occupied resource update for removed orphaned pod",
zap.String("namespace", pod.Namespace),
zap.String("podName", pod.Name),
zap.String("nodeName", pod.Spec.NodeName))
}
ctx.schedulerCache.RemovePod(pod)
}
log.Log(log.ShimContext).Debug("removing pod from cache", zap.String("podName", pod.Name))
ctx.schedulerCache.RemovePod(pod)
}

//nolint:unused
func (ctx *Context) updateNodeOccupiedResources(nodeName string, namespace string, podName string, resource *si.Resource, opt schedulercache.UpdateType) {
if common.IsZero(resource) {
return
Expand Down Expand Up @@ -1560,7 +1545,7 @@ func (ctx *Context) decommissionNode(node *v1.Node) error {
}

func (ctx *Context) updateNodeResources(node *v1.Node, capacity *si.Resource, occupied *si.Resource) error {
request := common.CreateUpdateRequestForUpdatedNode(node.Name, capacity, occupied)
request := common.CreateUpdateRequestForUpdatedNode(node.Name, capacity, nil)
return ctx.apiProvider.GetAPIs().SchedulerAPI.UpdateNode(request)
}

Expand Down
Loading

0 comments on commit 3c92e97

Please sign in to comment.