diff --git a/pkg/scheduler/api/pod_info.go b/pkg/scheduler/api/pod_info.go index 7922a78297..5fffe6107a 100644 --- a/pkg/scheduler/api/pod_info.go +++ b/pkg/scheduler/api/pod_info.go @@ -172,3 +172,9 @@ func GetPodResourceWithoutInitContainers(pod *v1.Pod) *Resource { return result } + +type PodStatusRateLimit struct { + Enable bool + MinPodNum int + MinIntervalSec int +} diff --git a/pkg/scheduler/cache/cache.go b/pkg/scheduler/cache/cache.go index d0cf336bd8..fcacd13c7b 100644 --- a/pkg/scheduler/cache/cache.go +++ b/pkg/scheduler/cache/cache.go @@ -64,10 +64,12 @@ import ( vcinformerv1 "volcano.sh/apis/pkg/client/informers/externalversions/scheduling/v1beta1" "volcano.sh/volcano/cmd/scheduler/app/options" "volcano.sh/volcano/pkg/features" + "volcano.sh/volcano/pkg/scheduler/api" schedulingapi "volcano.sh/volcano/pkg/scheduler/api" volumescheduling "volcano.sh/volcano/pkg/scheduler/capabilities/volumebinding" "volcano.sh/volcano/pkg/scheduler/metrics" "volcano.sh/volcano/pkg/scheduler/metrics/source" + schedulingutil "volcano.sh/volcano/pkg/scheduler/util" commonutil "volcano.sh/volcano/pkg/util" ) @@ -153,6 +155,7 @@ type SchedulerCache struct { imageStates map[string]*imageState nodeWorkers uint32 + PodNum int32 // IgnoredCSIProvisioners contains a list of provisioners, and pod request pvc with these provisioners will // not be counted in pod pvc resource request and node.Allocatable, because the spec.drivers of csinode resource @@ -1461,7 +1464,8 @@ func (sc *SchedulerCache) String() string { } // RecordJobStatusEvent records related events according to job status. -func (sc *SchedulerCache) RecordJobStatusEvent(job *schedulingapi.JobInfo, updatePG bool) { +func (sc *SchedulerCache) RecordJobStatusEvent(job *schedulingapi.JobInfo, updatePG bool, podStatusRateLimit *api.PodStatusRateLimit) { + nowTs := time.Now().Unix() pgUnschedulable := job.PodGroup != nil && (job.PodGroup.Status.Phase == scheduling.PodGroupUnknown || job.PodGroup.Status.Phase == scheduling.PodGroupPending || @@ -1497,16 +1501,28 @@ func (sc *SchedulerCache) RecordJobStatusEvent(job *schedulingapi.JobInfo, updat if len(msg) == 0 { msg = baseErrorMessage } - if err := sc.taskUnschedulable(taskInfo, reason, msg, nominatedNodeName); err != nil { - klog.ErrorS(err, "Failed to update unschedulable task status", "task", klog.KRef(taskInfo.Namespace, taskInfo.Name), - "reason", reason, "message", msg) + var ( + ts int64 + exist bool + useCache = podStatusRateLimit.Enable && sc.PodNum >= int32(podStatusRateLimit.MinPodNum) + ) + if useCache { + ts, exist = schedulingutil.GetPodStatusLastSetCache(job.UID, taskInfo.UID) + } + + if !exist || nowTs-ts > int64(podStatusRateLimit.MinIntervalSec) { + if err := sc.taskUnschedulable(taskInfo, reason, msg, nominatedNodeName); err != nil { + klog.ErrorS(err, "Failed to update unschedulable task status", "task", klog.KRef(taskInfo.Namespace, taskInfo.Name), + "reason", reason, "message", msg) + } + schedulingutil.SetPodStatusLastSetCache(job.UID, taskInfo.UID, nowTs) } } } } // UpdateJobStatus update the status of job and its tasks. -func (sc *SchedulerCache) UpdateJobStatus(job *schedulingapi.JobInfo, updatePG bool) (*schedulingapi.JobInfo, error) { +func (sc *SchedulerCache) UpdateJobStatus(job *schedulingapi.JobInfo, updatePG bool, podStatusRateLimit *api.PodStatusRateLimit) (*schedulingapi.JobInfo, error) { if updatePG { pg, err := sc.StatusUpdater.UpdatePodGroup(job.PodGroup) if err != nil { @@ -1515,7 +1531,7 @@ func (sc *SchedulerCache) UpdateJobStatus(job *schedulingapi.JobInfo, updatePG b job.PodGroup = pg } - sc.RecordJobStatusEvent(job, updatePG) + sc.RecordJobStatusEvent(job, updatePG, podStatusRateLimit) return job, nil } diff --git a/pkg/scheduler/cache/event_handlers.go b/pkg/scheduler/cache/event_handlers.go index 1e812868f8..6ef8a17b7f 100644 --- a/pkg/scheduler/cache/event_handlers.go +++ b/pkg/scheduler/cache/event_handlers.go @@ -22,6 +22,7 @@ import ( "math" "slices" "strconv" + "sync/atomic" v1 "k8s.io/api/core/v1" schedulingv1 "k8s.io/api/scheduling/v1" @@ -382,6 +383,7 @@ func (sc *SchedulerCache) AddPod(obj interface{}) { sc.Mutex.Lock() defer sc.Mutex.Unlock() + atomic.AddInt32(&sc.PodNum, 1) err := sc.addPod(pod) if err != nil { klog.Errorf("Failed to add pod <%s/%s> into cache: %v", @@ -437,6 +439,7 @@ func (sc *SchedulerCache) DeletePod(obj interface{}) { sc.Mutex.Lock() defer sc.Mutex.Unlock() + atomic.AddInt32(&sc.PodNum, -1) err := sc.deletePod(pod) if err != nil { klog.Errorf("Failed to delete pod %v from cache: %v", pod.Name, err) diff --git a/pkg/scheduler/cache/interface.go b/pkg/scheduler/cache/interface.go index 400eb9f33e..fdb6da16d2 100644 --- a/pkg/scheduler/cache/interface.go +++ b/pkg/scheduler/cache/interface.go @@ -52,10 +52,10 @@ type Cache interface { // RecordJobStatusEvent records related events according to job status. // Deprecated: remove it after removed PDB support. - RecordJobStatusEvent(job *api.JobInfo, updatePG bool) + RecordJobStatusEvent(job *api.JobInfo, updatePG bool, podStatusRateLimit *api.PodStatusRateLimit) // UpdateJobStatus puts job in backlog for a while. - UpdateJobStatus(job *api.JobInfo, updatePG bool) (*api.JobInfo, error) + UpdateJobStatus(job *api.JobInfo, updatePG bool, podStatusRateLimit *api.PodStatusRateLimit) (*api.JobInfo, error) // UpdateQueueStatus update queue status. UpdateQueueStatus(queue *api.QueueInfo) error diff --git a/pkg/scheduler/framework/framework.go b/pkg/scheduler/framework/framework.go index c3b9396bac..38b14614a2 100644 --- a/pkg/scheduler/framework/framework.go +++ b/pkg/scheduler/framework/framework.go @@ -33,7 +33,7 @@ func OpenSession(cache cache.Cache, tiers []conf.Tier, configurations []conf.Con ssn.Configurations = configurations ssn.NodeMap = GenerateNodeMapAndSlice(ssn.Nodes) ssn.PodLister = NewPodLister(ssn) - + ssn.parsePodStatusRateLimitArguments() for _, tier := range tiers { for _, plugin := range tier.Plugins { if pb, found := GetPluginBuilder(plugin.Name); !found { diff --git a/pkg/scheduler/framework/job_updater.go b/pkg/scheduler/framework/job_updater.go index 80ba52987d..6a82e202d1 100644 --- a/pkg/scheduler/framework/job_updater.go +++ b/pkg/scheduler/framework/job_updater.go @@ -95,7 +95,7 @@ func (ju *jobUpdater) updateJob(index int) { job.PodGroup.Status = jobStatus(ssn, job) oldStatus, found := ssn.podGroupStatus[job.UID] updatePG := !found || isPodGroupStatusUpdated(job.PodGroup.Status, oldStatus) - if _, err := ssn.cache.UpdateJobStatus(job, updatePG); err != nil { + if _, err := ssn.cache.UpdateJobStatus(job, updatePG, ssn.podStatusRateLimit); err != nil { klog.Errorf("Failed to update job <%s/%s>: %v", job.Namespace, job.Name, err) } diff --git a/pkg/scheduler/framework/session.go b/pkg/scheduler/framework/session.go index dd5ef2b275..b3bdaa10c9 100644 --- a/pkg/scheduler/framework/session.go +++ b/pkg/scheduler/framework/session.go @@ -95,17 +95,18 @@ type Session struct { overusedFns map[string]api.ValidateFn // preemptiveFns means whether current queue can reclaim from other queue, // while reclaimableFns means whether current queue's resources can be reclaimed. - preemptiveFns map[string]api.ValidateWithCandidateFn - allocatableFns map[string]api.AllocatableFn - jobReadyFns map[string]api.ValidateFn - jobPipelinedFns map[string]api.VoteFn - jobValidFns map[string]api.ValidateExFn - jobEnqueueableFns map[string]api.VoteFn - jobEnqueuedFns map[string]api.JobEnqueuedFn - targetJobFns map[string]api.TargetJobFn - reservedNodesFns map[string]api.ReservedNodesFn - victimTasksFns map[string][]api.VictimTasksFn - jobStarvingFns map[string]api.ValidateFn + preemptiveFns map[string]api.ValidateWithCandidateFn + allocatableFns map[string]api.AllocatableFn + jobReadyFns map[string]api.ValidateFn + jobPipelinedFns map[string]api.VoteFn + jobValidFns map[string]api.ValidateExFn + jobEnqueueableFns map[string]api.VoteFn + jobEnqueuedFns map[string]api.JobEnqueuedFn + targetJobFns map[string]api.TargetJobFn + reservedNodesFns map[string]api.ReservedNodesFn + victimTasksFns map[string][]api.VictimTasksFn + jobStarvingFns map[string]api.ValidateFn + podStatusRateLimit *api.PodStatusRateLimit } func openSession(cache cache.Cache) *Session { @@ -155,6 +156,11 @@ func openSession(cache cache.Cache) *Session { reservedNodesFns: map[string]api.ReservedNodesFn{}, victimTasksFns: map[string][]api.VictimTasksFn{}, jobStarvingFns: map[string]api.ValidateFn{}, + podStatusRateLimit: &api.PodStatusRateLimit{ + Enable: true, + MinPodNum: 10000, + MinIntervalSec: 60, + }, } snapshot := cache.Snapshot() @@ -311,6 +317,8 @@ func closeSession(ssn *Session) { ssn.NodeList = nil ssn.TotalResource = nil + util.CleanUnusedPodStatusLastSetCache(ssn.Jobs) + klog.V(3).Infof("Close Session %v", ssn.UID) } @@ -655,6 +663,13 @@ func (ssn *Session) UpdateSchedulerNumaInfo(AllocatedSets map[string]api.ResNuma ssn.cache.UpdateSchedulerNumaInfo(AllocatedSets) } +func (ssn *Session) parsePodStatusRateLimitArguments() { + arguments := GetArgOfActionFromConf(ssn.Configurations, "podStatusRateLimit") + arguments.GetInt(&ssn.podStatusRateLimit.MinIntervalSec, "minIntervalSec") + arguments.GetInt(&ssn.podStatusRateLimit.MinPodNum, "minPodNum") + arguments.GetBool(&ssn.podStatusRateLimit.Enable, "enable") +} + // KubeClient returns the kubernetes client func (ssn Session) KubeClient() kubernetes.Interface { return ssn.kubeClient diff --git a/pkg/scheduler/util/session_helper.go b/pkg/scheduler/util/session_helper.go new file mode 100644 index 0000000000..7eace1992f --- /dev/null +++ b/pkg/scheduler/util/session_helper.go @@ -0,0 +1,42 @@ +package util + +import ( + "sync" + + "volcano.sh/volcano/pkg/scheduler/api" +) + +type StatusLastSetCache struct { + Cache map[api.JobID]map[api.TaskID]int64 + sync.RWMutex +} + +var ( + podStatusLastSetCache = StatusLastSetCache{Cache: map[api.JobID]map[api.TaskID]int64{}} +) + +func SetPodStatusLastSetCache(jobID api.JobID, taskID api.TaskID, ts int64) { + podStatusLastSetCache.Lock() + defer podStatusLastSetCache.Unlock() + if _, ok := podStatusLastSetCache.Cache[jobID]; !ok { + podStatusLastSetCache.Cache[jobID] = map[api.TaskID]int64{} + } + podStatusLastSetCache.Cache[jobID][taskID] = ts +} + +func GetPodStatusLastSetCache(jobID api.JobID, taskID api.TaskID) (ts int64, exist bool) { + podStatusLastSetCache.RLock() + defer podStatusLastSetCache.RUnlock() + ts, exist = podStatusLastSetCache.Cache[jobID][taskID] + return +} + +func CleanUnusedPodStatusLastSetCache(jobs map[api.JobID]*api.JobInfo) { + podStatusLastSetCache.Lock() + defer podStatusLastSetCache.Unlock() + for jobID := range podStatusLastSetCache.Cache { + if _, ok := jobs[jobID]; !ok { + delete(podStatusLastSetCache.Cache, jobID) + } + } +}