Skip to content

Commit

Permalink
Improve performance of selectNodeForJobWithFairPreemption (#3679)
Browse files Browse the repository at this point in the history
* Improve performance of selectNodeForJobWithFairPreemption

Testing with the worst case, this makes the function 2-4 times faster
 - Combined with #3678 it is about 3-5 times faster

Even the average case should be improved, as we now do less work

The main "trick" here is to not perform node.UnsafeCopy() and nodeDb.unbindJobFromNodeInPlace until it is needed (once you know which node is the selected one)
 - These functions are expensive, particularly UnsafeCopy

Instead just keep track of the evicted nodes + available resource locally until you've found one that matches.

There are more enhancements that could be made here:
 - Work out when it is better to check static requirements first (possibly even save these and use them for all jobs with the same scheduling key)
 - Improve nodeDb speed to look up a node (use a map?)
 - Improve nodeDb speed to iterate evicted jobs (use a slice? / effectively we just need a sorted hashmap, not a fully fledged radix tree)

Signed-off-by: JamesMurkin <[email protected]>

* Lint

Signed-off-by: JamesMurkin <[email protected]>

* Move struct definition inside function

Signed-off-by: JamesMurkin <[email protected]>

---------

Signed-off-by: JamesMurkin <[email protected]>
  • Loading branch information
JamesMurkin authored Jun 21, 2024
1 parent 21bbd08 commit 5daee4a
Showing 1 changed file with 52 additions and 31 deletions.
83 changes: 52 additions & 31 deletions internal/scheduler/nodedb/nodedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -808,11 +808,17 @@ func (nodeDb *NodeDb) selectNodeForPodWithItAtPriority(
// It does this by considering all evicted jobs in the reverse order they would be scheduled in and preventing
// from being re-scheduled the jobs that would be scheduled last.
func (nodeDb *NodeDb) selectNodeForJobWithFairPreemption(txn *memdb.Txn, jctx *schedulercontext.JobSchedulingContext) (*internaltypes.Node, error) {
type consideredNode struct {
node *internaltypes.Node
availableResource internaltypes.ResourceList
evictedJobs []*EvictedJobSchedulingContext
staticRequirementsNotMet bool
}

pctx := jctx.PodSchedulingContext

var selectedNode *internaltypes.Node
nodesById := make(map[string]*internaltypes.Node)
evictedJobSchedulingContextsByNodeId := make(map[string][]*EvictedJobSchedulingContext)
nodesById := make(map[string]*consideredNode)
it, err := txn.ReverseLowerBound("evictedJobs", "index", math.MaxInt)
if err != nil {
return nil, errors.WithStack(err)
Expand All @@ -827,53 +833,68 @@ func (nodeDb *NodeDb) selectNodeForJobWithFairPreemption(txn *memdb.Txn, jctx *s
}
node, ok := nodesById[nodeId]
if !ok {
node, err = nodeDb.GetNodeWithTxn(txn, nodeId)
nodeFromDb, err := nodeDb.GetNodeWithTxn(txn, nodeId)
if err != nil {
return nil, errors.WithStack(err)
}
node = node.UnsafeCopy()
node = &consideredNode{
node: nodeFromDb,
availableResource: nodeFromDb.AllocatableByPriority[evictedPriority],
staticRequirementsNotMet: false,
evictedJobs: []*EvictedJobSchedulingContext{},
}

nodesById[nodeId] = node
}

err = nodeDb.unbindJobFromNodeInPlace(nodeDb.priorityClasses, evictedJctx.Job, node)
if err != nil {
return nil, err
if node.staticRequirementsNotMet {
continue
}
evictedJobSchedulingContextsByNodeId[nodeId] = append(evictedJobSchedulingContextsByNodeId[nodeId], evictedJobSchedulingContext)

priority, ok := nodeDb.GetScheduledAtPriority(evictedJctx.JobId)
if !ok {
priority = evictedJctx.PodRequirements.Priority
}
if priority > maxPriority {
maxPriority = priority
// Evict job, update available resource
node.availableResource = node.availableResource.Add(evictedJctx.ResourceRequirements)
node.evictedJobs = append(node.evictedJobs, evictedJobSchedulingContext)

dynamicRequirementsMet, _ := DynamicJobRequirementsMet(node.availableResource, jctx)
if !dynamicRequirementsMet {
continue
}
matches, reason, err := JobRequirementsMet(
node,
// At this point, we've unbound the jobs running on the node.
// Hence, we should check if the job is schedulable at evictedPriority,
// since that indicates the job can be scheduled without causing further preemptions.
evictedPriority,
jctx,
)

staticRequirementsMet, reason, err := StaticJobRequirementsMet(node.node, jctx)
if err != nil {
return nil, err
}
if matches {
selectedNode = node
} else {
if !staticRequirementsMet {
node.staticRequirementsNotMet = true
s := nodeDb.stringFromPodRequirementsNotMetReason(reason)
pctx.NumExcludedNodesByReason[s] += 1
continue
}
}
if selectedNode != nil {
pctx.NodeId = selectedNode.GetId()
pctx.PreemptedAtPriority = maxPriority
for _, evictedJobSchedulingContext := range evictedJobSchedulingContextsByNodeId[selectedNode.GetId()] {
if err := txn.Delete("evictedJobs", evictedJobSchedulingContext); err != nil {

nodeCopy := node.node.UnsafeCopy()
for _, job := range node.evictedJobs {
// Remove preempted job from node
err = nodeDb.unbindJobFromNodeInPlace(nodeDb.priorityClasses, job.JobSchedulingContext.Job, nodeCopy)
if err != nil {
return nil, err
}
// Remove preempted job from list of evicted jobs
if err := txn.Delete("evictedJobs", job); err != nil {
return nil, errors.WithStack(err)
}

priority, ok := nodeDb.GetScheduledAtPriority(evictedJctx.JobId)
if !ok {
priority = evictedJctx.PodRequirements.Priority
}
if priority > maxPriority {
maxPriority = priority
}
}

selectedNode = nodeCopy
pctx.NodeId = selectedNode.GetId()
pctx.PreemptedAtPriority = maxPriority
}
return selectedNode, nil
}
Expand Down

0 comments on commit 5daee4a

Please sign in to comment.