Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pod-update-performance #3907

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions pkg/scheduler/api/pod_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,3 +172,9 @@ func GetPodResourceWithoutInitContainers(pod *v1.Pod) *Resource {

return result
}

type PodStatusRateLimit struct {
Enable bool
MinPodNum int
MinIntervalSec int
}
28 changes: 22 additions & 6 deletions pkg/scheduler/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,12 @@ import (
vcinformerv1 "volcano.sh/apis/pkg/client/informers/externalversions/scheduling/v1beta1"
"volcano.sh/volcano/cmd/scheduler/app/options"
"volcano.sh/volcano/pkg/features"
"volcano.sh/volcano/pkg/scheduler/api"
schedulingapi "volcano.sh/volcano/pkg/scheduler/api"
volumescheduling "volcano.sh/volcano/pkg/scheduler/capabilities/volumebinding"
"volcano.sh/volcano/pkg/scheduler/metrics"
"volcano.sh/volcano/pkg/scheduler/metrics/source"
schedulingutil "volcano.sh/volcano/pkg/scheduler/util"
commonutil "volcano.sh/volcano/pkg/util"
)

Expand Down Expand Up @@ -153,6 +155,7 @@ type SchedulerCache struct {
imageStates map[string]*imageState

nodeWorkers uint32
PodNum int32

// IgnoredCSIProvisioners contains a list of provisioners, and pod request pvc with these provisioners will
// not be counted in pod pvc resource request and node.Allocatable, because the spec.drivers of csinode resource
Expand Down Expand Up @@ -1461,7 +1464,8 @@ func (sc *SchedulerCache) String() string {
}

// RecordJobStatusEvent records related events according to job status.
func (sc *SchedulerCache) RecordJobStatusEvent(job *schedulingapi.JobInfo, updatePG bool) {
func (sc *SchedulerCache) RecordJobStatusEvent(job *schedulingapi.JobInfo, updatePG bool, podStatusRateLimit *api.PodStatusRateLimit) {
nowTs := time.Now().Unix()
pgUnschedulable := job.PodGroup != nil &&
(job.PodGroup.Status.Phase == scheduling.PodGroupUnknown ||
job.PodGroup.Status.Phase == scheduling.PodGroupPending ||
Expand Down Expand Up @@ -1497,16 +1501,28 @@ func (sc *SchedulerCache) RecordJobStatusEvent(job *schedulingapi.JobInfo, updat
if len(msg) == 0 {
msg = baseErrorMessage
}
if err := sc.taskUnschedulable(taskInfo, reason, msg, nominatedNodeName); err != nil {
klog.ErrorS(err, "Failed to update unschedulable task status", "task", klog.KRef(taskInfo.Namespace, taskInfo.Name),
"reason", reason, "message", msg)
var (
ts int64
exist bool
useCache = podStatusRateLimit.Enable && sc.PodNum >= int32(podStatusRateLimit.MinPodNum)
)
if useCache {
ts, exist = schedulingutil.GetPodStatusLastSetCache(job.UID, taskInfo.UID)
}

if !exist || nowTs-ts > int64(podStatusRateLimit.MinIntervalSec) {
if err := sc.taskUnschedulable(taskInfo, reason, msg, nominatedNodeName); err != nil {
klog.ErrorS(err, "Failed to update unschedulable task status", "task", klog.KRef(taskInfo.Namespace, taskInfo.Name),
"reason", reason, "message", msg)
}
schedulingutil.SetPodStatusLastSetCache(job.UID, taskInfo.UID, nowTs)
}
}
}
}

// UpdateJobStatus update the status of job and its tasks.
func (sc *SchedulerCache) UpdateJobStatus(job *schedulingapi.JobInfo, updatePG bool) (*schedulingapi.JobInfo, error) {
func (sc *SchedulerCache) UpdateJobStatus(job *schedulingapi.JobInfo, updatePG bool, podStatusRateLimit *api.PodStatusRateLimit) (*schedulingapi.JobInfo, error) {
if updatePG {
pg, err := sc.StatusUpdater.UpdatePodGroup(job.PodGroup)
if err != nil {
Expand All @@ -1515,7 +1531,7 @@ func (sc *SchedulerCache) UpdateJobStatus(job *schedulingapi.JobInfo, updatePG b
job.PodGroup = pg
}

sc.RecordJobStatusEvent(job, updatePG)
sc.RecordJobStatusEvent(job, updatePG, podStatusRateLimit)

return job, nil
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/scheduler/cache/event_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"math"
"slices"
"strconv"
"sync/atomic"

v1 "k8s.io/api/core/v1"
schedulingv1 "k8s.io/api/scheduling/v1"
Expand Down Expand Up @@ -382,6 +383,7 @@ func (sc *SchedulerCache) AddPod(obj interface{}) {
sc.Mutex.Lock()
defer sc.Mutex.Unlock()

atomic.AddInt32(&sc.PodNum, 1)
err := sc.addPod(pod)
if err != nil {
klog.Errorf("Failed to add pod <%s/%s> into cache: %v",
Expand Down Expand Up @@ -437,6 +439,7 @@ func (sc *SchedulerCache) DeletePod(obj interface{}) {
sc.Mutex.Lock()
defer sc.Mutex.Unlock()

atomic.AddInt32(&sc.PodNum, -1)
err := sc.deletePod(pod)
if err != nil {
klog.Errorf("Failed to delete pod %v from cache: %v", pod.Name, err)
Expand Down
4 changes: 2 additions & 2 deletions pkg/scheduler/cache/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,10 @@ type Cache interface {

// RecordJobStatusEvent records related events according to job status.
// Deprecated: remove it after removed PDB support.
RecordJobStatusEvent(job *api.JobInfo, updatePG bool)
RecordJobStatusEvent(job *api.JobInfo, updatePG bool, podStatusRateLimit *api.PodStatusRateLimit)

// UpdateJobStatus puts job in backlog for a while.
UpdateJobStatus(job *api.JobInfo, updatePG bool) (*api.JobInfo, error)
UpdateJobStatus(job *api.JobInfo, updatePG bool, podStatusRateLimit *api.PodStatusRateLimit) (*api.JobInfo, error)

// UpdateQueueStatus update queue status.
UpdateQueueStatus(queue *api.QueueInfo) error
Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/framework/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func OpenSession(cache cache.Cache, tiers []conf.Tier, configurations []conf.Con
ssn.Configurations = configurations
ssn.NodeMap = GenerateNodeMapAndSlice(ssn.Nodes)
ssn.PodLister = NewPodLister(ssn)

ssn.parsePodStatusRateLimitArguments()
for _, tier := range tiers {
for _, plugin := range tier.Plugins {
if pb, found := GetPluginBuilder(plugin.Name); !found {
Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/framework/job_updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (ju *jobUpdater) updateJob(index int) {
job.PodGroup.Status = jobStatus(ssn, job)
oldStatus, found := ssn.podGroupStatus[job.UID]
updatePG := !found || isPodGroupStatusUpdated(job.PodGroup.Status, oldStatus)
if _, err := ssn.cache.UpdateJobStatus(job, updatePG); err != nil {
if _, err := ssn.cache.UpdateJobStatus(job, updatePG, ssn.podStatusRateLimit); err != nil {
klog.Errorf("Failed to update job <%s/%s>: %v",
job.Namespace, job.Name, err)
}
Expand Down
37 changes: 26 additions & 11 deletions pkg/scheduler/framework/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,17 +95,18 @@ type Session struct {
overusedFns map[string]api.ValidateFn
// preemptiveFns means whether current queue can reclaim from other queue,
// while reclaimableFns means whether current queue's resources can be reclaimed.
preemptiveFns map[string]api.ValidateWithCandidateFn
allocatableFns map[string]api.AllocatableFn
jobReadyFns map[string]api.ValidateFn
jobPipelinedFns map[string]api.VoteFn
jobValidFns map[string]api.ValidateExFn
jobEnqueueableFns map[string]api.VoteFn
jobEnqueuedFns map[string]api.JobEnqueuedFn
targetJobFns map[string]api.TargetJobFn
reservedNodesFns map[string]api.ReservedNodesFn
victimTasksFns map[string][]api.VictimTasksFn
jobStarvingFns map[string]api.ValidateFn
preemptiveFns map[string]api.ValidateWithCandidateFn
allocatableFns map[string]api.AllocatableFn
jobReadyFns map[string]api.ValidateFn
jobPipelinedFns map[string]api.VoteFn
jobValidFns map[string]api.ValidateExFn
jobEnqueueableFns map[string]api.VoteFn
jobEnqueuedFns map[string]api.JobEnqueuedFn
targetJobFns map[string]api.TargetJobFn
reservedNodesFns map[string]api.ReservedNodesFn
victimTasksFns map[string][]api.VictimTasksFn
jobStarvingFns map[string]api.ValidateFn
podStatusRateLimit *api.PodStatusRateLimit
}

func openSession(cache cache.Cache) *Session {
Expand Down Expand Up @@ -155,6 +156,11 @@ func openSession(cache cache.Cache) *Session {
reservedNodesFns: map[string]api.ReservedNodesFn{},
victimTasksFns: map[string][]api.VictimTasksFn{},
jobStarvingFns: map[string]api.ValidateFn{},
podStatusRateLimit: &api.PodStatusRateLimit{
Enable: true,
MinPodNum: 10000,
MinIntervalSec: 60,
},
}

snapshot := cache.Snapshot()
Expand Down Expand Up @@ -311,6 +317,8 @@ func closeSession(ssn *Session) {
ssn.NodeList = nil
ssn.TotalResource = nil

util.CleanUnusedPodStatusLastSetCache(ssn.Jobs)

klog.V(3).Infof("Close Session %v", ssn.UID)
}

Expand Down Expand Up @@ -655,6 +663,13 @@ func (ssn *Session) UpdateSchedulerNumaInfo(AllocatedSets map[string]api.ResNuma
ssn.cache.UpdateSchedulerNumaInfo(AllocatedSets)
}

func (ssn *Session) parsePodStatusRateLimitArguments() {
arguments := GetArgOfActionFromConf(ssn.Configurations, "podStatusRateLimit")
arguments.GetInt(&ssn.podStatusRateLimit.MinIntervalSec, "minIntervalSec")
arguments.GetInt(&ssn.podStatusRateLimit.MinPodNum, "minPodNum")
arguments.GetBool(&ssn.podStatusRateLimit.Enable, "enable")
}

// KubeClient returns the kubernetes client
func (ssn Session) KubeClient() kubernetes.Interface {
return ssn.kubeClient
Expand Down
42 changes: 42 additions & 0 deletions pkg/scheduler/util/session_helper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package util

import (
"sync"

"volcano.sh/volcano/pkg/scheduler/api"
)

type StatusLastSetCache struct {
Cache map[api.JobID]map[api.TaskID]int64
sync.RWMutex
}

var (
podStatusLastSetCache = StatusLastSetCache{Cache: map[api.JobID]map[api.TaskID]int64{}}
)

func SetPodStatusLastSetCache(jobID api.JobID, taskID api.TaskID, ts int64) {
podStatusLastSetCache.Lock()
defer podStatusLastSetCache.Unlock()
if _, ok := podStatusLastSetCache.Cache[jobID]; !ok {
podStatusLastSetCache.Cache[jobID] = map[api.TaskID]int64{}
}
podStatusLastSetCache.Cache[jobID][taskID] = ts
}

func GetPodStatusLastSetCache(jobID api.JobID, taskID api.TaskID) (ts int64, exist bool) {
podStatusLastSetCache.RLock()
defer podStatusLastSetCache.RUnlock()
ts, exist = podStatusLastSetCache.Cache[jobID][taskID]
return
}

func CleanUnusedPodStatusLastSetCache(jobs map[api.JobID]*api.JobInfo) {
podStatusLastSetCache.Lock()
defer podStatusLastSetCache.Unlock()
for jobID := range podStatusLastSetCache.Cache {
if _, ok := jobs[jobID]; !ok {
delete(podStatusLastSetCache.Cache, jobID)
}
}
}
Loading