Skip to content

Commit

Permalink
cleanup scheduler.go (#4108)
Browse files Browse the repository at this point in the history
  • Loading branch information
gabesaba authored Feb 3, 2025
1 parent 663bdfa commit dc077ea
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 71 deletions.
81 changes: 10 additions & 71 deletions pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"fmt"
"maps"
"sort"
"strings"
"testing"

"github.com/go-logr/logr"
Expand All @@ -33,7 +32,6 @@ import (
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
"k8s.io/utils/clock"
"k8s.io/utils/field"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

Expand All @@ -47,9 +45,7 @@ import (
"sigs.k8s.io/kueue/pkg/scheduler/flavorassigner"
"sigs.k8s.io/kueue/pkg/scheduler/preemption"
"sigs.k8s.io/kueue/pkg/util/api"
"sigs.k8s.io/kueue/pkg/util/limitrange"
"sigs.k8s.io/kueue/pkg/util/priority"
"sigs.k8s.io/kueue/pkg/util/resource"
"sigs.k8s.io/kueue/pkg/util/routine"
"sigs.k8s.io/kueue/pkg/util/wait"
"sigs.k8s.io/kueue/pkg/workload"
Expand All @@ -74,8 +70,9 @@ type Scheduler struct {
fairSharing config.FairSharing
clock clock.Clock

// attemptCount identifies the number of scheduling attempt in logs, from the last restart.
attemptCount int64
// schedulingCycle identifies the number of scheduling
// attempts since the last restart.
schedulingCycle int64

// Stubs.
applyAdmission func(context.Context, *kueue.Workload) error
Expand Down Expand Up @@ -175,8 +172,8 @@ func reportSkippedPreemptions(p map[string]int) {
}

func (s *Scheduler) schedule(ctx context.Context) wait.SpeedSignal {
s.attemptCount++
log := ctrl.LoggerFrom(ctx).WithValues("attemptCount", s.attemptCount)
s.schedulingCycle++
log := ctrl.LoggerFrom(ctx).WithValues("schedulingCycle", s.schedulingCycle)
ctx = ctrl.LoggerInto(ctx, log)

// 1. Get the heads from the queues, including their desired clusterQueue.
Expand Down Expand Up @@ -375,9 +372,9 @@ func (s *Scheduler) nominate(ctx context.Context, workloads []workload.Info, sna
} else if !cq.NamespaceSelector.Matches(labels.Set(ns.Labels)) {
e.inadmissibleMsg = "Workload namespace doesn't match ClusterQueue selector"
e.requeueReason = queue.RequeueReasonNamespaceMismatch
} else if err := s.validateResources(&w); err != nil {
} else if err := workload.ValidateResources(&w); err != nil {
e.inadmissibleMsg = err.Error()
} else if err := s.validateLimitRange(ctx, &w); err != nil {
} else if err := workload.ValidateLimitRange(ctx, s.client, &w); err != nil {
e.inadmissibleMsg = err.Error()
} else {
e.assignment, e.preemptionTargets = s.getAssignments(log, &e.Info, snap)
Expand Down Expand Up @@ -461,65 +458,6 @@ func (s *Scheduler) getAssignments(log logr.Logger, wl *workload.Info, snap *cac
return fullAssignment, nil
}

// validateResources validates that requested resources are less or equal
// to limits.
func (s *Scheduler) validateResources(wi *workload.Info) error {
podsetsPath := field.NewPath("podSets")
// requests should be less than limits.
allReasons := []string{}
for i := range wi.Obj.Spec.PodSets {
ps := &wi.Obj.Spec.PodSets[i]
psPath := podsetsPath.Child(ps.Name)
for i := range ps.Template.Spec.InitContainers {
c := ps.Template.Spec.InitContainers[i]
if list := resource.GetGreaterKeys(c.Resources.Requests, c.Resources.Limits); len(list) > 0 {
allReasons = append(allReasons, fmt.Sprintf("%s[%s] requests exceed it's limits",
psPath.Child("initContainers").Index(i).String(),
strings.Join(list, ", ")))
}
}

for i := range ps.Template.Spec.Containers {
c := ps.Template.Spec.Containers[i]
if list := resource.GetGreaterKeys(c.Resources.Requests, c.Resources.Limits); len(list) > 0 {
allReasons = append(allReasons, fmt.Sprintf("%s[%s] requests exceed it's limits",
psPath.Child("containers").Index(i).String(),
strings.Join(list, ", ")))
}
}
}
if len(allReasons) > 0 {
return fmt.Errorf("resource validation failed: %s", strings.Join(allReasons, "; "))
}
return nil
}

// validateLimitRange validates that the requested resources fit into the namespace defined
// limitRanges.
func (s *Scheduler) validateLimitRange(ctx context.Context, wi *workload.Info) error {
podsetsPath := field.NewPath("podSets")
// get the range summary from the namespace.
list := corev1.LimitRangeList{}
if err := s.client.List(ctx, &list, &client.ListOptions{Namespace: wi.Obj.Namespace}); err != nil {
return err
}
if len(list.Items) == 0 {
return nil
}
summary := limitrange.Summarize(list.Items...)

// verify
allReasons := []string{}
for i := range wi.Obj.Spec.PodSets {
ps := &wi.Obj.Spec.PodSets[i]
allReasons = append(allReasons, summary.ValidatePodSpec(&ps.Template.Spec, podsetsPath.Child(ps.Name))...)
}
if len(allReasons) > 0 {
return fmt.Errorf("didn't satisfy LimitRange constraints: %s", strings.Join(allReasons, "; "))
}
return nil
}

// admit sets the admitting clusterQueue and flavors into the workload of
// the entry, and asynchronously updates the object in the apiserver after
// assuming it in the cache.
Expand Down Expand Up @@ -602,8 +540,9 @@ func (e entryOrdering) Swap(i, j int) {

// Less is the ordering criteria:
// 1. request under nominal quota before borrowing.
// 2. higher priority first.
// 3. FIFO on eviction or creation timestamp.
// 2. fair sharing: lower DominantResourceShare first.
// 3. higher priority first.
// 4. FIFO on eviction or creation timestamp.
func (e entryOrdering) Less(i, j int) bool {
a := e.entries[i]
b := e.entries[j]
Expand Down
61 changes: 61 additions & 0 deletions pkg/workload/resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ package workload
import (
"context"
"fmt"
"strings"

corev1 "k8s.io/api/core/v1"
nodev1 "k8s.io/api/node/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/field"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

Expand Down Expand Up @@ -119,3 +121,62 @@ func AdjustResources(ctx context.Context, cl client.Client, wl *kueue.Workload)
}
handleLimitsToRequests(wl)
}

// ValidateResources validates that requested resources are less or equal
// to limits.
func ValidateResources(wi *Info) error {
podsetsPath := field.NewPath("podSets")
// requests should be less than limits.
allReasons := []string{}
for i := range wi.Obj.Spec.PodSets {
ps := &wi.Obj.Spec.PodSets[i]
psPath := podsetsPath.Child(ps.Name)
for i := range ps.Template.Spec.InitContainers {
c := ps.Template.Spec.InitContainers[i]
if list := resource.GetGreaterKeys(c.Resources.Requests, c.Resources.Limits); len(list) > 0 {
allReasons = append(allReasons, fmt.Sprintf("%s[%s] requests exceed it's limits",
psPath.Child("initContainers").Index(i).String(),
strings.Join(list, ", ")))
}
}

for i := range ps.Template.Spec.Containers {
c := ps.Template.Spec.Containers[i]
if list := resource.GetGreaterKeys(c.Resources.Requests, c.Resources.Limits); len(list) > 0 {
allReasons = append(allReasons, fmt.Sprintf("%s[%s] requests exceed it's limits",
psPath.Child("containers").Index(i).String(),
strings.Join(list, ", ")))
}
}
}
if len(allReasons) > 0 {
return fmt.Errorf("resource validation failed: %s", strings.Join(allReasons, "; "))
}
return nil
}

// ValidateLimitRange validates that the requested resources fit into the namespace defined
// limitRanges.
func ValidateLimitRange(ctx context.Context, c client.Client, wi *Info) error {
podsetsPath := field.NewPath("podSets")
// get the range summary from the namespace.
list := corev1.LimitRangeList{}
if err := c.List(ctx, &list, &client.ListOptions{Namespace: wi.Obj.Namespace}); err != nil {
return err
}
if len(list.Items) == 0 {
return nil
}
summary := limitrange.Summarize(list.Items...)

// verify
allReasons := []string{}
for i := range wi.Obj.Spec.PodSets {
ps := &wi.Obj.Spec.PodSets[i]
allReasons = append(allReasons, summary.ValidatePodSpec(&ps.Template.Spec, podsetsPath.Child(ps.Name))...)
}
if len(allReasons) > 0 {
return fmt.Errorf("didn't satisfy LimitRange constraints: %s", strings.Join(allReasons, "; "))
}
return nil
}

0 comments on commit dc077ea

Please sign in to comment.