From 57ab4d59d43ab357e4a677b8973c70d5b151c7e4 Mon Sep 17 00:00:00 2001 From: Craig Condit Date: Tue, 2 Apr 2024 18:00:10 -0500 Subject: [PATCH] [YUNIKORN-2530] Remove node readiness logic from shim (#812) Closes: #812 Signed-off-by: Craig Condit --- go.mod | 4 ++-- go.sum | 8 +++---- pkg/cache/context.go | 39 ++++++--------------------------- pkg/cache/context_test.go | 17 +++++--------- pkg/common/si_helper.go | 14 +++++------- pkg/common/si_helper_test.go | 13 ++++------- pkg/shim/scheduler_mock_test.go | 2 +- 7 files changed, 28 insertions(+), 69 deletions(-) diff --git a/go.mod b/go.mod index 040e248e2..9498a2484 100644 --- a/go.mod +++ b/go.mod @@ -21,8 +21,8 @@ module github.com/apache/yunikorn-k8shim go 1.21 require ( - github.com/apache/yunikorn-core v0.0.0-20240320142832-09e5d741b67c - github.com/apache/yunikorn-scheduler-interface v0.0.0-20240325175743-cc087bb5a191 + github.com/apache/yunikorn-core v0.0.0-20240402212227-bdf109b5432c + github.com/apache/yunikorn-scheduler-interface v0.0.0-20240402211642-e7421a4261fd github.com/google/go-cmp v0.6.0 github.com/google/uuid v1.6.0 github.com/looplab/fsm v1.0.1 diff --git a/go.sum b/go.sum index ae34d7e7b..6c2622fb8 100644 --- a/go.sum +++ b/go.sum @@ -9,10 +9,10 @@ github.com/NYTimes/gziphandler v1.1.1 h1:ZUDjpQae29j0ryrS0u/B8HZfJBtBQHjqw2rQ2cq github.com/NYTimes/gziphandler v1.1.1/go.mod h1:n/CVRwUEOgIxrgPvAQhUUr9oeUtvrhMomdKFjzJNB0c= github.com/antlr/antlr4/runtime/Go/antlr/v4 v4.0.0-20230305170008-8188dc5388df h1:7RFfzj4SSt6nnvCPbCqijJi1nWCd+TqAT3bYCStRC18= github.com/antlr/antlr4/runtime/Go/antlr/v4 v4.0.0-20230305170008-8188dc5388df/go.mod h1:pSwJ0fSY5KhvocuWSx4fz3BA8OrA1bQn+K1Eli3BRwM= -github.com/apache/yunikorn-core v0.0.0-20240320142832-09e5d741b67c h1:xabRyfVmPle5gAnppiY3jeoS5t9+lVgbfoOlS24E5e4= -github.com/apache/yunikorn-core v0.0.0-20240320142832-09e5d741b67c/go.mod h1:eLQ4wT62D3L05Fu+0OHh7hXMga6EXU1aMb4aOBwzEEM= -github.com/apache/yunikorn-scheduler-interface v0.0.0-20240325175743-cc087bb5a191 h1:EfDQhLaxdM6LxPK6BTKG+fAzj67sLMi576DWnDjnNgU= -github.com/apache/yunikorn-scheduler-interface v0.0.0-20240325175743-cc087bb5a191/go.mod h1:0f4l3ManMROX60xU7GbhejCEYYyMksH275oY2xIVkbM= +github.com/apache/yunikorn-core v0.0.0-20240402212227-bdf109b5432c h1:WoO71GKblZEKBOuWviJMD5f1W6tdbJp5Pv/utd4zYqw= +github.com/apache/yunikorn-core v0.0.0-20240402212227-bdf109b5432c/go.mod h1:RZCBSMe6UZ04b45ZzwvuhhkY2f7f8ZW7ERvVMUM6dy4= +github.com/apache/yunikorn-scheduler-interface v0.0.0-20240402211642-e7421a4261fd h1:uNOijHkCotZLUZ+A85NSftEJGfP50Opf7ms6Daj6pco= +github.com/apache/yunikorn-scheduler-interface v0.0.0-20240402211642-e7421a4261fd/go.mod h1:0f4l3ManMROX60xU7GbhejCEYYyMksH275oY2xIVkbM= github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio= github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs= github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a h1:idn718Q4B6AGu/h5Sxe66HYVdqdGu2l9Iebqhi/AEoA= diff --git a/pkg/cache/context.go b/pkg/cache/context.go index 46a230016..1ad4adf37 100644 --- a/pkg/cache/context.go +++ b/pkg/cache/context.go @@ -189,27 +189,16 @@ func (ctx *Context) updateNodeInternal(node *v1.Node, register bool) { // existing node prevCapacity := common.GetNodeResource(&prevNode.Status) newCapacity := common.GetNodeResource(&node.Status) - prevReady := hasReadyCondition(prevNode) - newReady := hasReadyCondition(node) if !common.Equals(prevCapacity, newCapacity) { // update capacity if capacity, occupied, ok := ctx.schedulerCache.UpdateCapacity(node.Name, newCapacity); ok { - if err := ctx.updateNodeResources(node, capacity, occupied, newReady); err != nil { + if err := ctx.updateNodeResources(node, capacity, occupied); err != nil { log.Log(log.ShimContext).Warn("Failed to update node capacity", zap.Error(err)) } } else { log.Log(log.ShimContext).Warn("Failed to update cached node capacity", zap.String("nodeName", node.Name)) } - } else if newReady != prevReady { - // update readiness - if capacity, occupied, ok := ctx.schedulerCache.SnapshotResources(node.Name); ok { - if err := ctx.updateNodeResources(node, capacity, occupied, newReady); err != nil { - log.Log(log.ShimContext).Warn("Failed to update node readiness", zap.Error(err)) - } - } else { - log.Log(log.ShimContext).Warn("Failed to snapshot cached node capacity", zap.String("nodeName", node.Name)) - } } } } @@ -481,7 +470,7 @@ func (ctx *Context) updateNodeOccupiedResources(nodeName string, namespace strin return } if node, capacity, occupied, ok := ctx.schedulerCache.UpdateOccupiedResource(nodeName, namespace, podName, resource, opt); ok { - if err := ctx.updateNodeResources(node, capacity, occupied, hasReadyCondition(node)); err != nil { + if err := ctx.updateNodeResources(node, capacity, occupied); err != nil { log.Log(log.ShimContext).Warn("scheduler rejected update to node occupied resources", zap.Error(err)) } } else { @@ -1527,7 +1516,6 @@ func (ctx *Context) registerNodes(nodes []*v1.Node) ([]*v1.Node, error) { Attributes: map[string]string{ constants.DefaultNodeAttributeHostNameKey: node.Name, constants.DefaultNodeAttributeRackNameKey: constants.DefaultRackName, - siCommon.NodeReadyAttribute: strconv.FormatBool(hasReadyCondition(node)), }, SchedulableResource: common.GetNodeResource(&nodeStatus), OccupiedResource: common.NewResourceBuilder().Build(), @@ -1598,8 +1586,8 @@ func (ctx *Context) decommissionNode(node *v1.Node) error { return ctx.apiProvider.GetAPIs().SchedulerAPI.UpdateNode(request) } -func (ctx *Context) updateNodeResources(node *v1.Node, capacity *si.Resource, occupied *si.Resource, ready bool) error { - request := common.CreateUpdateRequestForUpdatedNode(node.Name, capacity, occupied, ready) +func (ctx *Context) updateNodeResources(node *v1.Node, capacity *si.Resource, occupied *si.Resource) error { + request := common.CreateUpdateRequestForUpdatedNode(node.Name, capacity, occupied) return ctx.apiProvider.GetAPIs().SchedulerAPI.UpdateNode(request) } @@ -1614,11 +1602,9 @@ func (ctx *Context) enableNodes(nodes []*v1.Node) error { for _, node := range nodes { log.Log(log.ShimContext).Info("Enabling node", zap.String("name", node.Name)) nodesToEnable = append(nodesToEnable, &si.NodeInfo{ - NodeID: node.Name, - Action: si.NodeInfo_DRAIN_TO_SCHEDULABLE, - Attributes: map[string]string{ - siCommon.NodeReadyAttribute: strconv.FormatBool(hasReadyCondition(node)), - }, + NodeID: node.Name, + Action: si.NodeInfo_DRAIN_TO_SCHEDULABLE, + Attributes: map[string]string{}, }) } @@ -1757,14 +1743,3 @@ func convertToNode(obj interface{}) (*v1.Node, error) { } return nil, fmt.Errorf("cannot convert to *v1.Node: %v", obj) } - -func hasReadyCondition(node *v1.Node) bool { - if node != nil { - for _, condition := range node.Status.Conditions { - if condition.Type == v1.NodeReady && condition.Status == v1.ConditionTrue { - return true - } - } - } - return false -} diff --git a/pkg/cache/context_test.go b/pkg/cache/context_test.go index 4c279e36e..1a081765b 100644 --- a/pkg/cache/context_test.go +++ b/pkg/cache/context_test.go @@ -1325,7 +1325,7 @@ func TestFilteredEventsNotPublished(t *testing.T) { err := waitForNodeAcceptedEvent(recorder) assert.NilError(t, err, "node accepted event was not sent") - eventRecords := make([]*si.EventRecord, 7) + eventRecords := make([]*si.EventRecord, 6) eventRecords[0] = &si.EventRecord{ Type: si.EventRecord_NODE, EventChangeType: si.EventRecord_SET, @@ -1334,41 +1334,34 @@ func TestFilteredEventsNotPublished(t *testing.T) { Message: "", } eventRecords[1] = &si.EventRecord{ - Type: si.EventRecord_NODE, - EventChangeType: si.EventRecord_SET, - EventChangeDetail: si.EventRecord_NODE_READY, - ObjectID: "host0001", - Message: "", - } - eventRecords[2] = &si.EventRecord{ Type: si.EventRecord_NODE, EventChangeType: si.EventRecord_SET, EventChangeDetail: si.EventRecord_NODE_OCCUPIED, ObjectID: "host0001", Message: "", } - eventRecords[3] = &si.EventRecord{ + eventRecords[2] = &si.EventRecord{ Type: si.EventRecord_NODE, EventChangeType: si.EventRecord_SET, EventChangeDetail: si.EventRecord_NODE_CAPACITY, ObjectID: "host0001", Message: "", } - eventRecords[4] = &si.EventRecord{ + eventRecords[3] = &si.EventRecord{ Type: si.EventRecord_NODE, EventChangeType: si.EventRecord_ADD, EventChangeDetail: si.EventRecord_NODE_ALLOC, ObjectID: "host0001", Message: "", } - eventRecords[5] = &si.EventRecord{ + eventRecords[4] = &si.EventRecord{ Type: si.EventRecord_APP, EventChangeType: si.EventRecord_ADD, EventChangeDetail: si.EventRecord_APP_RUNNING, ObjectID: "app-1", Message: "", } - eventRecords[6] = &si.EventRecord{ + eventRecords[5] = &si.EventRecord{ Type: si.EventRecord_QUEUE, EventChangeType: si.EventRecord_ADD, EventChangeDetail: si.EventRecord_DETAILS_NONE, diff --git a/pkg/common/si_helper.go b/pkg/common/si_helper.go index 4434a82bd..336002dfe 100644 --- a/pkg/common/si_helper.go +++ b/pkg/common/si_helper.go @@ -173,7 +173,7 @@ func CreateReleaseAllocationRequestForTask(appID, taskID, allocationID, partitio // CreateUpdateRequestForNewNode builds a NodeRequest for new node addition and restoring existing node func CreateUpdateRequestForNewNode(nodeID string, nodeLabels map[string]string, capacity *si.Resource, occupied *si.Resource, - existingAllocations []*si.Allocation, ready bool) *si.NodeRequest { + existingAllocations []*si.Allocation) *si.NodeRequest { // Use node's name as the NodeID, this is because when bind pod to node, // name of node is required but uid is optional. nodeInfo := &si.NodeInfo{ @@ -183,7 +183,6 @@ func CreateUpdateRequestForNewNode(nodeID string, nodeLabels map[string]string, Attributes: map[string]string{ constants.DefaultNodeAttributeHostNameKey: nodeID, constants.DefaultNodeAttributeRackNameKey: constants.DefaultRackName, - common.NodeReadyAttribute: strconv.FormatBool(ready), }, ExistingAllocations: existingAllocations, Action: si.NodeInfo_CREATE, @@ -205,14 +204,11 @@ func CreateUpdateRequestForNewNode(nodeID string, nodeLabels map[string]string, } } -// CreateUpdateRequestForUpdatedNode builds a NodeRequest for any node updates like capacity, -// ready status flag etc -func CreateUpdateRequestForUpdatedNode(nodeID string, capacity *si.Resource, occupied *si.Resource, ready bool) *si.NodeRequest { +// CreateUpdateRequestForUpdatedNode builds a NodeRequest for capacity and occupied resource updates +func CreateUpdateRequestForUpdatedNode(nodeID string, capacity *si.Resource, occupied *si.Resource) *si.NodeRequest { nodeInfo := &si.NodeInfo{ - NodeID: nodeID, - Attributes: map[string]string{ - common.NodeReadyAttribute: strconv.FormatBool(ready), - }, + NodeID: nodeID, + Attributes: map[string]string{}, SchedulableResource: capacity, OccupiedResource: occupied, Action: si.NodeInfo_UPDATE, diff --git a/pkg/common/si_helper_test.go b/pkg/common/si_helper_test.go index f3a5fcf64..1f63373aa 100644 --- a/pkg/common/si_helper_test.go +++ b/pkg/common/si_helper_test.go @@ -18,7 +18,6 @@ limitations under the License. package common import ( - "strconv" "testing" "gotest.tools/v3/assert" @@ -219,21 +218,19 @@ func TestCreateUpdateRequestForNewNode(t *testing.T) { capacity := NewResourceBuilder().AddResource(common.Memory, 200).AddResource(common.CPU, 2).Build() occupied := NewResourceBuilder().AddResource(common.Memory, 50).AddResource(common.CPU, 1).Build() var existingAllocations []*si.Allocation - ready := true nodeLabels := map[string]string{ "label1": "key1", "label2": "key2", "node.kubernetes.io/instance-type": "HighMem", } - request := CreateUpdateRequestForNewNode(nodeID, nodeLabels, capacity, occupied, existingAllocations, ready) + request := CreateUpdateRequestForNewNode(nodeID, nodeLabels, capacity, occupied, existingAllocations) assert.Equal(t, len(request.Nodes), 1) assert.Equal(t, request.Nodes[0].NodeID, nodeID) assert.Equal(t, request.Nodes[0].SchedulableResource, capacity) assert.Equal(t, request.Nodes[0].OccupiedResource, occupied) - assert.Equal(t, len(request.Nodes[0].Attributes), 7) + assert.Equal(t, len(request.Nodes[0].Attributes), 6) assert.Equal(t, request.Nodes[0].Attributes[constants.DefaultNodeAttributeHostNameKey], nodeID) assert.Equal(t, request.Nodes[0].Attributes[constants.DefaultNodeAttributeRackNameKey], constants.DefaultRackName) - assert.Equal(t, request.Nodes[0].Attributes[common.NodeReadyAttribute], strconv.FormatBool(ready)) // Make sure include nodeLabel assert.Equal(t, request.Nodes[0].Attributes["label1"], "key1") @@ -247,14 +244,12 @@ func TestCreateUpdateRequestForNewNode(t *testing.T) { func TestCreateUpdateRequestForUpdatedNode(t *testing.T) { capacity := NewResourceBuilder().AddResource(common.Memory, 200).AddResource(common.CPU, 2).Build() occupied := NewResourceBuilder().AddResource(common.Memory, 50).AddResource(common.CPU, 1).Build() - ready := true - request := CreateUpdateRequestForUpdatedNode(nodeID, capacity, occupied, ready) + request := CreateUpdateRequestForUpdatedNode(nodeID, capacity, occupied) assert.Equal(t, len(request.Nodes), 1) assert.Equal(t, request.Nodes[0].NodeID, nodeID) assert.Equal(t, request.Nodes[0].SchedulableResource, capacity) assert.Equal(t, request.Nodes[0].OccupiedResource, occupied) - assert.Equal(t, len(request.Nodes[0].Attributes), 1) - assert.Equal(t, request.Nodes[0].Attributes[common.NodeReadyAttribute], strconv.FormatBool(ready)) + assert.Equal(t, len(request.Nodes[0].Attributes), 0) } func TestCreateUpdateRequestForDeleteNode(t *testing.T) { diff --git a/pkg/shim/scheduler_mock_test.go b/pkg/shim/scheduler_mock_test.go index 9b13ee7c1..6b6b9af64 100644 --- a/pkg/shim/scheduler_mock_test.go +++ b/pkg/shim/scheduler_mock_test.go @@ -124,7 +124,7 @@ func (fc *MockScheduler) addNode(nodeName string, nodeLabels map[string]string, AddResource(siCommon.CPU, cpu). AddResource("pods", pods). Build() - request := common.CreateUpdateRequestForNewNode(nodeName, nodeLabels, nodeResource, nil, nil, true) + request := common.CreateUpdateRequestForNewNode(nodeName, nodeLabels, nodeResource, nil, nil) fmt.Printf("report new nodes to scheduler, request: %s", request.String()) return fc.apiProvider.GetAPIs().SchedulerAPI.UpdateNode(request) }