From 8c0a7a81e1baf9d3faeaffb37328ac7768106baf Mon Sep 17 00:00:00 2001 From: Mohamed Mahmoud Date: Mon, 20 Jan 2025 09:50:48 -0500 Subject: [PATCH] move sample decoding logic out protobuf logic Signed-off-by: Mohamed Mahmoud --- pkg/agent/agent.go | 42 +++++++-------- pkg/decode/decode_protobuf.go | 7 ++- pkg/decode/decode_protobuf_test.go | 4 +- pkg/exporter/converters_test.go | 32 +++++------ pkg/exporter/grpc_proto.go | 7 +-- pkg/exporter/grpc_proto_test.go | 8 +-- pkg/exporter/kafka_proto.go | 8 ++- pkg/exporter/kafka_proto_test.go | 4 +- pkg/flow/account.go | 6 ++- pkg/flow/account_test.go | 37 +++++-------- pkg/flow/limiter_test.go | 4 +- pkg/flow/tracer_map.go | 7 ++- pkg/model/record.go | 85 ++++++++++++++++++++++++++---- pkg/pbflow/proto.go | 81 +++++++--------------------- 14 files changed, 168 insertions(+), 164 deletions(-) diff --git a/pkg/agent/agent.go b/pkg/agent/agent.go index 8b318bdc0..fd9397904 100644 --- a/pkg/agent/agent.go +++ b/pkg/agent/agent.go @@ -185,7 +185,7 @@ func FlowsAgent(cfg *Config) (*Flows, error) { } // configure selected exporter - exportFunc, err := buildFlowExporter(cfg, m, s) + exportFunc, err := buildFlowExporter(cfg, m) if err != nil { return nil, err } @@ -295,23 +295,22 @@ func flowsAgent(cfg *Config, m *metrics.Metrics, samplingGauge := m.CreateSamplingRate() samplingGauge.Set(float64(cfg.Sampling)) - mapTracer := flow.NewMapTracer(fetcher, cfg.CacheActiveTimeout, cfg.StaleEntriesEvictTimeout, m) + mapTracer := flow.NewMapTracer(fetcher, cfg.CacheActiveTimeout, cfg.StaleEntriesEvictTimeout, m, s) rbTracer := flow.NewRingBufTracer(fetcher, mapTracer, cfg.CacheActiveTimeout, m) - accounter := flow.NewAccounter(cfg.CacheMaxFlows, cfg.CacheActiveTimeout, time.Now, monotime.Now, m) + accounter := flow.NewAccounter(cfg.CacheMaxFlows, cfg.CacheActiveTimeout, time.Now, monotime.Now, m, s) limiter := flow.NewCapacityLimiter(m) return &Flows{ - ebpf: fetcher, - exporter: exporter, - interfaces: registerer, - filter: filter, - cfg: cfg, - mapTracer: mapTracer, - rbTracer: rbTracer, - accounter: accounter, - limiter: limiter, - promoServer: promoServer, - sampleDecoder: s, + ebpf: fetcher, + exporter: exporter, + interfaces: registerer, + filter: filter, + cfg: cfg, + mapTracer: mapTracer, + rbTracer: rbTracer, + accounter: accounter, + limiter: limiter, + promoServer: promoServer, }, nil } @@ -329,12 +328,12 @@ func flowDirections(cfg *Config) (ingress, egress bool) { } } -func buildFlowExporter(cfg *Config, m *metrics.Metrics, s *ovnobserv.SampleDecoder) (node.TerminalFunc[[]*model.Record], error) { +func buildFlowExporter(cfg *Config, m *metrics.Metrics) (node.TerminalFunc[[]*model.Record], error) { switch cfg.Export { case "grpc": - return buildGRPCExporter(cfg, m, s) + return buildGRPCExporter(cfg, m) case "kafka": - return buildKafkaExporter(cfg, m, s) + return buildKafkaExporter(cfg, m) case "ipfix+udp": return buildIPFIXExporter(cfg, "udp") case "ipfix+tcp": @@ -346,12 +345,12 @@ func buildFlowExporter(cfg *Config, m *metrics.Metrics, s *ovnobserv.SampleDecod } } -func buildGRPCExporter(cfg *Config, m *metrics.Metrics, s *ovnobserv.SampleDecoder) (node.TerminalFunc[[]*model.Record], error) { +func buildGRPCExporter(cfg *Config, m *metrics.Metrics) (node.TerminalFunc[[]*model.Record], error) { if cfg.TargetHost == "" || cfg.TargetPort == 0 { return nil, fmt.Errorf("missing target host or port: %s:%d", cfg.TargetHost, cfg.TargetPort) } - grpcExporter, err := exporter.StartGRPCProto(cfg.TargetHost, cfg.TargetPort, cfg.GRPCMessageMaxFlows, m, s) + grpcExporter, err := exporter.StartGRPCProto(cfg.TargetHost, cfg.TargetPort, cfg.GRPCMessageMaxFlows, m) if err != nil { return nil, err } @@ -366,7 +365,7 @@ func buildFlowDirectFLPExporter(cfg *Config) (node.TerminalFunc[[]*model.Record] return flpExporter.ExportFlows, nil } -func buildKafkaExporter(cfg *Config, m *metrics.Metrics, s *ovnobserv.SampleDecoder) (node.TerminalFunc[[]*model.Record], error) { +func buildKafkaExporter(cfg *Config, m *metrics.Metrics) (node.TerminalFunc[[]*model.Record], error) { if len(cfg.KafkaBrokers) == 0 { return nil, errors.New("at least one Kafka broker is needed") } @@ -412,8 +411,7 @@ func buildKafkaExporter(cfg *Config, m *metrics.Metrics, s *ovnobserv.SampleDeco Transport: &transport, Balancer: &kafkago.Hash{}, }, - Metrics: m, - SampleDecoder: s, + Metrics: m, }).ExportFlows, nil } diff --git a/pkg/decode/decode_protobuf.go b/pkg/decode/decode_protobuf.go index cb324419a..efb723f1b 100644 --- a/pkg/decode/decode_protobuf.go +++ b/pkg/decode/decode_protobuf.go @@ -69,15 +69,18 @@ func RecordToMap(fr *model.Record) config.GenericMap { var directions []int var interfaces []string + var udns []string for _, intf := range fr.Interfaces { directions = append(directions, intf.Direction) interfaces = append(interfaces, intf.Interface) + udns = append(udns, intf.Udn) } out["IfDirections"] = directions out["Interfaces"] = interfaces - if len(fr.UdnList) != 0 { - out["Udns"] = fr.UdnList + if len(udns) != 0 { + out["Udns"] = udns } + if fr.Metrics.Bytes != 0 { out["Bytes"] = fr.Metrics.Bytes } diff --git a/pkg/decode/decode_protobuf_test.go b/pkg/decode/decode_protobuf_test.go index d50b44586..391b42ec7 100644 --- a/pkg/decode/decode_protobuf_test.go +++ b/pkg/decode/decode_protobuf_test.go @@ -22,12 +22,10 @@ func TestPBFlowToMap(t *testing.T) { { Interface: "5e6e92caa1d51cf", Direction: pbflow.Direction_INGRESS, - Udn: "", }, { Interface: "eth0", Direction: pbflow.Direction_EGRESS, - Udn: "", }, }, EthProtocol: 2048, @@ -132,7 +130,7 @@ func TestPBFlowToMap(t *testing.T) { "DnsFlags": uint16(0x80), "DnsFlagsResponseCode": "NoError", "TimeFlowRttNs": someDuration.Nanoseconds(), - "NetworkEvents": []config.GenericMap{ + "NetworkEvents": []map[string]string{ { "Name": "test1", "Type": "NetworkPolicy", diff --git a/pkg/exporter/converters_test.go b/pkg/exporter/converters_test.go index 0cbbe6610..cc72779d8 100644 --- a/pkg/exporter/converters_test.go +++ b/pkg/exporter/converters_test.go @@ -55,8 +55,7 @@ func TestConversions(t *testing.T) { }, }, }, - Interfaces: []model.IntfDir{model.NewIntfDir("eth0", model.DirectionEgress)}, - UdnList: []string{""}, + Interfaces: []model.IntfDirUdn{model.NewIntfDirUdn("eth0", model.DirectionEgress, nil)}, TimeFlowStart: someTime, TimeFlowEnd: someTime, AgentIP: net.IPv4(0x0a, 0x0b, 0x0c, 0x0d), @@ -104,8 +103,7 @@ func TestConversions(t *testing.T) { Sampling: 2, }, }, - Interfaces: []model.IntfDir{model.NewIntfDir("eth0", model.DirectionEgress)}, - UdnList: []string{""}, + Interfaces: []model.IntfDirUdn{model.NewIntfDirUdn("eth0", model.DirectionEgress, nil)}, TimeFlowStart: someTime, TimeFlowEnd: someTime, AgentIP: net.IPv4(0x0a, 0x0b, 0x0c, 0x0d), @@ -151,8 +149,7 @@ func TestConversions(t *testing.T) { Dscp: 64, }, }, - Interfaces: []model.IntfDir{model.NewIntfDir("eth0", model.DirectionEgress)}, - UdnList: []string{""}, + Interfaces: []model.IntfDirUdn{model.NewIntfDirUdn("eth0", model.DirectionEgress, nil)}, TimeFlowStart: someTime, TimeFlowEnd: someTime, AgentIP: net.IPv4(0x0a, 0x0b, 0x0c, 0x0d), @@ -197,8 +194,7 @@ func TestConversions(t *testing.T) { Dscp: 64, }, }, - Interfaces: []model.IntfDir{model.NewIntfDir("eth0", model.DirectionEgress)}, - UdnList: []string{""}, + Interfaces: []model.IntfDirUdn{model.NewIntfDirUdn("eth0", model.DirectionEgress, nil)}, TimeFlowStart: someTime, TimeFlowEnd: someTime, AgentIP: net.IPv4(0x0a, 0x0b, 0x0c, 0x0d), @@ -244,8 +240,7 @@ func TestConversions(t *testing.T) { Packets: 128, }, }, - Interfaces: []model.IntfDir{model.NewIntfDir("eth0", model.DirectionEgress)}, - UdnList: []string{""}, + Interfaces: []model.IntfDirUdn{model.NewIntfDirUdn("eth0", model.DirectionEgress, nil)}, TimeFlowStart: someTime, TimeFlowEnd: someTime, AgentIP: net.IPv4(0x0a, 0x0b, 0x0c, 0x0d), @@ -292,8 +287,7 @@ func TestConversions(t *testing.T) { }, }, }, - Interfaces: []model.IntfDir{model.NewIntfDir("eth0", model.DirectionEgress)}, - UdnList: []string{""}, + Interfaces: []model.IntfDirUdn{model.NewIntfDirUdn("eth0", model.DirectionEgress, nil)}, TimeFlowStart: someTime, TimeFlowEnd: someTime, AgentIP: net.IPv4(0x0a, 0x0b, 0x0c, 0x0d), @@ -353,8 +347,7 @@ func TestConversions(t *testing.T) { }, }, }, - Interfaces: []model.IntfDir{model.NewIntfDir("eth0", model.DirectionEgress)}, - UdnList: []string{""}, + Interfaces: []model.IntfDirUdn{model.NewIntfDirUdn("eth0", model.DirectionEgress, nil)}, TimeFlowStart: someTime, TimeFlowEnd: someTime, AgentIP: net.IPv4(0x0a, 0x0b, 0x0c, 0x0d), @@ -418,11 +411,10 @@ func TestConversions(t *testing.T) { }, }, }, - Interfaces: []model.IntfDir{ - model.NewIntfDir("5e6e92caa1d51cf", model.DirectionIngress), - model.NewIntfDir("eth0", model.DirectionEgress), + Interfaces: []model.IntfDirUdn{ + model.NewIntfDirUdn("5e6e92caa1d51cf", model.DirectionIngress, nil), + model.NewIntfDirUdn("eth0", model.DirectionEgress, nil), }, - UdnList: []string{"", ""}, TimeFlowStart: someTime, TimeFlowEnd: someTime, AgentIP: net.IPv4(0x0a, 0x0b, 0x0c, 0x0d), @@ -444,8 +436,8 @@ func TestConversions(t *testing.T) { "TimeFlowStartMs": someTime.UnixMilli(), "TimeFlowEndMs": someTime.UnixMilli(), "Interfaces": []string{"5e6e92caa1d51cf", "eth0"}, - "AgentIP": "10.11.12.13", "Udns": []string{"", ""}, + "AgentIP": "10.11.12.13", }, }, } @@ -457,7 +449,7 @@ func TestConversions(t *testing.T) { delete(outDirect, "TimeReceived") // Generate the same using protobuf - tmpPB := pbflow.FlowToPB(tt.flow, nil) + tmpPB := pbflow.FlowToPB(tt.flow) rawPB, err := proto.Marshal(tmpPB) require.NoError(t, err, tt.name) outPB, err := decoder.Decode(rawPB) diff --git a/pkg/exporter/grpc_proto.go b/pkg/exporter/grpc_proto.go index 761f0053c..9680a95ba 100644 --- a/pkg/exporter/grpc_proto.go +++ b/pkg/exporter/grpc_proto.go @@ -9,7 +9,6 @@ import ( "github.com/netobserv/netobserv-ebpf-agent/pkg/pbflow" "github.com/netobserv/netobserv-ebpf-agent/pkg/utils" - ovnobserv "github.com/ovn-org/ovn-kubernetes/go-controller/observability-lib/sampledecoder" "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" ) @@ -31,10 +30,9 @@ type GRPCProto struct { maxFlowsPerMessage int metrics *metrics.Metrics batchCounter prometheus.Counter - sampler *ovnobserv.SampleDecoder } -func StartGRPCProto(hostIP string, hostPort int, maxFlowsPerMessage int, m *metrics.Metrics, s *ovnobserv.SampleDecoder) (*GRPCProto, error) { +func StartGRPCProto(hostIP string, hostPort int, maxFlowsPerMessage int, m *metrics.Metrics) (*GRPCProto, error) { clientConn, err := grpc.ConnectClient(hostIP, hostPort) if err != nil { return nil, err @@ -46,7 +44,6 @@ func StartGRPCProto(hostIP string, hostPort int, maxFlowsPerMessage int, m *metr maxFlowsPerMessage: maxFlowsPerMessage, metrics: m, batchCounter: m.CreateBatchCounter(componentGRPC), - sampler: s, }, nil } @@ -57,7 +54,7 @@ func (g *GRPCProto) ExportFlows(input <-chan []*model.Record) { log := glog.WithField("collector", socket) for inputRecords := range input { g.metrics.EvictionCounter.WithSource(componentGRPC).Inc() - for _, pbRecords := range pbflow.FlowsToPB(inputRecords, g.maxFlowsPerMessage, g.sampler) { + for _, pbRecords := range pbflow.FlowsToPB(inputRecords, g.maxFlowsPerMessage) { log.Debugf("sending %d records", len(pbRecords.Entries)) if _, err := g.clientConn.Client().Send(context.TODO(), pbRecords); err != nil { g.metrics.Errors.WithErrorName(componentGRPC, "CannotWriteMessage").Inc() diff --git a/pkg/exporter/grpc_proto_test.go b/pkg/exporter/grpc_proto_test.go index 39f0fe42e..7f4448dbf 100644 --- a/pkg/exporter/grpc_proto_test.go +++ b/pkg/exporter/grpc_proto_test.go @@ -29,7 +29,7 @@ func TestIPv4GRPCProto_ExportFlows_AgentIP(t *testing.T) { defer coll.Close() // Start GRPCProto exporter stage - exporter, err := StartGRPCProto("127.0.0.1", port, 1000, metrics.NewMetrics(&metrics.Settings{}), nil) + exporter, err := StartGRPCProto("127.0.0.1", port, 1000, metrics.NewMetrics(&metrics.Settings{})) require.NoError(t, err) // Send some flows to the input of the exporter stage @@ -71,7 +71,7 @@ func TestIPv6GRPCProto_ExportFlows_AgentIP(t *testing.T) { defer coll.Close() // Start GRPCProto exporter stage - exporter, err := StartGRPCProto("::1", port, 1000, metrics.NewMetrics(&metrics.Settings{}), nil) + exporter, err := StartGRPCProto("::1", port, 1000, metrics.NewMetrics(&metrics.Settings{})) require.NoError(t, err) // Send some flows to the input of the exporter stage @@ -114,7 +114,7 @@ func TestGRPCProto_SplitLargeMessages(t *testing.T) { const msgMaxLen = 10000 // Start GRPCProto exporter stage - exporter, err := StartGRPCProto("127.0.0.1", port, msgMaxLen, metrics.NewMetrics(&metrics.Settings{}), nil) + exporter, err := StartGRPCProto("127.0.0.1", port, msgMaxLen, metrics.NewMetrics(&metrics.Settings{})) require.NoError(t, err) // Send a message much longer than the limit length @@ -123,7 +123,7 @@ func TestGRPCProto_SplitLargeMessages(t *testing.T) { for i := 0; i < 25000; i++ { input = append(input, &model.Record{Metrics: model.BpfFlowContent{BpfFlowMetrics: &ebpf.BpfFlowMetrics{ EthProtocol: model.IPv6Type, - }}, AgentIP: net.ParseIP("1111::1111"), Interfaces: []model.IntfDir{model.NewIntfDir("12345678", 0)}}) + }}, AgentIP: net.ParseIP("1111::1111"), Interfaces: []model.IntfDirUdn{model.NewIntfDirUdn("12345678", 0, nil)}}) } flows <- input go exporter.ExportFlows(flows) diff --git a/pkg/exporter/kafka_proto.go b/pkg/exporter/kafka_proto.go index 50b5ee3f1..169562c57 100644 --- a/pkg/exporter/kafka_proto.go +++ b/pkg/exporter/kafka_proto.go @@ -7,7 +7,6 @@ import ( "github.com/netobserv/netobserv-ebpf-agent/pkg/model" "github.com/netobserv/netobserv-ebpf-agent/pkg/pbflow" - ovnobserv "github.com/ovn-org/ovn-kubernetes/go-controller/observability-lib/sampledecoder" kafkago "github.com/segmentio/kafka-go" "github.com/sirupsen/logrus" "google.golang.org/protobuf/proto" @@ -24,9 +23,8 @@ type kafkaWriter interface { // KafkaProto exports flows over Kafka, encoded as a protobuf that is understandable by the // Flowlogs-Pipeline collector type KafkaProto struct { - Writer kafkaWriter - Metrics *metrics.Metrics - SampleDecoder *ovnobserv.SampleDecoder + Writer kafkaWriter + Metrics *metrics.Metrics } func (kp *KafkaProto) ExportFlows(input <-chan []*model.Record) { @@ -52,7 +50,7 @@ func (kp *KafkaProto) batchAndSubmit(records []*model.Record) { klog.Debugf("sending %d records", len(records)) msgs := make([]kafkago.Message, 0, len(records)) for _, record := range records { - pbBytes, err := proto.Marshal(pbflow.FlowToPB(record, kp.SampleDecoder)) + pbBytes, err := proto.Marshal(pbflow.FlowToPB(record)) if err != nil { klog.WithError(err).Debug("can't encode protobuf message. Ignoring") kp.Metrics.Errors.WithErrorName(componentKafka, "CannotEncodeMessage").Inc() diff --git a/pkg/exporter/kafka_proto_test.go b/pkg/exporter/kafka_proto_test.go index 9a0f38997..3e3f2f790 100644 --- a/pkg/exporter/kafka_proto_test.go +++ b/pkg/exporter/kafka_proto_test.go @@ -51,7 +51,7 @@ func TestProtoConversion(t *testing.T) { Flags: uint16(1), }, }, - Interfaces: []model.IntfDir{model.NewIntfDir("veth0", 0), model.NewIntfDir("abcde", 1)}, + Interfaces: []model.IntfDirUdn{model.NewIntfDirUdn("veth0", 0, nil), model.NewIntfDirUdn("abcde", 1, nil)}, } input <- []*model.Record{&record} @@ -108,7 +108,7 @@ func TestIdenticalKeys(t *testing.T) { Flags: uint16(1), }, }, - Interfaces: []model.IntfDir{model.NewIntfDir("veth0", 0), model.NewIntfDir("abcde", 1)}, + Interfaces: []model.IntfDirUdn{model.NewIntfDirUdn("veth0", 0, nil), model.NewIntfDirUdn("abcde", 1, nil)}, } key1 := getFlowKey(&record) diff --git a/pkg/flow/account.go b/pkg/flow/account.go index 9431b3b1b..6d87806bb 100644 --- a/pkg/flow/account.go +++ b/pkg/flow/account.go @@ -7,6 +7,7 @@ import ( "github.com/netobserv/netobserv-ebpf-agent/pkg/metrics" "github.com/netobserv/netobserv-ebpf-agent/pkg/model" + ovnobserv "github.com/ovn-org/ovn-kubernetes/go-controller/observability-lib/sampledecoder" "github.com/sirupsen/logrus" ) @@ -21,6 +22,7 @@ type Accounter struct { clock func() time.Time monoClock func() time.Duration metrics *metrics.Metrics + s *ovnobserv.SampleDecoder } var alog = logrus.WithField("component", "flow/Accounter") @@ -32,6 +34,7 @@ func NewAccounter( clock func() time.Time, monoClock func() time.Duration, m *metrics.Metrics, + s *ovnobserv.SampleDecoder, ) *Accounter { acc := Accounter{ maxEntries: maxEntries, @@ -40,6 +43,7 @@ func NewAccounter( clock: clock, monoClock: monoClock, metrics: m, + s: s, } return &acc } @@ -97,7 +101,7 @@ func (c *Accounter) evict(entries map[ebpf.BpfFlowId]*ebpf.BpfFlowMetrics, evict records := make([]*model.Record, 0, len(entries)) for key, metrics := range entries { flowContent := model.NewBpfFlowContent(*metrics) - records = append(records, model.NewRecord(key, &flowContent, now, monotonicNow)) + records = append(records, model.NewRecord(key, &flowContent, now, monotonicNow, c.s)) } c.metrics.EvictionCounter.WithSourceAndReason("accounter", reason).Inc() c.metrics.EvictedFlowsCounter.WithSourceAndReason("accounter", reason).Add(float64(len(records))) diff --git a/pkg/flow/account_test.go b/pkg/flow/account_test.go index 6e2641069..13f1dfe96 100644 --- a/pkg/flow/account_test.go +++ b/pkg/flow/account_test.go @@ -4,7 +4,6 @@ import ( "testing" "time" - "github.com/netobserv/flowlogs-pipeline/pkg/config" "github.com/netobserv/netobserv-ebpf-agent/pkg/ebpf" "github.com/netobserv/netobserv-ebpf-agent/pkg/metrics" "github.com/netobserv/netobserv-ebpf-agent/pkg/model" @@ -52,7 +51,7 @@ func TestEvict_MaxEntries(t *testing.T) { return now }, func() time.Duration { return 1000 - }, metrics.NewMetrics(&metrics.Settings{})) + }, metrics.NewMetrics(&metrics.Settings{}), nil) // WHEN it starts accounting new records inputs := make(chan *model.RawRecord, 20) @@ -110,11 +109,9 @@ func TestEvict_MaxEntries(t *testing.T) { Bytes: 444, Packets: 2, StartMonoTimeTs: 123, EndMonoTimeTs: 789, Flags: 1, }, }, - TimeFlowStart: now.Add(-(1000 - 123) * time.Nanosecond), - TimeFlowEnd: now.Add(-(1000 - 789) * time.Nanosecond), - NetworkMonitorEventsMD: make([]config.GenericMap, 0), - Interfaces: []model.IntfDir{model.NewIntfDir("[namer unset] 0", 0)}, - UdnList: make([]string, 0), + TimeFlowStart: now.Add(-(1000 - 123) * time.Nanosecond), + TimeFlowEnd: now.Add(-(1000 - 789) * time.Nanosecond), + Interfaces: []model.IntfDirUdn{model.NewIntfDirUdn("[namer unset] 0", 0, nil)}, }, k2: { ID: k2, @@ -123,11 +120,9 @@ func TestEvict_MaxEntries(t *testing.T) { Bytes: 456, Packets: 1, StartMonoTimeTs: 456, EndMonoTimeTs: 456, Flags: 1, }, }, - TimeFlowStart: now.Add(-(1000 - 456) * time.Nanosecond), - TimeFlowEnd: now.Add(-(1000 - 456) * time.Nanosecond), - NetworkMonitorEventsMD: make([]config.GenericMap, 0), - Interfaces: []model.IntfDir{model.NewIntfDir("[namer unset] 0", 0)}, - UdnList: make([]string, 0), + TimeFlowStart: now.Add(-(1000 - 456) * time.Nanosecond), + TimeFlowEnd: now.Add(-(1000 - 456) * time.Nanosecond), + Interfaces: []model.IntfDirUdn{model.NewIntfDirUdn("[namer unset] 0", 0, nil)}, }, }, received) } @@ -139,7 +134,7 @@ func TestEvict_Period(t *testing.T) { return now }, func() time.Duration { return 1000 - }, metrics.NewMetrics(&metrics.Settings{})) + }, metrics.NewMetrics(&metrics.Settings{}), nil) // WHEN it starts accounting new records inputs := make(chan *model.RawRecord, 20) @@ -194,11 +189,9 @@ func TestEvict_Period(t *testing.T) { Flags: 1, }, }, - TimeFlowStart: now.Add(-1000 + 123), - TimeFlowEnd: now.Add(-1000 + 789), - NetworkMonitorEventsMD: make([]config.GenericMap, 0), - Interfaces: []model.IntfDir{model.NewIntfDir("[namer unset] 0", 0)}, - UdnList: make([]string, 0), + TimeFlowStart: now.Add(-1000 + 123), + TimeFlowEnd: now.Add(-1000 + 789), + Interfaces: []model.IntfDirUdn{model.NewIntfDirUdn("[namer unset] 0", 0, nil)}, }, *records[0]) records = receiveTimeout(t, evictor) require.Len(t, records, 1) @@ -213,11 +206,9 @@ func TestEvict_Period(t *testing.T) { Flags: 1, }, }, - TimeFlowStart: now.Add(-1000 + 1123), - TimeFlowEnd: now.Add(-1000 + 1456), - NetworkMonitorEventsMD: make([]config.GenericMap, 0), - Interfaces: []model.IntfDir{model.NewIntfDir("[namer unset] 0", 0)}, - UdnList: make([]string, 0), + TimeFlowStart: now.Add(-1000 + 1123), + TimeFlowEnd: now.Add(-1000 + 1456), + Interfaces: []model.IntfDirUdn{model.NewIntfDirUdn("[namer unset] 0", 0, nil)}, }, *records[0]) // no more flows are evicted diff --git a/pkg/flow/limiter_test.go b/pkg/flow/limiter_test.go index 1abee0a1d..2f4a419d7 100644 --- a/pkg/flow/limiter_test.go +++ b/pkg/flow/limiter_test.go @@ -19,7 +19,7 @@ func TestCapacityLimiter_NoDrop(t *testing.T) { // WHEN it buffers less elements than it's maximum capacity for i := 0; i < 33; i++ { - pipeIn <- []*model.Record{{Interfaces: []model.IntfDir{model.NewIntfDir(strconv.Itoa(i), 0)}}} + pipeIn <- []*model.Record{{Interfaces: []model.IntfDirUdn{model.NewIntfDirUdn(strconv.Itoa(i), 0, nil)}}} } // THEN it is able to retrieve all the buffered elements @@ -45,7 +45,7 @@ func TestCapacityLimiter_Drop(t *testing.T) { // WHEN it receives more elements than its maximum capacity // (it's not blocking) for i := 0; i < limiterLen*2; i++ { - pipeIn <- []*model.Record{{Interfaces: []model.IntfDir{model.NewIntfDir(strconv.Itoa(i), 0)}}} + pipeIn <- []*model.Record{{Interfaces: []model.IntfDirUdn{model.NewIntfDirUdn(strconv.Itoa(i), 0, nil)}}} } // THEN it is only able to retrieve all the nth first buffered elements diff --git a/pkg/flow/tracer_map.go b/pkg/flow/tracer_map.go index fe9d711ec..7fa12bd07 100644 --- a/pkg/flow/tracer_map.go +++ b/pkg/flow/tracer_map.go @@ -12,6 +12,7 @@ import ( "github.com/gavv/monotime" "github.com/netobserv/gopipes/pkg/node" + ovnobserv "github.com/ovn-org/ovn-kubernetes/go-controller/observability-lib/sampledecoder" "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" ) @@ -28,6 +29,7 @@ type MapTracer struct { evictionCond *sync.Cond metrics *metrics.Metrics timeSpentinLookupAndDelete prometheus.Histogram + s *ovnobserv.SampleDecoder } type mapFetcher interface { @@ -35,7 +37,8 @@ type mapFetcher interface { DeleteMapsStaleEntries(timeOut time.Duration) } -func NewMapTracer(fetcher mapFetcher, evictionTimeout, staleEntriesEvictTimeout time.Duration, m *metrics.Metrics) *MapTracer { +func NewMapTracer(fetcher mapFetcher, evictionTimeout, staleEntriesEvictTimeout time.Duration, m *metrics.Metrics, + s *ovnobserv.SampleDecoder) *MapTracer { return &MapTracer{ mapFetcher: fetcher, evictionTimeout: evictionTimeout, @@ -43,6 +46,7 @@ func NewMapTracer(fetcher mapFetcher, evictionTimeout, staleEntriesEvictTimeout staleEntriesEvictTimeout: staleEntriesEvictTimeout, metrics: m, timeSpentinLookupAndDelete: m.CreateTimeSpendInLookupAndDelete(), + s: s, } } @@ -107,6 +111,7 @@ func (m *MapTracer) evictFlows(ctx context.Context, forceGC bool, forwardFlows c &flowMetrics, currentTime, uint64(monotonicTimeNow), + m.s, )) } m.mapFetcher.DeleteMapsStaleEntries(m.staleEntriesEvictTimeout) diff --git a/pkg/model/record.go b/pkg/model/record.go index d4e57c428..9bf54d1a5 100644 --- a/pkg/model/record.go +++ b/pkg/model/record.go @@ -8,8 +8,10 @@ import ( "reflect" "time" - "github.com/netobserv/flowlogs-pipeline/pkg/config" "github.com/netobserv/netobserv-ebpf-agent/pkg/ebpf" + + ovnmodel "github.com/ovn-org/ovn-kubernetes/go-controller/observability-lib/model" + ovnobserv "github.com/ovn-org/ovn-kubernetes/go-controller/observability-lib/sampledecoder" ) // Values according to field 61 in https://www.iana.org/assignments/ipfix/ipfix.xhtml @@ -58,21 +60,24 @@ type Record struct { TimeFlowStart time.Time TimeFlowEnd time.Time DNSLatency time.Duration - Interfaces []IntfDir + Interfaces []IntfDirUdn // AgentIP provides information about the source of the flow (the Agent that traced it) AgentIP net.IP // Calculated RTT which is set when record is created by calling NewRecord TimeFlowRtt time.Duration - NetworkMonitorEventsMD []config.GenericMap - UdnList []string + NetworkMonitorEventsMD []map[string]string } +var udnsCache map[string]string + func NewRecord( key ebpf.BpfFlowId, metrics *BpfFlowContent, currentTime time.Time, monotonicCurrentTime uint64, + s *ovnobserv.SampleDecoder, ) *Record { + udnsCache = make(map[string]string) startDelta := time.Duration(monotonicCurrentTime - metrics.StartMonoTimeTs) endDelta := time.Duration(monotonicCurrentTime - metrics.EndMonoTimeTs) @@ -82,13 +87,17 @@ func NewRecord( TimeFlowStart: currentTime.Add(-startDelta), TimeFlowEnd: currentTime.Add(-endDelta), AgentIP: agentIP, - Interfaces: []IntfDir{NewIntfDir(interfaceNamer(int(metrics.IfIndexFirstSeen)), int(metrics.DirectionFirstSeen))}, + Interfaces: []IntfDirUdn{NewIntfDirUdn( + interfaceNamer(int(metrics.IfIndexFirstSeen)), + int(metrics.DirectionFirstSeen), + s)}, } if metrics.AdditionalMetrics != nil { for i := uint8(0); i < record.Metrics.AdditionalMetrics.NbObservedIntf; i++ { - record.Interfaces = append(record.Interfaces, NewIntfDir( + record.Interfaces = append(record.Interfaces, NewIntfDirUdn( interfaceNamer(int(metrics.AdditionalMetrics.ObservedIntf[i].IfIndex)), int(metrics.AdditionalMetrics.ObservedIntf[i].Direction), + s, )) } if metrics.AdditionalMetrics.FlowRtt != 0 { @@ -98,18 +107,72 @@ func NewRecord( record.DNSLatency = time.Duration(metrics.AdditionalMetrics.DnsRecord.Latency) } } - record.NetworkMonitorEventsMD = make([]config.GenericMap, 0) - record.UdnList = make([]string, 0) - + if s != nil && metrics.AdditionalMetrics != nil { + seen := make(map[string]bool) + record.NetworkMonitorEventsMD = make([]map[string]string, 0) + for _, metadata := range metrics.AdditionalMetrics.NetworkEvents { + if !AllZerosMetaData(metadata) { + var cm map[string]string + if md, err := s.DecodeCookie8Bytes(metadata); err == nil { + acl, ok := md.(*ovnmodel.ACLEvent) + mdStr := md.String() + if !seen[mdStr] { + if ok { + cm = map[string]string{ + "Action": acl.Action, + "Type": acl.Actor, + "Feature": "acl", + "Name": acl.Name, + "Namespace": acl.Namespace, + "Direction": acl.Direction, + } + } else { + cm = map[string]string{ + "Message": mdStr, + } + } + record.NetworkMonitorEventsMD = append(record.NetworkMonitorEventsMD, cm) + seen[mdStr] = true + } + } + } + } + } return &record } -type IntfDir struct { +type IntfDirUdn struct { Interface string Direction int + Udn string } -func NewIntfDir(intf string, dir int) IntfDir { return IntfDir{Interface: intf, Direction: dir} } +func NewIntfDirUdn(intf string, dir int, s *ovnobserv.SampleDecoder) IntfDirUdn { + var udn string + if s == nil { + return IntfDirUdn{Interface: intf, Direction: dir, Udn: ""} + } + + // Load UDN cache if empty + if len(udnsCache) == 0 { + m, err := s.GetInterfaceUDNs() + if err != nil { + return IntfDirUdn{Interface: intf, Direction: dir, Udn: ""} + } + udnsCache = m + } + + // Look up the interface in the cache + if v, ok := udnsCache[intf]; ok { + if v != "" { + udn = v + } else { + udn = "default" + } + } + + return IntfDirUdn{Interface: intf, Direction: dir, Udn: udn} +} func networkEventsMDExist(events [MaxNetworkEvents][NetworkEventsMaxEventsMD]uint8, md [NetworkEventsMaxEventsMD]uint8) bool { for _, e := range events { diff --git a/pkg/pbflow/proto.go b/pkg/pbflow/proto.go index 48ac5d917..781881ebd 100644 --- a/pkg/pbflow/proto.go +++ b/pkg/pbflow/proto.go @@ -4,11 +4,8 @@ import ( "encoding/binary" "net" - "github.com/netobserv/flowlogs-pipeline/pkg/config" "github.com/netobserv/netobserv-ebpf-agent/pkg/ebpf" "github.com/netobserv/netobserv-ebpf-agent/pkg/model" - ovnmodel "github.com/ovn-org/ovn-kubernetes/go-controller/observability-lib/model" - ovnobserv "github.com/ovn-org/ovn-kubernetes/go-controller/observability-lib/sampledecoder" "github.com/sirupsen/logrus" "google.golang.org/protobuf/types/known/durationpb" "google.golang.org/protobuf/types/known/timestamppb" @@ -18,10 +15,10 @@ var protoLog = logrus.WithField("component", "pbflow") // FlowsToPB is an auxiliary function to convert flow records, as returned by the eBPF agent, // into protobuf-encoded messages ready to be sent to the collector via GRPC -func FlowsToPB(inputRecords []*model.Record, maxLen int, s *ovnobserv.SampleDecoder) []*Records { +func FlowsToPB(inputRecords []*model.Record, maxLen int) []*Records { entries := make([]*Record, 0, len(inputRecords)) for _, record := range inputRecords { - entries = append(entries, FlowToPB(record, s)) + entries = append(entries, FlowToPB(record)) } var records []*Records for len(entries) > 0 { @@ -37,7 +34,7 @@ func FlowsToPB(inputRecords []*model.Record, maxLen int, s *ovnobserv.SampleDeco // FlowToPB is an auxiliary function to convert a single flow record, as returned by the eBPF agent, // into a protobuf-encoded message ready to be sent to the collector via kafka -func FlowToPB(fr *model.Record, s *ovnobserv.SampleDecoder) *Record { +func FlowToPB(fr *model.Record) *Record { var pbflowRecord = Record{ EthProtocol: uint32(fr.Metrics.EthProtocol), Direction: Direction(fr.Metrics.DirectionFirstSeen), @@ -90,27 +87,11 @@ func FlowToPB(fr *model.Record, s *ovnobserv.SampleDecoder) *Record { } pbflowRecord.DupList = make([]*DupMapEntry, 0) for _, intf := range fr.Interfaces { - entry := DupMapEntry{ + pbflowRecord.DupList = append(pbflowRecord.DupList, &DupMapEntry{ Interface: intf.Interface, Direction: Direction(intf.Direction), - Udn: "", - } - if s != nil { - m, err := s.GetInterfaceUDNs() - if err == nil { - if v, ok := m[entry.Interface]; ok { - if v != "" { - entry.Udn = v - } else { - entry.Udn = "default" - } - } - } else { - protoLog.Debugf("Failed to convert interface %s to UDN, err %s", entry.Interface, err) - } - } - - pbflowRecord.DupList = append(pbflowRecord.DupList, &entry) + Udn: intf.Udn, + }) } if fr.Metrics.EthProtocol == model.IPv6Type { pbflowRecord.Network.SrcAddr = &IP{IpFamily: &IP_Ipv6{Ipv6: fr.ID.SrcIp[:]}} @@ -127,42 +108,13 @@ func FlowToPB(fr *model.Record, s *ovnobserv.SampleDecoder) *Record { pbflowRecord.Xlat.DstAddr = &IP{IpFamily: &IP_Ipv4{Ipv4: model.IntEncodeV4(fr.Metrics.AdditionalMetrics.TranslatedFlow.Daddr)}} } } - if s != nil && fr.Metrics.AdditionalMetrics != nil { - seen := make(map[string]bool) + + if len(fr.NetworkMonitorEventsMD) != 0 { pbflowRecord.NetworkEventsMetadata = make([]*NetworkEvent, 0) - for _, metadata := range fr.Metrics.AdditionalMetrics.NetworkEvents { - var pbEvent NetworkEvent - if !model.AllZerosMetaData(metadata) { - if md, err := s.DecodeCookie8Bytes(metadata); err == nil { - acl, ok := md.(*ovnmodel.ACLEvent) - mdStr := md.String() - protoLog.Debugf("Network Events Metadata %v decoded Cookie: %v decoded string: %s", metadata, md, mdStr) - if !seen[mdStr] { - if ok { - pbEvent = NetworkEvent{ - Events: map[string]string{ - "Action": acl.Action, - "Type": acl.Actor, - "Feature": "acl", - "Name": acl.Name, - "Namespace": acl.Namespace, - "Direction": acl.Direction, - }, - } - } else { - pbEvent = NetworkEvent{ - Events: map[string]string{ - "Message": mdStr, - }, - } - } - pbflowRecord.NetworkEventsMetadata = append(pbflowRecord.NetworkEventsMetadata, &pbEvent) - seen[mdStr] = true - } - } else { - protoLog.Errorf("unable to decode Network events cookie: %v", err) - } - } + for _, networkEvent := range fr.NetworkMonitorEventsMD { + pbflowRecord.NetworkEventsMetadata = append(pbflowRecord.NetworkEventsMetadata, &NetworkEvent{ + Events: networkEvent, + }) } } return &pbflowRecord @@ -225,14 +177,17 @@ func PBToFlow(pb *Record) *model.Record { if len(pb.GetDupList()) != 0 { for _, entry := range pb.GetDupList() { - out.Interfaces = append(out.Interfaces, model.NewIntfDir(entry.Interface, int(entry.Direction))) - out.UdnList = append(out.UdnList, entry.Udn) + out.Interfaces = append(out.Interfaces, model.IntfDirUdn{ + Interface: entry.Interface, + Direction: int(entry.Direction), + Udn: entry.Udn, + }) } } if len(pb.GetNetworkEventsMetadata()) != 0 { for _, e := range pb.GetNetworkEventsMetadata() { - m := config.GenericMap{} + m := map[string]string{} for k, v := range e.Events { m[k] = v }