Skip to content

Commit

Permalink
Add network-topology-aware plugin and hyperNode score callback
Browse files Browse the repository at this point in the history
Signed-off-by: ecosysbin <[email protected]>
  • Loading branch information
ecosysbin committed Jan 14, 2025
1 parent 3df5ce8 commit 0aaad96
Show file tree
Hide file tree
Showing 14 changed files with 1,255 additions and 30 deletions.
37 changes: 17 additions & 20 deletions pkg/scheduler/actions/allocate/allocate.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package allocate

import (
"sort"
"time"

"k8s.io/klog/v2"
Expand All @@ -34,7 +33,6 @@ type Action struct {
session *framework.Session
// configured flag for error cache
enablePredicateErrorCache bool
hyperNodesTiers []int

// hyperNodeScoresByJob stores job total score for all available hyperNodes, this is used for accumulate
// all nodes' scores in each available hyperNode only when job has hard network topology constrains
Expand All @@ -45,7 +43,6 @@ type Action struct {
func New() *Action {
return &Action{
enablePredicateErrorCache: true, // default to enable it
hyperNodesTiers: []int{},
hyperNodeScoresByJob: make(map[string]map[string]float64),
}
}
Expand All @@ -61,26 +58,11 @@ func (alloc *Action) parseArguments(ssn *framework.Session) {
arguments.GetBool(&alloc.enablePredicateErrorCache, conf.EnablePredicateErrCacheKey)
}

func (alloc *Action) parseHyperNodesTiers(ssn *framework.Session) {
if ssn.HyperNodesSetByTier == nil || len(ssn.HyperNodesSetByTier) == 0 {
return
}

// sort to guarantee the traverse order is from down to top.
var tiers []int
for tier := range ssn.HyperNodesSetByTier {
tiers = append(tiers, tier)
}
sort.Ints(tiers)
alloc.hyperNodesTiers = tiers
}

func (alloc *Action) Execute(ssn *framework.Session) {
klog.V(5).Infof("Enter Allocate ...")
defer klog.V(5).Infof("Leaving Allocate ...")

alloc.parseArguments(ssn)
alloc.parseHyperNodesTiers(ssn)

// the allocation for pod may have many stages
// 1. pick a queue named Q (using ssn.QueueOrderFn)
Expand Down Expand Up @@ -237,9 +219,11 @@ func (alloc *Action) allocateResourceForTasksWithTopology(tasks *util.PriorityQu
hyperNodesWithLeftTasks := make(map[string]*util.PriorityQueue)
ssn := alloc.session
selectedTier := 0
LCAHyperNodeMap := map[string]string{}
jobHyperNode := job.PodGroup.Annotations[api.TopologyAllocateLCAHyperNode]

// Find a suitable hyperNode in one tier from down to top everytime to ensure that the selected hyperNode spans the least tier.
for index, tier := range alloc.hyperNodesTiers {
for index, tier := range ssn.HyperNodesTiers {
if index+1 > highestAllowedTier {
klog.V(4).ErrorS(nil, "Skip search for higher tier cause highest allowed tier reached", "jobName", job.UID, "highestAllowedTier", highestAllowedTier, "tier", tier)
break
Expand All @@ -254,7 +238,16 @@ func (alloc *Action) allocateResourceForTasksWithTopology(tasks *util.PriorityQu
klog.ErrorS(nil, "HyperNode not exists.", "jobName", job.UID, "name", hyperNodeName, "tier", tier)
continue
}

LCAHyperNodeMap[hyperNodeName] = hyperNodeName
// The job still has remaining tasks to be scheduled, check whether the least common ancestor hypernode still meets the requirement of the highest allowed tier
if jobHyperNode != "" {
LCAHyperNode, index := util.FindLCAHyperNode(hyperNodeName, jobHyperNode, ssn.HyperNodesTiers, ssn.HyperNodesSetByTier, ssn.HyperNodesDescendants)
if index+1 > highestAllowedTier {
klog.V(4).InfoS("Skip search for higher tier cause highest allowed tier reached", "jobName", job.UID, "highestAllowedTier", highestAllowedTier, "tier", index+1)
continue
}
LCAHyperNodeMap[hyperNodeName] = LCAHyperNode
}
// Clone tasks queue and rest job's fit err to make sure it's a clean cache when everytime filter a hyperNode and do not affect each other between hyperNodes.
tasksQueue := tasks.Clone()
job.ResetFitErr()
Expand Down Expand Up @@ -290,6 +283,10 @@ func (alloc *Action) allocateResourceForTasksWithTopology(tasks *util.PriorityQu
klog.V(4).InfoS("Find available hyperNodes for job", "jobName", job.UID, "tier", selectedTier, "hyperNodes", hyperNodes)
}
stmt, hyperNode := alloc.selectBestHyperNode(jobStmtsByTier[selectedTier], job)
jobNewHyperNode := LCAHyperNodeMap[hyperNode]
if jobNewHyperNode != jobHyperNode {
job.PodGroup.GetAnnotations()[api.TopologyAllocateLCAHyperNode] = jobHyperNode
}
return stmt, hyperNodesWithLeftTasks[hyperNode]
}

Expand Down
1 change: 1 addition & 0 deletions pkg/scheduler/api/cluster_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type ClusterInfo struct {
Jobs map[JobID]*JobInfo
Nodes map[string]*NodeInfo
HyperNodesSetByTier map[int]sets.Set[string]
HyperNodesDescendants map[string]sets.Set[string]
RealNodesSet map[string]sets.Set[string]
HyperNodesReadyToSchedule bool
Queues map[QueueID]*QueueInfo
Expand Down
9 changes: 9 additions & 0 deletions pkg/scheduler/api/hyper_node_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,15 @@ func (hni *HyperNodesInfo) GetAncestors(name string) sets.Set[string] {
return ancestors
}

// GetAllDescendants returns all descendants of all HyperNodes.
func (hni *HyperNodesInfo) GetAllDescendants() map[string]sets.Set[string] {
descendants := make(map[string]sets.Set[string])
for name := range hni.hyperNodes {
descendants[name] = hni.GetDescendants(name)
}
return descendants
}

// GetDescendants returns all descendants of a given HyperNode.
func (hni *HyperNodesInfo) GetDescendants(hyperNodeName string) sets.Set[string] {
queue := []string{hyperNodeName}
Expand Down
10 changes: 10 additions & 0 deletions pkg/scheduler/api/node_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"k8s.io/klog/v2"
k8sframework "k8s.io/kubernetes/pkg/scheduler/framework"

"volcano.sh/apis/pkg/apis/scheduling"
"volcano.sh/apis/pkg/apis/scheduling/v1beta1"

"volcano.sh/volcano/pkg/scheduler/api/devices/nvidia/gpushare"
Expand Down Expand Up @@ -90,6 +91,15 @@ type NodeInfo struct {
ImageStates map[string]*k8sframework.ImageStateSummary
}

// Recored podgroup old state
type PodGroupOldState struct {
// podGroupStatus cache podgroup status during schedule
// This should not be mutated after initiated
Status map[JobID]scheduling.PodGroupStatus
// recored old annotations for podgroup, used to detect changes
Annotations map[JobID]map[string]string
}

// FutureIdle returns resources that will be idle in the future:
//
// That is current idle resources plus released resources minus pipelined resources.
Expand Down
2 changes: 2 additions & 0 deletions pkg/scheduler/api/well_known_labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,6 @@ const (

// topologyDecisionAnnotation is the key of topology decision about pod request resource
topologyDecisionAnnotation = "volcano.sh/topology-decision"
// TopologyAllocateLCAHyperNode is the key to the lowest common ancestor of the network topology to which the tasks assigned to a job belong.
TopologyAllocateLCAHyperNode = "volcano.sh/allocate-lca-hypernode"
)
6 changes: 6 additions & 0 deletions pkg/scheduler/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ import (
topologyinformerv1alpha1 "volcano.sh/apis/pkg/client/informers/externalversions/topology/v1alpha1"
"volcano.sh/volcano/cmd/scheduler/app/options"
"volcano.sh/volcano/pkg/features"
"volcano.sh/volcano/pkg/scheduler/api"
schedulingapi "volcano.sh/volcano/pkg/scheduler/api"
volumescheduling "volcano.sh/volcano/pkg/scheduler/capabilities/volumebinding"
"volcano.sh/volcano/pkg/scheduler/metrics"
Expand Down Expand Up @@ -1402,6 +1403,7 @@ func (sc *SchedulerCache) Snapshot() *schedulingapi.ClusterInfo {
// Snapshot hyperNodes.
sc.HyperNodesInfo.Lock()
snapshot.HyperNodesSetByTier = sc.HyperNodesInfo.HyperNodesSetByTier()
snapshot.HyperNodesDescendants = sc.HyperNodesInfo.GetAllDescendants()
snapshot.RealNodesSet = sc.HyperNodesInfo.RealNodesSet()
snapshot.HyperNodesReadyToSchedule = sc.HyperNodesInfo.Ready()
sc.HyperNodesInfo.Unlock()
Expand Down Expand Up @@ -1562,6 +1564,10 @@ func (sc *SchedulerCache) UpdateJobStatus(job *schedulingapi.JobInfo, updatePG b
if err != nil {
return nil, err
}
sc.Mutex.Lock()
sc.Jobs[job.UID].PodGroup.GetAnnotations()[api.TopologyAllocateLCAHyperNode] = job.PodGroup.GetAnnotations()[api.TopologyAllocateLCAHyperNode]
sc.Mutex.Unlock()

job.PodGroup = pg
}

Expand Down
9 changes: 9 additions & 0 deletions pkg/scheduler/cache/event_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -820,6 +820,11 @@ func (sc *SchedulerCache) AddPodGroupV1beta1(obj interface{}) {
return
}

if podgroup.GetAnnotations() == nil {
klog.V(3).Infof("Add PodGroup(%s) into cache, spec(%#v)", ss.Name, ss.Spec)
podgroup.SetAnnotations(map[string]string{})
}

pg := &schedulingapi.PodGroup{PodGroup: podgroup, Version: schedulingapi.PodGroupVersionV1Beta1}
klog.V(4).Infof("Add PodGroup(%s) into cache, spec(%#v)", ss.Name, ss.Spec)

Expand Down Expand Up @@ -854,6 +859,10 @@ func (sc *SchedulerCache) UpdatePodGroupV1beta1(oldObj, newObj interface{}) {
klog.Errorf("Failed to convert podgroup from %T to %T", newSS, podgroup)
return
}
if podgroup.GetAnnotations() == nil {
klog.V(3).Infof("Update PodGroup(%s) into cache, spec(%#v)", newSS.Name, newSS.Spec)
podgroup.SetAnnotations(map[string]string{})
}

pg := &schedulingapi.PodGroup{PodGroup: podgroup, Version: schedulingapi.PodGroupVersionV1Beta1}

Expand Down
2 changes: 2 additions & 0 deletions pkg/scheduler/conf/scheduler_conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ type PluginOption struct {
EnabledOverused *bool `yaml:"enabledOverused"`
// EnabledAllocatable defines whether allocatable is enabled
EnabledAllocatable *bool `yaml:"enabledAllocatable"`
// EnabledNetworkTopology defines whether network topology is enabled
EnabledNetworkTopology *bool `yaml:"enabledNetworkTopology"`
// Arguments defines the different arguments that can be given to different plugins
Arguments map[string]interface{} `yaml:"arguments"`
}
6 changes: 4 additions & 2 deletions pkg/scheduler/framework/job_updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,12 @@ func isPodGroupStatusUpdated(newStatus, oldStatus scheduling.PodGroupStatus) boo
func (ju *jobUpdater) updateJob(index int) {
job := ju.jobQueue[index]
ssn := ju.ssn
oldHyperNode := ssn.PodGroupOldState.Annotations[job.UID][api.TopologyAllocateLCAHyperNode]

job.PodGroup.Status = jobStatus(ssn, job)
oldStatus, found := ssn.podGroupStatus[job.UID]
updatePG := !found || isPodGroupStatusUpdated(job.PodGroup.Status, oldStatus)
oldStatus, found := ssn.PodGroupOldState.Status[job.UID]
updatePG := !found || isPodGroupStatusUpdated(job.PodGroup.Status, oldStatus) || oldHyperNode != job.PodGroup.GetAnnotations()[api.TopologyAllocateLCAHyperNode]

if _, err := ssn.cache.UpdateJobStatus(job, updatePG); err != nil {
klog.Errorf("Failed to update job <%s/%s>: %v",
job.Namespace, job.Name, err)
Expand Down
36 changes: 28 additions & 8 deletions pkg/scheduler/framework/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package framework
import (
"context"
"fmt"
"sort"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
Expand Down Expand Up @@ -57,9 +58,9 @@ type Session struct {

TotalResource *api.Resource
TotalGuarantee *api.Resource
// podGroupStatus cache podgroup status during schedule
// PodGroupOldState contains podgroup status and annotations during schedule
// This should not be mutated after initiated
podGroupStatus map[api.JobID]scheduling.PodGroupStatus
api.PodGroupOldState

Jobs map[api.JobID]*api.JobInfo
Nodes map[string]*api.NodeInfo
Expand All @@ -81,7 +82,10 @@ type Session struct {
// hyperNode can gain a better performance, the lower the tier of hyperNode, the better performance.
HyperNodesSetByTier map[int]sets.Set[string]
// RealNodesList maps hyperNode Name -> nodes under the hyperNode.
RealNodesList map[string][]*api.NodeInfo
RealNodesList map[string][]*api.NodeInfo
HyperNodesTiers []int
// HyperNodesDescendants contains a map of hyperNode Name -> all descendants of the hyperNode,
HyperNodesDescendants map[string]sets.Set[string]
HyperNodesReadyToSchedule bool

plugins map[string]Plugin
Expand Down Expand Up @@ -129,8 +133,10 @@ func openSession(cache cache.Cache) *Session {

TotalResource: api.EmptyResource(),
TotalGuarantee: api.EmptyResource(),
podGroupStatus: map[api.JobID]scheduling.PodGroupStatus{},

PodGroupOldState: api.PodGroupOldState{
Status: map[api.JobID]scheduling.PodGroupStatus{},
Annotations: map[api.JobID]map[string]string{},
},
Jobs: map[api.JobID]*api.JobInfo{},
Nodes: map[string]*api.NodeInfo{},
CSINodesStatus: map[string]*api.CSINodeStatusInfo{},
Expand Down Expand Up @@ -172,7 +178,8 @@ func openSession(cache cache.Cache) *Session {
ssn.Jobs = snapshot.Jobs
for _, job := range ssn.Jobs {
if job.PodGroup != nil {
ssn.podGroupStatus[job.UID] = *job.PodGroup.Status.DeepCopy()
ssn.PodGroupOldState.Status[job.UID] = *job.PodGroup.Status.DeepCopy()
ssn.PodGroupOldState.Annotations[job.UID] = job.PodGroup.GetAnnotations()
}

if vjr := ssn.JobValid(job); vjr != nil {
Expand All @@ -197,12 +204,11 @@ func openSession(cache cache.Cache) *Session {
ssn.NodeList = util.GetNodeList(snapshot.Nodes, snapshot.NodeList)
ssn.HyperNodesSetByTier = snapshot.HyperNodesSetByTier
ssn.RealNodesList = util.GetRealNodesListByHyperNode(snapshot.RealNodesSet, snapshot.Nodes)
ssn.HyperNodesReadyToSchedule = snapshot.HyperNodesReadyToSchedule
parseHyperNodesTiers(ssn)
ssn.Nodes = snapshot.Nodes
ssn.CSINodesStatus = snapshot.CSINodesStatus
ssn.RevocableNodes = snapshot.RevocableNodes
ssn.Queues = snapshot.Queues
ssn.NamespaceInfo = snapshot.NamespaceInfo
// calculate all nodes' resource only once in each schedule cycle, other plugins can clone it when need
for _, n := range ssn.Nodes {
ssn.TotalResource.Add(n.Allocatable)
Expand All @@ -214,6 +220,20 @@ func openSession(cache cache.Cache) *Session {
return ssn
}

func parseHyperNodesTiers(ssn *Session) {
if ssn.HyperNodesSetByTier == nil || len(ssn.HyperNodesSetByTier) == 0 {
return
}

// sort to guarantee the traverse order is from down to top.
var tiers []int
for tier := range ssn.HyperNodesSetByTier {
tiers = append(tiers, tier)
}
sort.Ints(tiers)
ssn.HyperNodesTiers = tiers
}

// updateQueueStatus updates allocated field in queue status on session close.
func updateQueueStatus(ssn *Session) {
rootQueue := api.QueueID("root")
Expand Down
Loading

0 comments on commit 0aaad96

Please sign in to comment.