Skip to content

Commit

Permalink
[YUNIKORN-2530] Remove node readiness logic from shim (#812)
Browse files Browse the repository at this point in the history
Closes: #812

Signed-off-by: Craig Condit <[email protected]>
  • Loading branch information
craigcondit committed Apr 2, 2024
1 parent aa20c7b commit 57ab4d5
Show file tree
Hide file tree
Showing 7 changed files with 28 additions and 69 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ module github.com/apache/yunikorn-k8shim
go 1.21

require (
github.com/apache/yunikorn-core v0.0.0-20240320142832-09e5d741b67c
github.com/apache/yunikorn-scheduler-interface v0.0.0-20240325175743-cc087bb5a191
github.com/apache/yunikorn-core v0.0.0-20240402212227-bdf109b5432c
github.com/apache/yunikorn-scheduler-interface v0.0.0-20240402211642-e7421a4261fd
github.com/google/go-cmp v0.6.0
github.com/google/uuid v1.6.0
github.com/looplab/fsm v1.0.1
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ github.com/NYTimes/gziphandler v1.1.1 h1:ZUDjpQae29j0ryrS0u/B8HZfJBtBQHjqw2rQ2cq
github.com/NYTimes/gziphandler v1.1.1/go.mod h1:n/CVRwUEOgIxrgPvAQhUUr9oeUtvrhMomdKFjzJNB0c=
github.com/antlr/antlr4/runtime/Go/antlr/v4 v4.0.0-20230305170008-8188dc5388df h1:7RFfzj4SSt6nnvCPbCqijJi1nWCd+TqAT3bYCStRC18=
github.com/antlr/antlr4/runtime/Go/antlr/v4 v4.0.0-20230305170008-8188dc5388df/go.mod h1:pSwJ0fSY5KhvocuWSx4fz3BA8OrA1bQn+K1Eli3BRwM=
github.com/apache/yunikorn-core v0.0.0-20240320142832-09e5d741b67c h1:xabRyfVmPle5gAnppiY3jeoS5t9+lVgbfoOlS24E5e4=
github.com/apache/yunikorn-core v0.0.0-20240320142832-09e5d741b67c/go.mod h1:eLQ4wT62D3L05Fu+0OHh7hXMga6EXU1aMb4aOBwzEEM=
github.com/apache/yunikorn-scheduler-interface v0.0.0-20240325175743-cc087bb5a191 h1:EfDQhLaxdM6LxPK6BTKG+fAzj67sLMi576DWnDjnNgU=
github.com/apache/yunikorn-scheduler-interface v0.0.0-20240325175743-cc087bb5a191/go.mod h1:0f4l3ManMROX60xU7GbhejCEYYyMksH275oY2xIVkbM=
github.com/apache/yunikorn-core v0.0.0-20240402212227-bdf109b5432c h1:WoO71GKblZEKBOuWviJMD5f1W6tdbJp5Pv/utd4zYqw=
github.com/apache/yunikorn-core v0.0.0-20240402212227-bdf109b5432c/go.mod h1:RZCBSMe6UZ04b45ZzwvuhhkY2f7f8ZW7ERvVMUM6dy4=
github.com/apache/yunikorn-scheduler-interface v0.0.0-20240402211642-e7421a4261fd h1:uNOijHkCotZLUZ+A85NSftEJGfP50Opf7ms6Daj6pco=
github.com/apache/yunikorn-scheduler-interface v0.0.0-20240402211642-e7421a4261fd/go.mod h1:0f4l3ManMROX60xU7GbhejCEYYyMksH275oY2xIVkbM=
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio=
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs=
github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a h1:idn718Q4B6AGu/h5Sxe66HYVdqdGu2l9Iebqhi/AEoA=
Expand Down
39 changes: 7 additions & 32 deletions pkg/cache/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,27 +189,16 @@ func (ctx *Context) updateNodeInternal(node *v1.Node, register bool) {
// existing node
prevCapacity := common.GetNodeResource(&prevNode.Status)
newCapacity := common.GetNodeResource(&node.Status)
prevReady := hasReadyCondition(prevNode)
newReady := hasReadyCondition(node)

if !common.Equals(prevCapacity, newCapacity) {
// update capacity
if capacity, occupied, ok := ctx.schedulerCache.UpdateCapacity(node.Name, newCapacity); ok {
if err := ctx.updateNodeResources(node, capacity, occupied, newReady); err != nil {
if err := ctx.updateNodeResources(node, capacity, occupied); err != nil {
log.Log(log.ShimContext).Warn("Failed to update node capacity", zap.Error(err))
}
} else {
log.Log(log.ShimContext).Warn("Failed to update cached node capacity", zap.String("nodeName", node.Name))
}
} else if newReady != prevReady {
// update readiness
if capacity, occupied, ok := ctx.schedulerCache.SnapshotResources(node.Name); ok {
if err := ctx.updateNodeResources(node, capacity, occupied, newReady); err != nil {
log.Log(log.ShimContext).Warn("Failed to update node readiness", zap.Error(err))
}
} else {
log.Log(log.ShimContext).Warn("Failed to snapshot cached node capacity", zap.String("nodeName", node.Name))
}
}
}
}
Expand Down Expand Up @@ -481,7 +470,7 @@ func (ctx *Context) updateNodeOccupiedResources(nodeName string, namespace strin
return
}
if node, capacity, occupied, ok := ctx.schedulerCache.UpdateOccupiedResource(nodeName, namespace, podName, resource, opt); ok {
if err := ctx.updateNodeResources(node, capacity, occupied, hasReadyCondition(node)); err != nil {
if err := ctx.updateNodeResources(node, capacity, occupied); err != nil {
log.Log(log.ShimContext).Warn("scheduler rejected update to node occupied resources", zap.Error(err))
}
} else {
Expand Down Expand Up @@ -1527,7 +1516,6 @@ func (ctx *Context) registerNodes(nodes []*v1.Node) ([]*v1.Node, error) {
Attributes: map[string]string{
constants.DefaultNodeAttributeHostNameKey: node.Name,
constants.DefaultNodeAttributeRackNameKey: constants.DefaultRackName,
siCommon.NodeReadyAttribute: strconv.FormatBool(hasReadyCondition(node)),
},
SchedulableResource: common.GetNodeResource(&nodeStatus),
OccupiedResource: common.NewResourceBuilder().Build(),
Expand Down Expand Up @@ -1598,8 +1586,8 @@ func (ctx *Context) decommissionNode(node *v1.Node) error {
return ctx.apiProvider.GetAPIs().SchedulerAPI.UpdateNode(request)
}

func (ctx *Context) updateNodeResources(node *v1.Node, capacity *si.Resource, occupied *si.Resource, ready bool) error {
request := common.CreateUpdateRequestForUpdatedNode(node.Name, capacity, occupied, ready)
func (ctx *Context) updateNodeResources(node *v1.Node, capacity *si.Resource, occupied *si.Resource) error {
request := common.CreateUpdateRequestForUpdatedNode(node.Name, capacity, occupied)
return ctx.apiProvider.GetAPIs().SchedulerAPI.UpdateNode(request)
}

Expand All @@ -1614,11 +1602,9 @@ func (ctx *Context) enableNodes(nodes []*v1.Node) error {
for _, node := range nodes {
log.Log(log.ShimContext).Info("Enabling node", zap.String("name", node.Name))
nodesToEnable = append(nodesToEnable, &si.NodeInfo{
NodeID: node.Name,
Action: si.NodeInfo_DRAIN_TO_SCHEDULABLE,
Attributes: map[string]string{
siCommon.NodeReadyAttribute: strconv.FormatBool(hasReadyCondition(node)),
},
NodeID: node.Name,
Action: si.NodeInfo_DRAIN_TO_SCHEDULABLE,
Attributes: map[string]string{},
})
}

Expand Down Expand Up @@ -1757,14 +1743,3 @@ func convertToNode(obj interface{}) (*v1.Node, error) {
}
return nil, fmt.Errorf("cannot convert to *v1.Node: %v", obj)
}

func hasReadyCondition(node *v1.Node) bool {
if node != nil {
for _, condition := range node.Status.Conditions {
if condition.Type == v1.NodeReady && condition.Status == v1.ConditionTrue {
return true
}
}
}
return false
}
17 changes: 5 additions & 12 deletions pkg/cache/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1325,7 +1325,7 @@ func TestFilteredEventsNotPublished(t *testing.T) {
err := waitForNodeAcceptedEvent(recorder)
assert.NilError(t, err, "node accepted event was not sent")

eventRecords := make([]*si.EventRecord, 7)
eventRecords := make([]*si.EventRecord, 6)
eventRecords[0] = &si.EventRecord{
Type: si.EventRecord_NODE,
EventChangeType: si.EventRecord_SET,
Expand All @@ -1334,41 +1334,34 @@ func TestFilteredEventsNotPublished(t *testing.T) {
Message: "",
}
eventRecords[1] = &si.EventRecord{
Type: si.EventRecord_NODE,
EventChangeType: si.EventRecord_SET,
EventChangeDetail: si.EventRecord_NODE_READY,
ObjectID: "host0001",
Message: "",
}
eventRecords[2] = &si.EventRecord{
Type: si.EventRecord_NODE,
EventChangeType: si.EventRecord_SET,
EventChangeDetail: si.EventRecord_NODE_OCCUPIED,
ObjectID: "host0001",
Message: "",
}
eventRecords[3] = &si.EventRecord{
eventRecords[2] = &si.EventRecord{
Type: si.EventRecord_NODE,
EventChangeType: si.EventRecord_SET,
EventChangeDetail: si.EventRecord_NODE_CAPACITY,
ObjectID: "host0001",
Message: "",
}
eventRecords[4] = &si.EventRecord{
eventRecords[3] = &si.EventRecord{
Type: si.EventRecord_NODE,
EventChangeType: si.EventRecord_ADD,
EventChangeDetail: si.EventRecord_NODE_ALLOC,
ObjectID: "host0001",
Message: "",
}
eventRecords[5] = &si.EventRecord{
eventRecords[4] = &si.EventRecord{
Type: si.EventRecord_APP,
EventChangeType: si.EventRecord_ADD,
EventChangeDetail: si.EventRecord_APP_RUNNING,
ObjectID: "app-1",
Message: "",
}
eventRecords[6] = &si.EventRecord{
eventRecords[5] = &si.EventRecord{
Type: si.EventRecord_QUEUE,
EventChangeType: si.EventRecord_ADD,
EventChangeDetail: si.EventRecord_DETAILS_NONE,
Expand Down
14 changes: 5 additions & 9 deletions pkg/common/si_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func CreateReleaseAllocationRequestForTask(appID, taskID, allocationID, partitio

// CreateUpdateRequestForNewNode builds a NodeRequest for new node addition and restoring existing node
func CreateUpdateRequestForNewNode(nodeID string, nodeLabels map[string]string, capacity *si.Resource, occupied *si.Resource,
existingAllocations []*si.Allocation, ready bool) *si.NodeRequest {
existingAllocations []*si.Allocation) *si.NodeRequest {
// Use node's name as the NodeID, this is because when bind pod to node,
// name of node is required but uid is optional.
nodeInfo := &si.NodeInfo{
Expand All @@ -183,7 +183,6 @@ func CreateUpdateRequestForNewNode(nodeID string, nodeLabels map[string]string,
Attributes: map[string]string{
constants.DefaultNodeAttributeHostNameKey: nodeID,
constants.DefaultNodeAttributeRackNameKey: constants.DefaultRackName,
common.NodeReadyAttribute: strconv.FormatBool(ready),
},
ExistingAllocations: existingAllocations,
Action: si.NodeInfo_CREATE,
Expand All @@ -205,14 +204,11 @@ func CreateUpdateRequestForNewNode(nodeID string, nodeLabels map[string]string,
}
}

// CreateUpdateRequestForUpdatedNode builds a NodeRequest for any node updates like capacity,
// ready status flag etc
func CreateUpdateRequestForUpdatedNode(nodeID string, capacity *si.Resource, occupied *si.Resource, ready bool) *si.NodeRequest {
// CreateUpdateRequestForUpdatedNode builds a NodeRequest for capacity and occupied resource updates
func CreateUpdateRequestForUpdatedNode(nodeID string, capacity *si.Resource, occupied *si.Resource) *si.NodeRequest {
nodeInfo := &si.NodeInfo{
NodeID: nodeID,
Attributes: map[string]string{
common.NodeReadyAttribute: strconv.FormatBool(ready),
},
NodeID: nodeID,
Attributes: map[string]string{},
SchedulableResource: capacity,
OccupiedResource: occupied,
Action: si.NodeInfo_UPDATE,
Expand Down
13 changes: 4 additions & 9 deletions pkg/common/si_helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ limitations under the License.
package common

import (
"strconv"
"testing"

"gotest.tools/v3/assert"
Expand Down Expand Up @@ -219,21 +218,19 @@ func TestCreateUpdateRequestForNewNode(t *testing.T) {
capacity := NewResourceBuilder().AddResource(common.Memory, 200).AddResource(common.CPU, 2).Build()
occupied := NewResourceBuilder().AddResource(common.Memory, 50).AddResource(common.CPU, 1).Build()
var existingAllocations []*si.Allocation
ready := true
nodeLabels := map[string]string{
"label1": "key1",
"label2": "key2",
"node.kubernetes.io/instance-type": "HighMem",
}
request := CreateUpdateRequestForNewNode(nodeID, nodeLabels, capacity, occupied, existingAllocations, ready)
request := CreateUpdateRequestForNewNode(nodeID, nodeLabels, capacity, occupied, existingAllocations)
assert.Equal(t, len(request.Nodes), 1)
assert.Equal(t, request.Nodes[0].NodeID, nodeID)
assert.Equal(t, request.Nodes[0].SchedulableResource, capacity)
assert.Equal(t, request.Nodes[0].OccupiedResource, occupied)
assert.Equal(t, len(request.Nodes[0].Attributes), 7)
assert.Equal(t, len(request.Nodes[0].Attributes), 6)
assert.Equal(t, request.Nodes[0].Attributes[constants.DefaultNodeAttributeHostNameKey], nodeID)
assert.Equal(t, request.Nodes[0].Attributes[constants.DefaultNodeAttributeRackNameKey], constants.DefaultRackName)
assert.Equal(t, request.Nodes[0].Attributes[common.NodeReadyAttribute], strconv.FormatBool(ready))

// Make sure include nodeLabel
assert.Equal(t, request.Nodes[0].Attributes["label1"], "key1")
Expand All @@ -247,14 +244,12 @@ func TestCreateUpdateRequestForNewNode(t *testing.T) {
func TestCreateUpdateRequestForUpdatedNode(t *testing.T) {
capacity := NewResourceBuilder().AddResource(common.Memory, 200).AddResource(common.CPU, 2).Build()
occupied := NewResourceBuilder().AddResource(common.Memory, 50).AddResource(common.CPU, 1).Build()
ready := true
request := CreateUpdateRequestForUpdatedNode(nodeID, capacity, occupied, ready)
request := CreateUpdateRequestForUpdatedNode(nodeID, capacity, occupied)
assert.Equal(t, len(request.Nodes), 1)
assert.Equal(t, request.Nodes[0].NodeID, nodeID)
assert.Equal(t, request.Nodes[0].SchedulableResource, capacity)
assert.Equal(t, request.Nodes[0].OccupiedResource, occupied)
assert.Equal(t, len(request.Nodes[0].Attributes), 1)
assert.Equal(t, request.Nodes[0].Attributes[common.NodeReadyAttribute], strconv.FormatBool(ready))
assert.Equal(t, len(request.Nodes[0].Attributes), 0)
}

func TestCreateUpdateRequestForDeleteNode(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/shim/scheduler_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func (fc *MockScheduler) addNode(nodeName string, nodeLabels map[string]string,
AddResource(siCommon.CPU, cpu).
AddResource("pods", pods).
Build()
request := common.CreateUpdateRequestForNewNode(nodeName, nodeLabels, nodeResource, nil, nil, true)
request := common.CreateUpdateRequestForNewNode(nodeName, nodeLabels, nodeResource, nil, nil)
fmt.Printf("report new nodes to scheduler, request: %s", request.String())
return fc.apiProvider.GetAPIs().SchedulerAPI.UpdateNode(request)
}
Expand Down

0 comments on commit 57ab4d5

Please sign in to comment.