Skip to content

Commit

Permalink
move sample decoding logic out protobuf logic
Browse files Browse the repository at this point in the history
Signed-off-by: Mohamed Mahmoud <[email protected]>
  • Loading branch information
msherif1234 committed Jan 21, 2025
1 parent df87217 commit 8c0a7a8
Show file tree
Hide file tree
Showing 14 changed files with 168 additions and 164 deletions.
42 changes: 20 additions & 22 deletions pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ func FlowsAgent(cfg *Config) (*Flows, error) {
}

// configure selected exporter
exportFunc, err := buildFlowExporter(cfg, m, s)
exportFunc, err := buildFlowExporter(cfg, m)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -295,23 +295,22 @@ func flowsAgent(cfg *Config, m *metrics.Metrics,
samplingGauge := m.CreateSamplingRate()
samplingGauge.Set(float64(cfg.Sampling))

mapTracer := flow.NewMapTracer(fetcher, cfg.CacheActiveTimeout, cfg.StaleEntriesEvictTimeout, m)
mapTracer := flow.NewMapTracer(fetcher, cfg.CacheActiveTimeout, cfg.StaleEntriesEvictTimeout, m, s)
rbTracer := flow.NewRingBufTracer(fetcher, mapTracer, cfg.CacheActiveTimeout, m)
accounter := flow.NewAccounter(cfg.CacheMaxFlows, cfg.CacheActiveTimeout, time.Now, monotime.Now, m)
accounter := flow.NewAccounter(cfg.CacheMaxFlows, cfg.CacheActiveTimeout, time.Now, monotime.Now, m, s)
limiter := flow.NewCapacityLimiter(m)

return &Flows{
ebpf: fetcher,
exporter: exporter,
interfaces: registerer,
filter: filter,
cfg: cfg,
mapTracer: mapTracer,
rbTracer: rbTracer,
accounter: accounter,
limiter: limiter,
promoServer: promoServer,
sampleDecoder: s,
ebpf: fetcher,
exporter: exporter,
interfaces: registerer,
filter: filter,
cfg: cfg,
mapTracer: mapTracer,
rbTracer: rbTracer,
accounter: accounter,
limiter: limiter,
promoServer: promoServer,
}, nil
}

Expand All @@ -329,12 +328,12 @@ func flowDirections(cfg *Config) (ingress, egress bool) {
}
}

func buildFlowExporter(cfg *Config, m *metrics.Metrics, s *ovnobserv.SampleDecoder) (node.TerminalFunc[[]*model.Record], error) {
func buildFlowExporter(cfg *Config, m *metrics.Metrics) (node.TerminalFunc[[]*model.Record], error) {
switch cfg.Export {
case "grpc":
return buildGRPCExporter(cfg, m, s)
return buildGRPCExporter(cfg, m)
case "kafka":
return buildKafkaExporter(cfg, m, s)
return buildKafkaExporter(cfg, m)
case "ipfix+udp":
return buildIPFIXExporter(cfg, "udp")
case "ipfix+tcp":
Expand All @@ -346,12 +345,12 @@ func buildFlowExporter(cfg *Config, m *metrics.Metrics, s *ovnobserv.SampleDecod
}
}

func buildGRPCExporter(cfg *Config, m *metrics.Metrics, s *ovnobserv.SampleDecoder) (node.TerminalFunc[[]*model.Record], error) {
func buildGRPCExporter(cfg *Config, m *metrics.Metrics) (node.TerminalFunc[[]*model.Record], error) {
if cfg.TargetHost == "" || cfg.TargetPort == 0 {
return nil, fmt.Errorf("missing target host or port: %s:%d",
cfg.TargetHost, cfg.TargetPort)
}
grpcExporter, err := exporter.StartGRPCProto(cfg.TargetHost, cfg.TargetPort, cfg.GRPCMessageMaxFlows, m, s)
grpcExporter, err := exporter.StartGRPCProto(cfg.TargetHost, cfg.TargetPort, cfg.GRPCMessageMaxFlows, m)
if err != nil {
return nil, err
}
Expand All @@ -366,7 +365,7 @@ func buildFlowDirectFLPExporter(cfg *Config) (node.TerminalFunc[[]*model.Record]
return flpExporter.ExportFlows, nil
}

func buildKafkaExporter(cfg *Config, m *metrics.Metrics, s *ovnobserv.SampleDecoder) (node.TerminalFunc[[]*model.Record], error) {
func buildKafkaExporter(cfg *Config, m *metrics.Metrics) (node.TerminalFunc[[]*model.Record], error) {
if len(cfg.KafkaBrokers) == 0 {
return nil, errors.New("at least one Kafka broker is needed")
}
Expand Down Expand Up @@ -412,8 +411,7 @@ func buildKafkaExporter(cfg *Config, m *metrics.Metrics, s *ovnobserv.SampleDeco
Transport: &transport,
Balancer: &kafkago.Hash{},
},
Metrics: m,
SampleDecoder: s,
Metrics: m,
}).ExportFlows, nil
}

Expand Down
7 changes: 5 additions & 2 deletions pkg/decode/decode_protobuf.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,15 +69,18 @@ func RecordToMap(fr *model.Record) config.GenericMap {

var directions []int
var interfaces []string
var udns []string
for _, intf := range fr.Interfaces {
directions = append(directions, intf.Direction)
interfaces = append(interfaces, intf.Interface)
udns = append(udns, intf.Udn)
}
out["IfDirections"] = directions
out["Interfaces"] = interfaces
if len(fr.UdnList) != 0 {
out["Udns"] = fr.UdnList
if len(udns) != 0 {
out["Udns"] = udns
}

if fr.Metrics.Bytes != 0 {
out["Bytes"] = fr.Metrics.Bytes
}
Expand Down
4 changes: 1 addition & 3 deletions pkg/decode/decode_protobuf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,10 @@ func TestPBFlowToMap(t *testing.T) {
{
Interface: "5e6e92caa1d51cf",
Direction: pbflow.Direction_INGRESS,
Udn: "",
},
{
Interface: "eth0",
Direction: pbflow.Direction_EGRESS,
Udn: "",
},
},
EthProtocol: 2048,
Expand Down Expand Up @@ -132,7 +130,7 @@ func TestPBFlowToMap(t *testing.T) {
"DnsFlags": uint16(0x80),
"DnsFlagsResponseCode": "NoError",
"TimeFlowRttNs": someDuration.Nanoseconds(),
"NetworkEvents": []config.GenericMap{
"NetworkEvents": []map[string]string{
{
"Name": "test1",
"Type": "NetworkPolicy",
Expand Down
32 changes: 12 additions & 20 deletions pkg/exporter/converters_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,7 @@ func TestConversions(t *testing.T) {
},
},
},
Interfaces: []model.IntfDir{model.NewIntfDir("eth0", model.DirectionEgress)},
UdnList: []string{""},
Interfaces: []model.IntfDirUdn{model.NewIntfDirUdn("eth0", model.DirectionEgress, nil)},
TimeFlowStart: someTime,
TimeFlowEnd: someTime,
AgentIP: net.IPv4(0x0a, 0x0b, 0x0c, 0x0d),
Expand Down Expand Up @@ -104,8 +103,7 @@ func TestConversions(t *testing.T) {
Sampling: 2,
},
},
Interfaces: []model.IntfDir{model.NewIntfDir("eth0", model.DirectionEgress)},
UdnList: []string{""},
Interfaces: []model.IntfDirUdn{model.NewIntfDirUdn("eth0", model.DirectionEgress, nil)},
TimeFlowStart: someTime,
TimeFlowEnd: someTime,
AgentIP: net.IPv4(0x0a, 0x0b, 0x0c, 0x0d),
Expand Down Expand Up @@ -151,8 +149,7 @@ func TestConversions(t *testing.T) {
Dscp: 64,
},
},
Interfaces: []model.IntfDir{model.NewIntfDir("eth0", model.DirectionEgress)},
UdnList: []string{""},
Interfaces: []model.IntfDirUdn{model.NewIntfDirUdn("eth0", model.DirectionEgress, nil)},
TimeFlowStart: someTime,
TimeFlowEnd: someTime,
AgentIP: net.IPv4(0x0a, 0x0b, 0x0c, 0x0d),
Expand Down Expand Up @@ -197,8 +194,7 @@ func TestConversions(t *testing.T) {
Dscp: 64,
},
},
Interfaces: []model.IntfDir{model.NewIntfDir("eth0", model.DirectionEgress)},
UdnList: []string{""},
Interfaces: []model.IntfDirUdn{model.NewIntfDirUdn("eth0", model.DirectionEgress, nil)},
TimeFlowStart: someTime,
TimeFlowEnd: someTime,
AgentIP: net.IPv4(0x0a, 0x0b, 0x0c, 0x0d),
Expand Down Expand Up @@ -244,8 +240,7 @@ func TestConversions(t *testing.T) {
Packets: 128,
},
},
Interfaces: []model.IntfDir{model.NewIntfDir("eth0", model.DirectionEgress)},
UdnList: []string{""},
Interfaces: []model.IntfDirUdn{model.NewIntfDirUdn("eth0", model.DirectionEgress, nil)},
TimeFlowStart: someTime,
TimeFlowEnd: someTime,
AgentIP: net.IPv4(0x0a, 0x0b, 0x0c, 0x0d),
Expand Down Expand Up @@ -292,8 +287,7 @@ func TestConversions(t *testing.T) {
},
},
},
Interfaces: []model.IntfDir{model.NewIntfDir("eth0", model.DirectionEgress)},
UdnList: []string{""},
Interfaces: []model.IntfDirUdn{model.NewIntfDirUdn("eth0", model.DirectionEgress, nil)},
TimeFlowStart: someTime,
TimeFlowEnd: someTime,
AgentIP: net.IPv4(0x0a, 0x0b, 0x0c, 0x0d),
Expand Down Expand Up @@ -353,8 +347,7 @@ func TestConversions(t *testing.T) {
},
},
},
Interfaces: []model.IntfDir{model.NewIntfDir("eth0", model.DirectionEgress)},
UdnList: []string{""},
Interfaces: []model.IntfDirUdn{model.NewIntfDirUdn("eth0", model.DirectionEgress, nil)},
TimeFlowStart: someTime,
TimeFlowEnd: someTime,
AgentIP: net.IPv4(0x0a, 0x0b, 0x0c, 0x0d),
Expand Down Expand Up @@ -418,11 +411,10 @@ func TestConversions(t *testing.T) {
},
},
},
Interfaces: []model.IntfDir{
model.NewIntfDir("5e6e92caa1d51cf", model.DirectionIngress),
model.NewIntfDir("eth0", model.DirectionEgress),
Interfaces: []model.IntfDirUdn{
model.NewIntfDirUdn("5e6e92caa1d51cf", model.DirectionIngress, nil),
model.NewIntfDirUdn("eth0", model.DirectionEgress, nil),
},
UdnList: []string{"", ""},
TimeFlowStart: someTime,
TimeFlowEnd: someTime,
AgentIP: net.IPv4(0x0a, 0x0b, 0x0c, 0x0d),
Expand All @@ -444,8 +436,8 @@ func TestConversions(t *testing.T) {
"TimeFlowStartMs": someTime.UnixMilli(),
"TimeFlowEndMs": someTime.UnixMilli(),
"Interfaces": []string{"5e6e92caa1d51cf", "eth0"},
"AgentIP": "10.11.12.13",
"Udns": []string{"", ""},
"AgentIP": "10.11.12.13",
},
},
}
Expand All @@ -457,7 +449,7 @@ func TestConversions(t *testing.T) {
delete(outDirect, "TimeReceived")

// Generate the same using protobuf
tmpPB := pbflow.FlowToPB(tt.flow, nil)
tmpPB := pbflow.FlowToPB(tt.flow)
rawPB, err := proto.Marshal(tmpPB)
require.NoError(t, err, tt.name)
outPB, err := decoder.Decode(rawPB)
Expand Down
7 changes: 2 additions & 5 deletions pkg/exporter/grpc_proto.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"github.com/netobserv/netobserv-ebpf-agent/pkg/pbflow"
"github.com/netobserv/netobserv-ebpf-agent/pkg/utils"

ovnobserv "github.com/ovn-org/ovn-kubernetes/go-controller/observability-lib/sampledecoder"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
)
Expand All @@ -31,10 +30,9 @@ type GRPCProto struct {
maxFlowsPerMessage int
metrics *metrics.Metrics
batchCounter prometheus.Counter
sampler *ovnobserv.SampleDecoder
}

func StartGRPCProto(hostIP string, hostPort int, maxFlowsPerMessage int, m *metrics.Metrics, s *ovnobserv.SampleDecoder) (*GRPCProto, error) {
func StartGRPCProto(hostIP string, hostPort int, maxFlowsPerMessage int, m *metrics.Metrics) (*GRPCProto, error) {
clientConn, err := grpc.ConnectClient(hostIP, hostPort)
if err != nil {
return nil, err
Expand All @@ -46,7 +44,6 @@ func StartGRPCProto(hostIP string, hostPort int, maxFlowsPerMessage int, m *metr
maxFlowsPerMessage: maxFlowsPerMessage,
metrics: m,
batchCounter: m.CreateBatchCounter(componentGRPC),
sampler: s,
}, nil
}

Expand All @@ -57,7 +54,7 @@ func (g *GRPCProto) ExportFlows(input <-chan []*model.Record) {
log := glog.WithField("collector", socket)
for inputRecords := range input {
g.metrics.EvictionCounter.WithSource(componentGRPC).Inc()
for _, pbRecords := range pbflow.FlowsToPB(inputRecords, g.maxFlowsPerMessage, g.sampler) {
for _, pbRecords := range pbflow.FlowsToPB(inputRecords, g.maxFlowsPerMessage) {
log.Debugf("sending %d records", len(pbRecords.Entries))
if _, err := g.clientConn.Client().Send(context.TODO(), pbRecords); err != nil {
g.metrics.Errors.WithErrorName(componentGRPC, "CannotWriteMessage").Inc()
Expand Down
8 changes: 4 additions & 4 deletions pkg/exporter/grpc_proto_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func TestIPv4GRPCProto_ExportFlows_AgentIP(t *testing.T) {
defer coll.Close()

// Start GRPCProto exporter stage
exporter, err := StartGRPCProto("127.0.0.1", port, 1000, metrics.NewMetrics(&metrics.Settings{}), nil)
exporter, err := StartGRPCProto("127.0.0.1", port, 1000, metrics.NewMetrics(&metrics.Settings{}))
require.NoError(t, err)

// Send some flows to the input of the exporter stage
Expand Down Expand Up @@ -71,7 +71,7 @@ func TestIPv6GRPCProto_ExportFlows_AgentIP(t *testing.T) {
defer coll.Close()

// Start GRPCProto exporter stage
exporter, err := StartGRPCProto("::1", port, 1000, metrics.NewMetrics(&metrics.Settings{}), nil)
exporter, err := StartGRPCProto("::1", port, 1000, metrics.NewMetrics(&metrics.Settings{}))
require.NoError(t, err)

// Send some flows to the input of the exporter stage
Expand Down Expand Up @@ -114,7 +114,7 @@ func TestGRPCProto_SplitLargeMessages(t *testing.T) {

const msgMaxLen = 10000
// Start GRPCProto exporter stage
exporter, err := StartGRPCProto("127.0.0.1", port, msgMaxLen, metrics.NewMetrics(&metrics.Settings{}), nil)
exporter, err := StartGRPCProto("127.0.0.1", port, msgMaxLen, metrics.NewMetrics(&metrics.Settings{}))
require.NoError(t, err)

// Send a message much longer than the limit length
Expand All @@ -123,7 +123,7 @@ func TestGRPCProto_SplitLargeMessages(t *testing.T) {
for i := 0; i < 25000; i++ {
input = append(input, &model.Record{Metrics: model.BpfFlowContent{BpfFlowMetrics: &ebpf.BpfFlowMetrics{
EthProtocol: model.IPv6Type,
}}, AgentIP: net.ParseIP("1111::1111"), Interfaces: []model.IntfDir{model.NewIntfDir("12345678", 0)}})
}}, AgentIP: net.ParseIP("1111::1111"), Interfaces: []model.IntfDirUdn{model.NewIntfDirUdn("12345678", 0, nil)}})
}
flows <- input
go exporter.ExportFlows(flows)
Expand Down
8 changes: 3 additions & 5 deletions pkg/exporter/kafka_proto.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"github.com/netobserv/netobserv-ebpf-agent/pkg/model"
"github.com/netobserv/netobserv-ebpf-agent/pkg/pbflow"

ovnobserv "github.com/ovn-org/ovn-kubernetes/go-controller/observability-lib/sampledecoder"
kafkago "github.com/segmentio/kafka-go"
"github.com/sirupsen/logrus"
"google.golang.org/protobuf/proto"
Expand All @@ -24,9 +23,8 @@ type kafkaWriter interface {
// KafkaProto exports flows over Kafka, encoded as a protobuf that is understandable by the
// Flowlogs-Pipeline collector
type KafkaProto struct {
Writer kafkaWriter
Metrics *metrics.Metrics
SampleDecoder *ovnobserv.SampleDecoder
Writer kafkaWriter
Metrics *metrics.Metrics
}

func (kp *KafkaProto) ExportFlows(input <-chan []*model.Record) {
Expand All @@ -52,7 +50,7 @@ func (kp *KafkaProto) batchAndSubmit(records []*model.Record) {
klog.Debugf("sending %d records", len(records))
msgs := make([]kafkago.Message, 0, len(records))
for _, record := range records {
pbBytes, err := proto.Marshal(pbflow.FlowToPB(record, kp.SampleDecoder))
pbBytes, err := proto.Marshal(pbflow.FlowToPB(record))
if err != nil {
klog.WithError(err).Debug("can't encode protobuf message. Ignoring")
kp.Metrics.Errors.WithErrorName(componentKafka, "CannotEncodeMessage").Inc()
Expand Down
4 changes: 2 additions & 2 deletions pkg/exporter/kafka_proto_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func TestProtoConversion(t *testing.T) {
Flags: uint16(1),
},
},
Interfaces: []model.IntfDir{model.NewIntfDir("veth0", 0), model.NewIntfDir("abcde", 1)},
Interfaces: []model.IntfDirUdn{model.NewIntfDirUdn("veth0", 0, nil), model.NewIntfDirUdn("abcde", 1, nil)},
}

input <- []*model.Record{&record}
Expand Down Expand Up @@ -108,7 +108,7 @@ func TestIdenticalKeys(t *testing.T) {
Flags: uint16(1),
},
},
Interfaces: []model.IntfDir{model.NewIntfDir("veth0", 0), model.NewIntfDir("abcde", 1)},
Interfaces: []model.IntfDirUdn{model.NewIntfDirUdn("veth0", 0, nil), model.NewIntfDirUdn("abcde", 1, nil)},
}
key1 := getFlowKey(&record)

Expand Down
Loading

0 comments on commit 8c0a7a8

Please sign in to comment.