-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add network-topology-aware plugin and hyperNode score callback #3894
base: network-topology
Are you sure you want to change the base?
Conversation
Welcome @ecosysbin! |
/assign @Monokaix |
2f15078
to
947107e
Compare
pkg/scheduler/api/job_info.go
Outdated
|
||
// The RootHypernode property of a job is the hypernode that serves as the smallest root in the hypernode tree. | ||
// A job has multiple tasks, each belonging to a hypernode. This RootHypernode is the topmost and lowest common ancestor among the hypernodes of all tasks within the job. | ||
RootHyperNode string |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When is RootHyperNode
assigned? I didn't find any codes assign the value to RootHyperNode
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when the task of job commit, the hyperNode of the task will assigned to job. it will add the code later.
pkg/scheduler/api/types.go
Outdated
@@ -300,6 +301,9 @@ type NodeReduceFn func(*TaskInfo, k8sframework.NodeScoreList) error | |||
// NodeOrderMapFn is the func declaration used to get priority score of all plugins for a node for a particular task. | |||
type NodeOrderMapFn func(*TaskInfo, *NodeInfo) (map[string]float64, float64, error) | |||
|
|||
// HyperNodeOrderFn is the func declaration used to score hyperNodes for job. | |||
type HyperNodeOrderFn func(*JobInfo, map[string][]*NodeInfo, []int, map[int][]string, map[string]sets.Set[string]) (map[string]float64, error) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You may need to rebase this pr and factor your code:https://github.com/volcano-sh/volcano/pull/3874/files, the HyperNodeOrderFn structure has changed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok. l will do it later.
pkg/scheduler/framework/session.go
Outdated
// hyperNode can gain a better performance, the lower the tier of hyperNode, the better performance. | ||
HyperNodesListByTier map[int][]string | ||
|
||
HyperNodesTiers []int |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above, there are no so many fields now, may need to rebase and refactor: https://github.com/volcano-sh/volcano/pull/3874/files
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok. According to the discussion, there will have a hyperNode tree here. it will do in another PR.
|
||
// Goals: | ||
// - The tier to which the rootHypernode of a job belongs should be as low as possible. | ||
func networkTopologyAwareScore(hyperNode string, job *api.JobInfo, hyperNodesTiers []int, hyperNodesListByTier map[int][]string, hyperNodesMap map[string]sets.Set[string]) float64 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn’t get the point for this score func. Does this have a design plan or a picture? After refactoring based on this PR: https://github.com/volcano-sh/volcano/pull/3874/files, could you add more details or description about how to score hypernodes in your pr?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Explanation:
The RootHypernode property of a job is the hypernode that serves as the smallest root in the hypernode
tree.
A job has multiple tasks, each belonging to a hypernode. This RootHypernode is the topmost and lowest common ancestor among the hypernodes of all tasks within the job.
Goals:
The tier to which the rootHypernode of a job belongs should be as low as possible.
I will change the name 'RootHypernode' to 'LCAHyperNode' later.
d1da751
to
414e547
Compare
414e547
to
5036976
Compare
889afed
to
b9a291b
Compare
// - Tasks under a job should be scheduled to one hyperNode as much as possible. | ||
func networkTopologyAwareScore(hyperNodeName string, job *api.JobInfo, hyperNodeTree []map[string][]string) float64 { | ||
jobHyperNode := job.PodGroup.GetAnnotations()[api.TopologyAllocateLCAHyperNode] | ||
// job fist first scheduler. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
-> //job is scheduled for the first time
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
// HyperNodeTree is the hypernode tree of all hypernodes in the cluster. | ||
// currentJobLCAHyperNode is the hypernode of the job's LCAHyperNode. | ||
var ( | ||
HyperNodeTree []map[string][]string |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to maintain a separate global variable here? And this data structure does not seem to be called HyperNodeTree, it does not reflect hierarchical relationships.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
l have move the hyperNodeTree to session, According to the discussion, this variable will be replaced by Gu Peng's HyperNodeTree later.
return taskCount | ||
} | ||
|
||
// FindOutRootHyperNode find out the root hypernode of the job when the hypernode join the job. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
-> //FindLCAHyperNode finds out the common ancestor of the current hypernode and the hypernode where the job is scheduled
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
} | ||
} | ||
|
||
func TestFindLCAHyperNode(t *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
None of the use cases specify currentJobLCAHyperNode?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
} | ||
|
||
hyperNodesMap := make(map[string]sets.Set[string]) | ||
for i := 0; i < len(revertHyperNodeTree); i++ { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
extract a public function, has the same code in FindLCAHyperNode
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
// PluginName indicates name of volcano scheduler plugin. | ||
PluginName = "networktopologyaware" | ||
BaseScore = 100 | ||
TaskBaseScore = 10 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is 10 points reasonable? need to discuss /cc @Monokaix
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To ensure that the scores for the task num are distributed between 0 and 10, so that the scores for the tier level will be higher than those for the task num.
pkg/scheduler/framework/session.go
Outdated
@@ -127,8 +126,10 @@ func openSession(cache cache.Cache) *Session { | |||
|
|||
TotalResource: api.EmptyResource(), | |||
TotalGuarantee: api.EmptyResource(), | |||
podGroupStatus: map[api.JobID]scheduling.PodGroupStatus{}, | |||
|
|||
PodGroupCache: api.PodGroupCache{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is better to rename as PodGroupOldState
, which is used to record some old PodGroup data.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
pkg/scheduler/cache/cache.go
Outdated
job.PodGroup = pg | ||
} | ||
|
||
sc.Jobs[job.UID] = job |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to update here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
pkg/scheduler/cache/cache.go
Outdated
@@ -1536,9 +1537,12 @@ func (sc *SchedulerCache) UpdateJobStatus(job *schedulingapi.JobInfo, updatePG b | |||
if err != nil { | |||
return nil, err | |||
} | |||
sc.Mutex.Lock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there is no need to add lock? session --> UpdateJobStatus is still in the same goroutine, and there is no other goroutine will read or write this map
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
According to the discussion, locks need to be added for all operations on the cache.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here is to update podgroup to apiserver. If you just want to update cache, should not be here .
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, l need to update podgroup to apiserver.
@@ -245,6 +247,14 @@ func (alloc *Action) allocateResourceForTasksWithTopology(tasks *util.PriorityQu | |||
break | |||
} | |||
for _, hyperNodeName := range ssn.HyperNodesListByTier[tier] { | |||
if jobHyperNode == "" { | |||
// job first scheduler |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
-> //job is scheduled for the first time
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
e648163
to
1847753
Compare
6a60ca6
to
336d5fe
Compare
2c56edf
to
506cc84
Compare
nodes, ok := ssn.HyperNodes[hyperNodeName] | ||
if !ok { | ||
klog.ErrorS(nil, "HyperNode not exists.", "jobName", job.UID, "name", hyperNodeName, "tier", tier) | ||
continue | ||
} | ||
|
||
jobNewHyperNodeMap[hyperNodeName] = hyperNodeName | ||
// The job is rescheduled again. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
--> The job still has remaining tasks to be scheduled, check whether the least common ancestor hypernode still meets the requirement of the highest allowed tier
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok,I have already modified it.
@@ -233,9 +219,11 @@ func (alloc *Action) allocateResourceForTasksWithTopology(tasks *util.PriorityQu | |||
hyperNodesWithLeftTasks := make(map[string]*util.PriorityQueue) | |||
ssn := alloc.session | |||
selectedTier := 0 | |||
jobNewHyperNodeMap := map[string]string{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps LCAHypernodeMap
is better
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, I have done it
@@ -233,9 +219,11 @@ func (alloc *Action) allocateResourceForTasksWithTopology(tasks *util.PriorityQu | |||
hyperNodesWithLeftTasks := make(map[string]*util.PriorityQueue) | |||
ssn := alloc.session | |||
selectedTier := 0 | |||
jobNewHyperNodeMap := map[string]string{} | |||
jobHyperNode := job.PodGroup.Annotations[api.TopologyAllocateLCAHyperNode] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also jobHyperNode
--> jobLCAHyperNode
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok,I have done it.
|
||
// Find a suitable hyperNode in one tier from down to top everytime to ensure that the selected hyperNode spans the least tier. | ||
for index, tier := range alloc.hyperNodesTiers { | ||
for index, tier := range ssn.HyperNodesTiers { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why to change alloc.hyperNodesTiers --> ssn.HyperNodesTiers?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I alse need hyperNodesTiers in networktopologyaware plugin and the plugin only can use data of session. After discussion with @Monokaix , this data structure will be moved to the session.
@@ -286,6 +284,8 @@ func (alloc *Action) allocateResourceForTasksWithTopology(tasks *util.PriorityQu | |||
klog.V(4).InfoS("Find available hyperNodes for job", "jobName", job.UID, "tier", selectedTier, "hyperNodes", hyperNodes) | |||
} | |||
stmt, hyperNode := alloc.selectBestHyperNode(jobStmtsByTier[selectedTier], job) | |||
jobNewHyperNode := jobNewHyperNodeMap[hyperNode] | |||
job.PodGroup.GetAnnotations()[api.TopologyAllocateLCAHyperNode] = jobNewHyperNode |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a if here to judge whether current LCAHypernode is same as the previous? Like:
if currentLCAHyperNode != jobLCAHyperNode{
job.PodGroup.GetAnnotations()[api.TopologyAllocateLCAHyperNode] = currentLCAHyperNode
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes,good idea,In this way, the logic will be clearer. I have done it
506cc84
to
63a313c
Compare
6f940a9
to
0aaad96
Compare
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here.
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
62e7e66
to
44c8a28
Compare
44c8a28
to
2e8f63b
Compare
Signed-off-by: ecosysbin <[email protected]>
2e8f63b
to
a82f026
Compare
Realted to:
#3885
#3877
Add network-topology-aware plugin and hyperNode score callback
Volcano scheduler: Supports network topology aware scheduling when pods rescheduled