Skip to content

Commit

Permalink
Add test on IPFIX writer (#606)
Browse files Browse the repository at this point in the history
* Add test on IPFIX writer

* Run tests using go-ipfix without -race flag

The lib is not thread safe as it uses a shared global registry without
proper locks, e.g:

```
 WARNING: DATA RACE
Read at 0x000001fbbde8 by goroutine 28:
  github.com/vmware/go-ipfix/pkg/registry.GetInfoElementFromID()
      /home/runner/work/flowlogs-pipeline/flowlogs-pipeline/vendor/github.com/vmware/go-ipfix/pkg/registry/registry.go:112 +0x3e
  github.com/vmware/go-ipfix/pkg/collector.(*CollectingProcess).decodeTemplateSet()
      /home/runner/work/flowlogs-pipeline/flowlogs-pipeline/vendor/github.com/vmware/go-ipfix/pkg/collector/process.go:248 +0x3e8
  github.com/vmware/go-ipfix/pkg/collector.(*CollectingProcess).decodePacket()
      /home/runner/work/flowlogs-pipeline/flowlogs-pipeline/vendor/github.com/vmware/go-ipfix/pkg/collector/process.go:203 +0x9a4
  github.com/vmware/go-ipfix/pkg/collector.(*CollectingProcess).handleUDPClient.func1()
      /home/runner/work/flowlogs-pipeline/flowlogs-pipeline/vendor/github.com/vmware/go-ipfix/pkg/collector/udp.go:138 +0x28d

Previous write at 0x000001fbbde8 by goroutine 23:
  github.com/vmware/go-ipfix/pkg/registry.LoadRegistry()
      /home/runner/work/flowlogs-pipeline/flowlogs-pipeline/vendor/github.com/vmware/go-ipfix/pkg/registry/registry.go:97 +0x44
  github.com/netobserv/flowlogs-pipeline/pkg/pipeline/write.NewWriteIpfix()
      /home/runner/work/flowlogs-pipeline/flowlogs-pipeline/pkg/pipeline/write/write_ipfix.go:472 +0xaca
  github.com/netobserv/flowlogs-pipeline/pkg/pipeline/write.TestEnrichedIPFIXFlow()
      /home/runner/work/flowlogs-pipeline/flowlogs-pipeline/pkg/pipeline/write/write_ipfix_test.go:93 +0x4e4
```

* simplify reading collector
  • Loading branch information
jotak authored Feb 26, 2024
1 parent e8567bb commit ba7ad1f
Show file tree
Hide file tree
Showing 3 changed files with 297 additions and 141 deletions.
4 changes: 3 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,10 @@ clean: ## Clean
TEST_OPTS := -race -coverpkg=./... -covermode=atomic -coverprofile=/tmp/coverage.out
.PHONY: tests-unit
tests-unit: validate_go ## Unit tests
# tests may rely on non-thread safe libs such as go-ipfix => no -race flag
go test $$(go list ./... | grep /testnorace)
# enabling CGO is required for -race flag
CGO_ENABLED=1 go test -p 1 $(TEST_OPTS) $$(go list ./... | grep -v /e2e)
CGO_ENABLED=1 go test -p 1 $(TEST_OPTS) $$(go list ./... | grep -v /e2e | grep -v /testnorace)

.PHONY: tests-fast
tests-fast: TEST_OPTS=
Expand Down
220 changes: 220 additions & 0 deletions pkg/pipeline/write/testnorace/write_ipfix_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
package testnorace

import (
"context"
"fmt"
"net"
"strings"
"testing"
"time"

"github.com/netobserv/flowlogs-pipeline/pkg/api"
"github.com/netobserv/flowlogs-pipeline/pkg/config"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/write"
"github.com/netobserv/netobserv-ebpf-agent/pkg/decode"
"github.com/netobserv/netobserv-ebpf-agent/pkg/pbflow"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/vmware/go-ipfix/pkg/collector"
"github.com/vmware/go-ipfix/pkg/entities"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/timestamppb"
"k8s.io/apimachinery/pkg/util/wait"
)

var (
startTime = time.Now()
endTime = startTime.Add(7 * time.Second)
FullPBFlow = pbflow.Record{
Direction: pbflow.Direction_EGRESS,
Bytes: 1024,
DataLink: &pbflow.DataLink{
DstMac: 0x112233445566,
SrcMac: 0x010203040506,
},
Network: &pbflow.Network{
SrcAddr: &pbflow.IP{
IpFamily: &pbflow.IP_Ipv4{Ipv4: 0x01020304},
},
DstAddr: &pbflow.IP{
IpFamily: &pbflow.IP_Ipv4{Ipv4: 0x05060708},
},
Dscp: 1,
},
Duplicate: false,
EthProtocol: 2048,
Packets: 3,
Transport: &pbflow.Transport{
Protocol: 17,
SrcPort: 23000,
DstPort: 443,
},
TimeFlowStart: timestamppb.New(startTime),
TimeFlowEnd: timestamppb.New(endTime),
Interface: "eth0",
AgentIp: &pbflow.IP{
IpFamily: &pbflow.IP_Ipv4{Ipv4: 0x0a090807},
},
PktDropBytes: 15,
PktDropPackets: 1,
PktDropLatestFlags: 1,
PktDropLatestState: 1,
PktDropLatestDropCause: 5,
Flags: 0x110,
DnsId: 123,
DnsFlags: 0x80,
DnsErrno: 0,
DnsLatency: durationpb.New(150 * time.Millisecond),
TimeFlowRtt: durationpb.New(20 * time.Millisecond),
IcmpCode: 0,
IcmpType: 0,
DupList: []*pbflow.DupMapEntry{
{
Interface: "eth0",
Direction: pbflow.Direction_EGRESS,
},
},
}
)

func TestEnrichedIPFIXFlow(t *testing.T) {
cp := startCollector(t)
addr := cp.GetAddress().(*net.UDPAddr)

flow := decode.PBFlowToMap(&FullPBFlow)

// Add enrichment
flow["SrcK8S_Name"] = "pod A"
flow["SrcK8S_Namespace"] = "ns1"
flow["SrcK8S_HostName"] = "node1"
flow["DstK8S_Name"] = "pod B"
flow["DstK8S_Namespace"] = "ns2"
flow["DstK8S_HostName"] = "node2"

writer, err := write.NewWriteIpfix(config.StageParam{
Write: &config.Write{
Ipfix: &api.WriteIpfix{
TargetHost: addr.IP.String(),
TargetPort: addr.Port,
Transport: addr.Network(),
EnterpriseID: 9999,
},
},
})
require.NoError(t, err)

writer.Write(flow)

// Read collector
// 1st = IPv4 template
tplv4Msg := <-cp.GetMsgChan()
// 2nd = IPv6 template (ignore)
<-cp.GetMsgChan()
// 3rd = data record
dataMsg := <-cp.GetMsgChan()
cp.CloseMsgChan()
cp.Stop()

// Check template
assert.Equal(t, uint16(10), tplv4Msg.GetVersion())
templateSet := tplv4Msg.GetSet()
templateElements := templateSet.GetRecords()[0].GetOrderedElementList()
assert.Len(t, templateElements, 20)
assert.Equal(t, uint32(0), templateElements[0].GetInfoElement().EnterpriseId)

// Check data
assert.Equal(t, uint16(10), dataMsg.GetVersion())
dataSet := dataMsg.GetSet()
record := dataSet.GetRecords()[0]

expectedFields := append(write.IPv4IANAFields, write.KubeFields...)
expectedFields = append(expectedFields, write.CustomNetworkFields...)

for _, name := range expectedFields {
element, _, exist := record.GetInfoElementWithValue(name)
assert.Truef(t, exist, "element with name %s should exist in the record", name)
assert.NotNil(t, element)
matchElement(t, element, flow)
}
}

func matchElement(t *testing.T, element entities.InfoElementWithValue, flow config.GenericMap) {
switch element.GetName() {
case "sourceIPv4Address":
assert.Equal(t, flow["SrcAddr"], element.GetIPAddressValue().String())
case "destinationIPv4Address":
assert.Equal(t, flow["DstAddr"], element.GetIPAddressValue().String())
case "ethernetType":
assert.Equal(t, flow["Etype"], uint32(element.GetUnsigned16Value()))
case "flowDirection":
assert.Equal(t, flow["FlowDirection"], int(element.GetUnsigned8Value()))
case "protocolIdentifier":
assert.Equal(t, flow["Proto"], uint32(element.GetUnsigned8Value()))
case "sourceTransportPort":
assert.Equal(t, flow["SrcPort"], uint32(element.GetUnsigned16Value()))
case "destinationTransportPort":
assert.Equal(t, flow["DstPort"], uint32(element.GetUnsigned16Value()))
case "octetDeltaCount":
assert.Equal(t, flow["Bytes"], element.GetUnsigned64Value())
case "flowStartMilliseconds":
assert.Equal(t, flow["TimeFlowStartMs"], int64(element.GetUnsigned64Value()))
case "flowEndMilliseconds":
assert.Equal(t, flow["TimeFlowEndMs"], int64(element.GetUnsigned64Value()))
case "packetDeltaCount":
assert.Equal(t, flow["Packets"], element.GetUnsigned64Value())
case "interfaceName":
assert.Equal(t, flow["Interface"], element.GetStringValue())
case "sourcePodNamespace":
assert.Equal(t, flow["SrcK8S_Namespace"], element.GetStringValue())
case "sourcePodName":
assert.Equal(t, flow["SrcK8S_Name"], element.GetStringValue())
case "destinationPodNamespace":
assert.Equal(t, flow["DstK8S_Namespace"], element.GetStringValue())
case "destinationPodName":
assert.Equal(t, flow["DstK8S_Name"], element.GetStringValue())
case "sourceNodeName":
assert.Equal(t, flow["SrcK8S_HostName"], element.GetStringValue())
case "destinationNodeName":
assert.Equal(t, flow["DstK8S_HostName"], element.GetStringValue())
case "sourceMacAddress":
case "destinationMacAddress":
// Getting some discrepancies here, need to figure out why
default:
assert.Fail(t, "missing check on element", element.GetName())
}
}

func startCollector(t *testing.T) *collector.CollectingProcess {
address, err := net.ResolveUDPAddr("udp", "127.0.0.1:0")
require.NoError(t, err)

cp, err := collector.InitCollectingProcess(collector.CollectorInput{
Address: address.String(),
Protocol: address.Network(),
MaxBufferSize: 2048,
TemplateTTL: 0,
ServerCert: nil,
ServerKey: nil,
})
require.NoError(t, err)

go cp.Start()

// Wait for collector to be ready
checkConn := func(ctx context.Context) (bool, error) {
addr := cp.GetAddress()
if addr == nil || strings.HasSuffix(addr.String(), ":0") {
return false, fmt.Errorf("random port is not resolved")
}
conn, err := net.Dial(cp.GetAddress().Network(), cp.GetAddress().String())
if err != nil {
return false, err
}
conn.Close()
return true, nil
}
err = wait.PollUntilContextTimeout(context.TODO(), 100*time.Millisecond, 2*time.Second, false, checkConn)
require.NoError(t, err, "Connection timeout in collector setup")

return cp
}
Loading

0 comments on commit ba7ad1f

Please sign in to comment.