Skip to content

Commit

Permalink
[YUNIKORN-2832] [core] Add non-Yunikorn allocation tracking logic
Browse files Browse the repository at this point in the history
  • Loading branch information
pbacsko committed Sep 27, 2024
1 parent fe067b7 commit b836716
Show file tree
Hide file tree
Showing 17 changed files with 750 additions and 61 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ module github.com/apache/yunikorn-core
go 1.21

require (
github.com/apache/yunikorn-scheduler-interface v0.0.0-20240827015655-68e8c6cca28a
github.com/apache/yunikorn-scheduler-interface v0.0.0-20240924203603-aaf51c93d3a0
github.com/google/btree v1.1.2
github.com/google/go-cmp v0.6.0
github.com/google/uuid v1.6.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
github.com/apache/yunikorn-scheduler-interface v0.0.0-20240827015655-68e8c6cca28a h1:3WRXGTvhunGBZj8AVZDxx7Bs/AXiH9mvf2jYcuDyklA=
github.com/apache/yunikorn-scheduler-interface v0.0.0-20240827015655-68e8c6cca28a/go.mod h1:co3uU98sj1CUTPNTM13lTyi+CY0DOgDndDW2KiUjktU=
github.com/apache/yunikorn-scheduler-interface v0.0.0-20240924203603-aaf51c93d3a0 h1:/9j0YXuifvoOl4YVEbO0r+DPkkYLzaQ+/ac+xCc7SY8=
github.com/apache/yunikorn-scheduler-interface v0.0.0-20240924203603-aaf51c93d3a0/go.mod h1:co3uU98sj1CUTPNTM13lTyi+CY0DOgDndDW2KiUjktU=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -759,7 +759,7 @@ func (cc *ClusterContext) processAllocations(request *si.AllocationRequest) {
continue
}
// at some point, we may need to handle new requests as well
if newAlloc {
if newAlloc && !alloc.IsForeign() {
cc.notifyRMNewAllocation(request.RmID, alloc)
}
}
Expand Down
100 changes: 96 additions & 4 deletions pkg/scheduler/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,10 @@ import (
const pName = "default"

type mockEventHandler struct {
eventHandled bool
rejectedNodes []*si.RejectedNode
acceptedNodes []*si.AcceptedNode
eventHandled bool
rejectedNodes []*si.RejectedNode
acceptedNodes []*si.AcceptedNode
newAllocHandler func(*rmevent.RMNewAllocationsEvent)
}

func newMockEventHandler() *mockEventHandler {
Expand All @@ -56,6 +57,10 @@ func (m *mockEventHandler) HandleEvent(ev interface{}) {
m.rejectedNodes = append(m.rejectedNodes, nodeEvent.RejectedNodes...)
m.acceptedNodes = append(m.acceptedNodes, nodeEvent.AcceptedNodes...)
}

if allocEvent, ok := ev.(*rmevent.RMNewAllocationsEvent); ok && m.newAllocHandler != nil {
m.newAllocHandler(allocEvent)
}
}

func createTestContext(t *testing.T, partitionName string) *ClusterContext {
Expand All @@ -71,7 +76,13 @@ func createTestContext(t *testing.T, partitionName string) *ClusterContext {
Name: "root",
Parent: true,
SubmitACL: "*",
Queues: nil,
Queues: []configs.QueueConfig{
{
Name: "default",
Parent: false,
SubmitACL: "*",
},
},
},
},
}
Expand Down Expand Up @@ -296,6 +307,87 @@ func TestContextDrainingNodeBackToSchedulableMetrics(t *testing.T) {
verifyMetrics(t, 0, "draining")
}

func TestContext_OnAllocationNotification(t *testing.T) {
context := createTestContext(t, pName)
eventHandler := context.rmEventHandler.(*mockEventHandler) //nolint:errcheck
var lastAllocEvent *rmevent.RMNewAllocationsEvent
eventHandler.newAllocHandler = func(event *rmevent.RMNewAllocationsEvent) {
lastAllocEvent = event
go func() {
event.Channel <- &rmevent.Result{Succeeded: true}
}()
}

n := getNodeInfoForAddingNode()
err := context.addNode(n, true)
assert.NilError(t, err, "unexpected error returned from addNode")
partition := context.GetPartition(pName)
assert.Assert(t, partition != nil)
assert.Equal(t, 1, len(partition.GetNodes()), "expected node not found on partition")

// register application
appReq := &si.ApplicationRequest{
New: []*si.AddApplicationRequest{
{
QueueName: defQueue,
PartitionName: pName,
Ugi: &si.UserGroupInformation{
User: "testuser",
Groups: []string{"testgroup"},
},
ApplicationID: appID1,
},
},
RmID: "rm:123",
}
context.handleRMUpdateApplicationEvent(&rmevent.RMUpdateApplicationEvent{Request: appReq})

// add a Yunikorn allocation
allocReq := &si.AllocationRequest{
Allocations: []*si.Allocation{
{
AllocationKey: allocKey,
ResourcePerAlloc: &si.Resource{
Resources: map[string]*si.Quantity{
"first": {Value: 1},
},
},
ApplicationID: appID1,
NodeID: "test-1",
PartitionName: pName,
},
},
RmID: "rm:123",
}
context.handleRMUpdateAllocationEvent(&rmevent.RMUpdateAllocationEvent{Request: allocReq})
assert.Assert(t, lastAllocEvent != nil)
assert.Equal(t, lastAllocEvent.Allocations[0].AllocationKey, allocKey)

// add a non-Yunikorn allocation
lastAllocEvent = nil
nonYkAllocReq := &si.AllocationRequest{
Allocations: []*si.Allocation{
{
AllocationKey: "foreign-alloc-1",
ResourcePerAlloc: &si.Resource{
Resources: map[string]*si.Quantity{
"first": {Value: 1},
},
},
AllocationTags: map[string]string{
siCommon.Foreign: siCommon.AllocTypeDefault,
},
NodeID: "test-1",
PartitionName: pName,
},
},
RmID: "rm:123",
}

context.handleRMUpdateAllocationEvent(&rmevent.RMUpdateAllocationEvent{Request: nonYkAllocReq})
assert.Assert(t, lastAllocEvent == nil, "unexpected allocation event")
}

func getNodeInfoForAddingNode() *si.NodeInfo {
n := &si.NodeInfo{
NodeID: "test-1",
Expand Down
34 changes: 34 additions & 0 deletions pkg/scheduler/objects/allocation.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ type Allocation struct {
originator bool
tags map[string]string
resKeyWithoutNode string // the reservation key without node
foreign bool
preemptable bool

// Mutable fields which need protection
allocated bool
Expand Down Expand Up @@ -106,6 +108,20 @@ func NewAllocationFromSI(alloc *si.Allocation) *Allocation {
createTime = time.Unix(siCreationTime, 0)
}

foreign := false
preemptable := true
if foreignType, ok := alloc.AllocationTags[siCommon.Foreign]; ok {
foreign = true
switch foreignType {
case siCommon.AllocTypeStatic:
preemptable = false
case siCommon.AllocTypeDefault:
default:
log.Log(log.SchedAllocation).Warn("Foreign tag has illegal value, using default",
zap.String("value", foreignType))
}
}

var allocated bool
var nodeID string
var bindTime time.Time
Expand Down Expand Up @@ -135,9 +151,19 @@ func NewAllocationFromSI(alloc *si.Allocation) *Allocation {
allocated: allocated,
nodeID: nodeID,
bindTime: bindTime,
foreign: foreign,
preemptable: preemptable,
}
}

// NewAllocationFromSIAllocated creates an Allocation where the "allocated" flag is always true,
// regardless whehether the NodeID if empty or not. Used for testing.
func NewAllocationFromSIAllocated(siAlloc *si.Allocation) *Allocation {
alloc := NewAllocationFromSI(siAlloc)
alloc.allocated = true
return alloc

Check warning on line 164 in pkg/scheduler/objects/allocation.go

View check run for this annotation

Codecov / codecov/patch

pkg/scheduler/objects/allocation.go#L161-L164

Added lines #L161 - L164 were not covered by tests
}

// NewSIFromAllocation converts the Allocation into a SI object. This is a limited set of values that gets copied into
// the SI. This is only used to communicate *back* to the RM. All other fields are considered incoming fields from
// the RM into the core. The limited set of fields link the Allocation to an Application and Node.
Expand Down Expand Up @@ -573,3 +599,11 @@ func (a *Allocation) setUserQuotaCheckPassed() {
a.askEvents.SendRequestFitsInUserQuota(a.allocationKey, a.applicationID, a.allocatedResource)
}
}

func (a *Allocation) IsForeign() bool {
return a.foreign
}

func (a *Allocation) IsPreemptable() bool {
return a.preemptable
}
33 changes: 33 additions & 0 deletions pkg/scheduler/objects/allocation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,3 +466,36 @@ func TestNewAllocFromSI(t *testing.T) {
assert.Assert(t, !alloc.IsAllowPreemptSelf(), "alloc should not have allow-preempt-self set")
assert.Assert(t, !alloc.IsAllowPreemptOther(), "alloc should not have allow-preempt-other set")
}

func TestNewForeignAllocFromSI(t *testing.T) {
res := resources.NewResourceFromMap(map[string]resources.Quantity{
"first": 1,
})
siAlloc := &si.Allocation{
AllocationKey: "foreign-1",
NodeID: "node-1",
ResourcePerAlloc: res.ToProto(),
TaskGroupName: "",
AllocationTags: map[string]string{
siCommon.Foreign: siCommon.AllocTypeDefault,
},
}

// default
alloc := NewAllocationFromSI(siAlloc)
assert.Assert(t, alloc.IsPreemptable())
assert.Assert(t, alloc.IsForeign())
assert.Equal(t, "foreign-1", alloc.GetAllocationKey())
assert.Equal(t, "node-1", alloc.GetNodeID())
assert.Assert(t, resources.Equals(res, alloc.GetAllocatedResource()))

// static
siAlloc.AllocationTags[siCommon.Foreign] = siCommon.AllocTypeStatic
alloc = NewAllocationFromSI(siAlloc)
assert.Assert(t, !alloc.IsPreemptable())

// illegal value for foreign type
siAlloc.AllocationTags[siCommon.Foreign] = "xyz"
alloc = NewAllocationFromSI(siAlloc)
assert.Assert(t, alloc.IsPreemptable())
}
51 changes: 45 additions & 6 deletions pkg/scheduler/objects/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,31 @@ func (sn *Node) GetAllAllocations() []*Allocation {
return arr
}

// GetYunikornAllocations returns a copy of Yunikorn allocations on this node
func (sn *Node) GetYunikornAllocations() []*Allocation {
sn.RLock()
defer sn.RUnlock()
return sn.getAllocations(false)
}

// GetForeignAllocations returns a copy of non-Yunikorn allocations on this node
func (sn *Node) GetForeignAllocations() []*Allocation {
sn.RLock()
defer sn.RUnlock()
return sn.getAllocations(true)
}

func (sn *Node) getAllocations(foreign bool) []*Allocation {
arr := make([]*Allocation, 0)
for _, v := range sn.allocations {
if (v.IsForeign() && foreign) || (!v.IsForeign() && !foreign) {
arr = append(arr, v)
}
}

return arr
}

// Set the node to unschedulable.
// This will cause the node to be skipped during the scheduling cycle.
// Visible for testing only
Expand Down Expand Up @@ -312,15 +337,24 @@ func (sn *Node) FitInNode(resRequest *resources.Resource) bool {
// is found the Allocation removed is returned. Used resources will decrease available
// will increase as per the allocation removed.
func (sn *Node) RemoveAllocation(allocationKey string) *Allocation {
defer sn.notifyListeners()
var alloc *Allocation
defer func() {
if alloc != nil && !alloc.IsForeign() {
sn.notifyListeners()
}
}()
sn.Lock()
defer sn.Unlock()

alloc := sn.allocations[allocationKey]
alloc = sn.allocations[allocationKey]
if alloc != nil {
delete(sn.allocations, allocationKey)
sn.allocatedResource.SubFrom(alloc.GetAllocatedResource())
sn.allocatedResource.Prune()
if alloc.IsForeign() {
sn.occupiedResource = resources.Sub(sn.occupiedResource, alloc.GetAllocatedResource())
} else {
sn.allocatedResource.SubFrom(alloc.GetAllocatedResource())
sn.allocatedResource.Prune()
}
sn.availableResource.AddTo(alloc.GetAllocatedResource())
sn.nodeEvents.SendAllocationRemovedEvent(sn.NodeID, alloc.allocationKey, alloc.GetAllocatedResource())
return alloc
Expand Down Expand Up @@ -348,9 +382,10 @@ func (sn *Node) addAllocationInternal(alloc *Allocation, force bool) bool {
return false
}
result := false
foreign := alloc.IsForeign()
defer func() {
// check result to ensure we don't notify listeners unnecessarily
if result {
if result && !foreign {
sn.notifyListeners()
}
}()
Expand All @@ -361,7 +396,11 @@ func (sn *Node) addAllocationInternal(alloc *Allocation, force bool) bool {
res := alloc.GetAllocatedResource()
if force || sn.availableResource.FitIn(res) {
sn.allocations[alloc.GetAllocationKey()] = alloc
sn.allocatedResource.AddTo(res)
if foreign {
sn.occupiedResource = resources.Add(sn.occupiedResource, alloc.GetAllocatedResource())
} else {
sn.allocatedResource.AddTo(res)
}
sn.availableResource.SubFrom(res)
sn.availableResource.Prune()
sn.nodeEvents.SendAllocationAddedEvent(sn.NodeID, alloc.allocationKey, res)
Expand Down
Loading

0 comments on commit b836716

Please sign in to comment.