Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[YUNIKORN-2100] Merge functionality of node coordinator into context #716

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
170 changes: 126 additions & 44 deletions pkg/cache/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,18 +108,9 @@

ctx.apiProvider.AddEventHandler(&client.ResourceEventHandlers{
Type: client.PodInformerHandlers,
FilterFn: ctx.filterPods,
AddFn: ctx.addPodToCache,
UpdateFn: ctx.updatePodInCache,
DeleteFn: ctx.removePodFromCache,
})

nodeCoordinator := newNodeResourceCoordinator(ctx.nodes)
ctx.apiProvider.AddEventHandler(&client.ResourceEventHandlers{
Type: client.PodInformerHandlers,
FilterFn: nodeCoordinator.filterPods,
UpdateFn: nodeCoordinator.updatePod,
DeleteFn: nodeCoordinator.deletePod,
AddFn: ctx.addPod,
UpdateFn: ctx.updatePod,
DeleteFn: ctx.deletePod,

Check warning on line 113 in pkg/cache/context.go

View check run for this annotation

Codecov / codecov/patch

pkg/cache/context.go#L111-L113

Added lines #L111 - L113 were not covered by tests
})

ctx.apiProvider.AddEventHandler(&client.ResourceEventHandlers{
Expand Down Expand Up @@ -147,6 +138,9 @@
}

func (ctx *Context) addNode(obj interface{}) {
ctx.lock.Lock()
defer ctx.lock.Unlock()

node, err := convertToNode(obj)
if err != nil {
log.Log(log.ShimContext).Error("node conversion failed", zap.Error(err))
Expand All @@ -166,6 +160,9 @@
}

func (ctx *Context) updateNode(oldObj, newObj interface{}) {
ctx.lock.Lock()
defer ctx.lock.Unlock()

// we only trigger update when resource changes
oldNode, err := convertToNode(oldObj)
if err != nil {
Expand All @@ -189,6 +186,9 @@
}

func (ctx *Context) deleteNode(obj interface{}) {
ctx.lock.Lock()
defer ctx.lock.Unlock()

var node *v1.Node
switch t := obj.(type) {
case *v1.Node:
Expand Down Expand Up @@ -217,25 +217,90 @@
fmt.Sprintf("node %s is deleted from the scheduler", node.Name))
}

func (ctx *Context) addPodToCache(obj interface{}) {
func (ctx *Context) addPod(obj interface{}) {
pod, err := utils.Convert2Pod(obj)
if err != nil {
log.Log(log.ShimContext).Error("failed to add pod to cache", zap.Error(err))
log.Log(log.ShimContext).Error("failed to add pod", zap.Error(err))
return
}
if utils.GetApplicationIDFromPod(pod) == "" {
ctx.updateForeignPod(pod)
} else {
ctx.updateYuniKornPod(pod)
}
}

func (ctx *Context) updatePod(_, newObj interface{}) {
pod, err := utils.Convert2Pod(newObj)
if err != nil {
log.Log(log.ShimContext).Error("failed to update pod", zap.Error(err))
return
}
if utils.GetApplicationIDFromPod(pod) == "" {
ctx.updateForeignPod(pod)
} else {
ctx.updateYuniKornPod(pod)
}
}

// treat a terminated pod like a removal
func (ctx *Context) updateYuniKornPod(pod *v1.Pod) {
ctx.lock.Lock()
defer ctx.lock.Unlock()

// treat terminated pods like a remove
if utils.IsPodTerminated(pod) {
log.Log(log.ShimContext).Debug("Request to add terminated pod, removing from cache", zap.String("podName", pod.Name))
log.Log(log.ShimContext).Debug("Request to update terminated pod, removing from cache", zap.String("podName", pod.Name))
ctx.schedulerCache.RemovePod(pod)
return
}
ctx.schedulerCache.UpdatePod(pod)
}

log.Log(log.ShimContext).Debug("adding pod to cache", zap.String("podName", pod.Name))
ctx.schedulerCache.AddPod(pod)
func (ctx *Context) updateForeignPod(pod *v1.Pod) {
ctx.lock.Lock()
defer ctx.lock.Unlock()

podStatusBefore := ""
oldPod, ok := ctx.schedulerCache.GetPod(string(pod.UID))
if ok {
podStatusBefore = string(oldPod.Status.Phase)
}

// conditions for allocate:
// 1. pod was previously assigned
// 2. pod is now assigned
// 3. pod is not in terminated state
if oldPod == nil && utils.IsAssignedPod(pod) && !utils.IsPodTerminated(pod) {
log.Log(log.ShimContext).Debug("pod is assigned to a node, trigger occupied resource update",
zap.String("namespace", pod.Namespace),
zap.String("podName", pod.Name),
zap.String("podStatusBefore", podStatusBefore),
zap.String("podStatusCurrent", string(pod.Status.Phase)))
podResource := common.GetPodResource(pod)
ctx.nodes.updateNodeOccupiedResources(pod.Spec.NodeName, podResource, AddOccupiedResource)
ctx.schedulerCache.AddPod(pod)
return
}

// conditions for release:
// 1. pod was previously assigned
// 2. pod is now in a terminated state
if oldPod != nil && utils.IsPodTerminated(pod) {
log.Log(log.ShimContext).Debug("pod terminated, trigger occupied resource update",
zap.String("namespace", pod.Namespace),
zap.String("podName", pod.Name),
zap.String("podStatusBefore", podStatusBefore),
zap.String("podStatusCurrent", string(pod.Status.Phase)))
// this means pod is terminated
// we need sub the occupied resource and re-sync with the scheduler-core
podResource := common.GetPodResource(pod)
ctx.nodes.updateNodeOccupiedResources(pod.Spec.NodeName, podResource, SubOccupiedResource)
ctx.schedulerCache.RemovePod(pod)
return
}
}

func (ctx *Context) removePodFromCache(obj interface{}) {
func (ctx *Context) deletePod(obj interface{}) {
var pod *v1.Pod
switch t := obj.(type) {
case *v1.Pod:
Expand All @@ -252,39 +317,47 @@
return
}

if utils.GetApplicationIDFromPod(pod) == "" {
ctx.deleteForeignPod(pod)
} else {
ctx.deleteYuniKornPod(pod)
}
}

func (ctx *Context) deleteYuniKornPod(pod *v1.Pod) {
ctx.lock.Lock()
defer ctx.lock.Unlock()

log.Log(log.ShimContext).Debug("removing pod from cache", zap.String("podName", pod.Name))
ctx.schedulerCache.RemovePod(pod)
}

func (ctx *Context) updatePodInCache(oldObj, newObj interface{}) {
_, err := utils.Convert2Pod(oldObj)
if err != nil {
log.Log(log.ShimContext).Error("failed to update pod in cache", zap.Error(err))
return
}
newPod, err := utils.Convert2Pod(newObj)
if err != nil {
log.Log(log.ShimContext).Error("failed to update pod in cache", zap.Error(err))
return
}
func (ctx *Context) deleteForeignPod(pod *v1.Pod) {
ctx.lock.Lock()
defer ctx.lock.Unlock()

// treat terminated pods like a remove
if utils.IsPodTerminated(newPod) {
log.Log(log.ShimContext).Debug("Request to update terminated pod, removing from cache", zap.String("podName", newPod.Name))
ctx.schedulerCache.RemovePod(newPod)
oldPod, ok := ctx.schedulerCache.GetPod(string(pod.UID))
if !ok {
// if pod is not in scheduler cache, no node updates are needed
log.Log(log.ShimContext).Debug("unknown foreign pod deleted, no resource updated needed",
zap.String("namespace", pod.Namespace),
zap.String("podName", pod.Name))
return
}

ctx.schedulerCache.UpdatePod(newPod)
}

// filter pods by scheduler name and state
func (ctx *Context) filterPods(obj interface{}) bool {
switch obj := obj.(type) {
case *v1.Pod:
return utils.GetApplicationIDFromPod(obj) != ""
default:
return false
// conditions for release:
// 1. pod is already assigned to a node
if oldPod != nil {
log.Log(log.ShimContext).Debug("foreign pod deleted, triggering occupied resource update",
zap.String("namespace", pod.Namespace),
zap.String("podName", pod.Name),
zap.String("podStatusBefore", string(oldPod.Status.Phase)),
zap.String("podStatusCurrent", string(pod.Status.Phase)))
// this means pod is terminated
// we need sub the occupied resource and re-sync with the scheduler-core
podResource := common.GetPodResource(pod)
ctx.nodes.updateNodeOccupiedResources(pod.Spec.NodeName, podResource, SubOccupiedResource)
ctx.schedulerCache.RemovePod(pod)
}
}

Expand Down Expand Up @@ -367,6 +440,9 @@
}

func (ctx *Context) addPriorityClass(obj interface{}) {
ctx.lock.Lock()
defer ctx.lock.Unlock()

log.Log(log.ShimContext).Debug("priority class added")
priorityClass := utils.Convert2PriorityClass(obj)
if priorityClass != nil {
Expand All @@ -375,6 +451,9 @@
}

func (ctx *Context) updatePriorityClass(_, newObj interface{}) {
ctx.lock.Lock()
defer ctx.lock.Unlock()

log.Log(log.ShimContext).Debug("priority class updated")
priorityClass := utils.Convert2PriorityClass(newObj)
if priorityClass != nil {
Expand All @@ -383,6 +462,9 @@
}

func (ctx *Context) deletePriorityClass(obj interface{}) {
ctx.lock.Lock()
defer ctx.lock.Unlock()

log.Log(log.ShimContext).Debug("priorityClass deleted")
var priorityClass *schedulingv1.PriorityClass
switch t := obj.(type) {
Expand Down
Loading