diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 8c4aebcd80d..ddbd93e96e6 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -21,7 +21,6 @@ import ( "fmt" "maps" "sort" - "strings" "testing" "github.com/go-logr/logr" @@ -33,7 +32,6 @@ import ( "k8s.io/client-go/tools/record" "k8s.io/klog/v2" "k8s.io/utils/clock" - "k8s.io/utils/field" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -47,9 +45,7 @@ import ( "sigs.k8s.io/kueue/pkg/scheduler/flavorassigner" "sigs.k8s.io/kueue/pkg/scheduler/preemption" "sigs.k8s.io/kueue/pkg/util/api" - "sigs.k8s.io/kueue/pkg/util/limitrange" "sigs.k8s.io/kueue/pkg/util/priority" - "sigs.k8s.io/kueue/pkg/util/resource" "sigs.k8s.io/kueue/pkg/util/routine" "sigs.k8s.io/kueue/pkg/util/wait" "sigs.k8s.io/kueue/pkg/workload" @@ -74,8 +70,8 @@ type Scheduler struct { fairSharing config.FairSharing clock clock.Clock - // attemptCount identifies the number of scheduling attempt in logs, from the last restart. - attemptCount int64 + // schedulingCycleCount identifies the number of scheduling attempt in logs, from the last restart. + schedulingCycleCount int64 // Stubs. applyAdmission func(context.Context, *kueue.Workload) error @@ -175,8 +171,8 @@ func reportSkippedPreemptions(p map[string]int) { } func (s *Scheduler) schedule(ctx context.Context) wait.SpeedSignal { - s.attemptCount++ - log := ctrl.LoggerFrom(ctx).WithValues("attemptCount", s.attemptCount) + s.schedulingCycleCount++ + log := ctrl.LoggerFrom(ctx).WithValues("schedulingCycleCount", s.schedulingCycleCount) ctx = ctrl.LoggerInto(ctx, log) // 1. Get the heads from the queues, including their desired clusterQueue. @@ -375,9 +371,9 @@ func (s *Scheduler) nominate(ctx context.Context, workloads []workload.Info, sna } else if !cq.NamespaceSelector.Matches(labels.Set(ns.Labels)) { e.inadmissibleMsg = "Workload namespace doesn't match ClusterQueue selector" e.requeueReason = queue.RequeueReasonNamespaceMismatch - } else if err := s.validateResources(&w); err != nil { + } else if err := workload.ValidateResources(&w); err != nil { e.inadmissibleMsg = err.Error() - } else if err := s.validateLimitRange(ctx, &w); err != nil { + } else if err := workload.ValidateLimitRange(ctx, s.client, &w); err != nil { e.inadmissibleMsg = err.Error() } else { e.assignment, e.preemptionTargets = s.getAssignments(log, &e.Info, snap) @@ -461,65 +457,6 @@ func (s *Scheduler) getAssignments(log logr.Logger, wl *workload.Info, snap *cac return fullAssignment, nil } -// validateResources validates that requested resources are less or equal -// to limits. -func (s *Scheduler) validateResources(wi *workload.Info) error { - podsetsPath := field.NewPath("podSets") - // requests should be less than limits. - allReasons := []string{} - for i := range wi.Obj.Spec.PodSets { - ps := &wi.Obj.Spec.PodSets[i] - psPath := podsetsPath.Child(ps.Name) - for i := range ps.Template.Spec.InitContainers { - c := ps.Template.Spec.InitContainers[i] - if list := resource.GetGreaterKeys(c.Resources.Requests, c.Resources.Limits); len(list) > 0 { - allReasons = append(allReasons, fmt.Sprintf("%s[%s] requests exceed it's limits", - psPath.Child("initContainers").Index(i).String(), - strings.Join(list, ", "))) - } - } - - for i := range ps.Template.Spec.Containers { - c := ps.Template.Spec.Containers[i] - if list := resource.GetGreaterKeys(c.Resources.Requests, c.Resources.Limits); len(list) > 0 { - allReasons = append(allReasons, fmt.Sprintf("%s[%s] requests exceed it's limits", - psPath.Child("containers").Index(i).String(), - strings.Join(list, ", "))) - } - } - } - if len(allReasons) > 0 { - return fmt.Errorf("resource validation failed: %s", strings.Join(allReasons, "; ")) - } - return nil -} - -// validateLimitRange validates that the requested resources fit into the namespace defined -// limitRanges. -func (s *Scheduler) validateLimitRange(ctx context.Context, wi *workload.Info) error { - podsetsPath := field.NewPath("podSets") - // get the range summary from the namespace. - list := corev1.LimitRangeList{} - if err := s.client.List(ctx, &list, &client.ListOptions{Namespace: wi.Obj.Namespace}); err != nil { - return err - } - if len(list.Items) == 0 { - return nil - } - summary := limitrange.Summarize(list.Items...) - - // verify - allReasons := []string{} - for i := range wi.Obj.Spec.PodSets { - ps := &wi.Obj.Spec.PodSets[i] - allReasons = append(allReasons, summary.ValidatePodSpec(&ps.Template.Spec, podsetsPath.Child(ps.Name))...) - } - if len(allReasons) > 0 { - return fmt.Errorf("didn't satisfy LimitRange constraints: %s", strings.Join(allReasons, "; ")) - } - return nil -} - // admit sets the admitting clusterQueue and flavors into the workload of // the entry, and asynchronously updates the object in the apiserver after // assuming it in the cache. @@ -602,8 +539,9 @@ func (e entryOrdering) Swap(i, j int) { // Less is the ordering criteria: // 1. request under nominal quota before borrowing. -// 2. higher priority first. -// 3. FIFO on eviction or creation timestamp. +// 2. fair sharing: lower DominantResourceShare first. +// 3. higher priority first. +// 4. FIFO on eviction or creation timestamp. func (e entryOrdering) Less(i, j int) bool { a := e.entries[i] b := e.entries[j] diff --git a/pkg/workload/resources.go b/pkg/workload/resources.go index b8cf31c182b..7a1144f7b5d 100644 --- a/pkg/workload/resources.go +++ b/pkg/workload/resources.go @@ -19,10 +19,12 @@ package workload import ( "context" "fmt" + "strings" corev1 "k8s.io/api/core/v1" nodev1 "k8s.io/api/node/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/utils/field" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -119,3 +121,62 @@ func AdjustResources(ctx context.Context, cl client.Client, wl *kueue.Workload) } handleLimitsToRequests(wl) } + +// ValidateResources validates that requested resources are less or equal +// to limits. +func ValidateResources(wi *Info) error { + podsetsPath := field.NewPath("podSets") + // requests should be less than limits. + allReasons := []string{} + for i := range wi.Obj.Spec.PodSets { + ps := &wi.Obj.Spec.PodSets[i] + psPath := podsetsPath.Child(ps.Name) + for i := range ps.Template.Spec.InitContainers { + c := ps.Template.Spec.InitContainers[i] + if list := resource.GetGreaterKeys(c.Resources.Requests, c.Resources.Limits); len(list) > 0 { + allReasons = append(allReasons, fmt.Sprintf("%s[%s] requests exceed it's limits", + psPath.Child("initContainers").Index(i).String(), + strings.Join(list, ", "))) + } + } + + for i := range ps.Template.Spec.Containers { + c := ps.Template.Spec.Containers[i] + if list := resource.GetGreaterKeys(c.Resources.Requests, c.Resources.Limits); len(list) > 0 { + allReasons = append(allReasons, fmt.Sprintf("%s[%s] requests exceed it's limits", + psPath.Child("containers").Index(i).String(), + strings.Join(list, ", "))) + } + } + } + if len(allReasons) > 0 { + return fmt.Errorf("resource validation failed: %s", strings.Join(allReasons, "; ")) + } + return nil +} + +// ValidateLimitRange validates that the requested resources fit into the namespace defined +// limitRanges. +func ValidateLimitRange(ctx context.Context, c client.Client, wi *Info) error { + podsetsPath := field.NewPath("podSets") + // get the range summary from the namespace. + list := corev1.LimitRangeList{} + if err := c.List(ctx, &list, &client.ListOptions{Namespace: wi.Obj.Namespace}); err != nil { + return err + } + if len(list.Items) == 0 { + return nil + } + summary := limitrange.Summarize(list.Items...) + + // verify + allReasons := []string{} + for i := range wi.Obj.Spec.PodSets { + ps := &wi.Obj.Spec.PodSets[i] + allReasons = append(allReasons, summary.ValidatePodSpec(&ps.Template.Spec, podsetsPath.Child(ps.Name))...) + } + if len(allReasons) > 0 { + return fmt.Errorf("didn't satisfy LimitRange constraints: %s", strings.Join(allReasons, "; ")) + } + return nil +}