Skip to content

Commit

Permalink
NETOBSERV-627 merge Agent and FLP
Browse files Browse the repository at this point in the history
  • Loading branch information
jotak committed Feb 15, 2024
1 parent 93e7811 commit 3531e28
Show file tree
Hide file tree
Showing 7 changed files with 132 additions and 84 deletions.
30 changes: 2 additions & 28 deletions controllers/consoleplugin/consoleplugin_objects.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
flowslatest "github.com/netobserv/network-observability-operator/apis/flowcollector/v1beta2"
config "github.com/netobserv/network-observability-operator/controllers/consoleplugin/config"
"github.com/netobserv/network-observability-operator/controllers/constants"
"github.com/netobserv/network-observability-operator/controllers/ebpf"
"github.com/netobserv/network-observability-operator/pkg/helper"
"github.com/netobserv/network-observability-operator/pkg/loki"
"github.com/netobserv/network-observability-operator/pkg/volumes"
Expand Down Expand Up @@ -350,15 +349,6 @@ func (b *builder) setLokiConfig(lconf *config.LokiConfig) {
}

func (b *builder) setFrontendConfig(fconf *config.FrontendConfig) error {
var err error
dedupJustMark, err := strconv.ParseBool(ebpf.DedupeJustMarkDefault)
if err != nil {
return err
}
dedupMerge, err := strconv.ParseBool(ebpf.DedupeMergeDefault)
if err != nil {
return err
}
if helper.UseEBPF(b.desired) {
if helper.IsPktDropEnabled(&b.desired.Agent.EBPF) {
fconf.Features = append(fconf.Features, "pktDrop")
Expand All @@ -371,31 +361,15 @@ func (b *builder) setFrontendConfig(fconf *config.FrontendConfig) error {
if helper.IsFlowRTTEnabled(&b.desired.Agent.EBPF) {
fconf.Features = append(fconf.Features, "flowRTT")
}

if b.desired.Agent.EBPF.Advanced != nil {
if v, ok := b.desired.Agent.EBPF.Advanced.Env[ebpf.EnvDedupeJustMark]; ok {
dedupJustMark, err = strconv.ParseBool(v)
if err != nil {
return err
}
}

if v, ok := b.desired.Agent.EBPF.Advanced.Env[ebpf.EnvDedupeMerge]; ok {
dedupMerge, err = strconv.ParseBool(v)
if err != nil {
return err
}
}
}
}
fconf.RecordTypes = helper.GetRecordTypes(&b.desired.Processor)
fconf.PortNaming = b.desired.ConsolePlugin.PortNaming
fconf.QuickFilters = b.desired.ConsolePlugin.QuickFilters
fconf.AlertNamespaces = []string{b.namespace}
fconf.Sampling = helper.GetSampling(b.desired)
fconf.Deduper = config.Deduper{
Mark: dedupJustMark,
Merge: dedupMerge,
Mark: helper.UseDedupJustMark(b.desired),
Merge: helper.UseDedupMerge(b.desired),
}
if helper.IsMultiClusterEnabled(&b.desired.Processor) {
fconf.Features = append(fconf.Features, "multiCluster")
Expand Down
108 changes: 67 additions & 41 deletions controllers/ebpf/agent_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ import (
"strings"

flowslatest "github.com/netobserv/network-observability-operator/apis/flowcollector/v1beta2"
metricslatest "github.com/netobserv/network-observability-operator/apis/flowmetrics/v1alpha1"
"github.com/netobserv/network-observability-operator/controllers/constants"
"github.com/netobserv/network-observability-operator/controllers/ebpf/internal/permissions"
"github.com/netobserv/network-observability-operator/controllers/flp"
"github.com/netobserv/network-observability-operator/controllers/reconcilers"
"github.com/netobserv/network-observability-operator/pkg/helper"
"github.com/netobserv/network-observability-operator/pkg/volumes"
Expand All @@ -22,6 +24,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
)

Expand Down Expand Up @@ -50,7 +53,8 @@ const (
envKafkaSASLSecretPath = "KAFKA_SASL_CLIENT_SECRET_PATH"
envLogLevel = "LOG_LEVEL"
envDedupe = "DEDUPER"
dedupeDefault = "firstCome"
envDedupeJustMark = "DEDUPER_JUST_MARK"
envFLPConfig = "FLP_CONFIG"
envGoMemLimit = "GOMEMLIMIT"
envEnablePktDrop = "ENABLE_PKT_DROPS"
envEnableDNSTracking = "ENABLE_DNS_TRACKING"
Expand All @@ -60,7 +64,7 @@ const (

const (
exportKafka = "kafka"
exportGRPC = "grpc"
exportFLP = "direct-flp"
kafkaCerts = "kafka-certs"
averageMessageSize = 100
bpfTraceMountName = "bpf-kernel-debug"
Expand All @@ -69,13 +73,6 @@ const (
bpfNetNSMountPath = "/var/run/netns"
)

const (
EnvDedupeJustMark = "DEDUPER_JUST_MARK"
EnvDedupeMerge = "DEDUPER_MERGE"
DedupeJustMarkDefault = "true"
DedupeMergeDefault = "false"
)

type reconcileAction int

const (
Expand Down Expand Up @@ -183,7 +180,16 @@ func (c *AgentController) desired(ctx context.Context, coll *flowslatest.FlowCol
}
version := helper.ExtractVersion(c.Image)
annotations := make(map[string]string)
env, err := c.envConfig(ctx, coll, annotations)

fm := metricslatest.FlowMetricList{}
if !helper.UseKafka(&coll.Spec) {
// Direct-FLP mode => list custom metrics
if err := c.List(ctx, &fm, &client.ListOptions{Namespace: coll.Spec.Namespace}); err != nil {
return nil, c.Status.Error("CantListFlowMetrics", err)
}
}

env, err := c.envConfig(ctx, coll, annotations, &fm)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -272,7 +278,7 @@ func (c *AgentController) desired(ctx context.Context, coll *flowslatest.FlowCol
}, nil
}

func (c *AgentController) envConfig(ctx context.Context, coll *flowslatest.FlowCollector, annots map[string]string) ([]corev1.EnvVar, error) {
func (c *AgentController) envConfig(ctx context.Context, coll *flowslatest.FlowCollector, annots map[string]string, metrics *metricslatest.FlowMetricList) ([]corev1.EnvVar, error) {
config := c.setEnvConfig(coll)

if helper.UseKafka(&coll.Spec) {
Expand Down Expand Up @@ -327,26 +333,51 @@ func (c *AgentController) envConfig(ctx context.Context, coll *flowslatest.FlowC
)
}
} else {
config = append(config, corev1.EnvVar{Name: envExport, Value: exportGRPC})
flpConfig, err := c.buildFLPConfig(&coll.Spec, metrics)
if err != nil {
return nil, err
}
debugConfig := helper.GetAdvancedProcessorConfig(coll.Spec.Processor.Advanced)
// When flowlogs-pipeline is deployed as a daemonset, each agent must send
// data to the pod that is deployed in the same host
config = append(config, corev1.EnvVar{
Name: envFlowsTargetHost,
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
APIVersion: "v1",
FieldPath: "status.hostIP",
config = append(config,
corev1.EnvVar{
Name: envExport,
Value: exportFLP,
},
corev1.EnvVar{
Name: envFLPConfig,
Value: flpConfig,
},
corev1.EnvVar{
Name: envFlowsTargetHost,
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
APIVersion: "v1",
FieldPath: "status.hostIP",
},
},
},
}, corev1.EnvVar{
Name: envFlowsTargetPort,
Value: strconv.Itoa(int(*debugConfig.Port)),
})
corev1.EnvVar{
Name: envFlowsTargetPort,
Value: strconv.Itoa(int(*debugConfig.Port)),
},
)
}
return config, nil
}

func (c *AgentController) buildFLPConfig(desired *flowslatest.FlowCollectorSpec, metrics *metricslatest.FlowMetricList) (string, error) {
flpBuilder, err := flp.NewBuilder(c.NewInstance(c.Image, c.Status), desired, metrics, flp.ConfMonolith)
if err != nil {
return "", err
}
pipeline := flpBuilder.NewInProcessPipeline()
err = pipeline.AddProcessorStages()
if err != nil {
return "", err
}
return flpBuilder.GetJSONConfig()
}

func requiredAction(current, desired *v1.DaemonSet) reconcileAction {
if desired == nil {
return actionNone
Expand Down Expand Up @@ -465,27 +496,23 @@ func (c *AgentController) setEnvConfig(coll *flowslatest.FlowCollector) []corev1
})
}

dedup := dedupeDefault
dedupJustMark := DedupeJustMarkDefault
dedupMerge := DedupeMergeDefault
// Init with defaults
envs := map[string]string{
envDedupe: "firstCome",
envDedupeJustMark: "true",
}
debugConfig := helper.GetAdvancedAgentConfig(coll.Spec.Agent.EBPF.Advanced)
// Merge configured
for k, v := range debugConfig.Env {
envs[k] = v
}
// we need to sort env map to keep idempotency,
// as equal maps could be iterated in different order
debugConfig := helper.GetAdvancedAgentConfig(coll.Spec.Agent.EBPF.Advanced)
for _, pair := range helper.KeySorted(debugConfig.Env) {
for _, pair := range helper.KeySorted(envs) {
k, v := pair[0], pair[1]
if k == envDedupe {
dedup = v
} else if k == EnvDedupeJustMark {
dedupJustMark = v
} else if k == EnvDedupeMerge {
dedupMerge = v
} else {
config = append(config, corev1.EnvVar{Name: k, Value: v})
}
config = append(config, corev1.EnvVar{Name: k, Value: v})
}

config = append(config, corev1.EnvVar{Name: envDedupe, Value: dedup})
config = append(config, corev1.EnvVar{Name: EnvDedupeJustMark, Value: dedupJustMark})
config = append(config, corev1.EnvVar{
Name: envAgentIP,
ValueFrom: &corev1.EnvVarSource{
Expand All @@ -496,7 +523,6 @@ func (c *AgentController) setEnvConfig(coll *flowslatest.FlowCollector) []corev1
},
},
)
config = append(config, corev1.EnvVar{Name: EnvDedupeMerge, Value: dedupMerge})

return config
}
28 changes: 28 additions & 0 deletions controllers/ebpf/internal/permissions/permissions.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@ import (

flowslatest "github.com/netobserv/network-observability-operator/apis/flowcollector/v1beta2"
"github.com/netobserv/network-observability-operator/controllers/constants"
"github.com/netobserv/network-observability-operator/controllers/flp"
"github.com/netobserv/network-observability-operator/controllers/reconcilers"
"github.com/netobserv/network-observability-operator/pkg/helper"

osv1 "github.com/openshift/api/security/v1"
v1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -45,6 +47,9 @@ func (c *Reconciler) Reconcile(ctx context.Context, desired *flowslatest.FlowCol
if err := c.reconcileVendorPermissions(ctx, desired); err != nil {
return fmt.Errorf("reconciling vendor permissions: %w", err)
}
if err := c.reconcileRoles(ctx); err != nil {
return fmt.Errorf("reconciling roles: %w", err)
}
return nil
}

Expand Down Expand Up @@ -222,3 +227,26 @@ func (c *Reconciler) cleanupPreviousNamespace(ctx context.Context) error {
}
return nil
}

func (c *Reconciler) reconcileRoles(ctx context.Context) error {
cr := flp.BuildClusterRoleTransformer()
if err := c.ReconcileClusterRole(ctx, cr); err != nil {
return err
}
crb := &rbacv1.ClusterRoleBinding{
ObjectMeta: metav1.ObjectMeta{
Name: cr.Name + "-agent",
},
RoleRef: rbacv1.RoleRef{
APIGroup: "rbac.authorization.k8s.io",
Kind: "ClusterRole",
Name: cr.Name,
},
Subjects: []rbacv1.Subject{{
Kind: "ServiceAccount",
Name: constants.EBPFServiceAccount,
Namespace: c.PrivilegedNamespace(),
}},
}
return c.ReconcileClusterRoleBinding(ctx, crb)
}
4 changes: 4 additions & 0 deletions controllers/flp/flp_common_objects.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,10 @@ func (b *builder) NewKafkaPipeline() PipelineBuilder {
}))
}

func (b *builder) NewInProcessPipeline() PipelineBuilder {
return b.initPipeline(config.NewPresetIngesterPipeline())
}

func (b *builder) initPipeline(ingest config.PipelineBuilderStage) PipelineBuilder {
pipeline := newPipelineBuilder(b.desired, b.flowMetrics, b.info.Loki, b.info.ClusterID, &b.volumes, &ingest)
b.pipeline = &pipeline
Expand Down
4 changes: 2 additions & 2 deletions controllers/flp/flp_monolith_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ func (r *monolithReconciler) reconcile(ctx context.Context, desired *flowslatest
return err
}

if helper.UseKafka(&desired.Spec) {
r.Status.SetUnused("Monolith only used without Kafka")
if helper.UseKafka(&desired.Spec) || helper.UseMergedAgentFLP(&desired.Spec) {
r.Status.SetUnused("Monolith only used with IPFIX and without Kafka")
r.Managed.TryDeleteAll(ctx)
return nil
}
Expand Down
27 changes: 27 additions & 0 deletions pkg/helper/flowcollector.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package helper

import (
"strconv"
"strings"

flowslatest "github.com/netobserv/network-observability-operator/apis/flowcollector/v1beta2"
Expand All @@ -20,6 +21,28 @@ func UseEBPF(spec *flowslatest.FlowCollectorSpec) bool {
return spec.Agent.Type == flowslatest.AgentEBPF
}

func UseDedupJustMark(spec *flowslatest.FlowCollectorSpec) bool {
if spec.Agent.EBPF.Advanced != nil {
if v, ok := spec.Agent.EBPF.Advanced.Env["DEDUPER_JUST_MARK"]; ok {
b, _ := strconv.ParseBool(v)
return b
}
}
// default true
return true
}

func UseDedupMerge(spec *flowslatest.FlowCollectorSpec) bool {
if spec.Agent.EBPF.Advanced != nil {
if v, ok := spec.Agent.EBPF.Advanced.Env["DEDUPER_MERGE"]; ok {
b, _ := strconv.ParseBool(v)
return b
}
}
// default false
return false
}

func UseIPFIX(spec *flowslatest.FlowCollectorSpec) bool {
return spec.Agent.Type == flowslatest.AgentIPFIX
}
Expand All @@ -28,6 +51,10 @@ func UseKafka(spec *flowslatest.FlowCollectorSpec) bool {
return spec.DeploymentModel == flowslatest.DeploymentModelKafka
}

func UseMergedAgentFLP(spec *flowslatest.FlowCollectorSpec) bool {
return spec.DeploymentModel == flowslatest.DeploymentModelDirect && spec.Agent.Type == flowslatest.AgentEBPF
}

func HasKafkaExporter(spec *flowslatest.FlowCollectorSpec) bool {
for _, ex := range spec.Exporters {
if ex.Type == flowslatest.KafkaExporter {
Expand Down
Loading

0 comments on commit 3531e28

Please sign in to comment.