Skip to content

Commit

Permalink
[YUNIKORN-2780] Remove unnecessary node ExistingAllocations handling (a…
Browse files Browse the repository at this point in the history
…pache#924)

With node recovery simplified, the shim never sends ExistingAllocations
on node registration. Remove code related to this as it will never be
executed.

Closes: apache#924
  • Loading branch information
craigcondit authored and manirajv06 committed Aug 8, 2024
1 parent c094fdc commit 6761207
Show file tree
Hide file tree
Showing 10 changed files with 187 additions and 210 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ module github.com/apache/yunikorn-core
go 1.21

require (
github.com/apache/yunikorn-scheduler-interface v0.0.0-20240425182941-07f5695119a1
github.com/apache/yunikorn-scheduler-interface v0.0.0-20240731203810-92032b13d586
github.com/google/btree v1.1.2
github.com/google/go-cmp v0.6.0
github.com/google/uuid v1.6.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
github.com/apache/yunikorn-scheduler-interface v0.0.0-20240425182941-07f5695119a1 h1:v4J9L3MlW8BQfYnbq6FV2l3uyay3SqMS2Ffpo+SFat4=
github.com/apache/yunikorn-scheduler-interface v0.0.0-20240425182941-07f5695119a1/go.mod h1:WuHJpVk34t8N5+1ErYGj/5Qq33/cRzL4YtuoAsbMtWc=
github.com/apache/yunikorn-scheduler-interface v0.0.0-20240731203810-92032b13d586 h1:ZVpo9Qj2/gvwX6Rl44UxkZBm2pZWEJDYWTramc9hwF0=
github.com/apache/yunikorn-scheduler-interface v0.0.0-20240731203810-92032b13d586/go.mod h1:WuHJpVk34t8N5+1ErYGj/5Qq33/cRzL4YtuoAsbMtWc=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
Expand Down
15 changes: 2 additions & 13 deletions pkg/scheduler/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -607,8 +607,7 @@ func (cc *ClusterContext) addNode(nodeInfo *si.NodeInfo, schedulable bool) error
return err
}

existingAllocations := cc.convertAllocations(nodeInfo.ExistingAllocations)
err := partition.AddNode(sn, existingAllocations)
err := partition.AddNode(sn)
sn.SendNodeAddedEvent()
if err != nil {
wrapped := errors.Join(errors.New("failure while adding new node, node rejected with error: "), err)
Expand Down Expand Up @@ -751,7 +750,7 @@ func (cc *ClusterContext) processAllocations(request *si.AllocationRequest) {
}

alloc := objects.NewAllocationFromSI(siAlloc)
if err := partition.addAllocation(alloc); err != nil {
if err := partition.AddAllocation(alloc); err != nil {
rejectedAllocs = append(rejectedAllocs, &si.RejectedAllocation{
AllocationKey: siAlloc.AllocationKey,
ApplicationID: siAlloc.ApplicationID,
Expand Down Expand Up @@ -855,16 +854,6 @@ func (cc *ClusterContext) processAllocationReleases(releases []*si.AllocationRel
}
}

// Convert the si allocation to a proposal to add to the node
func (cc *ClusterContext) convertAllocations(allocations []*si.Allocation) []*objects.Allocation {
convert := make([]*objects.Allocation, len(allocations))
for current, allocation := range allocations {
convert[current] = objects.NewAllocationFromSI(allocation)
}

return convert
}

// Create a RM update event to notify RM of new allocations
// Lock free call, all updates occur via events.
func (cc *ClusterContext) notifyRMNewAllocation(rmID string, alloc *objects.Allocation) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/health_checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ func TestGetSchedulerHealthStatusContext(t *testing.T) {
SchedulableResource: &si.Resource{
Resources: map[string]*si.Quantity{"memory": {Value: -10}},
},
}), []*objects.Allocation{})
}))
assert.NilError(t, err, "Unexpected error while adding a new node")
healthInfo = GetSchedulerHealthStatus(schedulerMetrics, schedulerContext)
assert.Assert(t, !healthInfo.Healthy, "Scheduler should not be healthy")
Expand Down
30 changes: 5 additions & 25 deletions pkg/scheduler/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -545,9 +545,9 @@ func (pc *PartitionContext) GetNode(nodeID string) *objects.Node {
return pc.nodes.GetNode(nodeID)
}

// Add the node to the partition and process the allocations that are reported by the node.
// Add the node to the partition.
// NOTE: this is a lock free call. It must NOT be called holding the PartitionContext lock.
func (pc *PartitionContext) AddNode(node *objects.Node, existingAllocations []*objects.Allocation) error {
func (pc *PartitionContext) AddNode(node *objects.Node) error {
if node == nil {
return fmt.Errorf("cannot add 'nil' node to partition %s", pc.Name)
}
Expand All @@ -560,26 +560,6 @@ func (pc *PartitionContext) AddNode(node *objects.Node, existingAllocations []*o
if err := pc.addNodeToList(node); err != nil {
return err
}

// Add allocations that exist on the node when added
if len(existingAllocations) > 0 {
for current, alloc := range existingAllocations {
if err := pc.addAllocation(alloc); err != nil {
// not expecting any inflight replacements on node recovery
released, _ := pc.removeNode(node.NodeID)
log.Log(log.SchedPartition).Info("Failed to add existing allocations, changes reversed",
zap.String("nodeID", node.NodeID),
zap.Int("existingAllocations", len(existingAllocations)),
zap.Int("releasedAllocations", len(released)),
zap.Int("processingAlloc", current),
zap.Stringer("allocation", alloc),
zap.Error(err))
// update failed metric, active metrics are tracked in add/remove from list
metrics.GetSchedulerMetrics().IncFailedNodes()
return err
}
}
}
return nil
}

Expand Down Expand Up @@ -1143,11 +1123,11 @@ func (pc *PartitionContext) GetNodes() []*objects.Node {
return pc.nodes.GetNodes()
}

// Add an allocation to the partition/node/application/queue during node registration.
// Queue max allocation is not checked as the allocation is part of a new node addition.
// Add an already bound allocation to the partition/node/application/queue.
// Queue max allocation is not checked as the allocation came from the RM.
//
// NOTE: this is a lock free call. It must NOT be called holding the PartitionContext lock.
func (pc *PartitionContext) addAllocation(alloc *objects.Allocation) error {
func (pc *PartitionContext) AddAllocation(alloc *objects.Allocation) error {
// cannot do anything with a nil alloc, should only happen if the shim broke things badly
if alloc == nil {
return nil
Expand Down
Loading

0 comments on commit 6761207

Please sign in to comment.