Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into YUNIKORN-2724
Browse files Browse the repository at this point in the history
  • Loading branch information
ryankert01 committed Aug 8, 2024
2 parents d74d0dc + ba192d4 commit 7b48895
Show file tree
Hide file tree
Showing 13 changed files with 141 additions and 216 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ module github.com/apache/yunikorn-k8shim
go 1.21

require (
github.com/apache/yunikorn-core v0.0.0-20240711165824-d96cd583305b
github.com/apache/yunikorn-scheduler-interface v0.0.0-20240425182941-07f5695119a1
github.com/apache/yunikorn-core v0.0.0-20240802210614-4aec626c6bf9
github.com/apache/yunikorn-scheduler-interface v0.0.0-20240731203810-92032b13d586
github.com/google/go-cmp v0.6.0
github.com/google/uuid v1.6.0
github.com/looplab/fsm v1.0.1
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ github.com/NYTimes/gziphandler v1.1.1 h1:ZUDjpQae29j0ryrS0u/B8HZfJBtBQHjqw2rQ2cq
github.com/NYTimes/gziphandler v1.1.1/go.mod h1:n/CVRwUEOgIxrgPvAQhUUr9oeUtvrhMomdKFjzJNB0c=
github.com/antlr/antlr4/runtime/Go/antlr/v4 v4.0.0-20230305170008-8188dc5388df h1:7RFfzj4SSt6nnvCPbCqijJi1nWCd+TqAT3bYCStRC18=
github.com/antlr/antlr4/runtime/Go/antlr/v4 v4.0.0-20230305170008-8188dc5388df/go.mod h1:pSwJ0fSY5KhvocuWSx4fz3BA8OrA1bQn+K1Eli3BRwM=
github.com/apache/yunikorn-core v0.0.0-20240711165824-d96cd583305b h1:GDizY3dcE+hkfik/+NY3Zdw71A/V4dWGp9Pl6k5Ii2M=
github.com/apache/yunikorn-core v0.0.0-20240711165824-d96cd583305b/go.mod h1:pSi7AFBRiGCGQ7RwQffpD4m6dvA5lc1HuCrg7LpJJqs=
github.com/apache/yunikorn-scheduler-interface v0.0.0-20240425182941-07f5695119a1 h1:v4J9L3MlW8BQfYnbq6FV2l3uyay3SqMS2Ffpo+SFat4=
github.com/apache/yunikorn-scheduler-interface v0.0.0-20240425182941-07f5695119a1/go.mod h1:WuHJpVk34t8N5+1ErYGj/5Qq33/cRzL4YtuoAsbMtWc=
github.com/apache/yunikorn-core v0.0.0-20240802210614-4aec626c6bf9 h1:s1Co/K+cR9Q/GW0e974dToW9eyLQZxYoCp0TCoEuEj0=
github.com/apache/yunikorn-core v0.0.0-20240802210614-4aec626c6bf9/go.mod h1:S9yGBGA2i2hAajtEc2t4lmiPJDZz3Ek8eVxz5KhJqGI=
github.com/apache/yunikorn-scheduler-interface v0.0.0-20240731203810-92032b13d586 h1:ZVpo9Qj2/gvwX6Rl44UxkZBm2pZWEJDYWTramc9hwF0=
github.com/apache/yunikorn-scheduler-interface v0.0.0-20240731203810-92032b13d586/go.mod h1:WuHJpVk34t8N5+1ErYGj/5Qq33/cRzL4YtuoAsbMtWc=
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio=
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs=
github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a h1:idn718Q4B6AGu/h5Sxe66HYVdqdGu2l9Iebqhi/AEoA=
Expand Down
94 changes: 21 additions & 73 deletions pkg/cache/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,6 @@ func (ctx *Context) addNode(obj interface{}) {
}

func (ctx *Context) updateNode(_, obj interface{}) {
ctx.lock.Lock()
defer ctx.lock.Unlock()
node, err := convertToNode(obj)
if err != nil {
log.Log(log.ShimContext).Error("node conversion failed", zap.Error(err))
Expand Down Expand Up @@ -229,8 +227,6 @@ func (ctx *Context) updateNodeInternal(node *v1.Node, register bool) {
}

func (ctx *Context) deleteNode(obj interface{}) {
ctx.lock.Lock()
defer ctx.lock.Unlock()
var node *v1.Node
switch t := obj.(type) {
case *v1.Node:
Expand All @@ -250,9 +246,6 @@ func (ctx *Context) deleteNode(obj interface{}) {
}

func (ctx *Context) addNodesWithoutRegistering(nodes []*v1.Node) {
ctx.lock.Lock()
defer ctx.lock.Unlock()

for _, node := range nodes {
ctx.updateNodeInternal(node, false)
}
Expand Down Expand Up @@ -288,9 +281,6 @@ func (ctx *Context) AddPod(obj interface{}) {
}

func (ctx *Context) UpdatePod(_, newObj interface{}) {
ctx.lock.Lock()
defer ctx.lock.Unlock()

pod, err := utils.Convert2Pod(newObj)
if err != nil {
log.Log(log.ShimContext).Error("failed to update pod", zap.Error(err))
Expand Down Expand Up @@ -444,11 +434,11 @@ func (ctx *Context) DeletePod(obj interface{}) {
}

func (ctx *Context) deleteYuniKornPod(pod *v1.Pod) {
ctx.lock.Lock()
defer ctx.lock.Unlock()
if taskMeta, ok := getTaskMetadata(pod); ok {
if app := ctx.getApplication(taskMeta.ApplicationID); app != nil {
ctx.notifyTaskComplete(app, taskMeta.TaskID)
if app := ctx.GetApplication(taskMeta.ApplicationID); app != nil {
ctx.notifyTaskComplete(taskMeta.ApplicationID, taskMeta.TaskID)
}
}

Expand All @@ -457,9 +447,6 @@ func (ctx *Context) deleteYuniKornPod(pod *v1.Pod) {
}

func (ctx *Context) deleteForeignPod(pod *v1.Pod) {
ctx.lock.Lock()
defer ctx.lock.Unlock()

oldPod := ctx.schedulerCache.GetPod(string(pod.UID))
if oldPod == nil {
// if pod is not in scheduler cache, no node updates are needed
Expand Down Expand Up @@ -590,8 +577,6 @@ func (ctx *Context) addPriorityClass(obj interface{}) {
}

func (ctx *Context) updatePriorityClass(_, newObj interface{}) {
ctx.lock.Lock()
defer ctx.lock.Unlock()
if priorityClass := utils.Convert2PriorityClass(newObj); priorityClass != nil {
ctx.updatePriorityClassInternal(priorityClass)
}
Expand All @@ -602,9 +587,6 @@ func (ctx *Context) updatePriorityClassInternal(priorityClass *schedulingv1.Prio
}

func (ctx *Context) deletePriorityClass(obj interface{}) {
ctx.lock.Lock()
defer ctx.lock.Unlock()

log.Log(log.ShimContext).Debug("priorityClass deleted")
var priorityClass *schedulingv1.PriorityClass
switch t := obj.(type) {
Expand Down Expand Up @@ -670,8 +652,6 @@ func (ctx *Context) EventsToRegister(queueingHintFn framework.QueueingHintFn) []

// IsPodFitNode evaluates given predicates based on current context
func (ctx *Context) IsPodFitNode(name, node string, allocate bool) error {
ctx.lock.RLock()
defer ctx.lock.RUnlock()
pod := ctx.schedulerCache.GetPod(name)
if pod == nil {
return ErrorPodNotFound
Expand All @@ -692,8 +672,6 @@ func (ctx *Context) IsPodFitNode(name, node string, allocate bool) error {
}

func (ctx *Context) IsPodFitNodeViaPreemption(name, node string, allocations []string, startIndex int) (int, bool) {
ctx.lock.RLock()
defer ctx.lock.RUnlock()
if pod := ctx.schedulerCache.GetPod(name); pod != nil {
// if pod exists in cache, try to run predicates
if targetNode := ctx.schedulerCache.GetNode(node); targetNode != nil {
Expand Down Expand Up @@ -802,8 +780,6 @@ func (ctx *Context) bindPodVolumes(pod *v1.Pod) error {
// this way, the core can make allocation decisions with consideration of
// other assumed pods before they are actually bound to the node (bound is slow).
func (ctx *Context) AssumePod(name, node string) error {
ctx.lock.Lock()
defer ctx.lock.Unlock()
if pod := ctx.schedulerCache.GetPod(name); pod != nil {
// when add assumed pod, we make a copy of the pod to avoid
// modifying its original reference. otherwise, it may have
Expand Down Expand Up @@ -863,9 +839,6 @@ func (ctx *Context) AssumePod(name, node string) error {
// forget pod must be called when a pod is assumed to be running on a node,
// but then for some reason it is failed to bind or released.
func (ctx *Context) ForgetPod(name string) {
ctx.lock.Lock()
defer ctx.lock.Unlock()

if pod := ctx.schedulerCache.GetPod(name); pod != nil {
log.Log(log.ShimContext).Debug("forget pod", zap.String("pod", pod.Name))
ctx.schedulerCache.ForgetPod(pod)
Expand All @@ -874,12 +847,6 @@ func (ctx *Context) ForgetPod(name string) {
log.Log(log.ShimContext).Debug("unable to forget pod: not found in cache", zap.String("pod", name))
}

func (ctx *Context) UpdateApplication(app *Application) {
ctx.lock.Lock()
defer ctx.lock.Unlock()
ctx.applications[app.applicationID] = app
}

// IsTaskMaybeSchedulable returns true if a task might be currently able to be scheduled. This uses a bloom filter
// cached from a set of taskIDs to perform efficient negative lookups.
func (ctx *Context) IsTaskMaybeSchedulable(taskID string) bool {
Expand Down Expand Up @@ -922,13 +889,24 @@ func (ctx *Context) notifyTaskComplete(app *Application, taskID string) {
dispatcher.Dispatch(ev)
if app.GetApplicationState() == ApplicationStates().Resuming {
dispatcher.Dispatch(NewSimpleApplicationEvent(app.applicationID, AppTaskCompleted))
if app := ctx.GetApplication(appID); app != nil {
log.Log(log.ShimContext).Debug("release allocation",
zap.String("appID", appID),
zap.String("taskID", taskID))
ev := NewSimpleTaskEvent(appID, taskID, CompleteTask)
dispatcher.Dispatch(ev)
if app.GetApplicationState() == ApplicationStates().Resuming {
dispatcher.Dispatch(NewSimpleApplicationEvent(appID, AppTaskCompleted))
}
}
}

// update application tags in the AddApplicationRequest based on the namespace annotation
// adds the following tags to the request based on annotations (if exist):
// - namespace.resourcequota
// - namespace.parentqueue
// - namespace.resourceguaranteed
// - namespace.resourcemaxapps
func (ctx *Context) updateApplicationTags(request *AddApplicationRequest, namespace string) {
namespaceObj := ctx.getNamespaceObject(namespace)
if namespaceObj == nil {
Expand All @@ -950,6 +928,12 @@ func (ctx *Context) updateApplicationTags(request *AddApplicationRequest, namesp
}
}

// add maxApps resource info as an app tag
maxApps := utils.GetNamespaceMaxAppsFromAnnotation(namespaceObj)
if maxApps != "" {
request.Metadata.Tags[siCommon.AppTagNamespaceResourceMaxApps] = maxApps
}

// add parent queue info as an app tag
parentQueue := utils.GetNameSpaceAnnotationValue(namespaceObj, constants.AnnotationParentQueue)
if parentQueue != "" {
Expand Down Expand Up @@ -981,10 +965,6 @@ func (ctx *Context) AddApplication(request *AddApplicationRequest) *Application
ctx.lock.Lock()
defer ctx.lock.Unlock()

return ctx.addApplication(request)
}

func (ctx *Context) addApplication(request *AddApplicationRequest) *Application {
log.Log(log.ShimContext).Debug("AddApplication", zap.Any("Request", request))
if app := ctx.getApplication(request.Metadata.ApplicationID); app != nil {
return app
Expand Down Expand Up @@ -1050,31 +1030,7 @@ func (ctx *Context) getApplication(appID string) *Application {
return nil
}

func (ctx *Context) RemoveApplication(appID string) error {
ctx.lock.Lock()
defer ctx.lock.Unlock()
if app, exist := ctx.applications[appID]; exist {
// get the non-terminated task alias
nonTerminatedTaskAlias := app.getNonTerminatedTaskAlias()
// check there are any non-terminated task or not
if len(nonTerminatedTaskAlias) > 0 {
return fmt.Errorf("failed to remove application %s because it still has task in non-terminated task, tasks: %s", appID, strings.Join(nonTerminatedTaskAlias, ","))
}
// send the update request to scheduler core
rr := common.CreateUpdateRequestForRemoveApplication(app.applicationID, app.partition)
if err := ctx.apiProvider.GetAPIs().SchedulerAPI.UpdateApplication(rr); err != nil {
log.Log(log.ShimContext).Error("failed to send remove application request to core", zap.Error(err))
}
delete(ctx.applications, appID)
log.Log(log.ShimContext).Info("app removed",
zap.String("appID", appID))

return nil
}
return fmt.Errorf("application %s is not found in the context", appID)
}

func (ctx *Context) RemoveApplicationInternal(appID string) {
func (ctx *Context) RemoveApplication(appID string) {
ctx.lock.Lock()
defer ctx.lock.Unlock()
if _, exist := ctx.applications[appID]; !exist {
Expand All @@ -1086,8 +1042,6 @@ func (ctx *Context) RemoveApplicationInternal(appID string) {

// this implements ApplicationManagementProtocol
func (ctx *Context) AddTask(request *AddTaskRequest) *Task {
ctx.lock.Lock()
defer ctx.lock.Unlock()
return ctx.addTask(request)
}

Expand Down Expand Up @@ -1147,9 +1101,7 @@ func (ctx *Context) RemoveTask(appID, taskID string) {
}

func (ctx *Context) getTask(appID string, taskID string) *Task {
ctx.lock.RLock()
defer ctx.lock.RUnlock()
app := ctx.getApplication(appID)
app := ctx.GetApplication(appID)
if app == nil {
log.Log(log.ShimContext).Debug("application is not found in the context",
zap.String("appID", appID))
Expand Down Expand Up @@ -1557,7 +1509,6 @@ func (ctx *Context) registerNodes(nodes []*v1.Node) ([]*v1.Node, error) {
},
SchedulableResource: common.GetNodeResource(&nodeStatus),
OccupiedResource: common.NewResourceBuilder().Build(),
ExistingAllocations: make([]*si.Allocation, 0),
})
pendingNodes[node.Name] = node
}
Expand Down Expand Up @@ -1671,9 +1622,6 @@ func (ctx *Context) finalizeNodes(existingNodes []*v1.Node) error {
nodeMap[node.Name] = node
}

ctx.lock.Lock()
defer ctx.lock.Unlock()

// find any existing nodes that no longer exist
for _, node := range existingNodes {
if _, ok := nodeMap[node.Name]; !ok {
Expand Down
82 changes: 10 additions & 72 deletions pkg/cache/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,75 +324,6 @@ func TestGetApplication(t *testing.T) {
}

func TestRemoveApplication(t *testing.T) {
// add 3 applications
context := initContextForTest()
app1 := NewApplication(appID1, queueNameA, testUser, testGroups, map[string]string{}, newMockSchedulerAPI())
app2 := NewApplication(appID2, queueNameB, testUser, testGroups, map[string]string{}, newMockSchedulerAPI())
app3 := NewApplication(appID3, queueNameC, testUser, testGroups, map[string]string{}, newMockSchedulerAPI())
context.applications[appID1] = app1
context.applications[appID2] = app2
context.applications[appID3] = app3
pod1 := &v1.Pod{
TypeMeta: apis.TypeMeta{
Kind: "Pod",
APIVersion: "v1",
},
ObjectMeta: apis.ObjectMeta{
Name: "remove-test-00001",
UID: uid1,
},
}
pod2 := &v1.Pod{
TypeMeta: apis.TypeMeta{
Kind: "Pod",
APIVersion: "v1",
},
ObjectMeta: apis.ObjectMeta{
Name: "remove-test-00002",
UID: uid2,
},
}
// New task to application 1
// set task state in Pending (non-terminated)
task1 := NewTask(taskUID1, app1, context, pod1)
app1.taskMap[taskUID1] = task1
task1.sm.SetState(TaskStates().Pending)
// New task to application 2
// set task state in Failed (terminated)
task2 := NewTask(taskUID2, app2, context, pod2)
app2.taskMap[taskUID2] = task2
task2.sm.SetState(TaskStates().Failed)

// remove application 1 which have non-terminated task
// this should fail
assert.Equal(t, len(context.applications), 3)
err := context.RemoveApplication(appID1)
assert.Assert(t, err != nil)
assert.ErrorContains(t, err, "application app00001 because it still has task in non-terminated task, tasks: /remove-test-00001")

app := context.GetApplication(appID1)
assert.Assert(t, app != nil)

// remove application 2 which have terminated task
// this should be successful
err = context.RemoveApplication(appID2)
assert.Assert(t, err == nil)

app = context.GetApplication(appID2)
assert.Assert(t, app == nil)

// try remove again
// this should fail
err = context.RemoveApplication(appID2)
assert.Assert(t, err != nil)
assert.ErrorContains(t, err, "application app00002 is not found in the context")

// make sure the other app is not affected
app = context.GetApplication(appID3)
assert.Assert(t, app != nil)
}

func TestRemoveApplicationInternal(t *testing.T) {
context := initContextForTest()
app1 := NewApplication(appID1, queueNameA, testUser, testGroups, map[string]string{}, newMockSchedulerAPI())
app2 := NewApplication(appID2, queueNameB, testUser, testGroups, map[string]string{}, newMockSchedulerAPI())
Expand All @@ -401,17 +332,17 @@ func TestRemoveApplicationInternal(t *testing.T) {
assert.Equal(t, len(context.applications), 2)

// remove non-exist app
context.RemoveApplicationInternal(appID3)
context.RemoveApplication(appID3)
assert.Equal(t, len(context.applications), 2)

// remove app1
context.RemoveApplicationInternal(appID1)
context.RemoveApplication(appID1)
assert.Equal(t, len(context.applications), 1)
_, ok := context.applications[appID1]
assert.Equal(t, ok, false)

// remove app2
context.RemoveApplicationInternal(appID2)
context.RemoveApplication(appID2)
assert.Equal(t, len(context.applications), 0)
_, ok = context.applications[appID2]
assert.Equal(t, ok, false)
Expand Down Expand Up @@ -1522,6 +1453,7 @@ func TestAddApplicationsWithTags(t *testing.T) {
constants.NamespaceQuota: "{\"cpu\": \"1\", \"memory\": \"256M\", \"nvidia.com/gpu\": \"1\"}",
constants.DomainYuniKorn + "parentqueue": "root.test",
constants.NamespaceGuaranteed: "{\"cpu\": \"1\", \"memory\": \"256M\", \"nvidia.com/gpu\": \"1\"}",
constants.NamespaceMaxApps: "1000",
},
},
}
Expand Down Expand Up @@ -1610,6 +1542,12 @@ func TestAddApplicationsWithTags(t *testing.T) {
t.Fatalf("resource parsing failed")
}

maxApps, ok := request.Metadata.Tags[siCommon.AppTagNamespaceResourceMaxApps]
if !ok {
t.Fatalf("max apps tag is not updated from the namespace")
}
assert.Equal(t, maxApps, "1000")

parentQueue, ok := request.Metadata.Tags[constants.AppTagNamespaceParentQueue]
if !ok {
t.Fatalf("parent queue tag is not updated from the namespace")
Expand Down
Loading

0 comments on commit 7b48895

Please sign in to comment.