diff --git a/controllers/ebpf/agent_controller.go b/controllers/ebpf/agent_controller.go index 310262981..e5b6500ac 100644 --- a/controllers/ebpf/agent_controller.go +++ b/controllers/ebpf/agent_controller.go @@ -7,7 +7,6 @@ import ( "strings" flowslatest "github.com/netobserv/network-observability-operator/apis/flowcollector/v1beta2" - metricslatest "github.com/netobserv/network-observability-operator/apis/flowmetrics/v1alpha1" "github.com/netobserv/network-observability-operator/controllers/constants" "github.com/netobserv/network-observability-operator/controllers/ebpf/internal/permissions" "github.com/netobserv/network-observability-operator/controllers/flp" @@ -24,7 +23,6 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/utils/ptr" - "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/log" ) @@ -127,7 +125,17 @@ func (c *AgentController) Reconcile(ctx context.Context, target *flowslatest.Flo if err := c.permissions.Reconcile(ctx, &target.Spec.Agent.EBPF); err != nil { return fmt.Errorf("reconciling permissions: %w", err) } - desired, err := c.desired(ctx, target, rlog) + + var inprocFLPInfo *flp.InProcessInfo + if helper.UseMergedAgentFLP(&target.Spec) { + // Direct-FLP mode + inprocFLPInfo, err = flp.ReconcileInProcess(ctx, c.Instance, target) + if err != nil { + return fmt.Errorf("reconciling in-process FLP: %w", err) + } + } + + desired, err := c.desired(ctx, target, inprocFLPInfo, rlog) if err != nil { return err } @@ -174,28 +182,30 @@ func newMountPropagationMode(m corev1.MountPropagationMode) *corev1.MountPropaga return mode } -func (c *AgentController) desired(ctx context.Context, coll *flowslatest.FlowCollector, rlog logr.Logger) (*v1.DaemonSet, error) { +func (c *AgentController) desired(ctx context.Context, coll *flowslatest.FlowCollector, inprocFLPInfo *flp.InProcessInfo, rlog logr.Logger) (*v1.DaemonSet, error) { if coll == nil || !helper.UseEBPF(&coll.Spec) { return nil, nil } version := helper.ExtractVersion(c.Image) annotations := make(map[string]string) - fm := metricslatest.FlowMetricList{} - if !helper.UseKafka(&coll.Spec) { - // Direct-FLP mode => list custom metrics - if err := c.List(ctx, &fm, &client.ListOptions{Namespace: coll.Spec.Namespace}); err != nil { - return nil, c.Status.Error("CantListFlowMetrics", err) - } - } - - env, err := c.envConfig(ctx, coll, annotations, &fm) + env, err := c.envConfig(ctx, coll, annotations, inprocFLPInfo) if err != nil { return nil, err } volumeMounts := c.volumes.GetMounts() volumes := c.volumes.GetVolumes() + if inprocFLPInfo != nil { + // Merge annotations + for k, v := range inprocFLPInfo.Annotations { + annotations[k] = v + } + // Add volumes + volumes = inprocFLPInfo.Volumes.AppendVolumes(volumes) + volumeMounts = inprocFLPInfo.Volumes.AppendMounts(volumeMounts) + } + if helper.IsPrivileged(&coll.Spec.Agent.EBPF) { volume := corev1.Volume{ Name: bpfNetNSMountName, @@ -278,7 +288,7 @@ func (c *AgentController) desired(ctx context.Context, coll *flowslatest.FlowCol }, nil } -func (c *AgentController) envConfig(ctx context.Context, coll *flowslatest.FlowCollector, annots map[string]string, metrics *metricslatest.FlowMetricList) ([]corev1.EnvVar, error) { +func (c *AgentController) envConfig(ctx context.Context, coll *flowslatest.FlowCollector, annots map[string]string, inprocFLPInfo *flp.InProcessInfo) ([]corev1.EnvVar, error) { config := c.setEnvConfig(coll) if helper.UseKafka(&coll.Spec) { @@ -333,10 +343,6 @@ func (c *AgentController) envConfig(ctx context.Context, coll *flowslatest.FlowC ) } } else { - flpConfig, err := c.buildFLPConfig(&coll.Spec, metrics) - if err != nil { - return nil, err - } debugConfig := helper.GetAdvancedProcessorConfig(coll.Spec.Processor.Advanced) config = append(config, corev1.EnvVar{ @@ -345,7 +351,7 @@ func (c *AgentController) envConfig(ctx context.Context, coll *flowslatest.FlowC }, corev1.EnvVar{ Name: envFLPConfig, - Value: flpConfig, + Value: inprocFLPInfo.JSONConfig, }, corev1.EnvVar{ Name: envFlowsTargetHost, @@ -365,19 +371,6 @@ func (c *AgentController) envConfig(ctx context.Context, coll *flowslatest.FlowC return config, nil } -func (c *AgentController) buildFLPConfig(desired *flowslatest.FlowCollectorSpec, metrics *metricslatest.FlowMetricList) (string, error) { - flpBuilder, err := flp.NewBuilder(c.NewInstance(c.Image, c.Status), desired, metrics, flp.ConfMonolith) - if err != nil { - return "", err - } - pipeline := flpBuilder.NewInProcessPipeline() - err = pipeline.AddProcessorStages() - if err != nil { - return "", err - } - return flpBuilder.GetJSONConfig() -} - func requiredAction(current, desired *v1.DaemonSet) reconcileAction { if desired == nil { return actionNone diff --git a/controllers/ebpf/internal/permissions/permissions.go b/controllers/ebpf/internal/permissions/permissions.go index 34d733a2f..6d81909e5 100644 --- a/controllers/ebpf/internal/permissions/permissions.go +++ b/controllers/ebpf/internal/permissions/permissions.go @@ -6,13 +6,11 @@ import ( flowslatest "github.com/netobserv/network-observability-operator/apis/flowcollector/v1beta2" "github.com/netobserv/network-observability-operator/controllers/constants" - "github.com/netobserv/network-observability-operator/controllers/flp" "github.com/netobserv/network-observability-operator/controllers/reconcilers" "github.com/netobserv/network-observability-operator/pkg/helper" osv1 "github.com/openshift/api/security/v1" v1 "k8s.io/api/core/v1" - rbacv1 "k8s.io/api/rbac/v1" "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -47,9 +45,6 @@ func (c *Reconciler) Reconcile(ctx context.Context, desired *flowslatest.FlowCol if err := c.reconcileVendorPermissions(ctx, desired); err != nil { return fmt.Errorf("reconciling vendor permissions: %w", err) } - if err := c.reconcileRoles(ctx); err != nil { - return fmt.Errorf("reconciling roles: %w", err) - } return nil } @@ -227,26 +222,3 @@ func (c *Reconciler) cleanupPreviousNamespace(ctx context.Context) error { } return nil } - -func (c *Reconciler) reconcileRoles(ctx context.Context) error { - cr := flp.BuildClusterRoleTransformer() - if err := c.ReconcileClusterRole(ctx, cr); err != nil { - return err - } - crb := &rbacv1.ClusterRoleBinding{ - ObjectMeta: metav1.ObjectMeta{ - Name: cr.Name + "-agent", - }, - RoleRef: rbacv1.RoleRef{ - APIGroup: "rbac.authorization.k8s.io", - Kind: "ClusterRole", - Name: cr.Name, - }, - Subjects: []rbacv1.Subject{{ - Kind: "ServiceAccount", - Name: constants.EBPFServiceAccount, - Namespace: c.PrivilegedNamespace(), - }}, - } - return c.ReconcileClusterRoleBinding(ctx, crb) -} diff --git a/controllers/flp/flp_common_objects.go b/controllers/flp/flp_common_objects.go index c533edcec..8ea150c92 100644 --- a/controllers/flp/flp_common_objects.go +++ b/controllers/flp/flp_common_objects.go @@ -33,6 +33,7 @@ const ( livenessPeriodSeconds = 10 startupFailureThreshold = 5 startupPeriodSeconds = 10 + appLabel = "app" ) type ConfKind string @@ -87,11 +88,11 @@ func NewBuilder(info *reconcilers.Instance, desired *flowslatest.FlowCollectorSp return builder{ info: info, labels: map[string]string{ - "app": name, + appLabel: name, "version": helper.MaxLabelLength(version), }, selector: map[string]string{ - "app": name, + appLabel: name, }, desired: desired, flowMetrics: flowMetrics, @@ -155,6 +156,11 @@ func (b *builder) initPipeline(ingest config.PipelineBuilderStage) PipelineBuild return pipeline } +func (b *builder) overrideApp(app string) { + b.labels[appLabel] = app + b.selector[appLabel] = app +} + func (b *builder) portProtocol() corev1.Protocol { if helper.UseEBPF(b.desired) { return corev1.ProtocolTCP @@ -375,7 +381,7 @@ func (b *builder) serviceAccount() *corev1.ServiceAccount { Name: b.name(), Namespace: b.info.Namespace, Labels: map[string]string{ - "app": b.name(), + appLabel: b.name(), }, }, } @@ -392,7 +398,7 @@ func (b *builder) clusterRoleBinding(ck ConfKind, mono bool) *rbacv1.ClusterRole ObjectMeta: metav1.ObjectMeta{ Name: rbName, Labels: map[string]string{ - "app": b.name(), + appLabel: b.name(), }, }, RoleRef: rbacv1.RoleRef{ @@ -487,7 +493,7 @@ func (b *builder) prometheusRule() *monitoringv1.PrometheusRule { For: &d, Labels: map[string]string{ "severity": "warning", - "app": "netobserv", + appLabel: "netobserv", }, }) } @@ -504,7 +510,7 @@ func (b *builder) prometheusRule() *monitoringv1.PrometheusRule { For: &d, Labels: map[string]string{ "severity": "warning", - "app": "netobserv", + appLabel: "netobserv", }, }) } diff --git a/controllers/flp/flp_controller.go b/controllers/flp/flp_controller.go index 688f1b0a3..5ae008d8c 100644 --- a/controllers/flp/flp_controller.go +++ b/controllers/flp/flp_controller.go @@ -237,8 +237,8 @@ func reconcileMonitoringCerts(ctx context.Context, info *reconcilers.Common, tls return nil } -func reconcileLokiRoles(ctx context.Context, r *reconcilers.Common, b *builder) error { - roles := loki.ClusterRoles(b.desired.Loki.Mode) +func ReconcileLokiRoles(ctx context.Context, r *reconcilers.Common, spec *flowslatest.FlowCollectorSpec, appName, saName, saNamespace string) error { + roles := loki.ClusterRoles(spec.Loki.Mode) if len(roles) > 0 { for i := range roles { if err := r.ReconcileClusterRole(ctx, &roles[i]); err != nil { @@ -246,7 +246,7 @@ func reconcileLokiRoles(ctx context.Context, r *reconcilers.Common, b *builder) } } // Binding - crb := loki.ClusterRoleBinding(b.name(), b.name(), b.info.Namespace) + crb := loki.ClusterRoleBinding(appName, saName, saNamespace) if err := r.ReconcileClusterRoleBinding(ctx, crb); err != nil { return err } diff --git a/controllers/flp/flp_monolith_reconciler.go b/controllers/flp/flp_monolith_reconciler.go index 962515985..7530b0926 100644 --- a/controllers/flp/flp_monolith_reconciler.go +++ b/controllers/flp/flp_monolith_reconciler.go @@ -109,23 +109,26 @@ func (r *monolithReconciler) reconcile(ctx context.Context, desired *flowslatest return err } + err = r.reconcileCertificates(ctx, desired, annotations) + if err != nil { + return err + } + + return r.reconcileDaemonSet(ctx, builder.daemonSet(annotations)) +} + +func (r *monolithReconciler) reconcileCertificates(ctx context.Context, desired *flowslatest.FlowCollector, annotations map[string]string) error { // Watch for Loki certificate if necessary; we'll ignore in that case the returned digest, as we don't need to restart pods on cert rotation // because certificate is always reloaded from file - if _, err = r.Watcher.ProcessCACert(ctx, r.Client, &r.Loki.TLS, r.Namespace); err != nil { + if _, err := r.Watcher.ProcessCACert(ctx, r.Client, &r.Loki.TLS, r.Namespace); err != nil { return err } - // Watch for Kafka exporter certificate if necessary; need to restart pods in case of cert rotation - if err = annotateKafkaExporterCerts(ctx, r.Common, desired.Spec.Exporters, annotations); err != nil { + if err := annotateKafkaExporterCerts(ctx, r.Common, desired.Spec.Exporters, annotations); err != nil { return err } - // Watch for monitoring caCert - if err = reconcileMonitoringCerts(ctx, r.Common, &desired.Spec.Processor.Metrics.Server.TLS, r.Namespace); err != nil { - return err - } - - return r.reconcileDaemonSet(ctx, builder.daemonSet(annotations)) + return reconcileMonitoringCerts(ctx, r.Common, &desired.Spec.Processor.Metrics.Server.TLS, r.Namespace) } func (r *monolithReconciler) reconcilePrometheusService(ctx context.Context, builder *monolithBuilder) error { @@ -185,5 +188,12 @@ func (r *monolithReconciler) reconcilePermissions(ctx context.Context, builder * } } - return reconcileLokiRoles(ctx, r.Common, &builder.generic) + return ReconcileLokiRoles( + ctx, + r.Common, + builder.generic.desired, + builder.generic.name(), + builder.generic.name(), + r.Common.Namespace, + ) } diff --git a/controllers/flp/flp_transfo_reconciler.go b/controllers/flp/flp_transfo_reconciler.go index 009d46b9c..75c280790 100644 --- a/controllers/flp/flp_transfo_reconciler.go +++ b/controllers/flp/flp_transfo_reconciler.go @@ -202,5 +202,12 @@ func (r *transformerReconciler) reconcilePermissions(ctx context.Context, builde return err } - return reconcileLokiRoles(ctx, r.Common, &builder.generic) + return ReconcileLokiRoles( + ctx, + r.Common, + builder.generic.desired, + builder.generic.name(), + builder.generic.name(), + r.Common.Namespace, + ) } diff --git a/controllers/flp/in_process_reconciler.go b/controllers/flp/in_process_reconciler.go new file mode 100644 index 000000000..7b34bdbec --- /dev/null +++ b/controllers/flp/in_process_reconciler.go @@ -0,0 +1,124 @@ +package flp + +import ( + "context" + + flowslatest "github.com/netobserv/network-observability-operator/apis/flowcollector/v1beta2" + metricslatest "github.com/netobserv/network-observability-operator/apis/flowmetrics/v1alpha1" + "github.com/netobserv/network-observability-operator/controllers/constants" + "github.com/netobserv/network-observability-operator/controllers/reconcilers" + "github.com/netobserv/network-observability-operator/pkg/volumes" + rbacv1 "k8s.io/api/rbac/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +type inProcessReconciler struct { + monolith *monolithReconciler +} + +type InProcessInfo struct { + JSONConfig string + Annotations map[string]string + Volumes volumes.Builder +} + +func ReconcileInProcess(ctx context.Context, parent *reconcilers.Instance, desired *flowslatest.FlowCollector) (*InProcessInfo, error) { + i := newInProcessReconciler(parent) + return i.reconcileInProcess(ctx, desired) +} + +func newInProcessReconciler(parent *reconcilers.Instance) *inProcessReconciler { + cloneInfo := *parent.Common + cloneInfo.Namespace = parent.PrivilegedNamespace() + inst := cloneInfo.NewInstance(parent.Image, parent.Status) + m := newMonolithReconciler(inst) + return &inProcessReconciler{monolith: m} +} + +func (i *inProcessReconciler) reconcileInProcess(ctx context.Context, desired *flowslatest.FlowCollector) (*InProcessInfo, error) { + result := InProcessInfo{} + + // Retrieve current owned objects + err := i.monolith.Managed.FetchAll(ctx) + if err != nil { + return nil, err + } + + fm := metricslatest.FlowMetricList{} + if err := i.monolith.List(ctx, &fm, &client.ListOptions{Namespace: desired.Namespace}); err != nil { + return nil, i.monolith.Status.Error("CantListFlowMetrics", err) + } + + builder, err := newMonolithBuilder(i.monolith.Instance, &desired.Spec, &fm) + if err != nil { + return nil, err + } + + // Override target app + builder.generic.overrideApp(constants.EBPFAgentName) + // Build pipeline + pipeline := builder.generic.NewInProcessPipeline() + err = pipeline.AddProcessorStages() + if err != nil { + return nil, err + } + cfg, err := builder.generic.GetJSONConfig() + if err != nil { + return nil, err + } + result.JSONConfig = cfg + + err = i.reconcileRoles(ctx, &builder) + if err != nil { + return nil, err + } + + err = i.monolith.reconcilePrometheusService(ctx, &builder) + if err != nil { + return nil, err + } + + annotations := map[string]string{} + err = i.monolith.reconcileCertificates(ctx, desired, annotations) + if err != nil { + return nil, err + } + result.Annotations = annotations + result.Volumes = builder.generic.volumes + return &result, nil +} + +func (i *inProcessReconciler) reconcileRoles(ctx context.Context, builder *monolithBuilder) error { + cr := BuildClusterRoleTransformer() + if err := i.monolith.ReconcileClusterRole(ctx, cr); err != nil { + return err + } + crb := &rbacv1.ClusterRoleBinding{ + ObjectMeta: metav1.ObjectMeta{ + Name: cr.Name + "-agent", + }, + RoleRef: rbacv1.RoleRef{ + APIGroup: "rbac.authorization.k8s.io", + Kind: "ClusterRole", + Name: cr.Name, + }, + Subjects: []rbacv1.Subject{{ + Kind: "ServiceAccount", + Name: constants.EBPFServiceAccount, + Namespace: i.monolith.Namespace, + }}, + } + if err := i.monolith.ReconcileClusterRoleBinding(ctx, crb); err != nil { + return err + } + + return ReconcileLokiRoles( + ctx, + i.monolith.Common, + builder.generic.desired, + constants.EBPFAgentName, + constants.EBPFServiceAccount, + i.monolith.Namespace, + ) +}