Skip to content

Commit

Permalink
Merge branch 'master' into YUNIKORN-2504-V2
Browse files Browse the repository at this point in the history
  • Loading branch information
chenyulin0719 committed Aug 8, 2024
2 parents 7f8b865 + ba192d4 commit 3a5c300
Show file tree
Hide file tree
Showing 14 changed files with 163 additions and 239 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ module github.com/apache/yunikorn-k8shim
go 1.21

require (
github.com/apache/yunikorn-core v0.0.0-20240711165824-d96cd583305b
github.com/apache/yunikorn-scheduler-interface v0.0.0-20240425182941-07f5695119a1
github.com/apache/yunikorn-core v0.0.0-20240802210614-4aec626c6bf9
github.com/apache/yunikorn-scheduler-interface v0.0.0-20240731203810-92032b13d586
github.com/google/go-cmp v0.6.0
github.com/google/uuid v1.6.0
github.com/looplab/fsm v1.0.1
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ github.com/NYTimes/gziphandler v1.1.1 h1:ZUDjpQae29j0ryrS0u/B8HZfJBtBQHjqw2rQ2cq
github.com/NYTimes/gziphandler v1.1.1/go.mod h1:n/CVRwUEOgIxrgPvAQhUUr9oeUtvrhMomdKFjzJNB0c=
github.com/antlr/antlr4/runtime/Go/antlr/v4 v4.0.0-20230305170008-8188dc5388df h1:7RFfzj4SSt6nnvCPbCqijJi1nWCd+TqAT3bYCStRC18=
github.com/antlr/antlr4/runtime/Go/antlr/v4 v4.0.0-20230305170008-8188dc5388df/go.mod h1:pSwJ0fSY5KhvocuWSx4fz3BA8OrA1bQn+K1Eli3BRwM=
github.com/apache/yunikorn-core v0.0.0-20240711165824-d96cd583305b h1:GDizY3dcE+hkfik/+NY3Zdw71A/V4dWGp9Pl6k5Ii2M=
github.com/apache/yunikorn-core v0.0.0-20240711165824-d96cd583305b/go.mod h1:pSi7AFBRiGCGQ7RwQffpD4m6dvA5lc1HuCrg7LpJJqs=
github.com/apache/yunikorn-scheduler-interface v0.0.0-20240425182941-07f5695119a1 h1:v4J9L3MlW8BQfYnbq6FV2l3uyay3SqMS2Ffpo+SFat4=
github.com/apache/yunikorn-scheduler-interface v0.0.0-20240425182941-07f5695119a1/go.mod h1:WuHJpVk34t8N5+1ErYGj/5Qq33/cRzL4YtuoAsbMtWc=
github.com/apache/yunikorn-core v0.0.0-20240802210614-4aec626c6bf9 h1:s1Co/K+cR9Q/GW0e974dToW9eyLQZxYoCp0TCoEuEj0=
github.com/apache/yunikorn-core v0.0.0-20240802210614-4aec626c6bf9/go.mod h1:S9yGBGA2i2hAajtEc2t4lmiPJDZz3Ek8eVxz5KhJqGI=
github.com/apache/yunikorn-scheduler-interface v0.0.0-20240731203810-92032b13d586 h1:ZVpo9Qj2/gvwX6Rl44UxkZBm2pZWEJDYWTramc9hwF0=
github.com/apache/yunikorn-scheduler-interface v0.0.0-20240731203810-92032b13d586/go.mod h1:WuHJpVk34t8N5+1ErYGj/5Qq33/cRzL4YtuoAsbMtWc=
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio=
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs=
github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a h1:idn718Q4B6AGu/h5Sxe66HYVdqdGu2l9Iebqhi/AEoA=
Expand Down
121 changes: 28 additions & 93 deletions pkg/cache/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,6 @@ func (ctx *Context) addNode(obj interface{}) {
}

func (ctx *Context) updateNode(_, obj interface{}) {
ctx.lock.Lock()
defer ctx.lock.Unlock()
node, err := convertToNode(obj)
if err != nil {
log.Log(log.ShimContext).Error("node conversion failed", zap.Error(err))
Expand Down Expand Up @@ -202,7 +200,7 @@ func (ctx *Context) updateNodeInternal(node *v1.Node, register bool) {
if applicationID == "" {
ctx.updateForeignPod(pod)
} else {
ctx.updateYuniKornPod(pod)
ctx.updateYuniKornPod(applicationID, pod)
}
}

Expand All @@ -229,8 +227,6 @@ func (ctx *Context) updateNodeInternal(node *v1.Node, register bool) {
}

func (ctx *Context) deleteNode(obj interface{}) {
ctx.lock.Lock()
defer ctx.lock.Unlock()
var node *v1.Node
switch t := obj.(type) {
case *v1.Node:
Expand All @@ -250,9 +246,6 @@ func (ctx *Context) deleteNode(obj interface{}) {
}

func (ctx *Context) addNodesWithoutRegistering(nodes []*v1.Node) {
ctx.lock.Lock()
defer ctx.lock.Unlock()

for _, node := range nodes {
ctx.updateNodeInternal(node, false)
}
Expand Down Expand Up @@ -288,30 +281,31 @@ func (ctx *Context) AddPod(obj interface{}) {
}

func (ctx *Context) UpdatePod(_, newObj interface{}) {
ctx.lock.Lock()
defer ctx.lock.Unlock()

pod, err := utils.Convert2Pod(newObj)
if err != nil {
log.Log(log.ShimContext).Error("failed to update pod", zap.Error(err))
return
}
if utils.GetApplicationIDFromPod(pod) == "" {
applicationID := utils.GetApplicationIDFromPod(pod)
if applicationID == "" {
ctx.updateForeignPod(pod)
} else {
ctx.updateYuniKornPod(pod)
ctx.updateYuniKornPod(applicationID, pod)
}
}

func (ctx *Context) updateYuniKornPod(pod *v1.Pod) {
// treat terminated pods like a remove
if utils.IsPodTerminated(pod) {
if taskMeta, ok := getTaskMetadata(pod); ok {
if app := ctx.getApplication(taskMeta.ApplicationID); app != nil {
ctx.notifyTaskComplete(taskMeta.ApplicationID, taskMeta.TaskID)
}
func (ctx *Context) updateYuniKornPod(appID string, pod *v1.Pod) {
var app *Application
taskID := string(pod.UID)
if app = ctx.getApplication(appID); app != nil {
if task := app.GetTask(taskID); task != nil {
task.setTaskPod(pod)
}
}

// treat terminated pods like a remove
if utils.IsPodTerminated(pod) {
ctx.notifyTaskComplete(appID, taskID)
log.Log(log.ShimContext).Debug("Request to update terminated pod, removing from cache", zap.String("podName", pod.Name))
ctx.schedulerCache.RemovePod(pod)
return
Expand All @@ -334,9 +328,9 @@ func (ctx *Context) ensureAppAndTaskCreated(pod *v1.Pod) {
}

// add app if it doesn't already exist
app := ctx.getApplication(appMeta.ApplicationID)
app := ctx.GetApplication(appMeta.ApplicationID)
if app == nil {
app = ctx.addApplication(&AddApplicationRequest{
app = ctx.AddApplication(&AddApplicationRequest{
Metadata: appMeta,
})
}
Expand Down Expand Up @@ -440,10 +434,8 @@ func (ctx *Context) DeletePod(obj interface{}) {
}

func (ctx *Context) deleteYuniKornPod(pod *v1.Pod) {
ctx.lock.Lock()
defer ctx.lock.Unlock()
if taskMeta, ok := getTaskMetadata(pod); ok {
if app := ctx.getApplication(taskMeta.ApplicationID); app != nil {
if app := ctx.GetApplication(taskMeta.ApplicationID); app != nil {
ctx.notifyTaskComplete(taskMeta.ApplicationID, taskMeta.TaskID)
}
}
Expand All @@ -453,9 +445,6 @@ func (ctx *Context) deleteYuniKornPod(pod *v1.Pod) {
}

func (ctx *Context) deleteForeignPod(pod *v1.Pod) {
ctx.lock.Lock()
defer ctx.lock.Unlock()

oldPod := ctx.schedulerCache.GetPod(string(pod.UID))
if oldPod == nil {
// if pod is not in scheduler cache, no node updates are needed
Expand Down Expand Up @@ -586,8 +575,6 @@ func (ctx *Context) addPriorityClass(obj interface{}) {
}

func (ctx *Context) updatePriorityClass(_, newObj interface{}) {
ctx.lock.Lock()
defer ctx.lock.Unlock()
if priorityClass := utils.Convert2PriorityClass(newObj); priorityClass != nil {
ctx.updatePriorityClassInternal(priorityClass)
}
Expand All @@ -598,9 +585,6 @@ func (ctx *Context) updatePriorityClassInternal(priorityClass *schedulingv1.Prio
}

func (ctx *Context) deletePriorityClass(obj interface{}) {
ctx.lock.Lock()
defer ctx.lock.Unlock()

log.Log(log.ShimContext).Debug("priorityClass deleted")
var priorityClass *schedulingv1.PriorityClass
switch t := obj.(type) {
Expand Down Expand Up @@ -666,8 +650,6 @@ func (ctx *Context) EventsToRegister(queueingHintFn framework.QueueingHintFn) []

// IsPodFitNode evaluates given predicates based on current context
func (ctx *Context) IsPodFitNode(name, node string, allocate bool) error {
ctx.lock.RLock()
defer ctx.lock.RUnlock()
pod := ctx.schedulerCache.GetPod(name)
if pod == nil {
return ErrorPodNotFound
Expand All @@ -688,8 +670,6 @@ func (ctx *Context) IsPodFitNode(name, node string, allocate bool) error {
}

func (ctx *Context) IsPodFitNodeViaPreemption(name, node string, allocations []string, startIndex int) (int, bool) {
ctx.lock.RLock()
defer ctx.lock.RUnlock()
if pod := ctx.schedulerCache.GetPod(name); pod != nil {
// if pod exists in cache, try to run predicates
if targetNode := ctx.schedulerCache.GetNode(node); targetNode != nil {
Expand Down Expand Up @@ -798,8 +778,6 @@ func (ctx *Context) bindPodVolumes(pod *v1.Pod) error {
// this way, the core can make allocation decisions with consideration of
// other assumed pods before they are actually bound to the node (bound is slow).
func (ctx *Context) AssumePod(name, node string) error {
ctx.lock.Lock()
defer ctx.lock.Unlock()
if pod := ctx.schedulerCache.GetPod(name); pod != nil {
// when add assumed pod, we make a copy of the pod to avoid
// modifying its original reference. otherwise, it may have
Expand Down Expand Up @@ -859,9 +837,6 @@ func (ctx *Context) AssumePod(name, node string) error {
// forget pod must be called when a pod is assumed to be running on a node,
// but then for some reason it is failed to bind or released.
func (ctx *Context) ForgetPod(name string) {
ctx.lock.Lock()
defer ctx.lock.Unlock()

if pod := ctx.schedulerCache.GetPod(name); pod != nil {
log.Log(log.ShimContext).Debug("forget pod", zap.String("pod", pod.Name))
ctx.schedulerCache.ForgetPod(pod)
Expand All @@ -870,12 +845,6 @@ func (ctx *Context) ForgetPod(name string) {
log.Log(log.ShimContext).Debug("unable to forget pod: not found in cache", zap.String("pod", name))
}

func (ctx *Context) UpdateApplication(app *Application) {
ctx.lock.Lock()
defer ctx.lock.Unlock()
ctx.applications[app.applicationID] = app
}

// IsTaskMaybeSchedulable returns true if a task might be currently able to be scheduled. This uses a bloom filter
// cached from a set of taskIDs to perform efficient negative lookups.
func (ctx *Context) IsTaskMaybeSchedulable(taskID string) bool {
Expand Down Expand Up @@ -904,17 +873,11 @@ func (ctx *Context) StartPodAllocation(podKey string, nodeID string) bool {
return ctx.schedulerCache.StartPodAllocation(podKey, nodeID)
}

func (ctx *Context) NotifyTaskComplete(appID, taskID string) {
ctx.lock.Lock()
defer ctx.lock.Unlock()
ctx.notifyTaskComplete(appID, taskID)
}

func (ctx *Context) notifyTaskComplete(appID, taskID string) {
log.Log(log.ShimContext).Debug("NotifyTaskComplete",
zap.String("appID", appID),
zap.String("taskID", taskID))
if app := ctx.getApplication(appID); app != nil {
if app := ctx.GetApplication(appID); app != nil {
log.Log(log.ShimContext).Debug("release allocation",
zap.String("appID", appID),
zap.String("taskID", taskID))
Expand All @@ -930,6 +893,8 @@ func (ctx *Context) notifyTaskComplete(appID, taskID string) {
// adds the following tags to the request based on annotations (if exist):
// - namespace.resourcequota
// - namespace.parentqueue
// - namespace.resourceguaranteed
// - namespace.resourcemaxapps
func (ctx *Context) updateApplicationTags(request *AddApplicationRequest, namespace string) {
namespaceObj := ctx.getNamespaceObject(namespace)
if namespaceObj == nil {
Expand All @@ -951,6 +916,12 @@ func (ctx *Context) updateApplicationTags(request *AddApplicationRequest, namesp
}
}

// add maxApps resource info as an app tag
maxApps := utils.GetNamespaceMaxAppsFromAnnotation(namespaceObj)
if maxApps != "" {
request.Metadata.Tags[siCommon.AppTagNamespaceResourceMaxApps] = maxApps
}

// add parent queue info as an app tag
parentQueue := utils.GetNameSpaceAnnotationValue(namespaceObj, constants.AnnotationParentQueue)
if parentQueue != "" {
Expand Down Expand Up @@ -982,10 +953,6 @@ func (ctx *Context) AddApplication(request *AddApplicationRequest) *Application
ctx.lock.Lock()
defer ctx.lock.Unlock()

return ctx.addApplication(request)
}

func (ctx *Context) addApplication(request *AddApplicationRequest) *Application {
log.Log(log.ShimContext).Debug("AddApplication", zap.Any("Request", request))
if app := ctx.getApplication(request.Metadata.ApplicationID); app != nil {
return app
Expand Down Expand Up @@ -1051,31 +1018,7 @@ func (ctx *Context) getApplication(appID string) *Application {
return nil
}

func (ctx *Context) RemoveApplication(appID string) error {
ctx.lock.Lock()
defer ctx.lock.Unlock()
if app, exist := ctx.applications[appID]; exist {
// get the non-terminated task alias
nonTerminatedTaskAlias := app.getNonTerminatedTaskAlias()
// check there are any non-terminated task or not
if len(nonTerminatedTaskAlias) > 0 {
return fmt.Errorf("failed to remove application %s because it still has task in non-terminated task, tasks: %s", appID, strings.Join(nonTerminatedTaskAlias, ","))
}
// send the update request to scheduler core
rr := common.CreateUpdateRequestForRemoveApplication(app.applicationID, app.partition)
if err := ctx.apiProvider.GetAPIs().SchedulerAPI.UpdateApplication(rr); err != nil {
log.Log(log.ShimContext).Error("failed to send remove application request to core", zap.Error(err))
}
delete(ctx.applications, appID)
log.Log(log.ShimContext).Info("app removed",
zap.String("appID", appID))

return nil
}
return fmt.Errorf("application %s is not found in the context", appID)
}

func (ctx *Context) RemoveApplicationInternal(appID string) {
func (ctx *Context) RemoveApplication(appID string) {
ctx.lock.Lock()
defer ctx.lock.Unlock()
if _, exist := ctx.applications[appID]; !exist {
Expand All @@ -1087,8 +1030,6 @@ func (ctx *Context) RemoveApplicationInternal(appID string) {

// this implements ApplicationManagementProtocol
func (ctx *Context) AddTask(request *AddTaskRequest) *Task {
ctx.lock.Lock()
defer ctx.lock.Unlock()
return ctx.addTask(request)
}

Expand Down Expand Up @@ -1148,9 +1089,7 @@ func (ctx *Context) RemoveTask(appID, taskID string) {
}

func (ctx *Context) getTask(appID string, taskID string) *Task {
ctx.lock.RLock()
defer ctx.lock.RUnlock()
app := ctx.getApplication(appID)
app := ctx.GetApplication(appID)
if app == nil {
log.Log(log.ShimContext).Debug("application is not found in the context",
zap.String("appID", appID))
Expand Down Expand Up @@ -1558,7 +1497,6 @@ func (ctx *Context) registerNodes(nodes []*v1.Node) ([]*v1.Node, error) {
},
SchedulableResource: common.GetNodeResource(&nodeStatus),
OccupiedResource: common.NewResourceBuilder().Build(),
ExistingAllocations: make([]*si.Allocation, 0),
})
pendingNodes[node.Name] = node
}
Expand Down Expand Up @@ -1672,9 +1610,6 @@ func (ctx *Context) finalizeNodes(existingNodes []*v1.Node) error {
nodeMap[node.Name] = node
}

ctx.lock.Lock()
defer ctx.lock.Unlock()

// find any existing nodes that no longer exist
for _, node := range existingNodes {
if _, ok := nodeMap[node.Name]; !ok {
Expand Down
Loading

0 comments on commit 3a5c300

Please sign in to comment.