diff --git a/controllers/consoleplugin/consoleplugin_objects.go b/controllers/consoleplugin/consoleplugin_objects.go index 808edccf5..84859a9b1 100644 --- a/controllers/consoleplugin/consoleplugin_objects.go +++ b/controllers/consoleplugin/consoleplugin_objects.go @@ -20,7 +20,6 @@ import ( flowslatest "github.com/netobserv/network-observability-operator/apis/flowcollector/v1beta2" config "github.com/netobserv/network-observability-operator/controllers/consoleplugin/config" "github.com/netobserv/network-observability-operator/controllers/constants" - "github.com/netobserv/network-observability-operator/controllers/ebpf" "github.com/netobserv/network-observability-operator/pkg/helper" "github.com/netobserv/network-observability-operator/pkg/loki" "github.com/netobserv/network-observability-operator/pkg/volumes" @@ -350,15 +349,6 @@ func (b *builder) setLokiConfig(lconf *config.LokiConfig) { } func (b *builder) setFrontendConfig(fconf *config.FrontendConfig) error { - var err error - dedupJustMark, err := strconv.ParseBool(ebpf.DedupeJustMarkDefault) - if err != nil { - return err - } - dedupMerge, err := strconv.ParseBool(ebpf.DedupeMergeDefault) - if err != nil { - return err - } if helper.UseEBPF(b.desired) { if helper.IsPktDropEnabled(&b.desired.Agent.EBPF) { fconf.Features = append(fconf.Features, "pktDrop") @@ -371,22 +361,6 @@ func (b *builder) setFrontendConfig(fconf *config.FrontendConfig) error { if helper.IsFlowRTTEnabled(&b.desired.Agent.EBPF) { fconf.Features = append(fconf.Features, "flowRTT") } - - if b.desired.Agent.EBPF.Advanced != nil { - if v, ok := b.desired.Agent.EBPF.Advanced.Env[ebpf.EnvDedupeJustMark]; ok { - dedupJustMark, err = strconv.ParseBool(v) - if err != nil { - return err - } - } - - if v, ok := b.desired.Agent.EBPF.Advanced.Env[ebpf.EnvDedupeMerge]; ok { - dedupMerge, err = strconv.ParseBool(v) - if err != nil { - return err - } - } - } } fconf.RecordTypes = helper.GetRecordTypes(&b.desired.Processor) fconf.PortNaming = b.desired.ConsolePlugin.PortNaming @@ -394,8 +368,8 @@ func (b *builder) setFrontendConfig(fconf *config.FrontendConfig) error { fconf.AlertNamespaces = []string{b.namespace} fconf.Sampling = helper.GetSampling(b.desired) fconf.Deduper = config.Deduper{ - Mark: dedupJustMark, - Merge: dedupMerge, + Mark: helper.UseDedupJustMark(b.desired), + Merge: helper.UseDedupMerge(b.desired), } if helper.IsMultiClusterEnabled(&b.desired.Processor) { fconf.Features = append(fconf.Features, "multiCluster") diff --git a/controllers/ebpf/agent_controller.go b/controllers/ebpf/agent_controller.go index 500cf22ab..310262981 100644 --- a/controllers/ebpf/agent_controller.go +++ b/controllers/ebpf/agent_controller.go @@ -7,8 +7,10 @@ import ( "strings" flowslatest "github.com/netobserv/network-observability-operator/apis/flowcollector/v1beta2" + metricslatest "github.com/netobserv/network-observability-operator/apis/flowmetrics/v1alpha1" "github.com/netobserv/network-observability-operator/controllers/constants" "github.com/netobserv/network-observability-operator/controllers/ebpf/internal/permissions" + "github.com/netobserv/network-observability-operator/controllers/flp" "github.com/netobserv/network-observability-operator/controllers/reconcilers" "github.com/netobserv/network-observability-operator/pkg/helper" "github.com/netobserv/network-observability-operator/pkg/volumes" @@ -22,6 +24,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/utils/ptr" + "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/log" ) @@ -50,7 +53,8 @@ const ( envKafkaSASLSecretPath = "KAFKA_SASL_CLIENT_SECRET_PATH" envLogLevel = "LOG_LEVEL" envDedupe = "DEDUPER" - dedupeDefault = "firstCome" + envDedupeJustMark = "DEDUPER_JUST_MARK" + envFLPConfig = "FLP_CONFIG" envGoMemLimit = "GOMEMLIMIT" envEnablePktDrop = "ENABLE_PKT_DROPS" envEnableDNSTracking = "ENABLE_DNS_TRACKING" @@ -60,7 +64,7 @@ const ( const ( exportKafka = "kafka" - exportGRPC = "grpc" + exportFLP = "direct-flp" kafkaCerts = "kafka-certs" averageMessageSize = 100 bpfTraceMountName = "bpf-kernel-debug" @@ -69,13 +73,6 @@ const ( bpfNetNSMountPath = "/var/run/netns" ) -const ( - EnvDedupeJustMark = "DEDUPER_JUST_MARK" - EnvDedupeMerge = "DEDUPER_MERGE" - DedupeJustMarkDefault = "true" - DedupeMergeDefault = "false" -) - type reconcileAction int const ( @@ -183,7 +180,16 @@ func (c *AgentController) desired(ctx context.Context, coll *flowslatest.FlowCol } version := helper.ExtractVersion(c.Image) annotations := make(map[string]string) - env, err := c.envConfig(ctx, coll, annotations) + + fm := metricslatest.FlowMetricList{} + if !helper.UseKafka(&coll.Spec) { + // Direct-FLP mode => list custom metrics + if err := c.List(ctx, &fm, &client.ListOptions{Namespace: coll.Spec.Namespace}); err != nil { + return nil, c.Status.Error("CantListFlowMetrics", err) + } + } + + env, err := c.envConfig(ctx, coll, annotations, &fm) if err != nil { return nil, err } @@ -272,7 +278,7 @@ func (c *AgentController) desired(ctx context.Context, coll *flowslatest.FlowCol }, nil } -func (c *AgentController) envConfig(ctx context.Context, coll *flowslatest.FlowCollector, annots map[string]string) ([]corev1.EnvVar, error) { +func (c *AgentController) envConfig(ctx context.Context, coll *flowslatest.FlowCollector, annots map[string]string, metrics *metricslatest.FlowMetricList) ([]corev1.EnvVar, error) { config := c.setEnvConfig(coll) if helper.UseKafka(&coll.Spec) { @@ -327,26 +333,51 @@ func (c *AgentController) envConfig(ctx context.Context, coll *flowslatest.FlowC ) } } else { - config = append(config, corev1.EnvVar{Name: envExport, Value: exportGRPC}) + flpConfig, err := c.buildFLPConfig(&coll.Spec, metrics) + if err != nil { + return nil, err + } debugConfig := helper.GetAdvancedProcessorConfig(coll.Spec.Processor.Advanced) - // When flowlogs-pipeline is deployed as a daemonset, each agent must send - // data to the pod that is deployed in the same host - config = append(config, corev1.EnvVar{ - Name: envFlowsTargetHost, - ValueFrom: &corev1.EnvVarSource{ - FieldRef: &corev1.ObjectFieldSelector{ - APIVersion: "v1", - FieldPath: "status.hostIP", + config = append(config, + corev1.EnvVar{ + Name: envExport, + Value: exportFLP, + }, + corev1.EnvVar{ + Name: envFLPConfig, + Value: flpConfig, + }, + corev1.EnvVar{ + Name: envFlowsTargetHost, + ValueFrom: &corev1.EnvVarSource{ + FieldRef: &corev1.ObjectFieldSelector{ + APIVersion: "v1", + FieldPath: "status.hostIP", + }, }, }, - }, corev1.EnvVar{ - Name: envFlowsTargetPort, - Value: strconv.Itoa(int(*debugConfig.Port)), - }) + corev1.EnvVar{ + Name: envFlowsTargetPort, + Value: strconv.Itoa(int(*debugConfig.Port)), + }, + ) } return config, nil } +func (c *AgentController) buildFLPConfig(desired *flowslatest.FlowCollectorSpec, metrics *metricslatest.FlowMetricList) (string, error) { + flpBuilder, err := flp.NewBuilder(c.NewInstance(c.Image, c.Status), desired, metrics, flp.ConfMonolith) + if err != nil { + return "", err + } + pipeline := flpBuilder.NewInProcessPipeline() + err = pipeline.AddProcessorStages() + if err != nil { + return "", err + } + return flpBuilder.GetJSONConfig() +} + func requiredAction(current, desired *v1.DaemonSet) reconcileAction { if desired == nil { return actionNone @@ -465,27 +496,23 @@ func (c *AgentController) setEnvConfig(coll *flowslatest.FlowCollector) []corev1 }) } - dedup := dedupeDefault - dedupJustMark := DedupeJustMarkDefault - dedupMerge := DedupeMergeDefault + // Init with defaults + envs := map[string]string{ + envDedupe: "firstCome", + envDedupeJustMark: "true", + } + debugConfig := helper.GetAdvancedAgentConfig(coll.Spec.Agent.EBPF.Advanced) + // Merge configured + for k, v := range debugConfig.Env { + envs[k] = v + } // we need to sort env map to keep idempotency, // as equal maps could be iterated in different order - debugConfig := helper.GetAdvancedAgentConfig(coll.Spec.Agent.EBPF.Advanced) - for _, pair := range helper.KeySorted(debugConfig.Env) { + for _, pair := range helper.KeySorted(envs) { k, v := pair[0], pair[1] - if k == envDedupe { - dedup = v - } else if k == EnvDedupeJustMark { - dedupJustMark = v - } else if k == EnvDedupeMerge { - dedupMerge = v - } else { - config = append(config, corev1.EnvVar{Name: k, Value: v}) - } + config = append(config, corev1.EnvVar{Name: k, Value: v}) } - config = append(config, corev1.EnvVar{Name: envDedupe, Value: dedup}) - config = append(config, corev1.EnvVar{Name: EnvDedupeJustMark, Value: dedupJustMark}) config = append(config, corev1.EnvVar{ Name: envAgentIP, ValueFrom: &corev1.EnvVarSource{ @@ -496,7 +523,6 @@ func (c *AgentController) setEnvConfig(coll *flowslatest.FlowCollector) []corev1 }, }, ) - config = append(config, corev1.EnvVar{Name: EnvDedupeMerge, Value: dedupMerge}) return config } diff --git a/controllers/ebpf/internal/permissions/permissions.go b/controllers/ebpf/internal/permissions/permissions.go index 6d81909e5..34d733a2f 100644 --- a/controllers/ebpf/internal/permissions/permissions.go +++ b/controllers/ebpf/internal/permissions/permissions.go @@ -6,11 +6,13 @@ import ( flowslatest "github.com/netobserv/network-observability-operator/apis/flowcollector/v1beta2" "github.com/netobserv/network-observability-operator/controllers/constants" + "github.com/netobserv/network-observability-operator/controllers/flp" "github.com/netobserv/network-observability-operator/controllers/reconcilers" "github.com/netobserv/network-observability-operator/pkg/helper" osv1 "github.com/openshift/api/security/v1" v1 "k8s.io/api/core/v1" + rbacv1 "k8s.io/api/rbac/v1" "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -45,6 +47,9 @@ func (c *Reconciler) Reconcile(ctx context.Context, desired *flowslatest.FlowCol if err := c.reconcileVendorPermissions(ctx, desired); err != nil { return fmt.Errorf("reconciling vendor permissions: %w", err) } + if err := c.reconcileRoles(ctx); err != nil { + return fmt.Errorf("reconciling roles: %w", err) + } return nil } @@ -222,3 +227,26 @@ func (c *Reconciler) cleanupPreviousNamespace(ctx context.Context) error { } return nil } + +func (c *Reconciler) reconcileRoles(ctx context.Context) error { + cr := flp.BuildClusterRoleTransformer() + if err := c.ReconcileClusterRole(ctx, cr); err != nil { + return err + } + crb := &rbacv1.ClusterRoleBinding{ + ObjectMeta: metav1.ObjectMeta{ + Name: cr.Name + "-agent", + }, + RoleRef: rbacv1.RoleRef{ + APIGroup: "rbac.authorization.k8s.io", + Kind: "ClusterRole", + Name: cr.Name, + }, + Subjects: []rbacv1.Subject{{ + Kind: "ServiceAccount", + Name: constants.EBPFServiceAccount, + Namespace: c.PrivilegedNamespace(), + }}, + } + return c.ReconcileClusterRoleBinding(ctx, crb) +} diff --git a/controllers/flp/flp_common_objects.go b/controllers/flp/flp_common_objects.go index 8cc080d6d..c533edcec 100644 --- a/controllers/flp/flp_common_objects.go +++ b/controllers/flp/flp_common_objects.go @@ -145,6 +145,10 @@ func (b *builder) NewKafkaPipeline() PipelineBuilder { })) } +func (b *builder) NewInProcessPipeline() PipelineBuilder { + return b.initPipeline(config.NewPresetIngesterPipeline()) +} + func (b *builder) initPipeline(ingest config.PipelineBuilderStage) PipelineBuilder { pipeline := newPipelineBuilder(b.desired, b.flowMetrics, b.info.Loki, b.info.ClusterID, &b.volumes, &ingest) b.pipeline = &pipeline diff --git a/controllers/flp/flp_monolith_reconciler.go b/controllers/flp/flp_monolith_reconciler.go index 1afe914de..962515985 100644 --- a/controllers/flp/flp_monolith_reconciler.go +++ b/controllers/flp/flp_monolith_reconciler.go @@ -71,8 +71,8 @@ func (r *monolithReconciler) reconcile(ctx context.Context, desired *flowslatest return err } - if helper.UseKafka(&desired.Spec) { - r.Status.SetUnused("Monolith only used without Kafka") + if helper.UseKafka(&desired.Spec) || helper.UseMergedAgentFLP(&desired.Spec) { + r.Status.SetUnused("Monolith only used with IPFIX and without Kafka") r.Managed.TryDeleteAll(ctx) return nil } diff --git a/pkg/helper/flowcollector.go b/pkg/helper/flowcollector.go index fc58199dd..ee5955caf 100644 --- a/pkg/helper/flowcollector.go +++ b/pkg/helper/flowcollector.go @@ -1,6 +1,7 @@ package helper import ( + "strconv" "strings" flowslatest "github.com/netobserv/network-observability-operator/apis/flowcollector/v1beta2" @@ -20,6 +21,28 @@ func UseEBPF(spec *flowslatest.FlowCollectorSpec) bool { return spec.Agent.Type == flowslatest.AgentEBPF } +func UseDedupJustMark(spec *flowslatest.FlowCollectorSpec) bool { + if spec.Agent.EBPF.Advanced != nil { + if v, ok := spec.Agent.EBPF.Advanced.Env["DEDUPER_JUST_MARK"]; ok { + b, _ := strconv.ParseBool(v) + return b + } + } + // default true + return true +} + +func UseDedupMerge(spec *flowslatest.FlowCollectorSpec) bool { + if spec.Agent.EBPF.Advanced != nil { + if v, ok := spec.Agent.EBPF.Advanced.Env["DEDUPER_MERGE"]; ok { + b, _ := strconv.ParseBool(v) + return b + } + } + // default false + return false +} + func UseIPFIX(spec *flowslatest.FlowCollectorSpec) bool { return spec.Agent.Type == flowslatest.AgentIPFIX } @@ -28,6 +51,10 @@ func UseKafka(spec *flowslatest.FlowCollectorSpec) bool { return spec.DeploymentModel == flowslatest.DeploymentModelKafka } +func UseMergedAgentFLP(spec *flowslatest.FlowCollectorSpec) bool { + return spec.DeploymentModel == flowslatest.DeploymentModelDirect && spec.Agent.Type == flowslatest.AgentEBPF +} + func HasKafkaExporter(spec *flowslatest.FlowCollectorSpec) bool { for _, ex := range spec.Exporters { if ex.Type == flowslatest.KafkaExporter { diff --git a/pkg/loki/labels.go b/pkg/loki/labels.go index 2d25fded5..ee370c962 100644 --- a/pkg/loki/labels.go +++ b/pkg/loki/labels.go @@ -1,11 +1,8 @@ package loki import ( - "strconv" - flowslatest "github.com/netobserv/network-observability-operator/apis/flowcollector/v1beta2" "github.com/netobserv/network-observability-operator/controllers/constants" - "github.com/netobserv/network-observability-operator/controllers/ebpf" "github.com/netobserv/network-observability-operator/pkg/helper" ) @@ -24,16 +21,8 @@ func GetLokiLabels(desired *flowslatest.FlowCollectorSpec) []string { indexFields = append(indexFields, constants.LokiZoneIndexFields...) } - if helper.UseEBPF(desired) { - dedupJustMark, _ := strconv.ParseBool(ebpf.DedupeJustMarkDefault) - if desired.Agent.EBPF.Advanced != nil { - if v, ok := desired.Agent.EBPF.Advanced.Env[ebpf.EnvDedupeJustMark]; ok { - dedupJustMark, _ = strconv.ParseBool(v) - } - } - if dedupJustMark { - indexFields = append(indexFields, constants.LokiDeduperMarkIndexFields...) - } + if helper.UseEBPF(desired) && helper.UseDedupJustMark(desired) { + indexFields = append(indexFields, constants.LokiDeduperMarkIndexFields...) } return indexFields