Skip to content

Commit

Permalink
Fix optional resource deletion for collector CR
Browse files Browse the repository at this point in the history
  • Loading branch information
swiatekm committed Nov 25, 2024
1 parent 9262ba8 commit 96d5c1b
Show file tree
Hide file tree
Showing 10 changed files with 623 additions and 113 deletions.
16 changes: 16 additions & 0 deletions .chloggen/fix_remove-optional-resources.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. collector, target allocator, auto-instrumentation, opamp, github action)
component: collector

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Fix deletion of optional resources for OpenTelemetryCollector CRs

# One or more tracking issues related to the change
issues: [3454]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
26 changes: 20 additions & 6 deletions controllers/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import (

"github.com/go-logr/logr"
rbacv1 "k8s.io/api/rbac/v1"
apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/retry"
Expand Down Expand Up @@ -119,18 +119,32 @@ func BuildTargetAllocator(params targetallocator.Params) ([]client.Object, error
// getList queries the Kubernetes API to list the requested resource, setting the list l of type T.
func getList[T client.Object](ctx context.Context, cl client.Client, l T, options ...client.ListOption) (map[types.UID]client.Object, error) {
ownedObjects := map[types.UID]client.Object{}
list := &unstructured.UnstructuredList{}
gvk, err := apiutil.GVKForObject(l, cl.Scheme())
if err != nil {
return nil, err
}
list.SetGroupVersionKind(gvk)
err = cl.List(ctx, list, options...)
gvk.Kind = fmt.Sprintf("%sList", gvk.Kind)
list, err := cl.Scheme().New(gvk)
if err != nil {
return nil, fmt.Errorf("unable to list objects of type %s: %w", gvk.Kind, err)
}

objList := list.(client.ObjectList)

err = cl.List(ctx, objList, options...)
if err != nil {
return ownedObjects, fmt.Errorf("error listing %T: %w", l, err)
}
for i := range list.Items {
ownedObjects[list.Items[i].GetUID()] = &list.Items[i]
objs, err := apimeta.ExtractList(objList)
if err != nil {
return ownedObjects, fmt.Errorf("error listing %T: %w", l, err)
}
for i := range objs {
typedObj, ok := objs[i].(T)
if !ok {
return ownedObjects, fmt.Errorf("error listing %T: %w", l, err)
}
ownedObjects[typedObj.GetUID()] = typedObj
}
return ownedObjects, nil
}
Expand Down
163 changes: 95 additions & 68 deletions controllers/opentelemetrycollector_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package controllers

import (
"context"
"fmt"
"sort"

"github.com/go-logr/logr"
Expand All @@ -30,12 +29,14 @@ import (
policyV1 "k8s.io/api/policy/v1"
rbacv1 "k8s.io/api/rbac/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/cluster"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"

"github.com/open-telemetry/opentelemetry-operator/apis/v1alpha1"
Expand All @@ -46,7 +47,7 @@ import (
"github.com/open-telemetry/opentelemetry-operator/internal/config"
"github.com/open-telemetry/opentelemetry-operator/internal/manifests"
"github.com/open-telemetry/opentelemetry-operator/internal/manifests/collector"
"github.com/open-telemetry/opentelemetry-operator/internal/manifests/manifestutils"
"github.com/open-telemetry/opentelemetry-operator/internal/naming"
internalRbac "github.com/open-telemetry/opentelemetry-operator/internal/rbac"
collectorStatus "github.com/open-telemetry/opentelemetry-operator/internal/status/collector"
"github.com/open-telemetry/opentelemetry-operator/pkg/constants"
Expand Down Expand Up @@ -82,35 +83,13 @@ type Params struct {

func (r *OpenTelemetryCollectorReconciler) findOtelOwnedObjects(ctx context.Context, params manifests.Params) (map[types.UID]client.Object, error) {
ownedObjects := map[types.UID]client.Object{}
ownedObjectTypes := []client.Object{
&autoscalingv2.HorizontalPodAutoscaler{},
&networkingv1.Ingress{},
&policyV1.PodDisruptionBudget{},
}
listOps := &client.ListOptions{
Namespace: params.OtelCol.Namespace,
LabelSelector: labels.SelectorFromSet(manifestutils.SelectorLabels(params.OtelCol.ObjectMeta, collector.ComponentOpenTelemetryCollector)),
}
if featuregate.PrometheusOperatorIsAvailable.IsEnabled() && r.config.PrometheusCRAvailability() == prometheus.Available {
ownedObjectTypes = append(ownedObjectTypes,
&monitoringv1.ServiceMonitor{},
&monitoringv1.PodMonitor{},
)
}
if params.Config.OpenShiftRoutesAvailability() == openshift.RoutesAvailable {
ownedObjectTypes = append(ownedObjectTypes, &routev1.Route{})
ownedObjectTypes := r.GetOwnedResourceTypes()
listOpts := []client.ListOption{
client.InNamespace(params.OtelCol.Namespace),
client.MatchingFields{resourceOwnerKey: params.OtelCol.Name},
}
for _, objectType := range ownedObjectTypes {
objs, err := getList(ctx, r, objectType, listOps)
if err != nil {
return nil, err
}
for uid, object := range objs {
ownedObjects[uid] = object
}
}
if params.Config.CreateRBACPermissions() == rbac.Available {
objs, err := r.findClusterRoleObjects(ctx, params)
objs, err := getList(ctx, r, objectType, listOpts...)
if err != nil {
return nil, err
}
Expand All @@ -119,15 +98,7 @@ func (r *OpenTelemetryCollectorReconciler) findOtelOwnedObjects(ctx context.Cont
}
}

configMapList := &corev1.ConfigMapList{}
err := r.List(ctx, configMapList, listOps)
if err != nil {
return nil, fmt.Errorf("error listing ConfigMaps: %w", err)
}
ownedConfigMaps := r.getConfigMapsToRemove(params.OtelCol.Spec.ConfigVersions, configMapList)
for i := range ownedConfigMaps {
ownedObjects[ownedConfigMaps[i].GetUID()] = &ownedConfigMaps[i]
}
removeKeptConfigMapVersions(params.OtelCol.Spec.ConfigVersions, ownedObjects)

return ownedObjects, nil
}
Expand All @@ -138,7 +109,10 @@ func (r *OpenTelemetryCollectorReconciler) findClusterRoleObjects(ctx context.Co
// Remove cluster roles and bindings.
// Users might switch off the RBAC creation feature on the operator which should remove existing RBAC.
listOpsCluster := &client.ListOptions{
LabelSelector: labels.SelectorFromSet(manifestutils.SelectorLabels(params.OtelCol.ObjectMeta, collector.ComponentOpenTelemetryCollector)),
LabelSelector: labels.SelectorFromSet(map[string]string{
"app.kubernetes.io/managed-by": "opentelemetry-operator",
"app.kubernetes.io/instance": naming.Truncate("%s.%s", 63, params.OtelCol.Namespace, params.OtelCol.Name),
}),
}
for _, objectType := range ownedClusterObjectTypes {
objs, err := getList(ctx, r, objectType, listOpsCluster)
Expand All @@ -152,25 +126,34 @@ func (r *OpenTelemetryCollectorReconciler) findClusterRoleObjects(ctx context.Co
return ownedObjects, nil
}

// getConfigMapsToRemove returns a list of ConfigMaps to remove based on the number of ConfigMaps to keep.
// It keeps the newest ConfigMap, the `configVersionsToKeep` next newest ConfigMaps, and returns the remainder.
func (r *OpenTelemetryCollectorReconciler) getConfigMapsToRemove(configVersionsToKeep int, configMapList *corev1.ConfigMapList) []corev1.ConfigMap {
// removeKeptConfigMaps removes old ConfigMaps that we want to keep from the map of owned objects.
// Normally the controller would delete them after determining they're not in the list of desired objects generated
// from the OpenTelemetryCollector CR, but we want to keep them around.
func removeKeptConfigMapVersions(configVersionsToKeep int, ownedObjects map[types.UID]client.Object) {
configVersionsToKeep = max(1, configVersionsToKeep)
ownedConfigMaps := []corev1.ConfigMap{}
sort.Slice(configMapList.Items, func(i, j int) bool {
iTime := configMapList.Items[i].GetCreationTimestamp().Time
jTime := configMapList.Items[j].GetCreationTimestamp().Time
ownedConfigMaps := []client.Object{}
for _, ownedObject := range ownedObjects {
if ownedObject.GetObjectKind().GroupVersionKind().Kind != "ConfigMap" {
continue
}
if !featuregate.CollectorUsesTargetAllocatorCR.IsEnabled() && ownedObject.GetLabels()["app.kubernetes.io/component"] != "opentelemetry-collector" {
// we only apply this to collector ConfigMaps
continue
}
ownedConfigMaps = append(ownedConfigMaps, ownedObject)
}
sort.Slice(ownedConfigMaps, func(i, j int) bool {
iTime := ownedConfigMaps[i].GetCreationTimestamp().Time
jTime := ownedConfigMaps[j].GetCreationTimestamp().Time
// sort the ConfigMaps newest to oldest
return iTime.After(jTime)
})

for i := range configMapList.Items {
if i > configVersionsToKeep {
ownedConfigMaps = append(ownedConfigMaps, configMapList.Items[i])
}
configMapsToKeep := min(configVersionsToKeep+1, len(ownedConfigMaps))
// remove the first configVersionsToKeep items
for i := range ownedConfigMaps[:configMapsToKeep] {
delete(ownedObjects, ownedConfigMaps[i].GetUID())
}

return ownedConfigMaps
}

func (r *OpenTelemetryCollectorReconciler) GetParams(ctx context.Context, instance v1beta1.OpenTelemetryCollector) (manifests.Params, error) {
Expand Down Expand Up @@ -310,32 +293,74 @@ func (r *OpenTelemetryCollectorReconciler) Reconcile(ctx context.Context, req ct

// SetupWithManager tells the manager what our controller is interested in.
func (r *OpenTelemetryCollectorReconciler) SetupWithManager(mgr ctrl.Manager) error {
err := r.SetupCaches(mgr)
if err != nil {
return err
}

ownedResources := r.GetOwnedResourceTypes()
builder := ctrl.NewControllerManagedBy(mgr).
For(&v1beta1.OpenTelemetryCollector{}).
Owns(&corev1.ConfigMap{}).
Owns(&corev1.ServiceAccount{}).
Owns(&corev1.Service{}).
Owns(&appsv1.Deployment{}).
Owns(&appsv1.DaemonSet{}).
Owns(&appsv1.StatefulSet{}).
Owns(&networkingv1.Ingress{}).
Owns(&autoscalingv2.HorizontalPodAutoscaler{}).
Owns(&policyV1.PodDisruptionBudget{})
For(&v1beta1.OpenTelemetryCollector{})

for _, resource := range ownedResources {
builder.Owns(resource)
}

return builder.Complete(r)
}

// SetupCaches sets up caching and indexing for our controller.
func (r *OpenTelemetryCollectorReconciler) SetupCaches(cluster cluster.Cluster) error {
ownedResources := r.GetOwnedResourceTypes()
for _, resource := range ownedResources {
if err := cluster.GetCache().IndexField(context.Background(), resource, resourceOwnerKey, func(rawObj client.Object) []string {
owner := metav1.GetControllerOf(rawObj)
if owner == nil {
return nil
}
// make sure it's an OpenTelemetryCollector
if owner.Kind != "OpenTelemetryCollector" {
return nil
}

return []string{owner.Name}
}); err != nil {
return err
}
}
return nil
}

// GetOwnedResourceTypes returns all the resource types the controller can own. Even though this method returns an array
// of client.Object, these are (empty) example structs rather than actual resources.
func (r *OpenTelemetryCollectorReconciler) GetOwnedResourceTypes() []client.Object {
ownedResources := []client.Object{
&corev1.ConfigMap{},
&corev1.ServiceAccount{},
&corev1.Service{},
&appsv1.Deployment{},
&appsv1.DaemonSet{},
&appsv1.StatefulSet{},
&networkingv1.Ingress{},
&autoscalingv2.HorizontalPodAutoscaler{},
&policyV1.PodDisruptionBudget{},
}

if r.config.CreateRBACPermissions() == rbac.Available {
builder.Owns(&rbacv1.ClusterRoleBinding{})
builder.Owns(&rbacv1.ClusterRole{})
ownedResources = append(ownedResources, &rbacv1.ClusterRole{})
ownedResources = append(ownedResources, &rbacv1.ClusterRoleBinding{})
}

if featuregate.PrometheusOperatorIsAvailable.IsEnabled() && r.config.PrometheusCRAvailability() == prometheus.Available {
builder.Owns(&monitoringv1.ServiceMonitor{})
builder.Owns(&monitoringv1.PodMonitor{})
ownedResources = append(ownedResources, &monitoringv1.PodMonitor{})
ownedResources = append(ownedResources, &monitoringv1.ServiceMonitor{})
}

if r.config.OpenShiftRoutesAvailability() == openshift.RoutesAvailable {
builder.Owns(&routev1.Route{})
ownedResources = append(ownedResources, &routev1.Route{})
}

return builder.Complete(r)
return ownedResources
}

const collectorFinalizer = "opentelemetrycollector.opentelemetry.io/finalizer"
Expand All @@ -351,3 +376,5 @@ func (r *OpenTelemetryCollectorReconciler) finalizeCollector(ctx context.Context
}
return nil
}

const resourceOwnerKey = ".metadata.owner"
Loading

0 comments on commit 96d5c1b

Please sign in to comment.