From ecb651e665f6830eea6df35905b09a9cf00b0fdf Mon Sep 17 00:00:00 2001 From: qzhu Date: Fri, 2 Aug 2024 11:09:08 +0800 Subject: [PATCH 1/4] [YUNIKORN-1697] [shim] Make namespace annotation to support max applications update. (#885) fix lint Address comments Fix golint Closes: #885 Signed-off-by: qzhu --- pkg/cache/context.go | 8 ++++++ pkg/cache/context_test.go | 7 +++++ pkg/common/constants/constants.go | 3 ++ pkg/common/utils/utils.go | 21 ++++++++++++++ pkg/common/utils/utils_test.go | 47 +++++++++++++++++++++++++++++++ 5 files changed, 86 insertions(+) diff --git a/pkg/cache/context.go b/pkg/cache/context.go index 244b78563..c608b98e7 100644 --- a/pkg/cache/context.go +++ b/pkg/cache/context.go @@ -934,6 +934,8 @@ func (ctx *Context) notifyTaskComplete(appID, taskID string) { // adds the following tags to the request based on annotations (if exist): // - namespace.resourcequota // - namespace.parentqueue +// - namespace.resourceguaranteed +// - namespace.resourcemaxapps func (ctx *Context) updateApplicationTags(request *AddApplicationRequest, namespace string) { namespaceObj := ctx.getNamespaceObject(namespace) if namespaceObj == nil { @@ -955,6 +957,12 @@ func (ctx *Context) updateApplicationTags(request *AddApplicationRequest, namesp } } + // add maxApps resource info as an app tag + maxApps := utils.GetNamespaceMaxAppsFromAnnotation(namespaceObj) + if maxApps != "" { + request.Metadata.Tags[siCommon.AppTagNamespaceResourceMaxApps] = maxApps + } + // add parent queue info as an app tag parentQueue := utils.GetNameSpaceAnnotationValue(namespaceObj, constants.AnnotationParentQueue) if parentQueue != "" { diff --git a/pkg/cache/context_test.go b/pkg/cache/context_test.go index afc447013..c4802d4a3 100644 --- a/pkg/cache/context_test.go +++ b/pkg/cache/context_test.go @@ -1522,6 +1522,7 @@ func TestAddApplicationsWithTags(t *testing.T) { constants.NamespaceQuota: "{\"cpu\": \"1\", \"memory\": \"256M\", \"nvidia.com/gpu\": \"1\"}", constants.DomainYuniKorn + "parentqueue": "root.test", constants.NamespaceGuaranteed: "{\"cpu\": \"1\", \"memory\": \"256M\", \"nvidia.com/gpu\": \"1\"}", + constants.NamespaceMaxApps: "1000", }, }, } @@ -1610,6 +1611,12 @@ func TestAddApplicationsWithTags(t *testing.T) { t.Fatalf("resource parsing failed") } + maxApps, ok := request.Metadata.Tags[siCommon.AppTagNamespaceResourceMaxApps] + if !ok { + t.Fatalf("max apps tag is not updated from the namespace") + } + assert.Equal(t, maxApps, "1000") + parentQueue, ok := request.Metadata.Tags[constants.AppTagNamespaceParentQueue] if !ok { t.Fatalf("parent queue tag is not updated from the namespace") diff --git a/pkg/common/constants/constants.go b/pkg/common/constants/constants.go index a6968e582..0e6f00bab 100644 --- a/pkg/common/constants/constants.go +++ b/pkg/common/constants/constants.go @@ -97,6 +97,9 @@ const NamespaceQuota = DomainYuniKorn + "namespace.quota" // NamespaceGuaranteed Namespace Guaranteed const NamespaceGuaranteed = DomainYuniKorn + "namespace.guaranteed" +// NamespaceMaxApps Namespace Max Apps +const NamespaceMaxApps = DomainYuniKorn + "namespace.maxApps" + // AnnotationAllowPreemption set on PriorityClass, opt out of preemption for pods with this priority class const AnnotationAllowPreemption = DomainYuniKorn + "allow-preemption" diff --git a/pkg/common/utils/utils.go b/pkg/common/utils/utils.go index eb9f71208..281189972 100644 --- a/pkg/common/utils/utils.go +++ b/pkg/common/utils/utils.go @@ -214,6 +214,27 @@ func GetNamespaceGuaranteedFromAnnotation(namespaceObj *v1.Namespace) *si.Resour return nil } +// get namespace max apps from namespace annotation +func GetNamespaceMaxAppsFromAnnotation(namespaceObj *v1.Namespace) string { + if maxApps := GetNameSpaceAnnotationValue(namespaceObj, constants.NamespaceMaxApps); maxApps != "" { + numMaxApp, err := strconv.Atoi(maxApps) + if err != nil { + log.Log(log.ShimUtils).Warn("Unable to process namespace.maxApps annotation", + zap.String("namespace", namespaceObj.Name), + zap.String("namespace.maxApps is", maxApps)) + return "" + } + if numMaxApp < 0 { + log.Log(log.ShimUtils).Warn("Invalid value for namespace.maxApps annotation", + zap.String("namespace", namespaceObj.Name), + zap.String("namespace.maxApps is", maxApps)) + return "" + } + return maxApps + } + return "" +} + func GetNamespaceQuotaFromAnnotation(namespaceObj *v1.Namespace) *si.Resource { // retrieve resource quota info from annotations cpuQuota := GetNameSpaceAnnotationValue(namespaceObj, constants.CPUQuota) diff --git a/pkg/common/utils/utils_test.go b/pkg/common/utils/utils_test.go index adb072142..0b20f1a95 100644 --- a/pkg/common/utils/utils_test.go +++ b/pkg/common/utils/utils_test.go @@ -331,6 +331,53 @@ func TestGetNamespaceGuaranteedFromAnnotation(t *testing.T) { } } +func TestGetNamespaceMaxAppsFromAnnotation(t *testing.T) { + testCases := []struct { + namespace *v1.Namespace + expectedMaxApp string + }{ + {&v1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Namespace: "test", + }, + }, ""}, + {&v1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Namespace: "test", + Annotations: map[string]string{ + constants.NamespaceMaxApps: "5", + }, + }, + }, "5"}, + {&v1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Namespace: "test", + Annotations: map[string]string{ + constants.NamespaceMaxApps: "-5", + }, + }, + }, ""}, + {&v1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Namespace: "test", + Annotations: map[string]string{ + constants.NamespaceMaxApps: "error", + }, + }, + }, ""}, + } + for _, tc := range testCases { + t.Run(fmt.Sprintf("namespace: %v", tc.namespace), func(t *testing.T) { + maxApp := GetNamespaceMaxAppsFromAnnotation(tc.namespace) + assert.Equal(t, maxApp, tc.expectedMaxApp) + }) + } +} + func TestGetNamespaceQuotaFromAnnotationUsingNewAndOldAnnotations(t *testing.T) { testCases := []struct { namespace *v1.Namespace From acbc9668baab890b51f8c3f40cb1947dcd6aebab Mon Sep 17 00:00:00 2001 From: Peter Bacsko Date: Fri, 2 Aug 2024 15:21:58 +0200 Subject: [PATCH 2/4] [YUNIKORN-2629] Adding a node can result in a deadlock (#859) Closes: #859 Signed-off-by: Peter Bacsko --- pkg/cache/context.go | 65 +++++----------------------- pkg/cache/context_test.go | 6 +-- pkg/cache/scheduler_callback_test.go | 1 - 3 files changed, 14 insertions(+), 58 deletions(-) diff --git a/pkg/cache/context.go b/pkg/cache/context.go index c608b98e7..0c0dd30ab 100644 --- a/pkg/cache/context.go +++ b/pkg/cache/context.go @@ -166,8 +166,6 @@ func (ctx *Context) addNode(obj interface{}) { } func (ctx *Context) updateNode(_, obj interface{}) { - ctx.lock.Lock() - defer ctx.lock.Unlock() node, err := convertToNode(obj) if err != nil { log.Log(log.ShimContext).Error("node conversion failed", zap.Error(err)) @@ -229,8 +227,6 @@ func (ctx *Context) updateNodeInternal(node *v1.Node, register bool) { } func (ctx *Context) deleteNode(obj interface{}) { - ctx.lock.Lock() - defer ctx.lock.Unlock() var node *v1.Node switch t := obj.(type) { case *v1.Node: @@ -250,9 +246,6 @@ func (ctx *Context) deleteNode(obj interface{}) { } func (ctx *Context) addNodesWithoutRegistering(nodes []*v1.Node) { - ctx.lock.Lock() - defer ctx.lock.Unlock() - for _, node := range nodes { ctx.updateNodeInternal(node, false) } @@ -288,9 +281,6 @@ func (ctx *Context) AddPod(obj interface{}) { } func (ctx *Context) UpdatePod(_, newObj interface{}) { - ctx.lock.Lock() - defer ctx.lock.Unlock() - pod, err := utils.Convert2Pod(newObj) if err != nil { log.Log(log.ShimContext).Error("failed to update pod", zap.Error(err)) @@ -338,9 +328,9 @@ func (ctx *Context) ensureAppAndTaskCreated(pod *v1.Pod) { } // add app if it doesn't already exist - app := ctx.getApplication(appMeta.ApplicationID) + app := ctx.GetApplication(appMeta.ApplicationID) if app == nil { - app = ctx.addApplication(&AddApplicationRequest{ + app = ctx.AddApplication(&AddApplicationRequest{ Metadata: appMeta, }) } @@ -444,10 +434,8 @@ func (ctx *Context) DeletePod(obj interface{}) { } func (ctx *Context) deleteYuniKornPod(pod *v1.Pod) { - ctx.lock.Lock() - defer ctx.lock.Unlock() if taskMeta, ok := getTaskMetadata(pod); ok { - if app := ctx.getApplication(taskMeta.ApplicationID); app != nil { + if app := ctx.GetApplication(taskMeta.ApplicationID); app != nil { ctx.notifyTaskComplete(taskMeta.ApplicationID, taskMeta.TaskID) } } @@ -457,9 +445,6 @@ func (ctx *Context) deleteYuniKornPod(pod *v1.Pod) { } func (ctx *Context) deleteForeignPod(pod *v1.Pod) { - ctx.lock.Lock() - defer ctx.lock.Unlock() - oldPod := ctx.schedulerCache.GetPod(string(pod.UID)) if oldPod == nil { // if pod is not in scheduler cache, no node updates are needed @@ -590,8 +575,6 @@ func (ctx *Context) addPriorityClass(obj interface{}) { } func (ctx *Context) updatePriorityClass(_, newObj interface{}) { - ctx.lock.Lock() - defer ctx.lock.Unlock() if priorityClass := utils.Convert2PriorityClass(newObj); priorityClass != nil { ctx.updatePriorityClassInternal(priorityClass) } @@ -602,9 +585,6 @@ func (ctx *Context) updatePriorityClassInternal(priorityClass *schedulingv1.Prio } func (ctx *Context) deletePriorityClass(obj interface{}) { - ctx.lock.Lock() - defer ctx.lock.Unlock() - log.Log(log.ShimContext).Debug("priorityClass deleted") var priorityClass *schedulingv1.PriorityClass switch t := obj.(type) { @@ -670,8 +650,6 @@ func (ctx *Context) EventsToRegister(queueingHintFn framework.QueueingHintFn) [] // IsPodFitNode evaluates given predicates based on current context func (ctx *Context) IsPodFitNode(name, node string, allocate bool) error { - ctx.lock.RLock() - defer ctx.lock.RUnlock() pod := ctx.schedulerCache.GetPod(name) if pod == nil { return ErrorPodNotFound @@ -692,8 +670,6 @@ func (ctx *Context) IsPodFitNode(name, node string, allocate bool) error { } func (ctx *Context) IsPodFitNodeViaPreemption(name, node string, allocations []string, startIndex int) (int, bool) { - ctx.lock.RLock() - defer ctx.lock.RUnlock() if pod := ctx.schedulerCache.GetPod(name); pod != nil { // if pod exists in cache, try to run predicates if targetNode := ctx.schedulerCache.GetNode(node); targetNode != nil { @@ -802,8 +778,6 @@ func (ctx *Context) bindPodVolumes(pod *v1.Pod) error { // this way, the core can make allocation decisions with consideration of // other assumed pods before they are actually bound to the node (bound is slow). func (ctx *Context) AssumePod(name, node string) error { - ctx.lock.Lock() - defer ctx.lock.Unlock() if pod := ctx.schedulerCache.GetPod(name); pod != nil { // when add assumed pod, we make a copy of the pod to avoid // modifying its original reference. otherwise, it may have @@ -863,9 +837,6 @@ func (ctx *Context) AssumePod(name, node string) error { // forget pod must be called when a pod is assumed to be running on a node, // but then for some reason it is failed to bind or released. func (ctx *Context) ForgetPod(name string) { - ctx.lock.Lock() - defer ctx.lock.Unlock() - if pod := ctx.schedulerCache.GetPod(name); pod != nil { log.Log(log.ShimContext).Debug("forget pod", zap.String("pod", pod.Name)) ctx.schedulerCache.ForgetPod(pod) @@ -908,17 +879,11 @@ func (ctx *Context) StartPodAllocation(podKey string, nodeID string) bool { return ctx.schedulerCache.StartPodAllocation(podKey, nodeID) } -func (ctx *Context) NotifyTaskComplete(appID, taskID string) { - ctx.lock.Lock() - defer ctx.lock.Unlock() - ctx.notifyTaskComplete(appID, taskID) -} - func (ctx *Context) notifyTaskComplete(appID, taskID string) { log.Log(log.ShimContext).Debug("NotifyTaskComplete", zap.String("appID", appID), zap.String("taskID", taskID)) - if app := ctx.getApplication(appID); app != nil { + if app := ctx.GetApplication(appID); app != nil { log.Log(log.ShimContext).Debug("release allocation", zap.String("appID", appID), zap.String("taskID", taskID)) @@ -994,10 +959,6 @@ func (ctx *Context) AddApplication(request *AddApplicationRequest) *Application ctx.lock.Lock() defer ctx.lock.Unlock() - return ctx.addApplication(request) -} - -func (ctx *Context) addApplication(request *AddApplicationRequest) *Application { log.Log(log.ShimContext).Debug("AddApplication", zap.Any("Request", request)) if app := ctx.getApplication(request.Metadata.ApplicationID); app != nil { return app @@ -1065,25 +1026,28 @@ func (ctx *Context) getApplication(appID string) *Application { func (ctx *Context) RemoveApplication(appID string) error { ctx.lock.Lock() - defer ctx.lock.Unlock() if app, exist := ctx.applications[appID]; exist { // get the non-terminated task alias nonTerminatedTaskAlias := app.getNonTerminatedTaskAlias() // check there are any non-terminated task or not if len(nonTerminatedTaskAlias) > 0 { - return fmt.Errorf("failed to remove application %s because it still has task in non-terminated task, tasks: %s", appID, strings.Join(nonTerminatedTaskAlias, ",")) + ctx.lock.Unlock() + return fmt.Errorf("failed to remove application %s because it still has task in non-terminated tasks: %s", appID, strings.Join(nonTerminatedTaskAlias, ",")) } + delete(ctx.applications, appID) + ctx.lock.Unlock() // send the update request to scheduler core rr := common.CreateUpdateRequestForRemoveApplication(app.applicationID, app.partition) if err := ctx.apiProvider.GetAPIs().SchedulerAPI.UpdateApplication(rr); err != nil { log.Log(log.ShimContext).Error("failed to send remove application request to core", zap.Error(err)) } - delete(ctx.applications, appID) + log.Log(log.ShimContext).Info("app removed", zap.String("appID", appID)) return nil } + ctx.lock.Unlock() return fmt.Errorf("application %s is not found in the context", appID) } @@ -1099,8 +1063,6 @@ func (ctx *Context) RemoveApplicationInternal(appID string) { // this implements ApplicationManagementProtocol func (ctx *Context) AddTask(request *AddTaskRequest) *Task { - ctx.lock.Lock() - defer ctx.lock.Unlock() return ctx.addTask(request) } @@ -1160,9 +1122,7 @@ func (ctx *Context) RemoveTask(appID, taskID string) { } func (ctx *Context) getTask(appID string, taskID string) *Task { - ctx.lock.RLock() - defer ctx.lock.RUnlock() - app := ctx.getApplication(appID) + app := ctx.GetApplication(appID) if app == nil { log.Log(log.ShimContext).Debug("application is not found in the context", zap.String("appID", appID)) @@ -1684,9 +1644,6 @@ func (ctx *Context) finalizeNodes(existingNodes []*v1.Node) error { nodeMap[node.Name] = node } - ctx.lock.Lock() - defer ctx.lock.Unlock() - // find any existing nodes that no longer exist for _, node := range existingNodes { if _, ok := nodeMap[node.Name]; !ok { diff --git a/pkg/cache/context_test.go b/pkg/cache/context_test.go index c4802d4a3..717c4eb0c 100644 --- a/pkg/cache/context_test.go +++ b/pkg/cache/context_test.go @@ -368,7 +368,7 @@ func TestRemoveApplication(t *testing.T) { assert.Equal(t, len(context.applications), 3) err := context.RemoveApplication(appID1) assert.Assert(t, err != nil) - assert.ErrorContains(t, err, "application app00001 because it still has task in non-terminated task, tasks: /remove-test-00001") + assert.ErrorContains(t, err, "application app00001 because it still has task in non-terminated tasks: /remove-test-00001") app := context.GetApplication(appID1) assert.Assert(t, app != nil) @@ -1103,7 +1103,7 @@ func TestTaskReleaseAfterRecovery(t *testing.T) { assert.Equal(t, len(app.GetBoundTasks()), 2) // release one of the tasks - context.NotifyTaskComplete(appID, pod2UID) + context.notifyTaskComplete(appID, pod2UID) // wait for release err = utils.WaitForCondition(func() bool { @@ -2144,7 +2144,7 @@ func TestTaskRemoveOnCompletion(t *testing.T) { assert.NilError(t, err) // mark completion - context.NotifyTaskComplete(appID, taskUID1) + context.notifyTaskComplete(appID, taskUID1) err = utils.WaitForCondition(func() bool { return task.GetTaskState() == TaskStates().Completed }, 100*time.Millisecond, time.Second) diff --git a/pkg/cache/scheduler_callback_test.go b/pkg/cache/scheduler_callback_test.go index 47d1b146e..59bd875e4 100644 --- a/pkg/cache/scheduler_callback_test.go +++ b/pkg/cache/scheduler_callback_test.go @@ -85,7 +85,6 @@ func TestUpdateAllocation_NewTask_TaskNotFound(t *testing.T) { } func TestUpdateAllocation_NewTask_AssumePodFails(t *testing.T) { - t.Skip("disabled until YUNIKORN-2629 is resolved") // test can randomly trigger a deadlock, resulting in a failed build callback, context := initCallbackTest(t, false, false) defer dispatcher.UnregisterAllEventHandlers() defer dispatcher.Stop() From 7f9901f228e5c9083b63d80b1dd67ae765bf2ed7 Mon Sep 17 00:00:00 2001 From: Craig Condit Date: Fri, 2 Aug 2024 16:53:33 -0500 Subject: [PATCH 3/4] [YUNIKORN-2780] Remove references to SI ExistingAllocations (#886) Now that node registration has been simplified, we no longer need to send ExistingAllocations via the SI. Remove references so these can be removed from the interface. Closes: #886 --- go.mod | 4 ++-- go.sum | 8 ++++---- pkg/cache/context.go | 1 - pkg/common/si_helper.go | 33 --------------------------------- pkg/common/si_helper_test.go | 28 ---------------------------- pkg/shim/scheduler_mock_test.go | 32 +++++++++++++++++++++++++++++++- 6 files changed, 37 insertions(+), 69 deletions(-) diff --git a/go.mod b/go.mod index 9240134aa..349820d38 100644 --- a/go.mod +++ b/go.mod @@ -21,8 +21,8 @@ module github.com/apache/yunikorn-k8shim go 1.21 require ( - github.com/apache/yunikorn-core v0.0.0-20240711165824-d96cd583305b - github.com/apache/yunikorn-scheduler-interface v0.0.0-20240425182941-07f5695119a1 + github.com/apache/yunikorn-core v0.0.0-20240802210614-4aec626c6bf9 + github.com/apache/yunikorn-scheduler-interface v0.0.0-20240731203810-92032b13d586 github.com/google/go-cmp v0.6.0 github.com/google/uuid v1.6.0 github.com/looplab/fsm v1.0.1 diff --git a/go.sum b/go.sum index 0354b1fb5..7408b97bd 100644 --- a/go.sum +++ b/go.sum @@ -9,10 +9,10 @@ github.com/NYTimes/gziphandler v1.1.1 h1:ZUDjpQae29j0ryrS0u/B8HZfJBtBQHjqw2rQ2cq github.com/NYTimes/gziphandler v1.1.1/go.mod h1:n/CVRwUEOgIxrgPvAQhUUr9oeUtvrhMomdKFjzJNB0c= github.com/antlr/antlr4/runtime/Go/antlr/v4 v4.0.0-20230305170008-8188dc5388df h1:7RFfzj4SSt6nnvCPbCqijJi1nWCd+TqAT3bYCStRC18= github.com/antlr/antlr4/runtime/Go/antlr/v4 v4.0.0-20230305170008-8188dc5388df/go.mod h1:pSwJ0fSY5KhvocuWSx4fz3BA8OrA1bQn+K1Eli3BRwM= -github.com/apache/yunikorn-core v0.0.0-20240711165824-d96cd583305b h1:GDizY3dcE+hkfik/+NY3Zdw71A/V4dWGp9Pl6k5Ii2M= -github.com/apache/yunikorn-core v0.0.0-20240711165824-d96cd583305b/go.mod h1:pSi7AFBRiGCGQ7RwQffpD4m6dvA5lc1HuCrg7LpJJqs= -github.com/apache/yunikorn-scheduler-interface v0.0.0-20240425182941-07f5695119a1 h1:v4J9L3MlW8BQfYnbq6FV2l3uyay3SqMS2Ffpo+SFat4= -github.com/apache/yunikorn-scheduler-interface v0.0.0-20240425182941-07f5695119a1/go.mod h1:WuHJpVk34t8N5+1ErYGj/5Qq33/cRzL4YtuoAsbMtWc= +github.com/apache/yunikorn-core v0.0.0-20240802210614-4aec626c6bf9 h1:s1Co/K+cR9Q/GW0e974dToW9eyLQZxYoCp0TCoEuEj0= +github.com/apache/yunikorn-core v0.0.0-20240802210614-4aec626c6bf9/go.mod h1:S9yGBGA2i2hAajtEc2t4lmiPJDZz3Ek8eVxz5KhJqGI= +github.com/apache/yunikorn-scheduler-interface v0.0.0-20240731203810-92032b13d586 h1:ZVpo9Qj2/gvwX6Rl44UxkZBm2pZWEJDYWTramc9hwF0= +github.com/apache/yunikorn-scheduler-interface v0.0.0-20240731203810-92032b13d586/go.mod h1:WuHJpVk34t8N5+1ErYGj/5Qq33/cRzL4YtuoAsbMtWc= github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio= github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs= github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a h1:idn718Q4B6AGu/h5Sxe66HYVdqdGu2l9Iebqhi/AEoA= diff --git a/pkg/cache/context.go b/pkg/cache/context.go index 0c0dd30ab..1751ec89f 100644 --- a/pkg/cache/context.go +++ b/pkg/cache/context.go @@ -1530,7 +1530,6 @@ func (ctx *Context) registerNodes(nodes []*v1.Node) ([]*v1.Node, error) { }, SchedulableResource: common.GetNodeResource(&nodeStatus), OccupiedResource: common.NewResourceBuilder().Build(), - ExistingAllocations: make([]*si.Allocation, 0), }) pendingNodes[node.Name] = node } diff --git a/pkg/common/si_helper.go b/pkg/common/si_helper.go index 9c4520a07..c3c1a7796 100644 --- a/pkg/common/si_helper.go +++ b/pkg/common/si_helper.go @@ -153,39 +153,6 @@ func CreateReleaseRequestForTask(appID, taskID, allocationKey, partition, termin } } -// CreateUpdateRequestForNewNode builds a NodeRequest for new node addition and restoring existing node -func CreateUpdateRequestForNewNode(nodeID string, nodeLabels map[string]string, capacity *si.Resource, occupied *si.Resource, - existingAllocations []*si.Allocation) *si.NodeRequest { - // Use node's name as the NodeID, this is because when bind pod to node, - // name of node is required but uid is optional. - nodeInfo := &si.NodeInfo{ - NodeID: nodeID, - SchedulableResource: capacity, - OccupiedResource: occupied, - Attributes: map[string]string{ - constants.DefaultNodeAttributeHostNameKey: nodeID, - constants.DefaultNodeAttributeRackNameKey: constants.DefaultRackName, - }, - ExistingAllocations: existingAllocations, - Action: si.NodeInfo_CREATE, - } - - // Add nodeLabels key value to Attributes map - for k, v := range nodeLabels { - nodeInfo.Attributes[k] = v - } - - // Add instanceType to Attributes map - nodeInfo.Attributes[common.InstanceType] = nodeLabels[conf.GetSchedulerConf().InstanceTypeNodeLabelKey] - - nodes := make([]*si.NodeInfo, 1) - nodes[0] = nodeInfo - return &si.NodeRequest{ - Nodes: nodes, - RmID: conf.GetSchedulerConf().ClusterID, - } -} - // CreateUpdateRequestForUpdatedNode builds a NodeRequest for capacity and occupied resource updates func CreateUpdateRequestForUpdatedNode(nodeID string, capacity *si.Resource, occupied *si.Resource) *si.NodeRequest { nodeInfo := &si.NodeInfo{ diff --git a/pkg/common/si_helper_test.go b/pkg/common/si_helper_test.go index 63c603f7d..92f5e2407 100644 --- a/pkg/common/si_helper_test.go +++ b/pkg/common/si_helper_test.go @@ -24,7 +24,6 @@ import ( v1 "k8s.io/api/core/v1" apis "k8s.io/apimachinery/pkg/apis/meta/v1" - "github.com/apache/yunikorn-k8shim/pkg/common/constants" "github.com/apache/yunikorn-scheduler-interface/lib/go/common" "github.com/apache/yunikorn-scheduler-interface/lib/go/si" ) @@ -230,33 +229,6 @@ func TestCreateTagsForTask(t *testing.T) { assert.Equal(t, len(result4), 4) } -func TestCreateUpdateRequestForNewNode(t *testing.T) { - capacity := NewResourceBuilder().AddResource(common.Memory, 200).AddResource(common.CPU, 2).Build() - occupied := NewResourceBuilder().AddResource(common.Memory, 50).AddResource(common.CPU, 1).Build() - var existingAllocations []*si.Allocation - nodeLabels := map[string]string{ - "label1": "key1", - "label2": "key2", - "node.kubernetes.io/instance-type": "HighMem", - } - request := CreateUpdateRequestForNewNode(nodeID, nodeLabels, capacity, occupied, existingAllocations) - assert.Equal(t, len(request.Nodes), 1) - assert.Equal(t, request.Nodes[0].NodeID, nodeID) - assert.Equal(t, request.Nodes[0].SchedulableResource, capacity) - assert.Equal(t, request.Nodes[0].OccupiedResource, occupied) - assert.Equal(t, len(request.Nodes[0].Attributes), 6) - assert.Equal(t, request.Nodes[0].Attributes[constants.DefaultNodeAttributeHostNameKey], nodeID) - assert.Equal(t, request.Nodes[0].Attributes[constants.DefaultNodeAttributeRackNameKey], constants.DefaultRackName) - - // Make sure include nodeLabel - assert.Equal(t, request.Nodes[0].Attributes["label1"], "key1") - assert.Equal(t, request.Nodes[0].Attributes["label2"], "key2") - assert.Equal(t, request.Nodes[0].Attributes["node.kubernetes.io/instance-type"], "HighMem") - - // Make sure include the instanceType - assert.Equal(t, request.Nodes[0].Attributes[common.InstanceType], "HighMem") -} - func TestCreateUpdateRequestForUpdatedNode(t *testing.T) { capacity := NewResourceBuilder().AddResource(common.Memory, 200).AddResource(common.CPU, 2).Build() occupied := NewResourceBuilder().AddResource(common.Memory, 50).AddResource(common.CPU, 1).Build() diff --git a/pkg/shim/scheduler_mock_test.go b/pkg/shim/scheduler_mock_test.go index 1e8f19106..b67746d72 100644 --- a/pkg/shim/scheduler_mock_test.go +++ b/pkg/shim/scheduler_mock_test.go @@ -36,6 +36,7 @@ import ( "github.com/apache/yunikorn-k8shim/pkg/cache" "github.com/apache/yunikorn-k8shim/pkg/client" "github.com/apache/yunikorn-k8shim/pkg/common" + "github.com/apache/yunikorn-k8shim/pkg/common/constants" "github.com/apache/yunikorn-k8shim/pkg/common/events" "github.com/apache/yunikorn-k8shim/pkg/common/utils" "github.com/apache/yunikorn-k8shim/pkg/conf" @@ -124,7 +125,7 @@ func (fc *MockScheduler) addNode(nodeName string, nodeLabels map[string]string, AddResource(siCommon.CPU, cpu). AddResource("pods", pods). Build() - request := common.CreateUpdateRequestForNewNode(nodeName, nodeLabels, nodeResource, nil, nil) + request := createUpdateRequestForNewNode(nodeName, nodeLabels, nodeResource, nil) fmt.Printf("report new nodes to scheduler, request: %s", request.String()) return fc.apiProvider.GetAPIs().SchedulerAPI.UpdateNode(request) } @@ -335,3 +336,32 @@ func (fc *MockScheduler) ensureStarted() { panic("mock scheduler is not started - call start() first") } } +func createUpdateRequestForNewNode(nodeID string, nodeLabels map[string]string, capacity *si.Resource, occupied *si.Resource) *si.NodeRequest { + // Use node's name as the NodeID, this is because when bind pod to node, + // name of node is required but uid is optional. + nodeInfo := &si.NodeInfo{ + NodeID: nodeID, + SchedulableResource: capacity, + OccupiedResource: occupied, + Attributes: map[string]string{ + constants.DefaultNodeAttributeHostNameKey: nodeID, + constants.DefaultNodeAttributeRackNameKey: constants.DefaultRackName, + }, + Action: si.NodeInfo_CREATE, + } + + // Add nodeLabels key value to Attributes map + for k, v := range nodeLabels { + nodeInfo.Attributes[k] = v + } + + // Add instanceType to Attributes map + nodeInfo.Attributes[siCommon.InstanceType] = nodeLabels[conf.GetSchedulerConf().InstanceTypeNodeLabelKey] + + nodes := make([]*si.NodeInfo, 1) + nodes[0] = nodeInfo + return &si.NodeRequest{ + Nodes: nodes, + RmID: conf.GetSchedulerConf().ClusterID, + } +} From ba192d4f7ae782424a8df6661784dad425966b7a Mon Sep 17 00:00:00 2001 From: Tzu-Hua Lan Date: Wed, 7 Aug 2024 16:46:52 +0800 Subject: [PATCH 4/4] [YUNIKORN-2782] Cleanup dead code in cache/context (#888) Closes: #888 Signed-off-by: Chia-Ping Tsai --- pkg/cache/context.go | 35 +-------------- pkg/cache/context_test.go | 75 ++------------------------------- pkg/cache/scheduler_callback.go | 2 +- pkg/shim/scheduler.go | 2 +- 4 files changed, 6 insertions(+), 108 deletions(-) diff --git a/pkg/cache/context.go b/pkg/cache/context.go index 1751ec89f..211bf0e9b 100644 --- a/pkg/cache/context.go +++ b/pkg/cache/context.go @@ -845,12 +845,6 @@ func (ctx *Context) ForgetPod(name string) { log.Log(log.ShimContext).Debug("unable to forget pod: not found in cache", zap.String("pod", name)) } -func (ctx *Context) UpdateApplication(app *Application) { - ctx.lock.Lock() - defer ctx.lock.Unlock() - ctx.applications[app.applicationID] = app -} - // IsTaskMaybeSchedulable returns true if a task might be currently able to be scheduled. This uses a bloom filter // cached from a set of taskIDs to perform efficient negative lookups. func (ctx *Context) IsTaskMaybeSchedulable(taskID string) bool { @@ -1024,34 +1018,7 @@ func (ctx *Context) getApplication(appID string) *Application { return nil } -func (ctx *Context) RemoveApplication(appID string) error { - ctx.lock.Lock() - if app, exist := ctx.applications[appID]; exist { - // get the non-terminated task alias - nonTerminatedTaskAlias := app.getNonTerminatedTaskAlias() - // check there are any non-terminated task or not - if len(nonTerminatedTaskAlias) > 0 { - ctx.lock.Unlock() - return fmt.Errorf("failed to remove application %s because it still has task in non-terminated tasks: %s", appID, strings.Join(nonTerminatedTaskAlias, ",")) - } - delete(ctx.applications, appID) - ctx.lock.Unlock() - // send the update request to scheduler core - rr := common.CreateUpdateRequestForRemoveApplication(app.applicationID, app.partition) - if err := ctx.apiProvider.GetAPIs().SchedulerAPI.UpdateApplication(rr); err != nil { - log.Log(log.ShimContext).Error("failed to send remove application request to core", zap.Error(err)) - } - - log.Log(log.ShimContext).Info("app removed", - zap.String("appID", appID)) - - return nil - } - ctx.lock.Unlock() - return fmt.Errorf("application %s is not found in the context", appID) -} - -func (ctx *Context) RemoveApplicationInternal(appID string) { +func (ctx *Context) RemoveApplication(appID string) { ctx.lock.Lock() defer ctx.lock.Unlock() if _, exist := ctx.applications[appID]; !exist { diff --git a/pkg/cache/context_test.go b/pkg/cache/context_test.go index 717c4eb0c..ba661c39b 100644 --- a/pkg/cache/context_test.go +++ b/pkg/cache/context_test.go @@ -324,75 +324,6 @@ func TestGetApplication(t *testing.T) { } func TestRemoveApplication(t *testing.T) { - // add 3 applications - context := initContextForTest() - app1 := NewApplication(appID1, queueNameA, testUser, testGroups, map[string]string{}, newMockSchedulerAPI()) - app2 := NewApplication(appID2, queueNameB, testUser, testGroups, map[string]string{}, newMockSchedulerAPI()) - app3 := NewApplication(appID3, queueNameC, testUser, testGroups, map[string]string{}, newMockSchedulerAPI()) - context.applications[appID1] = app1 - context.applications[appID2] = app2 - context.applications[appID3] = app3 - pod1 := &v1.Pod{ - TypeMeta: apis.TypeMeta{ - Kind: "Pod", - APIVersion: "v1", - }, - ObjectMeta: apis.ObjectMeta{ - Name: "remove-test-00001", - UID: uid1, - }, - } - pod2 := &v1.Pod{ - TypeMeta: apis.TypeMeta{ - Kind: "Pod", - APIVersion: "v1", - }, - ObjectMeta: apis.ObjectMeta{ - Name: "remove-test-00002", - UID: uid2, - }, - } - // New task to application 1 - // set task state in Pending (non-terminated) - task1 := NewTask(taskUID1, app1, context, pod1) - app1.taskMap[taskUID1] = task1 - task1.sm.SetState(TaskStates().Pending) - // New task to application 2 - // set task state in Failed (terminated) - task2 := NewTask(taskUID2, app2, context, pod2) - app2.taskMap[taskUID2] = task2 - task2.sm.SetState(TaskStates().Failed) - - // remove application 1 which have non-terminated task - // this should fail - assert.Equal(t, len(context.applications), 3) - err := context.RemoveApplication(appID1) - assert.Assert(t, err != nil) - assert.ErrorContains(t, err, "application app00001 because it still has task in non-terminated tasks: /remove-test-00001") - - app := context.GetApplication(appID1) - assert.Assert(t, app != nil) - - // remove application 2 which have terminated task - // this should be successful - err = context.RemoveApplication(appID2) - assert.Assert(t, err == nil) - - app = context.GetApplication(appID2) - assert.Assert(t, app == nil) - - // try remove again - // this should fail - err = context.RemoveApplication(appID2) - assert.Assert(t, err != nil) - assert.ErrorContains(t, err, "application app00002 is not found in the context") - - // make sure the other app is not affected - app = context.GetApplication(appID3) - assert.Assert(t, app != nil) -} - -func TestRemoveApplicationInternal(t *testing.T) { context := initContextForTest() app1 := NewApplication(appID1, queueNameA, testUser, testGroups, map[string]string{}, newMockSchedulerAPI()) app2 := NewApplication(appID2, queueNameB, testUser, testGroups, map[string]string{}, newMockSchedulerAPI()) @@ -401,17 +332,17 @@ func TestRemoveApplicationInternal(t *testing.T) { assert.Equal(t, len(context.applications), 2) // remove non-exist app - context.RemoveApplicationInternal(appID3) + context.RemoveApplication(appID3) assert.Equal(t, len(context.applications), 2) // remove app1 - context.RemoveApplicationInternal(appID1) + context.RemoveApplication(appID1) assert.Equal(t, len(context.applications), 1) _, ok := context.applications[appID1] assert.Equal(t, ok, false) // remove app2 - context.RemoveApplicationInternal(appID2) + context.RemoveApplication(appID2) assert.Equal(t, len(context.applications), 0) _, ok = context.applications[appID2] assert.Equal(t, ok, false) diff --git a/pkg/cache/scheduler_callback.go b/pkg/cache/scheduler_callback.go index 8ed487c40..728212bfa 100644 --- a/pkg/cache/scheduler_callback.go +++ b/pkg/cache/scheduler_callback.go @@ -158,7 +158,7 @@ func (callback *AsyncRMCallback) UpdateApplication(response *si.ApplicationRespo zap.String("new status", updated.State)) switch updated.State { case ApplicationStates().Completed: - callback.context.RemoveApplicationInternal(updated.ApplicationID) + callback.context.RemoveApplication(updated.ApplicationID) case ApplicationStates().Resuming: app := callback.context.GetApplication(updated.ApplicationID) if app != nil && app.GetApplicationState() == ApplicationStates().Reserving { diff --git a/pkg/shim/scheduler.go b/pkg/shim/scheduler.go index 3e78d7ffd..ebf3fb118 100644 --- a/pkg/shim/scheduler.go +++ b/pkg/shim/scheduler.go @@ -160,7 +160,7 @@ func (ss *KubernetesShim) schedule() { for _, app := range apps { if app.GetApplicationState() == cache.ApplicationStates().Failed { if app.AreAllTasksTerminated() { - ss.context.RemoveApplicationInternal(app.GetApplicationID()) + ss.context.RemoveApplication(app.GetApplicationID()) } continue }