Skip to content

Commit

Permalink
use goroutine pool deal sc.taskUnschedulable
Browse files Browse the repository at this point in the history
Signed-off-by: 张建宇 <[email protected]>
  • Loading branch information
张建宇 committed Dec 30, 2024
1 parent 95c5aeb commit b6313fd
Showing 1 changed file with 20 additions and 23 deletions.
43 changes: 20 additions & 23 deletions pkg/scheduler/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -1486,35 +1486,32 @@ func (sc *SchedulerCache) RecordJobStatusEvent(job *schedulingapi.JobInfo, updat
}
// Update podCondition for tasks Allocated and Pending before job discarded
for _, status := range []schedulingapi.TaskStatus{schedulingapi.Allocated, schedulingapi.Pending, schedulingapi.Pipelined} {
semaphore := make(chan struct{}, 16)
var wg sync.WaitGroup
for _, taskInfo := range job.TaskStatusIndex[status] {
statusTasks := job.TaskStatusIndex[status]
workerNum := 16
taskInfos := make([]*schedulingapi.TaskInfo, 0)
for _, task := range statusTasks {
taskInfos = append(taskInfos, task)
}

workqueue.ParallelizeUntil(context.TODO(), workerNum, len(taskInfos), func(index int) {
taskInfo := taskInfos[index]

// The pod of a scheduling gated task is given
// the ScheduleGated condition by the api-server. Do not change it.
if taskInfo.SchGated {
continue
return
}

wg.Add(1)
semaphore <- struct{}{}

go func(taskInfo *schedulingapi.TaskInfo) {
defer wg.Done()

reason, msg, nominatedNodeName := job.TaskSchedulingReason(taskInfo.UID)
if len(msg) == 0 {
msg = baseErrorMessage
}

if err := sc.taskUnschedulable(taskInfo, reason, msg, nominatedNodeName); err != nil {
klog.ErrorS(err, "Failed to update unschedulable task status", "task", klog.KRef(taskInfo.Namespace, taskInfo.Name),
"reason", reason, "message", msg)
}
reason, msg, nominatedNodeName := job.TaskSchedulingReason(taskInfo.UID)
if len(msg) == 0 {
msg = baseErrorMessage
}

<-semaphore
}(taskInfo)
}
wg.Wait()
if err := sc.taskUnschedulable(taskInfo, reason, msg, nominatedNodeName); err != nil {
klog.ErrorS(err, "Failed to update unschedulable task status", "task", klog.KRef(taskInfo.Namespace, taskInfo.Name),
"reason", reason, "message", msg)
}
})
}
}

Expand Down

0 comments on commit b6313fd

Please sign in to comment.