Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Additional upstream metrics #1672

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 77 additions & 10 deletions pkg/controllers/metrics/pod/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"fmt"
"strings"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/samber/lo"
Expand Down Expand Up @@ -71,18 +72,47 @@ var (
Objectives: metrics.SummaryObjectives(),
},
)
podBoundDurationSeconds = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "karpenter",
Subsystem: metrics.PodSubsystem,
Name: "bound_duration_seconds",
Help: "The time from pod creation until the pod is bound.",
Buckets: metrics.DurationBuckets(),
},
labelNames(),
)
podCurrentUnboundTimeSeconds = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "karpenter",
Subsystem: metrics.PodSubsystem,
Name: "current_unbound_time_seconds",
Help: "The time from pod creation until the pod is bound.",
},
[]string{podName, podNameSpace},
)
podUnstartedTimeSeconds = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "karpenter",
Subsystem: metrics.PodSubsystem,
Name: "unstarted_time_seconds",
Help: "The time from pod creation until the pod is running.",
},
[]string{podName, podNameSpace},
)
)

// Controller for the resource
type Controller struct {
kubeClient client.Client
metricStore *metrics.Store

pendingPods sets.Set[string]
pendingPods sets.Set[string]
unscheduledPods sets.Set[string]
}

func init() {
crmetrics.Registry.MustRegister(podState, podStartupDurationSeconds)
crmetrics.Registry.MustRegister(podState, podStartupDurationSeconds, podBoundDurationSeconds, podCurrentUnboundTimeSeconds, podUnstartedTimeSeconds)
}

func labelNames() []string {
Expand All @@ -103,9 +133,10 @@ func labelNames() []string {
// NewController constructs a podController instance
func NewController(kubeClient client.Client) *Controller {
return &Controller{
kubeClient: kubeClient,
metricStore: metrics.NewStore(),
pendingPods: sets.New[string](),
kubeClient: kubeClient,
metricStore: metrics.NewStore(),
pendingPods: sets.New[string](),
unscheduledPods: sets.New[string](),
}
}

Expand All @@ -117,6 +148,7 @@ func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (reco
if err := c.kubeClient.Get(ctx, req.NamespacedName, pod); err != nil {
if errors.IsNotFound(err) {
c.pendingPods.Delete(req.NamespacedName.String())
c.unscheduledPods.Delete(req.NamespacedName.String())
c.metricStore.Delete(req.NamespacedName.String())
}
return reconcile.Result{}, client.IgnoreNotFound(err)
Expand All @@ -132,22 +164,57 @@ func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (reco
Labels: labels,
},
})
c.recordPodStartupMetric(pod)
c.recordPodStartupMetric(pod, labels)
return reconcile.Result{}, nil
}

func (c *Controller) recordPodStartupMetric(pod *corev1.Pod) {
func (c *Controller) recordPodStartupMetric(pod *corev1.Pod, labels prometheus.Labels) {
key := client.ObjectKeyFromObject(pod).String()
condScheduled, ok := lo.Find(pod.Status.Conditions, func(c corev1.PodCondition) bool {
return c.Type == corev1.PodScheduled
})
if pod.Status.Phase == phasePending {
if ok && condScheduled.Status != corev1.ConditionTrue {
podCurrentUnboundTimeSeconds.With(map[string]string{
podName: pod.Name,
podNameSpace: pod.Namespace,
}).Set(time.Since(pod.CreationTimestamp.Time).Seconds())
}
podUnstartedTimeSeconds.With(map[string]string{
podName: pod.Name,
podNameSpace: pod.Namespace,
}).Set(time.Since(pod.CreationTimestamp.Time).Seconds())
c.pendingPods.Insert(key)
c.unscheduledPods.Insert(key)
return
}
cond, ok := lo.Find(pod.Status.Conditions, func(c corev1.PodCondition) bool {
if c.unscheduledPods.Has(key) && ok && condScheduled.Status == corev1.ConditionTrue {
// Delete the unbound metric since the pod is now bound
podCurrentUnboundTimeSeconds.Delete(map[string]string{
podName: pod.Name,
podNameSpace: pod.Namespace,
})
podBoundDurationSeconds.With(labels).Observe(condScheduled.LastTransitionTime.Sub(pod.CreationTimestamp.Time).Seconds())
c.unscheduledPods.Delete(key)
}
condReady, ok := lo.Find(pod.Status.Conditions, func(c corev1.PodCondition) bool {
return c.Type == corev1.PodReady
})
if c.pendingPods.Has(key) && ok {
podStartupDurationSeconds.Observe(cond.LastTransitionTime.Sub(pod.CreationTimestamp.Time).Seconds())
c.pendingPods.Delete(key)
if condReady.Status != corev1.ConditionTrue {
podUnstartedTimeSeconds.With(map[string]string{
podName: pod.Name,
podNameSpace: pod.Namespace,
}).Set(time.Since(pod.CreationTimestamp.Time).Seconds())
} else {
// Delete the unstarted metric since the pod is now started
podUnstartedTimeSeconds.Delete(map[string]string{
podName: pod.Name,
podNameSpace: pod.Namespace,
})
podStartupDurationSeconds.Observe(condReady.LastTransitionTime.Sub(pod.CreationTimestamp.Time).Seconds())
c.pendingPods.Delete(key)
}
}
}

Expand Down
77 changes: 77 additions & 0 deletions pkg/controllers/metrics/pod/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"context"
"testing"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -84,6 +86,81 @@ var _ = Describe("Pod Metrics", func() {
})
Expect(found).To(BeTrue())
})
It("should update the pod bound and unbound time metrics", func() {
p := test.Pod()
p.Status.Phase = corev1.PodPending
p.Status.Conditions = []corev1.PodCondition{{Type: corev1.PodScheduled, Status: corev1.ConditionUnknown, LastTransitionTime: metav1.Now()}}
ExpectApplied(ctx, env.Client, p)
ExpectReconcileSucceeded(ctx, podController, client.ObjectKeyFromObject(p)) //This will add pod to pending pods and unscheduled pods set
metric, found := FindMetricWithLabelValues("karpenter_pods_current_unbound_time_seconds", map[string]string{
"name": p.GetName(),
"namespace": p.GetNamespace(),
})
Expect(found).To(BeTrue())
unboundTime := metric.GetGauge().Value

// Pod is still pending but has bound. At this step pods_unbound_duration should not change.
p.Status.Phase = corev1.PodPending
p.Status.Conditions = []corev1.PodCondition{{Type: corev1.PodScheduled, Status: corev1.ConditionTrue, LastTransitionTime: metav1.Now()}}
ExpectApplied(ctx, env.Client, p)
ExpectReconcileSucceeded(ctx, podController, client.ObjectKeyFromObject(p)) //This will check if the pod was scheduled or not
metric, found = FindMetricWithLabelValues("karpenter_pods_current_unbound_time_seconds", map[string]string{
"name": p.GetName(),
"namespace": p.GetNamespace(),
})
Expect(found).To(BeTrue())
Expect(metric.GetGauge().Value).To(Equal(unboundTime))

// Pod is still running and has bound. At this step pods_bound_duration should be fired and pods_current_unbound_time_seconds should be deleted
p.Status.Phase = corev1.PodRunning
ExpectApplied(ctx, env.Client, p)
ExpectReconcileSucceeded(ctx, podController, client.ObjectKeyFromObject(p)) //This will check if the pod was scheduled or not
_, found = FindMetricWithLabelValues("karpenter_pods_current_unbound_time_seconds", map[string]string{
"name": p.GetName(),
"namespace": p.GetNamespace(),
})
Expect(found).To(BeFalse())
_, found = FindMetricWithLabelValues("karpenter_pods_bound_duration_seconds", map[string]string{
"name": p.GetName(),
"namespace": p.GetNamespace(),
})
Expect(found).To(BeTrue())
})
It("should update the pod startup and unstarted time metrics", func() {
p := test.Pod()
p.Status.Phase = corev1.PodPending
ExpectApplied(ctx, env.Client, p)
ExpectReconcileSucceeded(ctx, podController, client.ObjectKeyFromObject(p)) //This will add pod to pending pods and unscheduled pods set
_, found := FindMetricWithLabelValues("karpenter_pods_unstarted_time_seconds", map[string]string{
"name": p.GetName(),
"namespace": p.GetNamespace(),
})
Expect(found).To(BeTrue())

// Pod is now running but not ready
p.Status.Phase = corev1.PodRunning
p.Status.Conditions = []corev1.PodCondition{{Type: corev1.PodReady, Status: corev1.ConditionUnknown, LastTransitionTime: metav1.Now()}}
ExpectApplied(ctx, env.Client, p)
ExpectReconcileSucceeded(ctx, podController, client.ObjectKeyFromObject(p)) //This will check if the pod was scheduled or not
_, found = FindMetricWithLabelValues("karpenter_pods_unstarted_time_seconds", map[string]string{
"name": p.GetName(),
"namespace": p.GetNamespace(),
})
Expect(found).To(BeTrue())

// Pod is now running and ready. At this step pods_startup_duration should be fired and pods_unstarted_time should be deleted
p.Status.Phase = corev1.PodRunning
p.Status.Conditions = []corev1.PodCondition{{Type: corev1.PodReady, Status: corev1.ConditionTrue, LastTransitionTime: metav1.Now()}}
ExpectApplied(ctx, env.Client, p)
ExpectReconcileSucceeded(ctx, podController, client.ObjectKeyFromObject(p)) //This will check if the pod was scheduled or not
_, found = FindMetricWithLabelValues("karpenter_pods_unstarted_time_seconds", map[string]string{
"name": p.GetName(),
"namespace": p.GetNamespace(),
})
Expect(found).To(BeFalse())
_, found = FindMetricWithLabelValues("karpenter_pods_startup_duration_seconds", nil)
Expect(found).To(BeTrue())
})
It("should delete the pod state metric on pod delete", func() {
p := test.Pod()
ExpectApplied(ctx, env.Client, p)
Expand Down
3 changes: 3 additions & 0 deletions pkg/controllers/node/termination/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,9 @@ func (c *Controller) finalize(ctx context.Context, node *corev1.Node) (reconcile

return reconcile.Result{RequeueAfter: 1 * time.Second}, nil
}
NodesDrainedTotal.With(prometheus.Labels{
metrics.NodePoolLabel: node.Labels[v1.NodePoolLabelKey],
}).Inc()
// In order for Pods associated with PersistentVolumes to smoothly migrate from the terminating Node, we wait
// for VolumeAttachments of drain-able Pods to be cleaned up before terminating Node and removing its finalizer.
// However, if TerminationGracePeriod is configured for Node, and we are past that period, we will skip waiting.
Expand Down
12 changes: 11 additions & 1 deletion pkg/controllers/node/termination/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ import (
func init() {
crmetrics.Registry.MustRegister(
TerminationDurationSeconds,
NodeLifetimeDurationSeconds)
NodeLifetimeDurationSeconds,
NodesDrainedTotal)
}

const dayDuration = time.Hour * 24
Expand All @@ -44,6 +45,15 @@ var (
},
[]string{metrics.NodePoolLabel},
)
NodesDrainedTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: metrics.Namespace,
Subsystem: metrics.NodeSubsystem,
Name: "drained_total",
Help: "The total number of nodes drained by Karpenter",
},
[]string{metrics.NodePoolLabel},
)
NodeLifetimeDurationSeconds = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: metrics.Namespace,
Expand Down
2 changes: 2 additions & 0 deletions pkg/controllers/node/termination/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ var _ = Describe("Termination", func() {
metrics.NodesTerminatedTotal.Reset()
termination.TerminationDurationSeconds.Reset()
termination.NodeLifetimeDurationSeconds.Reset()
termination.NodesDrainedTotal.Reset()
})

Context("Reconciliation", func() {
Expand Down Expand Up @@ -841,6 +842,7 @@ var _ = Describe("Termination", func() {
node = ExpectNodeExists(ctx, env.Client, node.Name)
// Reconcile twice, once to set the NodeClaim to terminating, another to check the instance termination status (and delete the node).
ExpectObjectReconciled(ctx, env.Client, terminationController, node)
ExpectMetricCounterValue(termination.NodesDrainedTotal, 1, map[string]string{"nodepool": node.Labels[v1.NodePoolLabelKey]})
ExpectObjectReconciled(ctx, env.Client, terminationController, node)

m, ok := FindMetricWithLabelValues("karpenter_nodes_terminated_total", map[string]string{"nodepool": node.Labels[v1.NodePoolLabelKey]})
Expand Down
10 changes: 9 additions & 1 deletion pkg/controllers/node/termination/terminator/eviction.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"sync"
"time"

"sigs.k8s.io/karpenter/pkg/metrics"

"github.com/awslabs/operatorpkg/singleton"
"github.com/samber/lo"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -182,11 +184,15 @@ func (q *Queue) Evict(ctx context.Context, key QueueKey) bool {
}); err != nil {
// status codes for the eviction API are defined here:
// https://kubernetes.io/docs/concepts/scheduling-eviction/api-eviction/#how-api-initiated-eviction-works
if apierrors.IsNotFound(err) || apierrors.IsConflict(err) {
if apierrors.IsNotFound(err) {
// 404 - The pod no longer exists
// https://github.com/kubernetes/kubernetes/blob/ad19beaa83363de89a7772f4d5af393b85ce5e61/pkg/registry/core/pod/storage/eviction.go#L160
return true
}
if apierrors.IsConflict(err) {
// 409 - The pod exists, but it is not the same pod that we initiated the eviction on
// https://github.com/kubernetes/kubernetes/blob/ad19beaa83363de89a7772f4d5af393b85ce5e61/pkg/registry/core/pod/storage/eviction.go#L318
metrics.NodesEvictionRequestsTotal.With(map[string]string{metrics.ResponseLabel: "conflict"}).Inc()
return true
}
if apierrors.IsTooManyRequests(err) { // 429 - PDB violation
Expand All @@ -196,9 +202,11 @@ func (q *Queue) Evict(ctx context.Context, key QueueKey) bool {
}}, fmt.Errorf("evicting pod %s/%s violates a PDB", key.Namespace, key.Name)))
return false
}
metrics.NodesEvictionRequestsTotal.With(map[string]string{metrics.ResponseLabel: "failure"}).Inc()
log.FromContext(ctx).Error(err, "failed evicting pod")
return false
}
metrics.NodesEvictionRequestsTotal.With(map[string]string{metrics.ResponseLabel: "success"}).Inc()
q.recorder.Publish(terminatorevents.EvictPod(&corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: key.Name, Namespace: key.Namespace}}))
return true
}
6 changes: 6 additions & 0 deletions pkg/controllers/node/termination/terminator/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"testing"
"time"

"sigs.k8s.io/karpenter/pkg/metrics"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/samber/lo"
Expand Down Expand Up @@ -92,6 +94,7 @@ var _ = Describe("Eviction/Queue", func() {
Labels: testLabels,
},
})
metrics.NodesEvictionRequestsTotal.Reset()
})

Context("Eviction API", func() {
Expand All @@ -102,11 +105,13 @@ var _ = Describe("Eviction/Queue", func() {
It("should succeed with no event when the pod UID conflicts", func() {
ExpectApplied(ctx, env.Client, pod)
Expect(queue.Evict(ctx, terminator.QueueKey{NamespacedName: client.ObjectKeyFromObject(pod), UID: uuid.NewUUID()})).To(BeTrue())
ExpectMetricCounterValue(metrics.NodesEvictionRequestsTotal, 1, map[string]string{metrics.ResponseLabel: "conflict"})
Expect(recorder.Events()).To(HaveLen(0))
})
It("should succeed with an evicted event when there are no PDBs", func() {
ExpectApplied(ctx, env.Client, pod)
Expect(queue.Evict(ctx, terminator.NewQueueKey(pod))).To(BeTrue())
ExpectMetricCounterValue(metrics.NodesEvictionRequestsTotal, 1, map[string]string{metrics.ResponseLabel: "success"})
Expect(recorder.Calls("Evicted")).To(Equal(1))
})
It("should succeed with no event when there are PDBs that allow an eviction", func() {
Expand All @@ -130,6 +135,7 @@ var _ = Describe("Eviction/Queue", func() {
})
ExpectApplied(ctx, env.Client, pdb, pdb2, pod)
Expect(queue.Evict(ctx, terminator.NewQueueKey(pod))).To(BeFalse())
ExpectMetricCounterValue(metrics.NodesEvictionRequestsTotal, 1, map[string]string{metrics.ResponseLabel: "failure"})
})
It("should ensure that calling Evict() is valid while making Add() calls", func() {
cancelCtx, cancel := context.WithCancel(ctx)
Expand Down
3 changes: 2 additions & 1 deletion pkg/controllers/provisioning/provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,13 +159,14 @@ func (p *Provisioner) GetPendingPods(ctx context.Context) ([]*corev1.Pod, error)
if err != nil {
return nil, fmt.Errorf("listing pods, %w", err)
}
pods = lo.Reject(pods, func(po *corev1.Pod, _ int) bool {
rejectedPods, pods := lo.FilterReject(pods, func(po *corev1.Pod, _ int) bool {
if err := p.Validate(ctx, po); err != nil {
log.FromContext(ctx).WithValues("Pod", klog.KRef(po.Namespace, po.Name)).V(1).Info(fmt.Sprintf("ignoring pod, %s", err))
return true
}
return false
})
metrics.IgnoredPodCount.Set(float64(len(rejectedPods)))
p.consolidationWarnings(ctx, pods)
return pods, nil
}
Expand Down
Loading
Loading