Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cleanup scheduler.go #4108

Merged
merged 1 commit into from
Feb 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 10 additions & 71 deletions pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"fmt"
"maps"
"sort"
"strings"
"testing"

"github.com/go-logr/logr"
Expand All @@ -33,7 +32,6 @@ import (
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
"k8s.io/utils/clock"
"k8s.io/utils/field"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

Expand All @@ -47,9 +45,7 @@ import (
"sigs.k8s.io/kueue/pkg/scheduler/flavorassigner"
"sigs.k8s.io/kueue/pkg/scheduler/preemption"
"sigs.k8s.io/kueue/pkg/util/api"
"sigs.k8s.io/kueue/pkg/util/limitrange"
"sigs.k8s.io/kueue/pkg/util/priority"
"sigs.k8s.io/kueue/pkg/util/resource"
"sigs.k8s.io/kueue/pkg/util/routine"
"sigs.k8s.io/kueue/pkg/util/wait"
"sigs.k8s.io/kueue/pkg/workload"
Expand All @@ -74,8 +70,9 @@ type Scheduler struct {
fairSharing config.FairSharing
clock clock.Clock

// attemptCount identifies the number of scheduling attempt in logs, from the last restart.
attemptCount int64
// schedulingCycle identifies the number of scheduling
// attempts since the last restart.
schedulingCycle int64

// Stubs.
applyAdmission func(context.Context, *kueue.Workload) error
Expand Down Expand Up @@ -175,8 +172,8 @@ func reportSkippedPreemptions(p map[string]int) {
}

func (s *Scheduler) schedule(ctx context.Context) wait.SpeedSignal {
s.attemptCount++
log := ctrl.LoggerFrom(ctx).WithValues("attemptCount", s.attemptCount)
s.schedulingCycle++
log := ctrl.LoggerFrom(ctx).WithValues("schedulingCycle", s.schedulingCycle)
ctx = ctrl.LoggerInto(ctx, log)

// 1. Get the heads from the queues, including their desired clusterQueue.
Expand Down Expand Up @@ -375,9 +372,9 @@ func (s *Scheduler) nominate(ctx context.Context, workloads []workload.Info, sna
} else if !cq.NamespaceSelector.Matches(labels.Set(ns.Labels)) {
e.inadmissibleMsg = "Workload namespace doesn't match ClusterQueue selector"
e.requeueReason = queue.RequeueReasonNamespaceMismatch
} else if err := s.validateResources(&w); err != nil {
} else if err := workload.ValidateResources(&w); err != nil {
e.inadmissibleMsg = err.Error()
} else if err := s.validateLimitRange(ctx, &w); err != nil {
} else if err := workload.ValidateLimitRange(ctx, s.client, &w); err != nil {
e.inadmissibleMsg = err.Error()
} else {
e.assignment, e.preemptionTargets = s.getAssignments(log, &e.Info, snap)
Expand Down Expand Up @@ -461,65 +458,6 @@ func (s *Scheduler) getAssignments(log logr.Logger, wl *workload.Info, snap *cac
return fullAssignment, nil
}

// validateResources validates that requested resources are less or equal
// to limits.
func (s *Scheduler) validateResources(wi *workload.Info) error {
podsetsPath := field.NewPath("podSets")
// requests should be less than limits.
allReasons := []string{}
for i := range wi.Obj.Spec.PodSets {
ps := &wi.Obj.Spec.PodSets[i]
psPath := podsetsPath.Child(ps.Name)
for i := range ps.Template.Spec.InitContainers {
c := ps.Template.Spec.InitContainers[i]
if list := resource.GetGreaterKeys(c.Resources.Requests, c.Resources.Limits); len(list) > 0 {
allReasons = append(allReasons, fmt.Sprintf("%s[%s] requests exceed it's limits",
psPath.Child("initContainers").Index(i).String(),
strings.Join(list, ", ")))
}
}

for i := range ps.Template.Spec.Containers {
c := ps.Template.Spec.Containers[i]
if list := resource.GetGreaterKeys(c.Resources.Requests, c.Resources.Limits); len(list) > 0 {
allReasons = append(allReasons, fmt.Sprintf("%s[%s] requests exceed it's limits",
psPath.Child("containers").Index(i).String(),
strings.Join(list, ", ")))
}
}
}
if len(allReasons) > 0 {
return fmt.Errorf("resource validation failed: %s", strings.Join(allReasons, "; "))
}
return nil
}

// validateLimitRange validates that the requested resources fit into the namespace defined
// limitRanges.
func (s *Scheduler) validateLimitRange(ctx context.Context, wi *workload.Info) error {
podsetsPath := field.NewPath("podSets")
// get the range summary from the namespace.
list := corev1.LimitRangeList{}
if err := s.client.List(ctx, &list, &client.ListOptions{Namespace: wi.Obj.Namespace}); err != nil {
return err
}
if len(list.Items) == 0 {
return nil
}
summary := limitrange.Summarize(list.Items...)

// verify
allReasons := []string{}
for i := range wi.Obj.Spec.PodSets {
ps := &wi.Obj.Spec.PodSets[i]
allReasons = append(allReasons, summary.ValidatePodSpec(&ps.Template.Spec, podsetsPath.Child(ps.Name))...)
}
if len(allReasons) > 0 {
return fmt.Errorf("didn't satisfy LimitRange constraints: %s", strings.Join(allReasons, "; "))
}
return nil
}

// admit sets the admitting clusterQueue and flavors into the workload of
// the entry, and asynchronously updates the object in the apiserver after
// assuming it in the cache.
Expand Down Expand Up @@ -602,8 +540,9 @@ func (e entryOrdering) Swap(i, j int) {

// Less is the ordering criteria:
// 1. request under nominal quota before borrowing.
// 2. higher priority first.
// 3. FIFO on eviction or creation timestamp.
// 2. fair sharing: lower DominantResourceShare first.
// 3. higher priority first.
// 4. FIFO on eviction or creation timestamp.
func (e entryOrdering) Less(i, j int) bool {
a := e.entries[i]
b := e.entries[j]
Expand Down
61 changes: 61 additions & 0 deletions pkg/workload/resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ package workload
import (
"context"
"fmt"
"strings"

corev1 "k8s.io/api/core/v1"
nodev1 "k8s.io/api/node/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/field"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

Expand Down Expand Up @@ -119,3 +121,62 @@ func AdjustResources(ctx context.Context, cl client.Client, wl *kueue.Workload)
}
handleLimitsToRequests(wl)
}

// ValidateResources validates that requested resources are less or equal
// to limits.
func ValidateResources(wi *Info) error {
podsetsPath := field.NewPath("podSets")
// requests should be less than limits.
allReasons := []string{}
for i := range wi.Obj.Spec.PodSets {
ps := &wi.Obj.Spec.PodSets[i]
psPath := podsetsPath.Child(ps.Name)
for i := range ps.Template.Spec.InitContainers {
c := ps.Template.Spec.InitContainers[i]
if list := resource.GetGreaterKeys(c.Resources.Requests, c.Resources.Limits); len(list) > 0 {
allReasons = append(allReasons, fmt.Sprintf("%s[%s] requests exceed it's limits",
psPath.Child("initContainers").Index(i).String(),
strings.Join(list, ", ")))
}
}

for i := range ps.Template.Spec.Containers {
c := ps.Template.Spec.Containers[i]
if list := resource.GetGreaterKeys(c.Resources.Requests, c.Resources.Limits); len(list) > 0 {
allReasons = append(allReasons, fmt.Sprintf("%s[%s] requests exceed it's limits",
psPath.Child("containers").Index(i).String(),
strings.Join(list, ", ")))
}
}
}
if len(allReasons) > 0 {
return fmt.Errorf("resource validation failed: %s", strings.Join(allReasons, "; "))
}
return nil
}

// ValidateLimitRange validates that the requested resources fit into the namespace defined
// limitRanges.
func ValidateLimitRange(ctx context.Context, c client.Client, wi *Info) error {
podsetsPath := field.NewPath("podSets")
// get the range summary from the namespace.
list := corev1.LimitRangeList{}
if err := c.List(ctx, &list, &client.ListOptions{Namespace: wi.Obj.Namespace}); err != nil {
return err
}
if len(list.Items) == 0 {
return nil
}
summary := limitrange.Summarize(list.Items...)

// verify
allReasons := []string{}
for i := range wi.Obj.Spec.PodSets {
ps := &wi.Obj.Spec.PodSets[i]
allReasons = append(allReasons, summary.ValidatePodSpec(&ps.Template.Spec, podsetsPath.Child(ps.Name))...)
}
if len(allReasons) > 0 {
return fmt.Errorf("didn't satisfy LimitRange constraints: %s", strings.Join(allReasons, "; "))
}
return nil
}