Skip to content

Commit

Permalink
[YUNIKORN-2100] Merge functionality of node coordinator into context
Browse files Browse the repository at this point in the history
Merge the node_coordinator.go pod event handler logic into the Context's pod
event handlers. Also, rename addPodToCache() / updatePodInCache() /
removePodFromCache() to addPod() / updatePod() / deletePod() for consistency.
  • Loading branch information
craigcondit committed Nov 1, 2023
1 parent 67d3933 commit 0c69dc2
Show file tree
Hide file tree
Showing 4 changed files with 418 additions and 645 deletions.
170 changes: 126 additions & 44 deletions pkg/cache/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,18 +108,9 @@ func (ctx *Context) AddSchedulingEventHandlers() {

ctx.apiProvider.AddEventHandler(&client.ResourceEventHandlers{
Type: client.PodInformerHandlers,
FilterFn: ctx.filterPods,
AddFn: ctx.addPodToCache,
UpdateFn: ctx.updatePodInCache,
DeleteFn: ctx.removePodFromCache,
})

nodeCoordinator := newNodeResourceCoordinator(ctx.nodes)
ctx.apiProvider.AddEventHandler(&client.ResourceEventHandlers{
Type: client.PodInformerHandlers,
FilterFn: nodeCoordinator.filterPods,
UpdateFn: nodeCoordinator.updatePod,
DeleteFn: nodeCoordinator.deletePod,
AddFn: ctx.addPod,
UpdateFn: ctx.updatePod,
DeleteFn: ctx.deletePod,

Check warning on line 113 in pkg/cache/context.go

View check run for this annotation

Codecov / codecov/patch

pkg/cache/context.go#L111-L113

Added lines #L111 - L113 were not covered by tests
})

ctx.apiProvider.AddEventHandler(&client.ResourceEventHandlers{
Expand Down Expand Up @@ -147,6 +138,9 @@ func (ctx *Context) SetPluginMode(pluginMode bool) {
}

func (ctx *Context) addNode(obj interface{}) {
ctx.lock.Lock()
defer ctx.lock.Unlock()

node, err := convertToNode(obj)
if err != nil {
log.Log(log.ShimContext).Error("node conversion failed", zap.Error(err))
Expand All @@ -166,6 +160,9 @@ func (ctx *Context) addNode(obj interface{}) {
}

func (ctx *Context) updateNode(oldObj, newObj interface{}) {
ctx.lock.Lock()
defer ctx.lock.Unlock()

// we only trigger update when resource changes
oldNode, err := convertToNode(oldObj)
if err != nil {
Expand All @@ -189,6 +186,9 @@ func (ctx *Context) updateNode(oldObj, newObj interface{}) {
}

func (ctx *Context) deleteNode(obj interface{}) {
ctx.lock.Lock()
defer ctx.lock.Unlock()

var node *v1.Node
switch t := obj.(type) {
case *v1.Node:
Expand Down Expand Up @@ -217,25 +217,90 @@ func (ctx *Context) deleteNode(obj interface{}) {
fmt.Sprintf("node %s is deleted from the scheduler", node.Name))
}

func (ctx *Context) addPodToCache(obj interface{}) {
func (ctx *Context) addPod(obj interface{}) {
pod, err := utils.Convert2Pod(obj)
if err != nil {
log.Log(log.ShimContext).Error("failed to add pod to cache", zap.Error(err))
log.Log(log.ShimContext).Error("failed to add pod", zap.Error(err))
return
}
if utils.GetApplicationIDFromPod(pod) == "" {
ctx.updateForeignPod(pod)
} else {
ctx.updateYuniKornPod(pod)
}
}

func (ctx *Context) updatePod(_, newObj interface{}) {
pod, err := utils.Convert2Pod(newObj)
if err != nil {
log.Log(log.ShimContext).Error("failed to update pod", zap.Error(err))
return
}
if utils.GetApplicationIDFromPod(pod) == "" {
ctx.updateForeignPod(pod)
} else {
ctx.updateYuniKornPod(pod)
}
}

// treat a terminated pod like a removal
func (ctx *Context) updateYuniKornPod(pod *v1.Pod) {
ctx.lock.Lock()
defer ctx.lock.Unlock()

// treat terminated pods like a remove
if utils.IsPodTerminated(pod) {
log.Log(log.ShimContext).Debug("Request to add terminated pod, removing from cache", zap.String("podName", pod.Name))
log.Log(log.ShimContext).Debug("Request to update terminated pod, removing from cache", zap.String("podName", pod.Name))
ctx.schedulerCache.RemovePod(pod)
return
}
ctx.schedulerCache.UpdatePod(pod)
}

log.Log(log.ShimContext).Debug("adding pod to cache", zap.String("podName", pod.Name))
ctx.schedulerCache.AddPod(pod)
func (ctx *Context) updateForeignPod(pod *v1.Pod) {
ctx.lock.Lock()
defer ctx.lock.Unlock()

podStatusBefore := ""
oldPod, ok := ctx.schedulerCache.GetPod(string(pod.UID))
if ok {
podStatusBefore = string(oldPod.Status.Phase)
}

// conditions for allocate:
// 1. pod was previously assigned
// 2. pod is now assigned
// 3. pod is not in terminated state
if oldPod == nil && utils.IsAssignedPod(pod) && !utils.IsPodTerminated(pod) {
log.Log(log.ShimContext).Debug("pod is assigned to a node, trigger occupied resource update",
zap.String("namespace", pod.Namespace),
zap.String("podName", pod.Name),
zap.String("podStatusBefore", podStatusBefore),
zap.String("podStatusCurrent", string(pod.Status.Phase)))
podResource := common.GetPodResource(pod)
ctx.nodes.updateNodeOccupiedResources(pod.Spec.NodeName, podResource, AddOccupiedResource)
ctx.schedulerCache.AddPod(pod)
return
}

// conditions for release:
// 1. pod was previously assigned
// 2. pod is now in a terminated state
if oldPod != nil && utils.IsPodTerminated(pod) {
log.Log(log.ShimContext).Debug("pod terminated, trigger occupied resource update",
zap.String("namespace", pod.Namespace),
zap.String("podName", pod.Name),
zap.String("podStatusBefore", podStatusBefore),
zap.String("podStatusCurrent", string(pod.Status.Phase)))
// this means pod is terminated
// we need sub the occupied resource and re-sync with the scheduler-core
podResource := common.GetPodResource(pod)
ctx.nodes.updateNodeOccupiedResources(pod.Spec.NodeName, podResource, SubOccupiedResource)
ctx.schedulerCache.RemovePod(pod)
return
}
}

func (ctx *Context) removePodFromCache(obj interface{}) {
func (ctx *Context) deletePod(obj interface{}) {
var pod *v1.Pod
switch t := obj.(type) {
case *v1.Pod:
Expand All @@ -252,39 +317,47 @@ func (ctx *Context) removePodFromCache(obj interface{}) {
return
}

if utils.GetApplicationIDFromPod(pod) == "" {
ctx.deleteForeignPod(pod)
} else {
ctx.deleteYuniKornPod(pod)
}
}

func (ctx *Context) deleteYuniKornPod(pod *v1.Pod) {
ctx.lock.Lock()
defer ctx.lock.Unlock()

log.Log(log.ShimContext).Debug("removing pod from cache", zap.String("podName", pod.Name))
ctx.schedulerCache.RemovePod(pod)
}

func (ctx *Context) updatePodInCache(oldObj, newObj interface{}) {
_, err := utils.Convert2Pod(oldObj)
if err != nil {
log.Log(log.ShimContext).Error("failed to update pod in cache", zap.Error(err))
return
}
newPod, err := utils.Convert2Pod(newObj)
if err != nil {
log.Log(log.ShimContext).Error("failed to update pod in cache", zap.Error(err))
return
}
func (ctx *Context) deleteForeignPod(pod *v1.Pod) {
ctx.lock.Lock()
defer ctx.lock.Unlock()

// treat terminated pods like a remove
if utils.IsPodTerminated(newPod) {
log.Log(log.ShimContext).Debug("Request to update terminated pod, removing from cache", zap.String("podName", newPod.Name))
ctx.schedulerCache.RemovePod(newPod)
oldPod, ok := ctx.schedulerCache.GetPod(string(pod.UID))
if !ok {
// if pod is not in scheduler cache, no node updates are needed
log.Log(log.ShimContext).Debug("unknown foreign pod deleted, no resource updated needed",
zap.String("namespace", pod.Namespace),
zap.String("podName", pod.Name))
return
}

ctx.schedulerCache.UpdatePod(newPod)
}

// filter pods by scheduler name and state
func (ctx *Context) filterPods(obj interface{}) bool {
switch obj := obj.(type) {
case *v1.Pod:
return utils.GetApplicationIDFromPod(obj) != ""
default:
return false
// conditions for release:
// 1. pod is already assigned to a node
if oldPod != nil {
log.Log(log.ShimContext).Debug("foreign pod deleted, triggering occupied resource update",
zap.String("namespace", pod.Namespace),
zap.String("podName", pod.Name),
zap.String("podStatusBefore", string(oldPod.Status.Phase)),
zap.String("podStatusCurrent", string(pod.Status.Phase)))
// this means pod is terminated
// we need sub the occupied resource and re-sync with the scheduler-core
podResource := common.GetPodResource(pod)
ctx.nodes.updateNodeOccupiedResources(pod.Spec.NodeName, podResource, SubOccupiedResource)
ctx.schedulerCache.RemovePod(pod)
}
}

Expand Down Expand Up @@ -367,6 +440,9 @@ func (ctx *Context) filterPriorityClasses(obj interface{}) bool {
}

func (ctx *Context) addPriorityClass(obj interface{}) {
ctx.lock.Lock()
defer ctx.lock.Unlock()

log.Log(log.ShimContext).Debug("priority class added")
priorityClass := utils.Convert2PriorityClass(obj)
if priorityClass != nil {
Expand All @@ -375,6 +451,9 @@ func (ctx *Context) addPriorityClass(obj interface{}) {
}

func (ctx *Context) updatePriorityClass(_, newObj interface{}) {
ctx.lock.Lock()
defer ctx.lock.Unlock()

log.Log(log.ShimContext).Debug("priority class updated")
priorityClass := utils.Convert2PriorityClass(newObj)
if priorityClass != nil {
Expand All @@ -383,6 +462,9 @@ func (ctx *Context) updatePriorityClass(_, newObj interface{}) {
}

func (ctx *Context) deletePriorityClass(obj interface{}) {
ctx.lock.Lock()
defer ctx.lock.Unlock()

log.Log(log.ShimContext).Debug("priorityClass deleted")
var priorityClass *schedulingv1.PriorityClass
switch t := obj.(type) {
Expand Down
Loading

0 comments on commit 0c69dc2

Please sign in to comment.