From a02189ff6cd634089a2eb6d3e38f7b00c01d94da Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E5=BB=BA=E5=AE=87?= Date: Tue, 7 Jan 2025 15:48:20 +0800 Subject: [PATCH] use goroutine pool deal sc.taskUnschedulable MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: 张建宇 --- pkg/scheduler/cache/cache.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pkg/scheduler/cache/cache.go b/pkg/scheduler/cache/cache.go index 04bcb3daff..4630667b09 100644 --- a/pkg/scheduler/cache/cache.go +++ b/pkg/scheduler/cache/cache.go @@ -74,6 +74,8 @@ import ( const ( // default interval for sync data from metrics server, the value is 30s defaultMetricsInternal = 30 * time.Second + + taskUpdaterWorker = 16 ) // defaultIgnoredProvisioners contains provisioners that will be ignored during pod pvc request computation and preemption. @@ -1487,13 +1489,12 @@ func (sc *SchedulerCache) RecordJobStatusEvent(job *schedulingapi.JobInfo, updat // Update podCondition for tasks Allocated and Pending before job discarded for _, status := range []schedulingapi.TaskStatus{schedulingapi.Allocated, schedulingapi.Pending, schedulingapi.Pipelined} { statusTasks := job.TaskStatusIndex[status] - workerNum := 16 taskInfos := make([]*schedulingapi.TaskInfo, 0, len(statusTasks)) for _, task := range statusTasks { taskInfos = append(taskInfos, task) } - workqueue.ParallelizeUntil(context.TODO(), workerNum, len(taskInfos), func(index int) { + workqueue.ParallelizeUntil(context.TODO(), taskUpdaterWorker, len(taskInfos), func(index int) { taskInfo := taskInfos[index] // The pod of a scheduling gated task is given