diff --git a/Makefile b/Makefile index f11213915..5bc157f49 100644 --- a/Makefile +++ b/Makefile @@ -132,8 +132,10 @@ clean: ## Clean TEST_OPTS := -race -coverpkg=./... -covermode=atomic -coverprofile=/tmp/coverage.out .PHONY: tests-unit tests-unit: validate_go ## Unit tests + # tests may rely on non-thread safe libs such as go-ipfix => no -race flag + go test $$(go list ./... | grep /testnorace) # enabling CGO is required for -race flag - CGO_ENABLED=1 go test -p 1 $(TEST_OPTS) $$(go list ./... | grep -v /e2e) + CGO_ENABLED=1 go test -p 1 $(TEST_OPTS) $$(go list ./... | grep -v /e2e | grep -v /testnorace) .PHONY: tests-fast tests-fast: TEST_OPTS= diff --git a/pkg/pipeline/write/testnorace/write_ipfix_test.go b/pkg/pipeline/write/testnorace/write_ipfix_test.go new file mode 100644 index 000000000..f67ca993b --- /dev/null +++ b/pkg/pipeline/write/testnorace/write_ipfix_test.go @@ -0,0 +1,220 @@ +package testnorace + +import ( + "context" + "fmt" + "net" + "strings" + "testing" + "time" + + "github.com/netobserv/flowlogs-pipeline/pkg/api" + "github.com/netobserv/flowlogs-pipeline/pkg/config" + "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/write" + "github.com/netobserv/netobserv-ebpf-agent/pkg/decode" + "github.com/netobserv/netobserv-ebpf-agent/pkg/pbflow" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/vmware/go-ipfix/pkg/collector" + "github.com/vmware/go-ipfix/pkg/entities" + "google.golang.org/protobuf/types/known/durationpb" + "google.golang.org/protobuf/types/known/timestamppb" + "k8s.io/apimachinery/pkg/util/wait" +) + +var ( + startTime = time.Now() + endTime = startTime.Add(7 * time.Second) + FullPBFlow = pbflow.Record{ + Direction: pbflow.Direction_EGRESS, + Bytes: 1024, + DataLink: &pbflow.DataLink{ + DstMac: 0x112233445566, + SrcMac: 0x010203040506, + }, + Network: &pbflow.Network{ + SrcAddr: &pbflow.IP{ + IpFamily: &pbflow.IP_Ipv4{Ipv4: 0x01020304}, + }, + DstAddr: &pbflow.IP{ + IpFamily: &pbflow.IP_Ipv4{Ipv4: 0x05060708}, + }, + Dscp: 1, + }, + Duplicate: false, + EthProtocol: 2048, + Packets: 3, + Transport: &pbflow.Transport{ + Protocol: 17, + SrcPort: 23000, + DstPort: 443, + }, + TimeFlowStart: timestamppb.New(startTime), + TimeFlowEnd: timestamppb.New(endTime), + Interface: "eth0", + AgentIp: &pbflow.IP{ + IpFamily: &pbflow.IP_Ipv4{Ipv4: 0x0a090807}, + }, + PktDropBytes: 15, + PktDropPackets: 1, + PktDropLatestFlags: 1, + PktDropLatestState: 1, + PktDropLatestDropCause: 5, + Flags: 0x110, + DnsId: 123, + DnsFlags: 0x80, + DnsErrno: 0, + DnsLatency: durationpb.New(150 * time.Millisecond), + TimeFlowRtt: durationpb.New(20 * time.Millisecond), + IcmpCode: 0, + IcmpType: 0, + DupList: []*pbflow.DupMapEntry{ + { + Interface: "eth0", + Direction: pbflow.Direction_EGRESS, + }, + }, + } +) + +func TestEnrichedIPFIXFlow(t *testing.T) { + cp := startCollector(t) + addr := cp.GetAddress().(*net.UDPAddr) + + flow := decode.PBFlowToMap(&FullPBFlow) + + // Add enrichment + flow["SrcK8S_Name"] = "pod A" + flow["SrcK8S_Namespace"] = "ns1" + flow["SrcK8S_HostName"] = "node1" + flow["DstK8S_Name"] = "pod B" + flow["DstK8S_Namespace"] = "ns2" + flow["DstK8S_HostName"] = "node2" + + writer, err := write.NewWriteIpfix(config.StageParam{ + Write: &config.Write{ + Ipfix: &api.WriteIpfix{ + TargetHost: addr.IP.String(), + TargetPort: addr.Port, + Transport: addr.Network(), + EnterpriseID: 9999, + }, + }, + }) + require.NoError(t, err) + + writer.Write(flow) + + // Read collector + // 1st = IPv4 template + tplv4Msg := <-cp.GetMsgChan() + // 2nd = IPv6 template (ignore) + <-cp.GetMsgChan() + // 3rd = data record + dataMsg := <-cp.GetMsgChan() + cp.CloseMsgChan() + cp.Stop() + + // Check template + assert.Equal(t, uint16(10), tplv4Msg.GetVersion()) + templateSet := tplv4Msg.GetSet() + templateElements := templateSet.GetRecords()[0].GetOrderedElementList() + assert.Len(t, templateElements, 20) + assert.Equal(t, uint32(0), templateElements[0].GetInfoElement().EnterpriseId) + + // Check data + assert.Equal(t, uint16(10), dataMsg.GetVersion()) + dataSet := dataMsg.GetSet() + record := dataSet.GetRecords()[0] + + expectedFields := append(write.IPv4IANAFields, write.KubeFields...) + expectedFields = append(expectedFields, write.CustomNetworkFields...) + + for _, name := range expectedFields { + element, _, exist := record.GetInfoElementWithValue(name) + assert.Truef(t, exist, "element with name %s should exist in the record", name) + assert.NotNil(t, element) + matchElement(t, element, flow) + } +} + +func matchElement(t *testing.T, element entities.InfoElementWithValue, flow config.GenericMap) { + switch element.GetName() { + case "sourceIPv4Address": + assert.Equal(t, flow["SrcAddr"], element.GetIPAddressValue().String()) + case "destinationIPv4Address": + assert.Equal(t, flow["DstAddr"], element.GetIPAddressValue().String()) + case "ethernetType": + assert.Equal(t, flow["Etype"], uint32(element.GetUnsigned16Value())) + case "flowDirection": + assert.Equal(t, flow["FlowDirection"], int(element.GetUnsigned8Value())) + case "protocolIdentifier": + assert.Equal(t, flow["Proto"], uint32(element.GetUnsigned8Value())) + case "sourceTransportPort": + assert.Equal(t, flow["SrcPort"], uint32(element.GetUnsigned16Value())) + case "destinationTransportPort": + assert.Equal(t, flow["DstPort"], uint32(element.GetUnsigned16Value())) + case "octetDeltaCount": + assert.Equal(t, flow["Bytes"], element.GetUnsigned64Value()) + case "flowStartMilliseconds": + assert.Equal(t, flow["TimeFlowStartMs"], int64(element.GetUnsigned64Value())) + case "flowEndMilliseconds": + assert.Equal(t, flow["TimeFlowEndMs"], int64(element.GetUnsigned64Value())) + case "packetDeltaCount": + assert.Equal(t, flow["Packets"], element.GetUnsigned64Value()) + case "interfaceName": + assert.Equal(t, flow["Interface"], element.GetStringValue()) + case "sourcePodNamespace": + assert.Equal(t, flow["SrcK8S_Namespace"], element.GetStringValue()) + case "sourcePodName": + assert.Equal(t, flow["SrcK8S_Name"], element.GetStringValue()) + case "destinationPodNamespace": + assert.Equal(t, flow["DstK8S_Namespace"], element.GetStringValue()) + case "destinationPodName": + assert.Equal(t, flow["DstK8S_Name"], element.GetStringValue()) + case "sourceNodeName": + assert.Equal(t, flow["SrcK8S_HostName"], element.GetStringValue()) + case "destinationNodeName": + assert.Equal(t, flow["DstK8S_HostName"], element.GetStringValue()) + case "sourceMacAddress": + case "destinationMacAddress": + // Getting some discrepancies here, need to figure out why + default: + assert.Fail(t, "missing check on element", element.GetName()) + } +} + +func startCollector(t *testing.T) *collector.CollectingProcess { + address, err := net.ResolveUDPAddr("udp", "127.0.0.1:0") + require.NoError(t, err) + + cp, err := collector.InitCollectingProcess(collector.CollectorInput{ + Address: address.String(), + Protocol: address.Network(), + MaxBufferSize: 2048, + TemplateTTL: 0, + ServerCert: nil, + ServerKey: nil, + }) + require.NoError(t, err) + + go cp.Start() + + // Wait for collector to be ready + checkConn := func(ctx context.Context) (bool, error) { + addr := cp.GetAddress() + if addr == nil || strings.HasSuffix(addr.String(), ":0") { + return false, fmt.Errorf("random port is not resolved") + } + conn, err := net.Dial(cp.GetAddress().Network(), cp.GetAddress().String()) + if err != nil { + return false, err + } + conn.Close() + return true, nil + } + err = wait.PollUntilContextTimeout(context.TODO(), 100*time.Millisecond, 2*time.Second, false, checkConn) + require.NoError(t, err, "Connection timeout in collector setup") + + return cp +} diff --git a/pkg/pipeline/write/write_ipfix.go b/pkg/pipeline/write/write_ipfix.go index 724eda370..5cf78f8b5 100644 --- a/pkg/pipeline/write/write_ipfix.go +++ b/pkg/pipeline/write/write_ipfix.go @@ -43,7 +43,43 @@ type writeIpfix struct { // IPv6Type value as defined in IEEE 802: https://www.iana.org/assignments/ieee-802-numbers/ieee-802-numbers.xhtml const IPv6Type = 0x86DD -var ilog = logrus.WithField("component", "write.Ipfix") +var ( + ilog = logrus.WithField("component", "write.Ipfix") + IANAFields = []string{ + "ethernetType", + "flowDirection", + "sourceMacAddress", + "destinationMacAddress", + "protocolIdentifier", + "sourceTransportPort", + "destinationTransportPort", + "octetDeltaCount", + "flowStartMilliseconds", + "flowEndMilliseconds", + "packetDeltaCount", + "interfaceName", + } + IPv4IANAFields = append([]string{ + "sourceIPv4Address", + "destinationIPv4Address", + }, IANAFields...) + IPv6IANAFields = append([]string{ + "sourceIPv6Address", + "destinationIPv6Address", + "nextHeaderIPv6", + }, IANAFields...) + KubeFields = []string{ + "sourcePodNamespace", + "sourcePodName", + "destinationPodNamespace", + "destinationPodName", + "sourceNodeName", + "destinationNodeName", + } + CustomNetworkFields = []string{ + // TODO + } +) func addElementToTemplate(elementName string, value []byte, elements *[]entities.InfoElementWithValue, registryID uint32) error { element, err := registry.GetInfoElement(elementName, registryID) @@ -60,30 +96,20 @@ func addElementToTemplate(elementName string, value []byte, elements *[]entities return nil } -func addKubeContextToTemplate(elements *[]entities.InfoElementWithValue, registryID uint32) error { - err := addElementToTemplate("sourcePodNamespace", nil, elements, registryID) - if err != nil { - return err - } - err = addElementToTemplate("sourcePodName", nil, elements, registryID) - if err != nil { - return err - } - err = addElementToTemplate("destinationPodNamespace", nil, elements, registryID) - if err != nil { - return err - } - err = addElementToTemplate("destinationPodName", nil, elements, registryID) - if err != nil { - return err - } - err = addElementToTemplate("sourceNodeName", nil, elements, registryID) - if err != nil { - return err +func addNetworkEnrichmentToTemplate(elements *[]entities.InfoElementWithValue, registryID uint32) error { + for _, field := range CustomNetworkFields { + if err := addElementToTemplate(field, nil, elements, registryID); err != nil { + return err + } } - err = addElementToTemplate("destinationNodeName", nil, elements, registryID) - if err != nil { - return err + return nil +} + +func addKubeContextToTemplate(elements *[]entities.InfoElementWithValue, registryID uint32) error { + for _, field := range KubeFields { + if err := addElementToTemplate(field, nil, elements, registryID); err != nil { + return err + } } return nil } @@ -94,32 +120,32 @@ func loadCustomRegistry(EnterpriseID uint32) error { ilog.WithError(err).Errorf("Failed to initialize registry") return err } - err = registry.PutInfoElement((*entities.NewInfoElement("sourcePodNamespace", 7733, 13, EnterpriseID, 65535)), EnterpriseID) + err = registry.PutInfoElement((*entities.NewInfoElement("sourcePodNamespace", 7733, entities.String, EnterpriseID, 65535)), EnterpriseID) if err != nil { ilog.WithError(err).Errorf("Failed to register element") return err } - err = registry.PutInfoElement((*entities.NewInfoElement("sourcePodName", 7734, 13, EnterpriseID, 65535)), EnterpriseID) + err = registry.PutInfoElement((*entities.NewInfoElement("sourcePodName", 7734, entities.String, EnterpriseID, 65535)), EnterpriseID) if err != nil { ilog.WithError(err).Errorf("Failed to register element") return err } - err = registry.PutInfoElement((*entities.NewInfoElement("destinationPodNamespace", 7735, 13, EnterpriseID, 65535)), EnterpriseID) + err = registry.PutInfoElement((*entities.NewInfoElement("destinationPodNamespace", 7735, entities.String, EnterpriseID, 65535)), EnterpriseID) if err != nil { ilog.WithError(err).Errorf("Failed to register element") return err } - err = registry.PutInfoElement((*entities.NewInfoElement("destinationPodName", 7736, 13, EnterpriseID, 65535)), EnterpriseID) + err = registry.PutInfoElement((*entities.NewInfoElement("destinationPodName", 7736, entities.String, EnterpriseID, 65535)), EnterpriseID) if err != nil { ilog.WithError(err).Errorf("Failed to register element") return err } - err = registry.PutInfoElement((*entities.NewInfoElement("sourceNodeName", 7737, 13, EnterpriseID, 65535)), EnterpriseID) + err = registry.PutInfoElement((*entities.NewInfoElement("sourceNodeName", 7737, entities.String, EnterpriseID, 65535)), EnterpriseID) if err != nil { ilog.WithError(err).Errorf("Failed to register element") return err } - err = registry.PutInfoElement((*entities.NewInfoElement("destinationNodeName", 7738, 13, EnterpriseID, 65535)), EnterpriseID) + err = registry.PutInfoElement((*entities.NewInfoElement("destinationNodeName", 7738, entities.String, EnterpriseID, 65535)), EnterpriseID) if err != nil { ilog.WithError(err).Errorf("Failed to register element") return err @@ -137,67 +163,21 @@ func SendTemplateRecordv4(exporter *ipfixExporter.ExportingProcess, enrichEnterp } elements := make([]entities.InfoElementWithValue, 0) - err = addElementToTemplate("ethernetType", nil, &elements, registry.IANAEnterpriseID) - if err != nil { - return 0, nil, err - } - err = addElementToTemplate("flowDirection", nil, &elements, registry.IANAEnterpriseID) - if err != nil { - return 0, nil, err - } - err = addElementToTemplate("sourceMacAddress", nil, &elements, registry.IANAEnterpriseID) - if err != nil { - return 0, nil, err - } - err = addElementToTemplate("destinationMacAddress", nil, &elements, registry.IANAEnterpriseID) - if err != nil { - return 0, nil, err - } - err = addElementToTemplate("sourceIPv4Address", nil, &elements, registry.IANAEnterpriseID) - if err != nil { - return 0, nil, err - } - err = addElementToTemplate("destinationIPv4Address", nil, &elements, registry.IANAEnterpriseID) - if err != nil { - return 0, nil, err - } - err = addElementToTemplate("protocolIdentifier", nil, &elements, registry.IANAEnterpriseID) - if err != nil { - return 0, nil, err - } - err = addElementToTemplate("sourceTransportPort", nil, &elements, registry.IANAEnterpriseID) - if err != nil { - return 0, nil, err - } - err = addElementToTemplate("destinationTransportPort", nil, &elements, registry.IANAEnterpriseID) - if err != nil { - return 0, nil, err - } - err = addElementToTemplate("octetDeltaCount", nil, &elements, registry.IANAEnterpriseID) - if err != nil { - return 0, nil, err - } - err = addElementToTemplate("flowStartMilliseconds", nil, &elements, registry.IANAEnterpriseID) - if err != nil { - return 0, nil, err - } - err = addElementToTemplate("flowEndMilliseconds", nil, &elements, registry.IANAEnterpriseID) - if err != nil { - return 0, nil, err - } - err = addElementToTemplate("packetDeltaCount", nil, &elements, registry.IANAEnterpriseID) - if err != nil { - return 0, nil, err - } - err = addElementToTemplate("interfaceName", nil, &elements, registry.IANAEnterpriseID) - if err != nil { - return 0, nil, err + for _, field := range IPv4IANAFields { + err = addElementToTemplate(field, nil, &elements, registry.IANAEnterpriseID) + if err != nil { + return 0, nil, err + } } if enrichEnterpriseID != 0 { err = addKubeContextToTemplate(&elements, enrichEnterpriseID) if err != nil { return 0, nil, err } + err = addNetworkEnrichmentToTemplate(&elements, enrichEnterpriseID) + if err != nil { + return 0, nil, err + } } err = templateSet.AddRecord(elements, templateID) if err != nil { @@ -222,67 +202,21 @@ func SendTemplateRecordv6(exporter *ipfixExporter.ExportingProcess, enrichEnterp } elements := make([]entities.InfoElementWithValue, 0) - err = addElementToTemplate("ethernetType", nil, &elements, registry.IANAEnterpriseID) - if err != nil { - return 0, nil, err - } - err = addElementToTemplate("flowDirection", nil, &elements, registry.IANAEnterpriseID) - if err != nil { - return 0, nil, err - } - err = addElementToTemplate("sourceMacAddress", nil, &elements, registry.IANAEnterpriseID) - if err != nil { - return 0, nil, err - } - err = addElementToTemplate("destinationMacAddress", nil, &elements, registry.IANAEnterpriseID) - if err != nil { - return 0, nil, err - } - err = addElementToTemplate("sourceIPv6Address", nil, &elements, registry.IANAEnterpriseID) - if err != nil { - return 0, nil, err - } - err = addElementToTemplate("destinationIPv6Address", nil, &elements, registry.IANAEnterpriseID) - if err != nil { - return 0, nil, err - } - err = addElementToTemplate("nextHeaderIPv6", nil, &elements, registry.IANAEnterpriseID) - if err != nil { - return 0, nil, err - } - err = addElementToTemplate("sourceTransportPort", nil, &elements, registry.IANAEnterpriseID) - if err != nil { - return 0, nil, err - } - err = addElementToTemplate("destinationTransportPort", nil, &elements, registry.IANAEnterpriseID) - if err != nil { - return 0, nil, err - } - err = addElementToTemplate("octetDeltaCount", nil, &elements, registry.IANAEnterpriseID) - if err != nil { - return 0, nil, err - } - err = addElementToTemplate("flowStartMilliseconds", nil, &elements, registry.IANAEnterpriseID) - if err != nil { - return 0, nil, err - } - err = addElementToTemplate("flowEndMilliseconds", nil, &elements, registry.IANAEnterpriseID) - if err != nil { - return 0, nil, err - } - err = addElementToTemplate("packetDeltaCount", nil, &elements, registry.IANAEnterpriseID) - if err != nil { - return 0, nil, err - } - err = addElementToTemplate("interfaceName", nil, &elements, registry.IANAEnterpriseID) - if err != nil { - return 0, nil, err + for _, field := range IPv6IANAFields { + err = addElementToTemplate(field, nil, &elements, registry.IANAEnterpriseID) + if err != nil { + return 0, nil, err + } } if enrichEnterpriseID != 0 { err = addKubeContextToTemplate(&elements, enrichEnterpriseID) if err != nil { return 0, nil, err } + err = addNetworkEnrichmentToTemplate(&elements, enrichEnterpriseID) + if err != nil { + return 0, nil, err + } } err = templateSet.AddRecord(elements, templateID)