Skip to content

Commit

Permalink
Collect PodDisruptionBudgets in Cluster Agent (#30412)
Browse files Browse the repository at this point in the history
  • Loading branch information
jbartosik authored Nov 4, 2024
1 parent 2ad2d6c commit baf7988
Show file tree
Hide file tree
Showing 10 changed files with 692 additions and 26 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -613,7 +613,7 @@ require (

require (
github.com/DATA-DOG/go-sqlmock v1.5.2
github.com/DataDog/agent-payload/v5 v5.0.134
github.com/DataDog/agent-payload/v5 v5.0.135
github.com/DataDog/datadog-agent/comp/api/api/def v0.56.0-rc.3
github.com/DataDog/datadog-agent/comp/core/config v0.57.1
github.com/DataDog/datadog-agent/comp/core/flare/types v0.57.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -28,31 +28,32 @@ type CollectorInventory struct {
func NewCollectorInventory(cfg config.Component, store workloadmeta.Component, tagger tagger.Component) *CollectorInventory {
return &CollectorInventory{
collectors: []collectors.CollectorVersions{
k8sCollectors.NewCRDCollectorVersions(),
k8sCollectors.NewClusterCollectorVersions(),
k8sCollectors.NewClusterRoleCollectorVersions(),
k8sCollectors.NewClusterRoleBindingCollectorVersions(),
k8sCollectors.NewCRDCollectorVersions(),
k8sCollectors.NewClusterRoleCollectorVersions(),
k8sCollectors.NewCronJobCollectorVersions(),
k8sCollectors.NewDaemonSetCollectorVersions(),
k8sCollectors.NewDeploymentCollectorVersions(),
k8sCollectors.NewHorizontalPodAutoscalerCollectorVersions(),
k8sCollectors.NewIngressCollectorVersions(),
k8sCollectors.NewJobCollectorVersions(),
k8sCollectors.NewLimitRangeCollectorVersions(),
k8sCollectors.NewNamespaceCollectorVersions(),
k8sCollectors.NewNetworkPolicyCollectorVersions(),
k8sCollectors.NewNodeCollectorVersions(),
k8sCollectors.NewPersistentVolumeCollectorVersions(),
k8sCollectors.NewPersistentVolumeClaimCollectorVersions(),
k8sCollectors.NewPersistentVolumeCollectorVersions(),
k8sCollectors.NewPodDisruptionBudgetCollectorVersions(),
k8sCollectors.NewReplicaSetCollectorVersions(),
k8sCollectors.NewRoleCollectorVersions(),
k8sCollectors.NewRoleBindingCollectorVersions(),
k8sCollectors.NewServiceCollectorVersions(),
k8sCollectors.NewRoleCollectorVersions(),
k8sCollectors.NewServiceAccountCollectorVersions(),
k8sCollectors.NewServiceCollectorVersions(),
k8sCollectors.NewStatefulSetCollectorVersions(),
k8sCollectors.NewStorageClassCollectorVersions(),
k8sCollectors.NewUnassignedPodCollectorVersions(cfg, store, tagger),
k8sCollectors.NewVerticalPodAutoscalerCollectorVersions(),
k8sCollectors.NewHorizontalPodAutoscalerCollectorVersions(),
k8sCollectors.NewNetworkPolicyCollectorVersions(),
},
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2024-present Datadog, Inc.

//go:build kubeapiserver && orchestrator

package k8s

import (
"k8s.io/apimachinery/pkg/labels"
v1policyinformer "k8s.io/client-go/informers/policy/v1"
v1policylister "k8s.io/client-go/listers/policy/v1"
"k8s.io/client-go/tools/cache"

"github.com/DataDog/datadog-agent/pkg/collector/corechecks/cluster/orchestrator/collectors"
"github.com/DataDog/datadog-agent/pkg/collector/corechecks/cluster/orchestrator/processors"
k8sProcessors "github.com/DataDog/datadog-agent/pkg/collector/corechecks/cluster/orchestrator/processors/k8s"
"github.com/DataDog/datadog-agent/pkg/orchestrator"
)

// NewPodDisruptionBudgetCollectorVersions builds the group of collector versions.
func NewPodDisruptionBudgetCollectorVersions() collectors.CollectorVersions {
return collectors.NewCollectorVersions(
NewPodDisruptionBudgetCollectorVersion(),
)
}

// PodDisruptionBudgetCollector is a collector for Kubernetes Pod Disruption Budgets.
type PodDisruptionBudgetCollector struct {
informer v1policyinformer.PodDisruptionBudgetInformer
lister v1policylister.PodDisruptionBudgetLister
metadata *collectors.CollectorMetadata
processor *processors.Processor
}

// NewPodDisruptionBudgetCollectorVersion creates a new collector for the Kubernetes Pod Disruption Budget
// resource.
func NewPodDisruptionBudgetCollectorVersion() *PodDisruptionBudgetCollector {
return &PodDisruptionBudgetCollector{
informer: nil,
lister: nil,
metadata: &collectors.CollectorMetadata{
IsDefaultVersion: true,
IsStable: false,
IsMetadataProducer: true,
IsManifestProducer: true,
SupportsManifestBuffering: true,
Name: "poddisruptionbudgets",
NodeType: orchestrator.K8sPodDisruptionBudget,
Version: "policy/v1",
},
processor: processors.NewProcessor(new(k8sProcessors.PodDisruptionBudgetHandlers)),
}
}

// Informer returns the shared informer.
func (c *PodDisruptionBudgetCollector) Informer() cache.SharedInformer {
return c.informer.Informer()
}

// Init is used to initialize the collector.
func (c *PodDisruptionBudgetCollector) Init(rcfg *collectors.CollectorRunConfig) {
c.informer = rcfg.OrchestratorInformerFactory.InformerFactory.Policy().V1().PodDisruptionBudgets()
c.lister = c.informer.Lister()
}

// Metadata is used to access information about the collector.
func (c *PodDisruptionBudgetCollector) Metadata() *collectors.CollectorMetadata {
return c.metadata
}

// Run triggers the collection process.
func (c *PodDisruptionBudgetCollector) Run(rcfg *collectors.CollectorRunConfig) (*collectors.CollectorRunResult, error) {
list, err := c.lister.List(labels.Everything())
if err != nil {
return nil, collectors.NewListingError(err)
}

ctx := collectors.NewK8sProcessorContext(rcfg, c.metadata)

processResult, processed := c.processor.Process(ctx, list)

if processed == -1 {
return nil, collectors.ErrProcessingPanic
}

result := &collectors.CollectorRunResult{
Result: processResult,
ResourcesListed: len(list),
ResourcesProcessed: processed,
}

return result, nil
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2024-present Datadog, Inc.

//go:build orchestrator

package k8s

import (
model "github.com/DataDog/agent-payload/v5/process"
"github.com/DataDog/datadog-agent/pkg/collector/corechecks/cluster/orchestrator/processors"
"github.com/DataDog/datadog-agent/pkg/collector/corechecks/cluster/orchestrator/processors/common"
k8sTransformers "github.com/DataDog/datadog-agent/pkg/collector/corechecks/cluster/orchestrator/transformers/k8s"
"github.com/DataDog/datadog-agent/pkg/orchestrator/redact"

policyv1 "k8s.io/api/policy/v1"
"k8s.io/apimachinery/pkg/types"
)

// PodDisruptionBudgetHandlers implements the Handlers interface for Kubernetes NetworkPolicy.
type PodDisruptionBudgetHandlers struct {
common.BaseHandlers
}

// AfterMarshalling is a handler called after resource marshalling.
func (h *PodDisruptionBudgetHandlers) AfterMarshalling(_ processors.ProcessorContext, _, _ interface{}, _ []byte) (skip bool) {
return
}

// BuildMessageBody is a handler called to build a message body out of a list of
// extracted resources.
func (h *PodDisruptionBudgetHandlers) BuildMessageBody(ctx processors.ProcessorContext, resourceModels []interface{}, groupSize int) model.MessageBody {
pctx := ctx.(*processors.K8sProcessorContext)
models := make([]*model.PodDisruptionBudget, 0, len(resourceModels))

for _, m := range resourceModels {
models = append(models, m.(*model.PodDisruptionBudget))
}

return &model.CollectorPodDisruptionBudget{
ClusterName: pctx.Cfg.KubeClusterName,
ClusterId: pctx.ClusterID,
GroupId: pctx.MsgGroupID,
GroupSize: int32(groupSize),
PodDisruptionBudgets: models,
Tags: append(pctx.Cfg.ExtraTags, pctx.ApiGroupVersionTag),
}
}

// ExtractResource is a handler called to extract the resource model out of a raw resource.
func (h *PodDisruptionBudgetHandlers) ExtractResource(_ processors.ProcessorContext, resource interface{}) (resourceModel interface{}) {
r := resource.(*policyv1.PodDisruptionBudget)
return k8sTransformers.ExtractPodDisruptionBudget(r)
}

// ResourceList is a handler called to convert a list passed as a generic
// interface to a list of generic interfaces.
func (h *PodDisruptionBudgetHandlers) ResourceList(_ processors.ProcessorContext, list interface{}) (resources []interface{}) {
resourceList := list.([]*policyv1.PodDisruptionBudget)
resources = make([]interface{}, 0, len(resourceList))

for _, resource := range resourceList {
resources = append(resources, resource)
}

return resources
}

// ResourceUID is a handler called to retrieve the resource UID.
func (h *PodDisruptionBudgetHandlers) ResourceUID(_ processors.ProcessorContext, resource interface{}) types.UID {
return resource.(*policyv1.PodDisruptionBudget).UID
}

// ResourceVersion is a handler called to retrieve the resource version.
func (h *PodDisruptionBudgetHandlers) ResourceVersion(_ processors.ProcessorContext, resource, _ interface{}) string {
return resource.(*policyv1.PodDisruptionBudget).ResourceVersion
}

// ScrubBeforeExtraction is a handler called to redact the raw resource before
// it is extracted as an internal resource model.
func (h *PodDisruptionBudgetHandlers) ScrubBeforeExtraction(_ processors.ProcessorContext, resource interface{}) {
r := resource.(*policyv1.PodDisruptionBudget)
redact.RemoveSensitiveAnnotationsAndLabels(r.Annotations, r.Labels)
}

// ScrubBeforeMarshalling is a handler called to redact the raw resource before
// it is marshalled to generate a manifest.
func (h *PodDisruptionBudgetHandlers) ScrubBeforeMarshalling(_ processors.ProcessorContext, _ interface{}) {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2024-present Datadog, Inc.

//go:build orchestrator

package k8s

import (
model "github.com/DataDog/agent-payload/v5/process"
"github.com/DataDog/datadog-agent/pkg/collector/corechecks/cluster/orchestrator/transformers"

policyv1 "k8s.io/api/policy/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
)

// ExtractPodDisruptionBudget returns the protobuf model corresponding to a Kubernetes
func ExtractPodDisruptionBudget(pdb *policyv1.PodDisruptionBudget) *model.PodDisruptionBudget {
if pdb == nil {
return nil
}
result := model.PodDisruptionBudget{
Metadata: extractMetadata(&pdb.ObjectMeta),
Spec: extractPodDisruptionBudgetSpec(&pdb.Spec),
Status: extractPodDisruptionBudgetStatus(&pdb.Status),
}
result.Tags = append(result.Tags, transformers.RetrieveUnifiedServiceTags(pdb.ObjectMeta.Labels)...)
return &result
}

func extractPodDisruptionBudgetSpec(spec *policyv1.PodDisruptionBudgetSpec) *model.PodDisruptionBudgetSpec {
if spec == nil {
return nil
}
result := model.PodDisruptionBudgetSpec{}
result.MinAvailable = extractIntOrString(spec.MinAvailable)
if spec.Selector != nil {
result.Selector = extractLabelSelector(spec.Selector)
}
result.MaxUnavailable = extractIntOrString(spec.MaxUnavailable)
if spec.UnhealthyPodEvictionPolicy != nil {
result.UnhealthyPodEvictionPolicy = string(*spec.UnhealthyPodEvictionPolicy)
}
return &result
}

func extractIntOrString(source *intstr.IntOrString) *model.IntOrString {
if source == nil {
return nil
}
switch source.Type {
case intstr.Int:
return &model.IntOrString{
Type: model.IntOrString_Int,
IntVal: source.IntVal,
}
case intstr.String:
return &model.IntOrString{
Type: model.IntOrString_String,
StrVal: source.StrVal,
}
}
return nil
}

func extractPodDisruptionBudgetStatus(status *policyv1.PodDisruptionBudgetStatus) *model.PodDisruptionBudgetStatus {
if status == nil {
return nil
}
return &model.PodDisruptionBudgetStatus{
DisruptedPods: extractDisruptedPods(status.DisruptedPods),
DisruptionsAllowed: status.DisruptionsAllowed,
CurrentHealthy: status.CurrentHealthy,
DesiredHealthy: status.DesiredHealthy,
ExpectedPods: status.ExpectedPods,
Conditions: extractPodDisruptionBudgetConditions(status.Conditions),
}
}

func extractDisruptedPods(disruptedPodsmap map[string]metav1.Time) map[string]int64 {
result := make(map[string]int64)
for pod, t := range disruptedPodsmap {
result[pod] = t.Time.Unix()
}
return result
}
func extractPodDisruptionBudgetConditions(conditions []metav1.Condition) []*model.Condition {
result := make([]*model.Condition, 0)
for _, condition := range conditions {
result = append(result, &model.Condition{
Type: condition.Type,
Status: string(condition.Status),
LastTransitionTime: condition.LastTransitionTime.Time.Unix(),
Reason: condition.Reason,
Message: condition.Message,
})
}
return result
}
Loading

0 comments on commit baf7988

Please sign in to comment.