Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implemented UnschedulablePodsCount metric #1698

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion kwok/apis/crds/karpenter.kwok.sh_kwoknodeclasses.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
annotations:
controller-gen.kubebuilder.io/version: v0.16.2
controller-gen.kubebuilder.io/version: v0.16.3
name: kwoknodeclasses.karpenter.kwok.sh
spec:
group: karpenter.kwok.sh
Expand Down
2 changes: 1 addition & 1 deletion kwok/charts/crds/karpenter.sh_nodeclaims.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
annotations:
controller-gen.kubebuilder.io/version: v0.16.2
controller-gen.kubebuilder.io/version: v0.16.3
name: nodeclaims.karpenter.sh
spec:
group: karpenter.sh
Expand Down
2 changes: 1 addition & 1 deletion kwok/charts/crds/karpenter.sh_nodepools.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
annotations:
controller-gen.kubebuilder.io/version: v0.16.2
controller-gen.kubebuilder.io/version: v0.16.3
name: nodepools.karpenter.sh
spec:
group: karpenter.sh
Expand Down
2 changes: 1 addition & 1 deletion pkg/apis/crds/karpenter.sh_nodeclaims.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
annotations:
controller-gen.kubebuilder.io/version: v0.16.2
controller-gen.kubebuilder.io/version: v0.16.3
name: nodeclaims.karpenter.sh
spec:
group: karpenter.sh
Expand Down
2 changes: 1 addition & 1 deletion pkg/apis/crds/karpenter.sh_nodepools.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
annotations:
controller-gen.kubebuilder.io/version: v0.16.2
controller-gen.kubebuilder.io/version: v0.16.3
name: nodepools.karpenter.sh
spec:
group: karpenter.sh
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/disruption/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func SimulateScheduling(ctx context.Context, kubeClient client.Client, cluster *
return client.ObjectKeyFromObject(p), nil
})

results := scheduler.Solve(log.IntoContext(ctx, operatorlogging.NopLogger), pods).TruncateInstanceTypes(pscheduling.MaxInstanceTypes)
results := scheduler.Solve(log.IntoContext(ctx, operatorlogging.NopLogger), pods, pscheduling.MaxInstanceTypes)
for _, n := range results.ExistingNodes {
// We consider existing nodes for scheduling. When these nodes are unmanaged, their taint logic should
// tell us if we can schedule to them or not; however, if these nodes are managed, we will still schedule to them
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/provisioning/provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ func (p *Provisioner) Schedule(ctx context.Context) (scheduler.Results, error) {
}
return scheduler.Results{}, fmt.Errorf("creating scheduler, %w", err)
}
results := s.Solve(ctx, pods).TruncateInstanceTypes(scheduler.MaxInstanceTypes)
results := s.Solve(ctx, pods, scheduler.MaxInstanceTypes)
if len(results.NewNodeClaims) > 0 {
log.FromContext(ctx).WithValues("Pods", pretty.Slice(lo.Map(pods, func(p *corev1.Pod, _ int) string { return klog.KRef(p.Namespace, p.Name).String() }), 5), "duration", time.Since(start)).Info("found provisionable pod(s)")
}
Expand Down
14 changes: 13 additions & 1 deletion pkg/controllers/provisioning/scheduling/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
)

func init() {
crmetrics.Registry.MustRegister(SchedulingDurationSeconds, QueueDepth)
crmetrics.Registry.MustRegister(SchedulingDurationSeconds, QueueDepth, UnschedulablePodsCount)
}

const (
Expand Down Expand Up @@ -58,4 +58,16 @@ var (
schedulingIDLabel,
},
)
UnschedulablePodsCount = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: metrics.Namespace,
Subsystem: schedulerSubsystem,
Name: "unschedulable_pods_count",
Help: "The number of unschedulable Pods.",
},
[]string{
ControllerLabel,
schedulingIDLabel,
},
)
)
16 changes: 11 additions & 5 deletions pkg/controllers/provisioning/scheduling/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func (r Results) NonPendingPodSchedulingErrors() string {

// TruncateInstanceTypes filters the result based on the maximum number of instanceTypes that needs
// to be considered. This filters all instance types generated in NewNodeClaims in the Results
func (r Results) TruncateInstanceTypes(maxInstanceTypes int) Results {
func (r *Results) TruncateInstanceTypes(maxInstanceTypes int) {
var validNewNodeClaims []*NodeClaim
for _, newNodeClaim := range r.NewNodeClaims {
// The InstanceTypeOptions are truncated due to limitations in sending the number of instances to launch API.
Expand All @@ -191,10 +191,10 @@ func (r Results) TruncateInstanceTypes(maxInstanceTypes int) Results {
}
}
r.NewNodeClaims = validNewNodeClaims
return r

}

func (s *Scheduler) Solve(ctx context.Context, pods []*corev1.Pod) Results {
func (s *Scheduler) Solve(ctx context.Context, pods []*corev1.Pod, maxInstanceTypes int) Results {
defer metrics.Measure(SchedulingDurationSeconds.With(
prometheus.Labels{ControllerLabel: injection.GetControllerName(ctx)},
))()
Expand All @@ -204,7 +204,8 @@ func (s *Scheduler) Solve(ctx context.Context, pods []*corev1.Pod) Results {
// had 5xA pods and 5xB pods were they have a zonal topology spread, but A can only go in one zone and B in another.
// We need to schedule them alternating, A, B, A, B, .... and this solution also solves that as well.
errors := map[*corev1.Pod]error{}
QueueDepth.DeletePartialMatch(prometheus.Labels{ControllerLabel: injection.GetControllerName(ctx)}) // Reset the metric for the controller, so we don't keep old ids around
UnschedulablePodsCount.DeletePartialMatch(prometheus.Labels{ControllerLabel: injection.GetControllerName(ctx)}) // Reset the metric for the controller, so we don't keep old ids around
QueueDepth.DeletePartialMatch(prometheus.Labels{ControllerLabel: injection.GetControllerName(ctx)}) // Reset the metric for the controller, so we don't keep old ids around
q := NewQueue(pods...)
for {
QueueDepth.With(
Expand Down Expand Up @@ -240,11 +241,16 @@ func (s *Scheduler) Solve(ctx context.Context, pods []*corev1.Pod) Results {
delete(errors, k)
}
}
return Results{
var results = Results{
NewNodeClaims: s.newNodeClaims,
ExistingNodes: s.existingNodes,
PodErrors: errors,
}
results.TruncateInstanceTypes(MaxInstanceTypes)
UnschedulablePodsCount.With(
prometheus.Labels{ControllerLabel: injection.GetControllerName(ctx), schedulingIDLabel: string(s.id)},
).Set(float64(len(results.PodErrors)))
return results
}

func (s *Scheduler) add(ctx context.Context, pod *corev1.Pod) error {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func benchmarkScheduler(b *testing.B, instanceCount, podCount int) {
podsScheduledInRound1 := 0
nodesInRound1 := 0
for i := 0; i < b.N; i++ {
results := scheduler.Solve(ctx, pods)
results := scheduler.Solve(ctx, pods, scheduling.MaxInstanceTypes)
if i == 0 {

minPods := math.MaxInt64
Expand Down
41 changes: 39 additions & 2 deletions pkg/controllers/provisioning/scheduling/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ var _ = AfterEach(func() {
cluster.Reset()
scheduling.QueueDepth.Reset()
scheduling.SchedulingDurationSeconds.Reset()
scheduling.UnschedulablePodsCount.Reset()
})

var _ = Context("Scheduling", func() {
Expand Down Expand Up @@ -3679,9 +3680,45 @@ var _ = Context("Scheduling", func() {
g.Expect(lo.FromPtr(m.Gauge.Value)).To(BeNumerically(">", 0))
}, time.Second).Should(Succeed())
}()
s.Solve(injection.WithControllerName(ctx, "provisioner"), pods)
s.Solve(injection.WithControllerName(ctx, "provisioner"), pods, scheduling.MaxInstanceTypes)
wg.Wait()
})
It("should surface the UnschedulablePodsCount metric while executing the scheduling loop", func() {
nodePool := test.NodePool()
ExpectApplied(ctx, env.Client, nodePool)
// all of these pods have anti-affinity to each other
labels := map[string]string{
"app": "nginx",
}
pods := test.Pods(1000, test.PodOptions{
ObjectMeta: metav1.ObjectMeta{
Labels: labels,
},
PodAntiRequirements: []corev1.PodAffinityTerm{
{
LabelSelector: &metav1.LabelSelector{MatchLabels: labels},
TopologyKey: corev1.LabelHostname,
},
},
}) // Create 1000 pods which should take long enough to schedule that we should be able to read the UnschedulablePodsCount metric with a value
s, err := prov.NewScheduler(ctx, pods, nil)
Expect(err).To(BeNil())

var wg sync.WaitGroup
wg.Add(1)
go func() {
defer GinkgoRecover()
defer wg.Done()
Eventually(func(g Gomega) {
m, ok := FindMetricWithLabelValues("karpenter_scheduler_unschedulable_pods_count", map[string]string{"controller": "provisioner"})
g.Expect(ok).To(BeTrue())
g.Expect(lo.FromPtr(m.Gauge.Value)).To(BeNumerically(">=", 0))
}, 10*time.Second).Should(Succeed())
Comment on lines +3715 to +3716
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this just test for the existence of the metric or do we expect that it should be greater than 0?

}()
s.Solve(injection.WithControllerName(ctx, "provisioner"), pods, scheduling.MaxInstanceTypes)
wg.Wait()

})
It("should surface the schedulingDuration metric after executing a scheduling loop", func() {
nodePool := test.NodePool()
ExpectApplied(ctx, env.Client, nodePool)
Expand All @@ -3702,7 +3739,7 @@ var _ = Context("Scheduling", func() {
}) // Create 1000 pods which should take long enough to schedule that we should be able to read the queueDepth metric with a value
s, err := prov.NewScheduler(ctx, pods, nil)
Expect(err).To(BeNil())
s.Solve(injection.WithControllerName(ctx, "provisioner"), pods)
s.Solve(injection.WithControllerName(ctx, "provisioner"), pods, scheduling.MaxInstanceTypes)

m, ok := FindMetricWithLabelValues("karpenter_scheduler_scheduling_duration_seconds", map[string]string{"controller": "provisioner"})
Expect(ok).To(BeTrue())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
annotations:
controller-gen.kubebuilder.io/version: v0.16.2
controller-gen.kubebuilder.io/version: v0.16.3
name: testnodeclasses.karpenter.test.sh
spec:
group: karpenter.test.sh
Expand Down
Loading