From c7a75450d6a099cd8ca84998db73c982abc9f7b6 Mon Sep 17 00:00:00 2001 From: ecosysbin <14729934+ecosysbin@user.noreply.gitee.com> Date: Tue, 24 Dec 2024 20:18:29 +0800 Subject: [PATCH] Add network-topology-aware plugin and hyperNode score callback Signed-off-by: ecosysbin <14729934+ecosysbin@user.noreply.gitee.com> combine PodGroupStatus and PodGroupAnnotations Signed-off-by: ecosysbin <14729934+ecosysbin@user.noreply.gitee.com> rebase with pr:8704 Signed-off-by: ecosysbin <14729934+ecosysbin@user.noreply.gitee.com> rebase with pr:8704 Signed-off-by: ecosysbin <14729934+ecosysbin@user.noreply.gitee.com> update networktopologyaware score Signed-off-by: ecosysbin <14729934+ecosysbin@user.noreply.gitee.com> --- go.mod | 1 - go.sum | 4 +- pkg/scheduler/actions/allocate/allocate.go | 58 +- pkg/scheduler/api/node_info.go | 10 + pkg/scheduler/api/well_known_labels.go | 3 + pkg/scheduler/cache/cache.go | 5 +- pkg/scheduler/conf/scheduler_conf.go | 2 + pkg/scheduler/framework/job_updater.go | 6 +- pkg/scheduler/framework/session.go | 16 +- .../networktopologyaware.go | 198 ++++++ .../networktopologyaware_test.go | 616 ++++++++++++++++++ pkg/scheduler/uthelper/helper.go | 8 - pkg/scheduler/util/scheduler_helper.go | 100 ++- pkg/scheduler/util/scheduler_helper_test.go | 261 +++++++- 14 files changed, 1212 insertions(+), 76 deletions(-) create mode 100644 pkg/scheduler/plugins/networktopologyaware/networktopologyaware.go create mode 100644 pkg/scheduler/plugins/networktopologyaware/networktopologyaware_test.go diff --git a/go.mod b/go.mod index 8f6f6ac902..af4d2e14d9 100644 --- a/go.mod +++ b/go.mod @@ -190,5 +190,4 @@ replace ( k8s.io/sample-apiserver => k8s.io/sample-apiserver v0.31.1 k8s.io/sample-cli-plugin => k8s.io/sample-cli-plugin v0.31.1 k8s.io/sample-controller => k8s.io/sample-controller v0.31.1 - volcano.sh/apis => github.com/Monokaix/apis v0.0.0-20241205070443-2cc3b54d83c7 ) diff --git a/go.sum b/go.sum index 42f182df38..7791857a2e 100644 --- a/go.sum +++ b/go.sum @@ -1,8 +1,6 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/Microsoft/go-winio v0.6.0 h1:slsWYD/zyx7lCXoZVlvQrj0hPTM1HI4+v1sIda2yDvg= github.com/Microsoft/go-winio v0.6.0/go.mod h1:cTAf44im0RAYeL23bpB+fzCyDH2MJiz2BO69KH/soAE= -github.com/Monokaix/apis v0.0.0-20241205070443-2cc3b54d83c7 h1:Fb5bCm+ZygQ4Rrqt9x+uTG60kVlm41prlOpWwuWyHA4= -github.com/Monokaix/apis v0.0.0-20241205070443-2cc3b54d83c7/go.mod h1:XHIjTlHDMZTLRg2Y2JAkj85iP0iiet2tv+HfPQZrsHs= github.com/NYTimes/gziphandler v1.1.1 h1:ZUDjpQae29j0ryrS0u/B8HZfJBtBQHjqw2rQ2cqUQ3I= github.com/NYTimes/gziphandler v1.1.1/go.mod h1:n/CVRwUEOgIxrgPvAQhUUr9oeUtvrhMomdKFjzJNB0c= github.com/agiledragon/gomonkey/v2 v2.11.0 h1:5oxSgA+tC1xuGsrIorR+sYiziYltmJyEZ9qA25b6l5U= @@ -512,3 +510,5 @@ sigs.k8s.io/yaml v1.4.0 h1:Mk1wCc2gy/F0THH0TAp1QYyJNzRm2KCLy3o5ASXVI5E= sigs.k8s.io/yaml v1.4.0/go.mod h1:Ejl7/uTz7PSA4eKMyQCUTnhZYNmLIl+5c2lQPGR2BPY= stathat.com/c/consistent v1.0.0 h1:ezyc51EGcRPJUxfHGSgJjWzJdj3NiMU9pNfLNGiXV0c= stathat.com/c/consistent v1.0.0/go.mod h1:QkzMWzcbB+yQBL2AttO6sgsQS/JSTapcDISJalmCDS0= +volcano.sh/apis v1.10.0-alpha.0.0.20241218081838-e5d361b6bfbe h1:iHd1Xt36a7S47IFksuF0h9W9J4LKzhBEz0C9XbkBvB8= +volcano.sh/apis v1.10.0-alpha.0.0.20241218081838-e5d361b6bfbe/go.mod h1:XHIjTlHDMZTLRg2Y2JAkj85iP0iiet2tv+HfPQZrsHs= diff --git a/pkg/scheduler/actions/allocate/allocate.go b/pkg/scheduler/actions/allocate/allocate.go index 0c344bc523..a82add0409 100644 --- a/pkg/scheduler/actions/allocate/allocate.go +++ b/pkg/scheduler/actions/allocate/allocate.go @@ -233,6 +233,8 @@ func (alloc *Action) allocateResourceForTasksWithTopology(tasks *util.PriorityQu hyperNodesWithLeftTasks := make(map[string]*util.PriorityQueue) ssn := alloc.session selectedTier := 0 + jobNewHyperNodeMap := map[string]string{} + jobHyperNode := job.PodGroup.Annotations[api.TopologyAllocateLCAHyperNode] // Find a suitable hyperNode in one tier from down to top everytime to ensure that the selected hyperNode spans the least tier. for index, tier := range alloc.hyperNodesTiers { @@ -245,6 +247,15 @@ func (alloc *Action) allocateResourceForTasksWithTopology(tasks *util.PriorityQu break } for _, hyperNodeName := range ssn.HyperNodesListByTier[tier] { + // job is scheduled for the first time + jobNewHyperNodeMap[hyperNodeName] = hyperNodeName + if jobHyperNode != "" { + jobNewHyperNode, _ := util.FindLCAHyperNode(hyperNodeName, jobHyperNode, nil) + // to do. + // check whether the hyperNode meets the requirements of the topology hard tier. + jobNewHyperNodeMap[hyperNodeName] = jobNewHyperNode + } + nodes, ok := ssn.HyperNodes[hyperNodeName] if !ok { klog.ErrorS(nil, "HyperNode not exists.", "jobName", job.UID, "name", hyperNodeName, "tier", tier) @@ -286,6 +297,8 @@ func (alloc *Action) allocateResourceForTasksWithTopology(tasks *util.PriorityQu klog.V(4).InfoS("Find available hyperNodes for job", "jobName", job.UID, "tier", selectedTier, "hyperNodes", hyperNodes) } stmt, hyperNode := alloc.selectBestHyperNode(jobStmtsByTier[selectedTier], job) + jobNewHyperNode := jobNewHyperNodeMap[hyperNode] + job.PodGroup.GetAnnotations()[api.TopologyAllocateLCAHyperNode] = jobNewHyperNode return stmt, hyperNodesWithLeftTasks[hyperNode] } @@ -390,6 +403,18 @@ func (alloc *Action) allocateResourcesForTasks(tasks *util.PriorityQueue, job *a continue } + // recored hyperNode of the job + if hyperNode == "" { + jobHyperNode := job.PodGroup.Annotations[api.TopologyAllocateLCAHyperNode] + hyperNodeOfNode := util.FindHyperNodeOfNode(bestNode.Name, ssn.HyperNodes) + newJobHyperNode := hyperNodeOfNode + if jobHyperNode != "" { + // job is not scheduled for the first time + newJobHyperNode, _ = util.FindLCAHyperNode(hyperNodeOfNode, jobHyperNode, ssn.HyperNodeTree) + } + job.PodGroup.GetAnnotations()[api.TopologyAllocateLCAHyperNode] = newJobHyperNode + } + alloc.sumNodeScoresInHyperNode(string(job.UID), hyperNode, highestScore) alloc.allocateResourcesForTask(stmt, task, bestNode, job) @@ -480,6 +505,7 @@ func (alloc *Action) allocateResourcesForTask(stmt *framework.Statement, task *a if task.InitResreq.LessEqual(node.Idle, api.Zero) { klog.V(3).Infof("Binding Task <%v/%v> to node <%v>", task.Namespace, task.Name, node.Name) if err := stmt.Allocate(task, node); err != nil { + klog.Errorf("Failed to bind Task %v on %v in Session %v, err: %v", task.UID, node.Name, alloc.session.UID, err) if rollbackErr := stmt.UnAllocate(task); rollbackErr != nil { klog.Errorf("Failed to unallocate Task %v on %v in Session %v for %v.", @@ -492,38 +518,6 @@ func (alloc *Action) allocateResourcesForTask(stmt *framework.Statement, task *a return } - klog.V(3).Infof("Predicates failed in allocate for task <%s/%s> on node <%s> with limited resources", - task.Namespace, task.Name, node.Name) - - // Allocate releasing resource to the task if any. - if task.InitResreq.LessEqual(node.FutureIdle(), api.Zero) { - klog.V(3).Infof("Pipelining Task <%v/%v> to node <%v> for <%v> on <%v>", - task.Namespace, task.Name, node.Name, task.InitResreq, node.Releasing) - if err := stmt.Pipeline(task, node.Name, false); err != nil { - klog.Errorf("Failed to pipeline Task %v on %v in Session %v for %v.", - task.UID, node.Name, alloc.session.UID, err) - } else { - metrics.UpdateE2eSchedulingDurationByJob(job.Name, string(job.Queue), job.Namespace, metrics.Duration(job.CreationTimestamp.Time)) - metrics.UpdateE2eSchedulingLastTimeByJob(job.Name, string(job.Queue), job.Namespace, time.Now()) -======= - if ssn.JobReady(job) { - klog.V(3).InfoS("Job ready, return statement", "jobName", job.UID) - return stmt - } else { - if !ssn.JobPipelined(job) { - // Allocate idle resource to the task. - if task.InitResreq.LessEqual(node.Idle, api.Zero) { - klog.V(3).Infof("Binding Task <%v/%v> to node <%v>", task.Namespace, task.Name, node.Name) - if err := stmt.Allocate(task, node); err != nil { - klog.Errorf("Failed to bind Task %v on %v in Session %v, err: %v", - task.UID, node.Name, alloc.session.UID, err) - } else { - metrics.UpdateE2eSchedulingDurationByJob(job.Name, string(job.Queue), job.Namespace, metrics.Duration(job.CreationTimestamp.Time)) - metrics.UpdateE2eSchedulingLastTimeByJob(job.Name, string(job.Queue), job.Namespace, time.Now()) - } - return - } - klog.V(3).Infof("Predicates failed in allocate for task <%s/%s> on node <%s> with limited resources", task.Namespace, task.Name, node.Name) diff --git a/pkg/scheduler/api/node_info.go b/pkg/scheduler/api/node_info.go index f1fdbdfc32..8d5c8b2cf2 100644 --- a/pkg/scheduler/api/node_info.go +++ b/pkg/scheduler/api/node_info.go @@ -25,6 +25,7 @@ import ( "k8s.io/klog/v2" k8sframework "k8s.io/kubernetes/pkg/scheduler/framework" + "volcano.sh/apis/pkg/apis/scheduling" "volcano.sh/apis/pkg/apis/scheduling/v1beta1" "volcano.sh/volcano/pkg/scheduler/api/devices/nvidia/gpushare" @@ -90,6 +91,15 @@ type NodeInfo struct { ImageStates map[string]*k8sframework.ImageStateSummary } +// Recored podgroup old state +type PodGroupOldState struct { + // podGroupStatus cache podgroup status during schedule + // This should not be mutated after initiated + Status map[JobID]scheduling.PodGroupStatus + // recored old annotations for podgroup, used to detect changes + Annotations map[JobID]map[string]string +} + // FutureIdle returns resources that will be idle in the future: // // That is current idle resources plus released resources minus pipelined resources. diff --git a/pkg/scheduler/api/well_known_labels.go b/pkg/scheduler/api/well_known_labels.go index 2e1db71c74..5801c5b6ce 100644 --- a/pkg/scheduler/api/well_known_labels.go +++ b/pkg/scheduler/api/well_known_labels.go @@ -42,4 +42,7 @@ const ( // topologyDecisionAnnotation is the key of topology decision about pod request resource topologyDecisionAnnotation = "volcano.sh/topology-decision" + + // TopologyAllocateLCAHyperNode is the key to the lowest common ancestor of the network topology to which the tasks assigned to a job belong. + TopologyAllocateLCAHyperNode = "volcano.sh/allocate-lca-hypernode" ) diff --git a/pkg/scheduler/cache/cache.go b/pkg/scheduler/cache/cache.go index 6ba27cc3a8..8bd070dc56 100644 --- a/pkg/scheduler/cache/cache.go +++ b/pkg/scheduler/cache/cache.go @@ -64,6 +64,7 @@ import ( vcinformerv1 "volcano.sh/apis/pkg/client/informers/externalversions/scheduling/v1beta1" "volcano.sh/volcano/cmd/scheduler/app/options" "volcano.sh/volcano/pkg/features" + "volcano.sh/volcano/pkg/scheduler/api" schedulingapi "volcano.sh/volcano/pkg/scheduler/api" volumescheduling "volcano.sh/volcano/pkg/scheduler/capabilities/volumebinding" "volcano.sh/volcano/pkg/scheduler/metrics" @@ -1536,9 +1537,11 @@ func (sc *SchedulerCache) UpdateJobStatus(job *schedulingapi.JobInfo, updatePG b if err != nil { return nil, err } + sc.Mutex.Lock() + sc.Jobs[job.UID].PodGroup.GetAnnotations()[api.TopologyAllocateLCAHyperNode] = job.PodGroup.GetAnnotations()[api.TopologyAllocateLCAHyperNode] + sc.Mutex.Unlock() job.PodGroup = pg } - sc.RecordJobStatusEvent(job, updatePG) return job, nil diff --git a/pkg/scheduler/conf/scheduler_conf.go b/pkg/scheduler/conf/scheduler_conf.go index e911e0b69e..182501ed8f 100644 --- a/pkg/scheduler/conf/scheduler_conf.go +++ b/pkg/scheduler/conf/scheduler_conf.go @@ -87,6 +87,8 @@ type PluginOption struct { EnabledOverused *bool `yaml:"enabledOverused"` // EnabledAllocatable defines whether allocatable is enabled EnabledAllocatable *bool `yaml:"enabledAllocatable"` + // EnabledNetworkTopology defines whether network topology is enabled + EnabledNetworkTopology *bool `yaml:"enabledNetworkTopology"` // Arguments defines the different arguments that can be given to different plugins Arguments map[string]interface{} `yaml:"arguments"` } diff --git a/pkg/scheduler/framework/job_updater.go b/pkg/scheduler/framework/job_updater.go index 80ba52987d..3ee2254a02 100644 --- a/pkg/scheduler/framework/job_updater.go +++ b/pkg/scheduler/framework/job_updater.go @@ -91,10 +91,12 @@ func isPodGroupStatusUpdated(newStatus, oldStatus scheduling.PodGroupStatus) boo func (ju *jobUpdater) updateJob(index int) { job := ju.jobQueue[index] ssn := ju.ssn + oldHyperNode := ssn.PodGroupOldState.Annotations[job.UID][api.TopologyAllocateLCAHyperNode] job.PodGroup.Status = jobStatus(ssn, job) - oldStatus, found := ssn.podGroupStatus[job.UID] - updatePG := !found || isPodGroupStatusUpdated(job.PodGroup.Status, oldStatus) + oldStatus, found := ssn.PodGroupOldState.Status[job.UID] + updatePG := !found || isPodGroupStatusUpdated(job.PodGroup.Status, oldStatus) || oldHyperNode != job.PodGroup.GetAnnotations()[api.TopologyAllocateLCAHyperNode] + if _, err := ssn.cache.UpdateJobStatus(job, updatePG); err != nil { klog.Errorf("Failed to update job <%s/%s>: %v", job.Namespace, job.Name, err) diff --git a/pkg/scheduler/framework/session.go b/pkg/scheduler/framework/session.go index ebacefb696..7752136547 100644 --- a/pkg/scheduler/framework/session.go +++ b/pkg/scheduler/framework/session.go @@ -56,9 +56,10 @@ type Session struct { TotalResource *api.Resource TotalGuarantee *api.Resource - // podGroupStatus cache podgroup status during schedule + + // PodGroupOldState contains podgroup status and annotations during schedule // This should not be mutated after initiated - podGroupStatus map[api.JobID]scheduling.PodGroupStatus + api.PodGroupOldState Jobs map[api.JobID]*api.JobInfo Nodes map[string]*api.NodeInfo @@ -81,6 +82,8 @@ type Session struct { HyperNodesListByTier map[int][]string // HyperNodes maps hyperNode Name -> nodes under the hyperNode. HyperNodes map[string][]*api.NodeInfo + // HyperNodeTree is the hypernode tree of all hypernodes in the cluster. + HyperNodeTree []map[string][]string plugins map[string]Plugin eventHandlers []*EventHandler @@ -127,8 +130,10 @@ func openSession(cache cache.Cache) *Session { TotalResource: api.EmptyResource(), TotalGuarantee: api.EmptyResource(), - podGroupStatus: map[api.JobID]scheduling.PodGroupStatus{}, - + PodGroupOldState: api.PodGroupOldState{ + Status: map[api.JobID]scheduling.PodGroupStatus{}, + Annotations: map[api.JobID]map[string]string{}, + }, Jobs: map[api.JobID]*api.JobInfo{}, Nodes: map[string]*api.NodeInfo{}, CSINodesStatus: map[string]*api.CSINodeStatusInfo{}, @@ -170,7 +175,8 @@ func openSession(cache cache.Cache) *Session { ssn.Jobs = snapshot.Jobs for _, job := range ssn.Jobs { if job.PodGroup != nil { - ssn.podGroupStatus[job.UID] = *job.PodGroup.Status.DeepCopy() + ssn.PodGroupOldState.Status[job.UID] = *job.PodGroup.Status.DeepCopy() + ssn.PodGroupOldState.Annotations[job.UID] = job.PodGroup.GetAnnotations() } if vjr := ssn.JobValid(job); vjr != nil { diff --git a/pkg/scheduler/plugins/networktopologyaware/networktopologyaware.go b/pkg/scheduler/plugins/networktopologyaware/networktopologyaware.go new file mode 100644 index 0000000000..737296e146 --- /dev/null +++ b/pkg/scheduler/plugins/networktopologyaware/networktopologyaware.go @@ -0,0 +1,198 @@ +/* +Copyright 2019 The Volcano Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package networktopologyaware + +import ( + "k8s.io/klog/v2" + + "volcano.sh/volcano/pkg/scheduler/api" + "volcano.sh/volcano/pkg/scheduler/framework" + "volcano.sh/volcano/pkg/scheduler/util" +) + +const ( + // PluginName indicates name of volcano scheduler plugin. + PluginName = "networktopologyaware" + BaseScore = 100.0 + TaskBaseScore = 10.0 + ZeroScore = 0.0 + NetworkTopologyWeight = "weight" +) + +type networkTopologyAwarePlugin struct { + // Arguments given for the plugin + pluginArguments framework.Arguments +} + +// New function returns prioritizePlugin object +func New(arguments framework.Arguments) framework.Plugin { + return &networkTopologyAwarePlugin{ + pluginArguments: arguments, + } +} + +func (nta *networkTopologyAwarePlugin) Name() string { + return PluginName +} + +func calculateWeight(args framework.Arguments) int { + weight := 1 + args.GetInt(&weight, NetworkTopologyWeight) + return weight +} + +func (nta *networkTopologyAwarePlugin) OnSessionOpen(ssn *framework.Session) { + klog.V(5).Infof("Enter networkTopologyAwarePlugin plugin ...") + defer func() { + klog.V(5).Infof("Leaving networkTopologyAware plugin ...") + }() + + weight := calculateWeight(nta.pluginArguments) + hyperNodeFn := func(job *api.JobInfo, hyperNodes map[string][]*api.NodeInfo) (map[string]float64, error) { + hyperNodeScores := make(map[string]float64) + jobHyperNode := job.PodGroup.GetAnnotations()[api.TopologyAllocateLCAHyperNode] + // job is scheduled for the first time, All hyperNodes have the same score.. + if jobHyperNode == "" { + for hyperNode := range hyperNodes { + hyperNodeScores[hyperNode] = ZeroScore + } + return hyperNodeScores, nil + } + // job is not scheduled for the first time, calculate score based on hypernode tree. + maxScore := ZeroScore + scoreHyperNode := map[float64][]string{} + for hyperNode := range hyperNodes { + score := networkTopologyAwareScore(hyperNode, job, ssn.HyperNodeTree) + score *= float64(weight) + hyperNodeScores[hyperNode] = score + if score >= maxScore { + maxScore = score + scoreHyperNode[score] = append(scoreHyperNode[score], hyperNode) + } + } + // calculate score based on task num if max score hyperNode has more than one. + if len(scoreHyperNode[maxScore]) > 1 { + for hyperNode, score := range hyperNodeScores { + if score == maxScore { + taskNumScore := networkTopologyAwareScoreWithTaskNum(hyperNode, job, ssn.HyperNodes) + taskNumScore *= float64(weight) + hyperNodeScores[hyperNode] += taskNumScore + } + } + } + + klog.V(1).Infof("networkTopologyAware score is: %v", hyperNodeScores) + return hyperNodeScores, nil + } + + nodeFn := func(task *api.TaskInfo, nodes []*api.NodeInfo) (map[string]float64, error) { + nodeScores := make(map[string]float64) + + taskJob := ssn.Jobs[task.Job] + jobHyperNode := taskJob.PodGroup.GetAnnotations()[api.TopologyAllocateLCAHyperNode] + // job fist first scheduler, All node have the same score. + if jobHyperNode == "" { + for _, node := range nodes { + nodeScores[node.Name] = ZeroScore + } + return nodeScores, nil + } + // job not first scheduler, calculate score based on hypernode tree. + for _, node := range nodes { + hyperNode := util.FindHyperNodeOfNode(node.Name, ssn.HyperNodes) + score := networkTopologyAwareScore(hyperNode, taskJob, ssn.HyperNodeTree) + score *= float64(weight) + nodeScores[node.Name] = score + } + + maxScore := ZeroScore + scoreNodes := map[float64][]string{} + for _, node := range nodes { + hyperNode := util.FindHyperNodeOfNode(node.Name, ssn.HyperNodes) + score := networkTopologyAwareScore(hyperNode, taskJob, ssn.HyperNodeTree) + score *= float64(weight) + nodeScores[node.Name] = score + if score >= maxScore { + maxScore = score + scoreNodes[score] = append(scoreNodes[score], node.Name) + } + } + // calculate score based on task num if max score hyperNode has more than one. + if len(scoreNodes[maxScore]) > 1 { + for node, score := range nodeScores { + if score == maxScore { + hyperNode := util.FindHyperNodeOfNode(node, ssn.HyperNodes) + taskNumScore := networkTopologyAwareScoreWithTaskNum(hyperNode, taskJob, ssn.HyperNodes) + taskNumScore *= float64(weight) + nodeScores[node] += taskNumScore + } + } + } + + klog.V(1).Infof("networkTopologyAware score is: %v", nodeScores) + return nodeScores, nil + } + + ssn.AddHyperNodeOrederFn(nta.Name(), hyperNodeFn) + ssn.AddBatchNodeOrderFn(nta.Name(), nodeFn) +} + +func (bp *networkTopologyAwarePlugin) OnSessionClose(ssn *framework.Session) { +} + +// networkTopologyAwareScore use the best fit polices during scheduling. + +// Goals: +// - The tier index to which the LCAHyperNode of a job belongs should be as low as possible. +func networkTopologyAwareScore(hyperNodeName string, job *api.JobInfo, hyperNodeTree []map[string][]string) float64 { + jobHyperNode := job.PodGroup.GetAnnotations()[api.TopologyAllocateLCAHyperNode] + + if jobHyperNode == hyperNodeName { + return BaseScore + } + // Calculate hyperNode tier index score. + _, index := util.FindLCAHyperNode(hyperNodeName, jobHyperNode, hyperNodeTree) + if index <= 0 { + klog.V(4).Infof("find LCAhyperNode failed wtih %s in hyperNodeTree", hyperNodeName) + return 0.0 + } + tierIndexScore := BaseScore * scoreHyperNodeWithIndex(index, 1, len(hyperNodeTree)) + + return tierIndexScore +} + +// Goals: +// - Tasks under a job should be scheduled to one hyperNode as much as possible. +func networkTopologyAwareScoreWithTaskNum(hyperNodeName string, job *api.JobInfo, hyperNodes map[string][]*api.NodeInfo) float64 { + // Calculate tasks num score. + taskNum := util.FindJobTaskNumOfHyperNode(hyperNodeName, job, hyperNodes) + taskNumScore := ZeroScore + if len(job.Tasks) > 0 { + taskNumScore = TaskBaseScore * scoreHyperNodeWithTaskNum(taskNum, len(job.Tasks)) + } + return taskNumScore +} + +func scoreHyperNodeWithIndex(index int, minIndex int, maxIndex int) float64 { + // Use tier index to calculate scores and map the original score to the range between 0 and 1. + return float64(maxIndex-index) / float64(maxIndex-minIndex) +} + +func scoreHyperNodeWithTaskNum(taskNum int, maxTaskNum int) float64 { + // Calculate task distribution rate as score and make sure the original score to the range between 0 and 1. + return float64(taskNum) / float64(maxTaskNum) +} diff --git a/pkg/scheduler/plugins/networktopologyaware/networktopologyaware_test.go b/pkg/scheduler/plugins/networktopologyaware/networktopologyaware_test.go new file mode 100644 index 0000000000..9c1d5485ed --- /dev/null +++ b/pkg/scheduler/plugins/networktopologyaware/networktopologyaware_test.go @@ -0,0 +1,616 @@ +/* +Copyright 2019 The Volcano Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package networktopologyaware + +import ( + "math" + "testing" + + "volcano.sh/volcano/pkg/scheduler/api" + "volcano.sh/volcano/pkg/scheduler/conf" + "volcano.sh/volcano/pkg/scheduler/framework" + "volcano.sh/volcano/pkg/scheduler/uthelper" +) + +const ( + eps = 1e-1 +) + +func TestArguments(t *testing.T) { + framework.RegisterPluginBuilder(PluginName, New) + defer framework.CleanupPluginBuilders() + + arguments := framework.Arguments{ + "weight": 2, + } + + builder, ok := framework.GetPluginBuilder(PluginName) + if !ok { + t.Fatalf("should have plugin named %s", PluginName) + } + + plugin := builder(arguments) + networkTopologyAware, ok := plugin.(*networkTopologyAwarePlugin) + if !ok { + t.Fatalf("plugin should be %T, but not %T", networkTopologyAware, plugin) + } + weight := calculateWeight(networkTopologyAware.pluginArguments) + if weight != 2 { + t.Errorf("weight should be 2, but get %v", weight) + } +} + +func TestNetworkTopologyAwareHyperNodeScore(t *testing.T) { + tests := []struct { + name string + uthelper.TestCommonStruct + arguments framework.Arguments + hyperNodes map[string][]*api.NodeInfo + hyperNodeTree []map[string][]string + jobHyperNode string + tasks map[string]string + expected map[string]float64 + }{ + { + name: "Job first scheduler when all hyperNode score is 0.0", + TestCommonStruct: uthelper.TestCommonStruct{ + Plugins: map[string]framework.PluginBuilder{PluginName: New}, + }, + arguments: framework.Arguments{ + "weight": 1, + }, + hyperNodes: map[string][]*api.NodeInfo{ + "hyperNode3": nil, + "hyperNode4": nil, + "hyperNode5": nil, + }, + jobHyperNode: "", + hyperNodeTree: []map[string][]string{ + { + "hyperNode0": []string{"hyperNode1", "hyperNode2"}, + }, + { + "hyperNode1": []string{"hyperNode3", "hyperNode4"}, + "hyperNode2": []string{"hyperNode5", "hyperNode6"}, + }, + { + "hyperNode3": []string{"node1", "node2"}, + "hyperNode4": []string{"node3", "node4"}, + "hyperNode5": []string{"node5", "node6"}, + "hyperNode6": []string{"node7", "node8"}, + }, + }, + expected: map[string]float64{ + "hyperNode3": 0.0, + "hyperNode4": 0.0, + "hyperNode5": 0.0, + }, + }, + // test case 2:Job is not first scheduler when the `HyperNode` of the job is not empty and the jobHyperNode hyperNode3, + // for the hyperNode3, it is equls to the jobHyperNode, it is the best choice for the job, so it is expected to return 100.0 for the score. + // for the hyperNode4, find the LCA hyperNode is the hyperNode1 of tier index 2, it is a not good choice. according to calculate to return 36.9 for the score. + // for the hyperNode5, find the LCA hyperNode is the hyperNode0 of tier index 3, it is a worst choice. so it is expected to return 0.0 for the score. + { + name: "Job not first scheduler when hyperNodes will score according to LCA hyperNode of the job", + TestCommonStruct: uthelper.TestCommonStruct{ + Plugins: map[string]framework.PluginBuilder{PluginName: New}, + }, + arguments: framework.Arguments{ + "weight": 1, + }, + hyperNodes: map[string][]*api.NodeInfo{ + "hyperNode3": { + {Name: "node1"}, + {Name: "node2"}, + }, + "hyperNode4": { + {Name: "node3"}, + {Name: "node4"}, + }, + "hyperNode5": { + {Name: "node5"}, + {Name: "node6"}, + }, + "hyperNode6": { + {Name: "node7"}, + {Name: "node8"}, + }, + }, + jobHyperNode: "hyperNode3", + hyperNodeTree: []map[string][]string{ + { + "hyperNode0": []string{"hyperNode1", "hyperNode2"}, + }, + { + "hyperNode1": []string{"hyperNode3", "hyperNode4"}, + "hyperNode2": []string{"hyperNode5", "hyperNode6"}, + }, + { + "hyperNode3": []string{}, + "hyperNode4": []string{}, + "hyperNode5": []string{}, + "hyperNode6": []string{}, + }, + }, + expected: map[string]float64{ + "hyperNode3": 100.0, + "hyperNode4": 50.0, + "hyperNode5": 0.0, + }, + }, + { + name: "Job not first scheduler to score for hyperNodes with weight 2", + TestCommonStruct: uthelper.TestCommonStruct{ + Plugins: map[string]framework.PluginBuilder{PluginName: New}, + }, + arguments: framework.Arguments{ + "weight": 2, + }, + hyperNodes: map[string][]*api.NodeInfo{ + "hyperNode3": { + {Name: "node1"}, + {Name: "node2"}, + }, + "hyperNode4": { + {Name: "node3"}, + {Name: "node4"}, + }, + "hyperNode5": { + {Name: "node5"}, + {Name: "node6"}, + }, + "hyperNode6": { + {Name: "node7"}, + {Name: "node8"}, + }, + }, + jobHyperNode: "hyperNode3", + hyperNodeTree: []map[string][]string{ + { + "hyperNode0": []string{"hyperNode1", "hyperNode2"}, + }, + { + "hyperNode1": []string{"hyperNode3", "hyperNode4"}, + "hyperNode2": []string{"hyperNode5", "hyperNode6"}, + }, + { + "hyperNode3": []string{}, + "hyperNode4": []string{}, + "hyperNode5": []string{}, + "hyperNode6": []string{}, + }, + }, + expected: map[string]float64{ + "hyperNode3": 200.0, + "hyperNode4": 100.0, + "hyperNode5": 0.0, + }, + }, + { + name: "Job not first scheduler when tier index maxScore has two hyperNodes, then hyperNodes score will add task sum score", + TestCommonStruct: uthelper.TestCommonStruct{ + Plugins: map[string]framework.PluginBuilder{PluginName: New}, + }, + arguments: framework.Arguments{ + "weight": 1, + }, + hyperNodes: map[string][]*api.NodeInfo{ + "hyperNode3": { + {Name: "node1"}, + {Name: "node2"}, + }, + "hyperNode4": { + {Name: "node3"}, + {Name: "node4"}, + }, + "hyperNode5": { + {Name: "node5"}, + {Name: "node6"}, + }, + "hyperNode6": { + {Name: "node7"}, + {Name: "node8"}, + }, + }, + jobHyperNode: "hyperNode1", + tasks: map[string]string{ + "task1": "node1", + "task2": "node2", + "task3": "node3", + "test4": "", + }, + hyperNodeTree: []map[string][]string{ + { + "hyperNode0": []string{"hyperNode1", "hyperNode2"}, + }, + { + "hyperNode1": []string{"hyperNode3", "hyperNode4"}, + "hyperNode2": []string{"hyperNode5", "hyperNode6"}, + }, + { + "hyperNode3": []string{}, + "hyperNode4": []string{}, + "hyperNode5": []string{}, + "hyperNode6": []string{}, + }, + }, + expected: map[string]float64{ + "hyperNode3": 55.0, + "hyperNode4": 52.5, + "hyperNode5": 0.0, + }, + }, + } + + trueValue := true + for i, test := range tests { + tiers := []conf.Tier{ + { + Plugins: []conf.PluginOption{ + { + Name: PluginName, + EnabledNetworkTopology: &trueValue, + Arguments: test.arguments, + }, + }, + }, + } + ssn := test.RegisterSession(tiers, nil) + ssn.HyperNodes = test.hyperNodes + // mock for test + ssn.HyperNodeTree = test.hyperNodeTree + job := &api.JobInfo{ + Name: "test-job", + PodGroup: &api.PodGroup{}, + } + job.PodGroup.SetAnnotations(map[string]string{api.TopologyAllocateLCAHyperNode: test.jobHyperNode}) + job.Tasks = make(map[api.TaskID]*api.TaskInfo) + for name, node := range test.tasks { + taskInfo := &api.TaskInfo{ + UID: api.TaskID(name), + Name: name, + Job: job.UID, + } + job.Tasks[taskInfo.UID] = taskInfo + taskInfo.NodeName = node + } + scores, err := ssn.HyperNodeOrderMapFn(job, ssn.HyperNodes) + if err != nil { + t.Errorf("case%d: task %s has err %v", i, test.Name, err) + continue + } + hyperNodesScore := scores[PluginName] + for hypernode, expected := range test.expected { + if math.Abs(hyperNodesScore[hypernode]-expected) > eps { + t.Errorf("case%d: task %s on hypernode %s expect have score %v, but get %v", i+1, test.name, hypernode, expected, hyperNodesScore[hypernode]) + } + } + } +} + +func TestNetworkTopologyAwareNodeScore(t *testing.T) { + tests := []struct { + name string + uthelper.TestCommonStruct + arguments framework.Arguments + nodes []*api.NodeInfo + hyperNodeTree []map[string][]string + jobHyperNode string + tasks map[string]string + hyerNodes map[string][]*api.NodeInfo + expected map[string]float64 + }{ + { + name: "task first scheduler of the job when all node score is 0.0", + TestCommonStruct: uthelper.TestCommonStruct{ + Plugins: map[string]framework.PluginBuilder{PluginName: New}, + }, + arguments: framework.Arguments{ + "weight": 1, + }, + nodes: []*api.NodeInfo{ + { + Name: "node1", + }, + { + Name: "node3", + }, + { + Name: "node5", + }, + }, + jobHyperNode: "", + hyperNodeTree: []map[string][]string{ + { + "hyperNode0": []string{"hyperNode1", "hyperNode2"}, + }, + { + "hyperNode1": []string{"hyperNode3", "hyperNode4"}, + "hyperNode2": []string{"hyperNode5", "hyperNode6"}, + }, + { + "hyperNode3": []string{}, + "hyperNode4": []string{}, + "hyperNode5": []string{}, + "hyperNode6": []string{}, + }, + }, + hyerNodes: map[string][]*api.NodeInfo{ + "hyperNode3": { + {Name: "node1"}, + {Name: "node2"}, + }, + "hyperNode4": { + {Name: "node3"}, + {Name: "node4"}, + }, + "hyperNode5": { + {Name: "node5"}, + {Name: "node6"}, + }, + "hyperNode6": { + {Name: "node7"}, + {Name: "node8"}, + }, + }, + expected: map[string]float64{ + "node1": 0.0, + "node3": 0.0, + "node5": 0.0, + }, + }, + { + name: "task not first scheduler of job when nodes will score according to hyperNode of the node", + TestCommonStruct: uthelper.TestCommonStruct{ + Plugins: map[string]framework.PluginBuilder{PluginName: New}, + }, + arguments: framework.Arguments{ + "weight": 1, + }, + nodes: []*api.NodeInfo{ + { + Name: "node1", + }, + { + Name: "node3", + }, + { + Name: "node5", + }, + }, + jobHyperNode: "hyperNode3", + hyperNodeTree: []map[string][]string{ + { + "hyperNode0": []string{"hyperNode1", "hyperNode2"}, + }, + { + "hyperNode1": []string{"hyperNode3", "hyperNode4"}, + "hyperNode2": []string{"hyperNode5", "hyperNode6"}, + }, + { + "hyperNode3": []string{}, + "hyperNode4": []string{}, + "hyperNode5": []string{}, + "hyperNode6": []string{}, + }, + }, + hyerNodes: map[string][]*api.NodeInfo{ + "hyperNode3": { + {Name: "node1"}, + {Name: "node2"}, + }, + "hyperNode4": { + {Name: "node3"}, + {Name: "node4"}, + }, + "hyperNode5": { + {Name: "node5"}, + {Name: "node6"}, + }, + "hyperNode6": { + {Name: "node7"}, + {Name: "node8"}, + }, + }, + expected: map[string]float64{ + "node1": 100.0, + "node3": 50.0, + "node5": 0.0, + }, + }, + { + name: "task not first scheduler of the job when nodes will score for hyperNodes with weight 2", + TestCommonStruct: uthelper.TestCommonStruct{ + Plugins: map[string]framework.PluginBuilder{PluginName: New}, + }, + arguments: framework.Arguments{ + "weight": 2, + }, + nodes: []*api.NodeInfo{ + { + Name: "node1", + }, + { + Name: "node3", + }, + { + Name: "node5", + }, + }, + jobHyperNode: "hyperNode3", + hyperNodeTree: []map[string][]string{ + { + "hyperNode0": []string{"hyperNode1", "hyperNode2"}, + }, + { + "hyperNode1": []string{"hyperNode3", "hyperNode4"}, + "hyperNode2": []string{"hyperNode5", "hyperNode6"}, + }, + { + "hyperNode3": []string{}, + "hyperNode4": []string{}, + "hyperNode5": []string{}, + "hyperNode6": []string{}, + }, + }, + hyerNodes: map[string][]*api.NodeInfo{ + "hyperNode3": { + {Name: "node1"}, + {Name: "node2"}, + }, + "hyperNode4": { + {Name: "node3"}, + {Name: "node4"}, + }, + "hyperNode5": { + {Name: "node5"}, + {Name: "node6"}, + }, + "hyperNode6": { + {Name: "node7"}, + {Name: "node8"}, + }, + }, + expected: map[string]float64{ + "node1": 200.0, + "node3": 100.0, + "node5": 0.0, + }, + }, + { + name: "task not first scheduler of the job when tier index maxScore has two nodes, then nodes score will add task sum score", + TestCommonStruct: uthelper.TestCommonStruct{ + Plugins: map[string]framework.PluginBuilder{PluginName: New}, + }, + arguments: framework.Arguments{ + "weight": 1, + }, + nodes: []*api.NodeInfo{ + { + Name: "node1", + }, + { + Name: "node3", + }, + { + Name: "node5", + }, + }, + jobHyperNode: "hyperNode1", + hyperNodeTree: []map[string][]string{ + { + "hyperNode0": []string{"hyperNode1", "hyperNode2"}, + }, + { + "hyperNode1": []string{"hyperNode3", "hyperNode4"}, + "hyperNode2": []string{"hyperNode5", "hyperNode6"}, + }, + { + "hyperNode3": []string{}, + "hyperNode4": []string{}, + "hyperNode5": []string{}, + "hyperNode6": []string{}, + }, + }, + hyerNodes: map[string][]*api.NodeInfo{ + "hyperNode3": { + {Name: "node1"}, + {Name: "node2"}, + }, + "hyperNode4": { + {Name: "node3"}, + {Name: "node4"}, + }, + "hyperNode5": { + {Name: "node5"}, + {Name: "node6"}, + }, + "hyperNode6": { + {Name: "node7"}, + {Name: "node8"}, + }, + }, + tasks: map[string]string{ + "task1": "node1", + "task2": "node2", + "task3": "node3", + "test4": "", + }, + expected: map[string]float64{ + "node1": 55.0, + "node3": 52.5, + "node5": 0.0, + }, + }, + } + + trueValue := true + for i, test := range tests { + tiers := []conf.Tier{ + { + Plugins: []conf.PluginOption{ + { + Name: PluginName, + EnabledNodeOrder: &trueValue, + Arguments: test.arguments, + }, + }, + }, + } + ssn := test.RegisterSession(tiers, nil) + // mock for test + ssn.HyperNodeTree = test.hyperNodeTree + // mock job + job := &api.JobInfo{ + UID: "test-job", + Name: "test-job", + PodGroup: &api.PodGroup{}, + } + job.PodGroup.SetAnnotations(map[string]string{api.TopologyAllocateLCAHyperNode: test.jobHyperNode}) + // mock session + ssn.Jobs = map[api.JobID]*api.JobInfo{ + job.UID: job, + } + ssn.HyperNodes = test.hyerNodes + job.Tasks = make(map[api.TaskID]*api.TaskInfo) + for name, node := range test.tasks { + taskInfo := &api.TaskInfo{ + UID: api.TaskID(name), + Name: name, + Job: job.UID, + } + taskInfo.NodeName = node + job.Tasks[taskInfo.UID] = taskInfo + } + ssn.Jobs[job.UID] = job + // mock task + task := &api.TaskInfo{ + Name: "test4", + Job: job.UID, + } + scores, err := ssn.BatchNodeOrderFn(task, test.nodes) + if err != nil { + t.Errorf("case%d: task %s has err %v", i, test.Name, err) + continue + } + for node, expected := range test.expected { + if math.Abs(scores[node]-expected) > eps { + t.Errorf("case%d: task %s on node %s expect have score %v, but get %v", i+1, test.name, node, expected, scores[node]) + } + } + } +} diff --git a/pkg/scheduler/uthelper/helper.go b/pkg/scheduler/uthelper/helper.go index a534c0f465..61a2e72d1e 100644 --- a/pkg/scheduler/uthelper/helper.go +++ b/pkg/scheduler/uthelper/helper.go @@ -189,14 +189,6 @@ func (test *TestCommonStruct) CheckBind(caseIndex int) error { } } - if test.MinimalBindCheck { - return nil - } - - if test.ExpectBindsNum != len(test.ExpectBindMap) { - return fmt.Errorf("invalid setting for binding check: want bind count %d, want bind result length %d", test.ExpectBindsNum, len(test.ExpectBindMap)) - } - // in case expected test.BindsNum is 0, but actually there is a binding and wait the binding goroutine to run select { case <-time.After(50 * time.Millisecond): diff --git a/pkg/scheduler/util/scheduler_helper.go b/pkg/scheduler/util/scheduler_helper.go index 4ef4ce8914..14367ee706 100644 --- a/pkg/scheduler/util/scheduler_helper.go +++ b/pkg/scheduler/util/scheduler_helper.go @@ -137,29 +137,31 @@ func PrioritizeHyperNodes(candidateHyperNodes map[string][]*api.NodeInfo, nodeSc if err != nil { return nil, err } - // plugin scores of hyperNode. for pluginName, scores := range mapScores { for hyperNode, score := range scores { klog.V(5).InfoS("Add plugin score at hypeNode", "jobName", job.UID, "pluginName", pluginName, "hyperNodeName", hyperNode, "score", score) hyperNodesScoreMap[hyperNode] += score + } } - // accumulate node scores in NodeOrder and hyperNode score itself as the final score of each hyperNode. for hyperNodeName, score := range nodeScoresInHyperNode { klog.V(5).InfoS("Add node level scores to final hyperNode score", "jobName", job.UID, "hyperNodeName", hyperNodeName, "score", score) hyperNodesScoreMap[hyperNodeName] += score } - + hyperNodeScores := make(map[float64][]string) + hyperNodeScoreMap := make(map[string]float64) + for hyperNodeName := range candidateHyperNodes { + // If no plugin is applied to this node, the default is 0.0 + score := 0.0 + if value, ok := hyperNodesScoreMap[hyperNodeName]; ok { score += value } hyperNodeScores[score] = append(hyperNodeScores[score], hyperNodeName) - if klog.V(5).Enabled() { hyperNodeScoreMap[hyperNodeName] = score } } - klog.V(5).InfoS("Prioritize hyperNode score map for job", "jobName", job.UID, "scoreMap", hyperNodeScoreMap) return hyperNodeScores, nil } @@ -215,24 +217,6 @@ func SelectBestHyperNode(hyperNodeScores map[float64][]string) string { return bestHyperNodes[rand.Intn(len(bestHyperNodes))] } -// SelectBestHyperNode return the best hyperNode name whose score is highest, pick one randomly if there are many hyperNodes with same score. -func SelectBestHyperNode(hyperNodeScores map[float64][]string) string { - var bestHyperNodes []string - maxScore := -1.0 - for score, hyperNodes := range hyperNodeScores { - if score > maxScore { - maxScore = score - bestHyperNodes = hyperNodes - } - } - - if len(bestHyperNodes) == 0 { - return "" - } - - return bestHyperNodes[rand.Intn(len(bestHyperNodes))] -} - // GetNodeList returns values of the map 'nodes' func GetNodeList(nodes map[string]*api.NodeInfo, nodeList []string) []*api.NodeInfo { result := make([]*api.NodeInfo, 0, len(nodeList)) @@ -306,3 +290,73 @@ func ConvertRes2ResList(res *api.Resource) v1.ResourceList { } return rl } + +// Find the hyperNode to which the node belongs. +func FindHyperNodeOfNode(nodeName string, hyperNodes map[string][]*api.NodeInfo) string { + for hyperNode, nodes := range hyperNodes { + for _, node := range nodes { + if node.Name == nodeName { + return hyperNode + } + } + } + return "" +} + +// FindJobTaskNumOfHyperNode find out the number of tasks in the job that belong to the hyperNode. +func FindJobTaskNumOfHyperNode(hyperNodeName string, job *api.JobInfo, hyperNodes map[string][]*api.NodeInfo) int { + nodes := hyperNodes[hyperNodeName] + taskCount := 0 + for _, task := range job.Tasks { + for _, node := range nodes { + if node.Name == task.NodeName { + taskCount++ + break + } + } + } + return taskCount +} + +// FindLCAHyperNode finds out the common ancestor of the current hypernode and the hypernode where the job is scheduled +func FindLCAHyperNode(hyperNodeName string, jobHyperNode string, hyperNodeTree []map[string][]string) (string, int) { + revertHyperNodeTree := make([]map[string][]string, len(hyperNodeTree)) + for i := len(hyperNodeTree) - 1; i >= 0; i-- { + revertHyperNodeTree[len(hyperNodeTree)-1-i] = hyperNodeTree[i] + } + + hyperNodesMap := make(map[string]sets.Set[string]) + for i := 0; i < len(revertHyperNodeTree); i++ { + for name, children := range revertHyperNodeTree[i] { + hyperNodesMap[name] = sets.Set[string]{} + hyperNodesMap[name].Insert(name) + for _, child := range children { + hyperNodesMap[name].Insert(child) + if v, ok := hyperNodesMap[child]; ok { + hyperNodesMap[name] = hyperNodesMap[name].Union(v) + } + } + } + } + + hyperNodesListByTier := [][]string{} + for i := 0; i < len(revertHyperNodeTree); i++ { + hyperNodes := []string{} + for name := range revertHyperNodeTree[i] { + hyperNodes = append(hyperNodes, name) + } + hyperNodesListByTier = append(hyperNodesListByTier, hyperNodes) + } + + for index, tierHyperNodes := range hyperNodesListByTier { + for _, hyperNode := range tierHyperNodes { + hyperNodeSet := hyperNodesMap[hyperNode] + if hyperNodeSet.Has(hyperNodeName) { + if jobHyperNode == "" || hyperNodeSet.Has(jobHyperNode) { + return hyperNode, index + 1 + } + } + } + } + return "", -1 +} diff --git a/pkg/scheduler/util/scheduler_helper_test.go b/pkg/scheduler/util/scheduler_helper_test.go index 3697384a62..befa7c1657 100644 --- a/pkg/scheduler/util/scheduler_helper_test.go +++ b/pkg/scheduler/util/scheduler_helper_test.go @@ -19,8 +19,6 @@ package util import ( "testing" - "k8s.io/apimachinery/pkg/util/sets" - "github.com/stretchr/testify/assert" "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/util/sets" @@ -242,3 +240,262 @@ func TestGetHyperNodeList(t *testing.T) { }) } } + +func TestFindLCAHyperNode(t *testing.T) { + testCases := []struct { + name string + hyperNodeName string + JobHyperNode string + hyperNodeTree []map[string][]string + expectedNode string + expectedLevel int + }{ + { + name: "Job hyperNode is empty", + hyperNodeName: "hyperNode3", + JobHyperNode: "", + hyperNodeTree: []map[string][]string{ + { + "hyperNode0": []string{"hyperNode1", "hyperNode2"}, + }, + { + "hyperNode1": []string{"hyperNode3", "hyperNode4"}, + "hyperNode2": []string{"hyperNode5", "hyperNode6"}, + }, + { + "hyperNode3": []string{}, + "hyperNode4": []string{}, + "hyperNode5": []string{}, + "hyperNode6": []string{}, + }, + }, + expectedNode: "hyperNode3", + expectedLevel: 1, + }, + { + name: "Job hyperNode equals input hyperNodeName", + hyperNodeName: "hyperNode3", + JobHyperNode: "hyperNode3", + hyperNodeTree: []map[string][]string{ + { + "hyperNode0": []string{"hyperNode1", "hyperNode2"}, + }, + { + "hyperNode1": []string{"hyperNode3", "hyperNode4"}, + "hyperNode2": []string{"hyperNode5", "hyperNode6"}, + }, + { + "hyperNode3": []string{}, + "hyperNode4": []string{}, + "hyperNode5": []string{}, + "hyperNode6": []string{}, + }, + }, + expectedNode: "hyperNode3", + expectedLevel: 1, + }, + { + name: "Normal LCA find for non-leaf node", + hyperNodeName: "hyperNode4", + JobHyperNode: "hyperNode1", + hyperNodeTree: []map[string][]string{ + { + "hyperNode0": []string{"hyperNode1", "hyperNode2"}, + }, + { + "hyperNode1": []string{"hyperNode3", "hyperNode4"}, + "hyperNode2": []string{"hyperNode5", "hyperNode6"}, + }, + { + "hyperNode3": []string{}, + "hyperNode4": []string{}, + "hyperNode5": []string{}, + "hyperNode6": []string{}, + }, + }, + expectedNode: "hyperNode1", + expectedLevel: 2, + }, + { + name: "Find LCA for hyperNodes in different branches", + hyperNodeName: "hyperNode5", + JobHyperNode: "hyperNode1", + hyperNodeTree: []map[string][]string{ + { + "hyperNode0": []string{"hyperNode1", "hyperNode2"}, + }, + { + "hyperNode1": []string{"hyperNode3", "hyperNode4"}, + "hyperNode2": []string{"hyperNode5", "hyperNode6"}, + }, + { + "hyperNode3": []string{}, + "hyperNode4": []string{}, + "hyperNode5": []string{}, + "hyperNode6": []string{}, + }, + }, + expectedNode: "hyperNode0", + expectedLevel: 3, + }, + { + name: "Find LCA for hyperNodes in different branches", + hyperNodeName: "hyperNode1", + JobHyperNode: "hyperNode2", + hyperNodeTree: []map[string][]string{ + { + "hyperNode0": []string{"hyperNode1", "hyperNode2"}, + }, + { + "hyperNode1": []string{"hyperNode3", "hyperNode4"}, + "hyperNode2": []string{"hyperNode5", "hyperNode6"}, + }, + { + "hyperNode3": []string{}, + "hyperNode4": []string{}, + "hyperNode5": []string{}, + "hyperNode6": []string{}, + }, + }, + expectedNode: "hyperNode0", + expectedLevel: 3, + }, + { + name: "No LCA hyperNode found for non-existent node", + hyperNodeName: "nonExistentNode", + JobHyperNode: "", + hyperNodeTree: []map[string][]string{ + { + "hyperNode0": []string{"hyperNode1", "hyperNode2"}, + }, + { + "hyperNode1": []string{"hyperNode3", "hyperNode4"}, + "hyperNode2": []string{"hyperNode5", "hyperNode6"}, + }, + { + "hyperNode3": []string{}, + "hyperNode4": []string{}, + "hyperNode5": []string{}, + "hyperNode6": []string{}, + }, + }, + expectedNode: "", + expectedLevel: -1, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + resultNode, resultLevel := FindLCAHyperNode(tc.hyperNodeName, tc.JobHyperNode, tc.hyperNodeTree) + if resultNode != tc.expectedNode || resultLevel != tc.expectedLevel { + t.Errorf("Test case '%s' failed. Expected node: %s, level: %d. Got node: %s, level: %d", + tc.name, tc.expectedNode, tc.expectedLevel, resultNode, resultLevel) + } + }) + } +} + +func TestFindJobTaskNumOfHyperNode(t *testing.T) { + testCases := []struct { + name string + hyperNodeName string + tasks map[string]string + hyperNodes map[string][]*api.NodeInfo + expectedRes int + }{ + { + name: "Normal case with matching tasks", + hyperNodeName: "hyperNode1", + tasks: map[string]string{ + "task1": "node1", + "task2": "node2", + }, + hyperNodes: map[string][]*api.NodeInfo{ + "hyperNode1": { + {Name: "node1"}, + {Name: "node3"}, + }, + }, + expectedRes: 1, + }, + { + name: "No matching tasks case", + hyperNodeName: "hyperNode1", + tasks: map[string]string{ + "task1": "node4", + "task2": "node5", + }, + hyperNodes: map[string][]*api.NodeInfo{ + "hyperNode1": { + {Name: "node1"}, + {Name: "node3"}, + }, + }, + expectedRes: 0, + }, + { + name: "Empty job tasks map case", + hyperNodeName: "hyperNode1", + tasks: map[string]string{}, + hyperNodes: map[string][]*api.NodeInfo{ + "hyperNode1": { + {Name: "node1"}, + {Name: "node3"}, + }, + }, + expectedRes: 0, + }, + { + name: "Empty nodes list for hyperNode case", + hyperNodeName: "hyperNode2", + tasks: map[string]string{ + "task1": "node1", + "task2": "node2", + }, + hyperNodes: map[string][]*api.NodeInfo{ + "hyperNode2": {}, + }, + expectedRes: 0, + }, + { + name: "Tasks with duplicate match in multiple hyperNodes", + hyperNodeName: "hyperNode1", + tasks: map[string]string{ + "task1": "node1", + }, + hyperNodes: map[string][]*api.NodeInfo{ + "hyperNode1": { + {Name: "node1"}, + }, + "hyperNode2": { + {Name: "node1"}, + }, + }, + expectedRes: 1, + }, + } + + job := &api.JobInfo{ + Name: "test-job", + PodGroup: &api.PodGroup{}, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + job.Tasks = make(map[api.TaskID]*api.TaskInfo) + for name, node := range tc.tasks { + taskInfo := &api.TaskInfo{ + UID: api.TaskID(name), + Name: name, + Job: job.UID, + } + taskInfo.NodeName = node + job.Tasks[taskInfo.UID] = taskInfo + } + result := FindJobTaskNumOfHyperNode(tc.hyperNodeName, job, tc.hyperNodes) + if result != tc.expectedRes { + t.Errorf("Test case '%s' failed. Expected result: %d, but got: %d", + tc.name, tc.expectedRes, result) + } + }) + } +}