Skip to content

Commit

Permalink
Add network-topology-aware plugin and hyperNode score callback
Browse files Browse the repository at this point in the history
Signed-off-by: ecosysbin <[email protected]>

combine PodGroupStatus and PodGroupAnnotations

Signed-off-by: ecosysbin <[email protected]>

rebase with pr:8704

Signed-off-by: ecosysbin <[email protected]>

rebase with pr:8704

Signed-off-by: ecosysbin <[email protected]>

update networktopologyaware score

Signed-off-by: ecosysbin <[email protected]>
  • Loading branch information
ecosysbin committed Jan 2, 2025
1 parent 2658a13 commit 1847753
Show file tree
Hide file tree
Showing 14 changed files with 1,216 additions and 75 deletions.
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -190,5 +190,4 @@ replace (
k8s.io/sample-apiserver => k8s.io/sample-apiserver v0.31.1
k8s.io/sample-cli-plugin => k8s.io/sample-cli-plugin v0.31.1
k8s.io/sample-controller => k8s.io/sample-controller v0.31.1
volcano.sh/apis => github.com/Monokaix/apis v0.0.0-20241205070443-2cc3b54d83c7
)
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/Microsoft/go-winio v0.6.0 h1:slsWYD/zyx7lCXoZVlvQrj0hPTM1HI4+v1sIda2yDvg=
github.com/Microsoft/go-winio v0.6.0/go.mod h1:cTAf44im0RAYeL23bpB+fzCyDH2MJiz2BO69KH/soAE=
github.com/Monokaix/apis v0.0.0-20241205070443-2cc3b54d83c7 h1:Fb5bCm+ZygQ4Rrqt9x+uTG60kVlm41prlOpWwuWyHA4=
github.com/Monokaix/apis v0.0.0-20241205070443-2cc3b54d83c7/go.mod h1:XHIjTlHDMZTLRg2Y2JAkj85iP0iiet2tv+HfPQZrsHs=
github.com/NYTimes/gziphandler v1.1.1 h1:ZUDjpQae29j0ryrS0u/B8HZfJBtBQHjqw2rQ2cqUQ3I=
github.com/NYTimes/gziphandler v1.1.1/go.mod h1:n/CVRwUEOgIxrgPvAQhUUr9oeUtvrhMomdKFjzJNB0c=
github.com/agiledragon/gomonkey/v2 v2.11.0 h1:5oxSgA+tC1xuGsrIorR+sYiziYltmJyEZ9qA25b6l5U=
Expand Down Expand Up @@ -512,3 +510,5 @@ sigs.k8s.io/yaml v1.4.0 h1:Mk1wCc2gy/F0THH0TAp1QYyJNzRm2KCLy3o5ASXVI5E=
sigs.k8s.io/yaml v1.4.0/go.mod h1:Ejl7/uTz7PSA4eKMyQCUTnhZYNmLIl+5c2lQPGR2BPY=
stathat.com/c/consistent v1.0.0 h1:ezyc51EGcRPJUxfHGSgJjWzJdj3NiMU9pNfLNGiXV0c=
stathat.com/c/consistent v1.0.0/go.mod h1:QkzMWzcbB+yQBL2AttO6sgsQS/JSTapcDISJalmCDS0=
volcano.sh/apis v1.10.0-alpha.0.0.20241218081838-e5d361b6bfbe h1:iHd1Xt36a7S47IFksuF0h9W9J4LKzhBEz0C9XbkBvB8=
volcano.sh/apis v1.10.0-alpha.0.0.20241218081838-e5d361b6bfbe/go.mod h1:XHIjTlHDMZTLRg2Y2JAkj85iP0iiet2tv+HfPQZrsHs=
58 changes: 26 additions & 32 deletions pkg/scheduler/actions/allocate/allocate.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,8 @@ func (alloc *Action) allocateResourceForTasksWithTopology(tasks *util.PriorityQu
hyperNodesWithLeftTasks := make(map[string]*util.PriorityQueue)
ssn := alloc.session
selectedTier := 0
jobNewHyperNodeMap := map[string]string{}
jobHyperNode := job.PodGroup.Annotations[api.TopologyAllocateLCAHyperNode]

// Find a suitable hyperNode in one tier from down to top everytime to ensure that the selected hyperNode spans the least tier.
for index, tier := range alloc.hyperNodesTiers {
Expand All @@ -245,6 +247,15 @@ func (alloc *Action) allocateResourceForTasksWithTopology(tasks *util.PriorityQu
break
}
for _, hyperNodeName := range ssn.HyperNodesListByTier[tier] {
// job is scheduled for the first time
jobNewHyperNodeMap[hyperNodeName] = hyperNodeName
if jobHyperNode != "" {
jobNewHyperNode, _ := util.FindLCAHyperNode(hyperNodeName, jobHyperNode, nil)
// to do.
// check whether the hyperNode meets the requirements of the topology hard tier.
jobNewHyperNodeMap[hyperNodeName] = jobNewHyperNode
}

nodes, ok := ssn.HyperNodes[hyperNodeName]
if !ok {
klog.ErrorS(nil, "HyperNode not exists.", "jobName", job.UID, "name", hyperNodeName, "tier", tier)
Expand Down Expand Up @@ -286,6 +297,8 @@ func (alloc *Action) allocateResourceForTasksWithTopology(tasks *util.PriorityQu
klog.V(4).InfoS("Find available hyperNodes for job", "jobName", job.UID, "tier", selectedTier, "hyperNodes", hyperNodes)
}
stmt, hyperNode := alloc.selectBestHyperNode(jobStmtsByTier[selectedTier], job)
jobNewHyperNode := jobNewHyperNodeMap[hyperNode]
job.PodGroup.GetAnnotations()[api.TopologyAllocateLCAHyperNode] = jobNewHyperNode
return stmt, hyperNodesWithLeftTasks[hyperNode]
}

Expand Down Expand Up @@ -390,6 +403,18 @@ func (alloc *Action) allocateResourcesForTasks(tasks *util.PriorityQueue, job *a
continue
}

// recored hyperNode of the job
if hyperNode == "" {
jobHyperNode := job.PodGroup.Annotations[api.TopologyAllocateLCAHyperNode]
hyperNodeOfNode := util.FindHyperNodeOfNode(bestNode.Name, ssn.HyperNodes)
newJobHyperNode := hyperNodeOfNode
if jobHyperNode != "" {
// job is not scheduled for the first time
newJobHyperNode, _ = util.FindLCAHyperNode(hyperNodeOfNode, jobHyperNode, ssn.HyperNodeTree)
}
job.PodGroup.GetAnnotations()[api.TopologyAllocateLCAHyperNode] = newJobHyperNode
}

alloc.sumNodeScoresInHyperNode(string(job.UID), hyperNode, highestScore)
alloc.allocateResourcesForTask(stmt, task, bestNode, job)

Expand Down Expand Up @@ -480,6 +505,7 @@ func (alloc *Action) allocateResourcesForTask(stmt *framework.Statement, task *a
if task.InitResreq.LessEqual(node.Idle, api.Zero) {
klog.V(3).Infof("Binding Task <%v/%v> to node <%v>", task.Namespace, task.Name, node.Name)
if err := stmt.Allocate(task, node); err != nil {
klog.Errorf("Failed to bind Task %v on %v in Session %v, err: %v",
task.UID, node.Name, alloc.session.UID, err)
if rollbackErr := stmt.UnAllocate(task); rollbackErr != nil {
klog.Errorf("Failed to unallocate Task %v on %v in Session %v for %v.",
Expand All @@ -492,38 +518,6 @@ func (alloc *Action) allocateResourcesForTask(stmt *framework.Statement, task *a
return
}

klog.V(3).Infof("Predicates failed in allocate for task <%s/%s> on node <%s> with limited resources",
task.Namespace, task.Name, node.Name)

// Allocate releasing resource to the task if any.
if task.InitResreq.LessEqual(node.FutureIdle(), api.Zero) {
klog.V(3).Infof("Pipelining Task <%v/%v> to node <%v> for <%v> on <%v>",
task.Namespace, task.Name, node.Name, task.InitResreq, node.Releasing)
if err := stmt.Pipeline(task, node.Name, false); err != nil {
klog.Errorf("Failed to pipeline Task %v on %v in Session %v for %v.",
task.UID, node.Name, alloc.session.UID, err)
} else {
metrics.UpdateE2eSchedulingDurationByJob(job.Name, string(job.Queue), job.Namespace, metrics.Duration(job.CreationTimestamp.Time))
metrics.UpdateE2eSchedulingLastTimeByJob(job.Name, string(job.Queue), job.Namespace, time.Now())
=======
if ssn.JobReady(job) {
klog.V(3).InfoS("Job ready, return statement", "jobName", job.UID)
return stmt
} else {
if !ssn.JobPipelined(job) {
// Allocate idle resource to the task.
if task.InitResreq.LessEqual(node.Idle, api.Zero) {
klog.V(3).Infof("Binding Task <%v/%v> to node <%v>", task.Namespace, task.Name, node.Name)
if err := stmt.Allocate(task, node); err != nil {
klog.Errorf("Failed to bind Task %v on %v in Session %v, err: %v",
task.UID, node.Name, alloc.session.UID, err)
} else {
metrics.UpdateE2eSchedulingDurationByJob(job.Name, string(job.Queue), job.Namespace, metrics.Duration(job.CreationTimestamp.Time))
metrics.UpdateE2eSchedulingLastTimeByJob(job.Name, string(job.Queue), job.Namespace, time.Now())
}
return
}

klog.V(3).Infof("Predicates failed in allocate for task <%s/%s> on node <%s> with limited resources",
task.Namespace, task.Name, node.Name)

Expand Down
10 changes: 10 additions & 0 deletions pkg/scheduler/api/node_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"k8s.io/klog/v2"
k8sframework "k8s.io/kubernetes/pkg/scheduler/framework"

"volcano.sh/apis/pkg/apis/scheduling"
"volcano.sh/apis/pkg/apis/scheduling/v1beta1"

"volcano.sh/volcano/pkg/scheduler/api/devices/nvidia/gpushare"
Expand Down Expand Up @@ -90,6 +91,15 @@ type NodeInfo struct {
ImageStates map[string]*k8sframework.ImageStateSummary
}

// Recored podgroup old state
type PodGroupOldState struct {
// podGroupStatus cache podgroup status during schedule
// This should not be mutated after initiated
Status map[JobID]scheduling.PodGroupStatus
// recored old annotations for podgroup, used to detect changes
Annotations map[JobID]map[string]string
}

// FutureIdle returns resources that will be idle in the future:
//
// That is current idle resources plus released resources minus pipelined resources.
Expand Down
3 changes: 3 additions & 0 deletions pkg/scheduler/api/well_known_labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,7 @@ const (

// topologyDecisionAnnotation is the key of topology decision about pod request resource
topologyDecisionAnnotation = "volcano.sh/topology-decision"

// TopologyAllocateLCAHyperNode is the key to the lowest common ancestor of the network topology to which the tasks assigned to a job belong.
TopologyAllocateLCAHyperNode = "volcano.sh/allocate-lca-hypernode"
)
8 changes: 8 additions & 0 deletions pkg/scheduler/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ import (
vcinformerv1 "volcano.sh/apis/pkg/client/informers/externalversions/scheduling/v1beta1"
"volcano.sh/volcano/cmd/scheduler/app/options"
"volcano.sh/volcano/pkg/features"
"volcano.sh/volcano/pkg/scheduler/api"
schedulingapi "volcano.sh/volcano/pkg/scheduler/api"
volumescheduling "volcano.sh/volcano/pkg/scheduler/capabilities/volumebinding"
"volcano.sh/volcano/pkg/scheduler/metrics"
Expand Down Expand Up @@ -1486,6 +1487,13 @@ func (sc *SchedulerCache) String() string {

// RecordJobStatusEvent records related events according to job status.
func (sc *SchedulerCache) RecordJobStatusEvent(job *schedulingapi.JobInfo, updatePG bool) {
if updatePG {
// update cache for job's podgroup annotations
sc.Mutex.Lock()
sc.Jobs[job.UID].PodGroup.GetAnnotations()[api.TopologyAllocateLCAHyperNode] = job.PodGroup.GetAnnotations()[api.TopologyAllocateLCAHyperNode]
sc.Mutex.Unlock()
}

pgUnschedulable := job.PodGroup != nil &&
(job.PodGroup.Status.Phase == scheduling.PodGroupUnknown ||
job.PodGroup.Status.Phase == scheduling.PodGroupPending ||
Expand Down
2 changes: 2 additions & 0 deletions pkg/scheduler/conf/scheduler_conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ type PluginOption struct {
EnabledOverused *bool `yaml:"enabledOverused"`
// EnabledAllocatable defines whether allocatable is enabled
EnabledAllocatable *bool `yaml:"enabledAllocatable"`
// EnabledNetworkTopology defines whether network topology is enabled
EnabledNetworkTopology *bool `yaml:"enabledNetworkTopology"`
// Arguments defines the different arguments that can be given to different plugins
Arguments map[string]interface{} `yaml:"arguments"`
}
6 changes: 4 additions & 2 deletions pkg/scheduler/framework/job_updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,12 @@ func isPodGroupStatusUpdated(newStatus, oldStatus scheduling.PodGroupStatus) boo
func (ju *jobUpdater) updateJob(index int) {
job := ju.jobQueue[index]
ssn := ju.ssn
oldHyperNode := ssn.PodGroupOldState.Annotations[job.UID][api.TopologyAllocateLCAHyperNode]

job.PodGroup.Status = jobStatus(ssn, job)
oldStatus, found := ssn.podGroupStatus[job.UID]
updatePG := !found || isPodGroupStatusUpdated(job.PodGroup.Status, oldStatus)
oldStatus, found := ssn.PodGroupOldState.Status[job.UID]
updatePG := !found || isPodGroupStatusUpdated(job.PodGroup.Status, oldStatus) || oldHyperNode != job.PodGroup.GetAnnotations()[api.TopologyAllocateLCAHyperNode]

if _, err := ssn.cache.UpdateJobStatus(job, updatePG); err != nil {
klog.Errorf("Failed to update job <%s/%s>: %v",
job.Namespace, job.Name, err)
Expand Down
16 changes: 11 additions & 5 deletions pkg/scheduler/framework/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,10 @@ type Session struct {

TotalResource *api.Resource
TotalGuarantee *api.Resource
// podGroupStatus cache podgroup status during schedule

// PodGroupOldState contains podgroup status and annotations during schedule
// This should not be mutated after initiated
podGroupStatus map[api.JobID]scheduling.PodGroupStatus
api.PodGroupOldState

Jobs map[api.JobID]*api.JobInfo
Nodes map[string]*api.NodeInfo
Expand All @@ -81,6 +82,8 @@ type Session struct {
HyperNodesListByTier map[int][]string
// HyperNodes maps hyperNode Name -> nodes under the hyperNode.
HyperNodes map[string][]*api.NodeInfo
// HyperNodeTree is the hypernode tree of all hypernodes in the cluster.
HyperNodeTree []map[string][]string

plugins map[string]Plugin
eventHandlers []*EventHandler
Expand Down Expand Up @@ -127,8 +130,10 @@ func openSession(cache cache.Cache) *Session {

TotalResource: api.EmptyResource(),
TotalGuarantee: api.EmptyResource(),
podGroupStatus: map[api.JobID]scheduling.PodGroupStatus{},

PodGroupOldState: api.PodGroupOldState{
Status: map[api.JobID]scheduling.PodGroupStatus{},
Annotations: map[api.JobID]map[string]string{},
},
Jobs: map[api.JobID]*api.JobInfo{},
Nodes: map[string]*api.NodeInfo{},
CSINodesStatus: map[string]*api.CSINodeStatusInfo{},
Expand Down Expand Up @@ -170,7 +175,8 @@ func openSession(cache cache.Cache) *Session {
ssn.Jobs = snapshot.Jobs
for _, job := range ssn.Jobs {
if job.PodGroup != nil {
ssn.podGroupStatus[job.UID] = *job.PodGroup.Status.DeepCopy()
ssn.PodGroupOldState.Status[job.UID] = *job.PodGroup.Status.DeepCopy()
ssn.PodGroupOldState.Annotations[job.UID] = job.PodGroup.GetAnnotations()
}

if vjr := ssn.JobValid(job); vjr != nil {
Expand Down
Loading

0 comments on commit 1847753

Please sign in to comment.