Skip to content

Commit

Permalink
Run tests using go-ipfix without -race flag
Browse files Browse the repository at this point in the history
The lib is not thread safe as it uses a shared global registry without
proper locks, e.g:

```
 WARNING: DATA RACE
Read at 0x000001fbbde8 by goroutine 28:
  github.com/vmware/go-ipfix/pkg/registry.GetInfoElementFromID()
      /home/runner/work/flowlogs-pipeline/flowlogs-pipeline/vendor/github.com/vmware/go-ipfix/pkg/registry/registry.go:112 +0x3e
  github.com/vmware/go-ipfix/pkg/collector.(*CollectingProcess).decodeTemplateSet()
      /home/runner/work/flowlogs-pipeline/flowlogs-pipeline/vendor/github.com/vmware/go-ipfix/pkg/collector/process.go:248 +0x3e8
  github.com/vmware/go-ipfix/pkg/collector.(*CollectingProcess).decodePacket()
      /home/runner/work/flowlogs-pipeline/flowlogs-pipeline/vendor/github.com/vmware/go-ipfix/pkg/collector/process.go:203 +0x9a4
  github.com/vmware/go-ipfix/pkg/collector.(*CollectingProcess).handleUDPClient.func1()
      /home/runner/work/flowlogs-pipeline/flowlogs-pipeline/vendor/github.com/vmware/go-ipfix/pkg/collector/udp.go:138 +0x28d

Previous write at 0x000001fbbde8 by goroutine 23:
  github.com/vmware/go-ipfix/pkg/registry.LoadRegistry()
      /home/runner/work/flowlogs-pipeline/flowlogs-pipeline/vendor/github.com/vmware/go-ipfix/pkg/registry/registry.go:97 +0x44
  github.com/netobserv/flowlogs-pipeline/pkg/pipeline/write.NewWriteIpfix()
      /home/runner/work/flowlogs-pipeline/flowlogs-pipeline/pkg/pipeline/write/write_ipfix.go:472 +0xaca
  github.com/netobserv/flowlogs-pipeline/pkg/pipeline/write.TestEnrichedIPFIXFlow()
      /home/runner/work/flowlogs-pipeline/flowlogs-pipeline/pkg/pipeline/write/write_ipfix_test.go:93 +0x4e4
```
  • Loading branch information
jotak committed Feb 22, 2024
1 parent 1ca2a78 commit 1fc980c
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 16 deletions.
4 changes: 3 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,10 @@ clean: ## Clean
TEST_OPTS := -race -coverpkg=./... -covermode=atomic -coverprofile=/tmp/coverage.out
.PHONY: tests-unit
tests-unit: validate_go ## Unit tests
# tests may rely on non-thread safe libs such as go-ipfix => no -race flag
go test $$(go list ./... | grep /testnorace)
# enabling CGO is required for -race flag
CGO_ENABLED=1 go test -p 1 $(TEST_OPTS) $$(go list ./... | grep -v /e2e)
CGO_ENABLED=1 go test -p 1 $(TEST_OPTS) $$(go list ./... | grep -v /e2e | grep -v /testnorace)

.PHONY: tests-fast
tests-fast: TEST_OPTS=
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package write
package testnorace

import (
"context"
Expand All @@ -10,6 +10,7 @@ import (

"github.com/netobserv/flowlogs-pipeline/pkg/api"
"github.com/netobserv/flowlogs-pipeline/pkg/config"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/write"
"github.com/netobserv/netobserv-ebpf-agent/pkg/decode"
"github.com/netobserv/netobserv-ebpf-agent/pkg/pbflow"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -90,7 +91,7 @@ func TestEnrichedIPFIXFlow(t *testing.T) {
flow["DstK8S_Namespace"] = "ns2"
flow["DstK8S_HostName"] = "node2"

writer, err := NewWriteIpfix(config.StageParam{
writer, err := write.NewWriteIpfix(config.StageParam{
Write: &config.Write{
Ipfix: &api.WriteIpfix{
TargetHost: addr.IP.String(),
Expand All @@ -117,8 +118,8 @@ func TestEnrichedIPFIXFlow(t *testing.T) {
dataSet := dataMsg.GetSet()
record := dataSet.GetRecords()[0]

expectedFields := append(ipv4IANAFields, kubeFields...)
expectedFields = append(expectedFields, customNetworkFields...)
expectedFields := append(write.IPv4IANAFields, write.KubeFields...)
expectedFields = append(expectedFields, write.CustomNetworkFields...)

for _, name := range expectedFields {
element, _, exist := record.GetInfoElementWithValue(name)
Expand Down
22 changes: 11 additions & 11 deletions pkg/pipeline/write/write_ipfix.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ const IPv6Type = 0x86DD

var (
ilog = logrus.WithField("component", "write.Ipfix")
ianaFields = []string{
IANAFields = []string{
"ethernetType",
"flowDirection",
"sourceMacAddress",
Expand All @@ -59,24 +59,24 @@ var (
"packetDeltaCount",
"interfaceName",
}
ipv4IANAFields = append([]string{
IPv4IANAFields = append([]string{
"sourceIPv4Address",
"destinationIPv4Address",
}, ianaFields...)
ipv6IANAFields = append([]string{
}, IANAFields...)
IPv6IANAFields = append([]string{
"sourceIPv6Address",
"destinationIPv6Address",
"nextHeaderIPv6",
}, ianaFields...)
kubeFields = []string{
}, IANAFields...)
KubeFields = []string{
"sourcePodNamespace",
"sourcePodName",
"destinationPodNamespace",
"destinationPodName",
"sourceNodeName",
"destinationNodeName",
}
customNetworkFields = []string{
CustomNetworkFields = []string{
// TODO
}
)
Expand All @@ -97,7 +97,7 @@ func addElementToTemplate(elementName string, value []byte, elements *[]entities
}

func addNetworkEnrichmentToTemplate(elements *[]entities.InfoElementWithValue, registryID uint32) error {
for _, field := range customNetworkFields {
for _, field := range CustomNetworkFields {
if err := addElementToTemplate(field, nil, elements, registryID); err != nil {
return err
}
Expand All @@ -106,7 +106,7 @@ func addNetworkEnrichmentToTemplate(elements *[]entities.InfoElementWithValue, r
}

func addKubeContextToTemplate(elements *[]entities.InfoElementWithValue, registryID uint32) error {
for _, field := range kubeFields {
for _, field := range KubeFields {
if err := addElementToTemplate(field, nil, elements, registryID); err != nil {
return err
}
Expand Down Expand Up @@ -163,7 +163,7 @@ func SendTemplateRecordv4(exporter *ipfixExporter.ExportingProcess, enrichEnterp
}
elements := make([]entities.InfoElementWithValue, 0)

for _, field := range ipv4IANAFields {
for _, field := range IPv4IANAFields {
err = addElementToTemplate(field, nil, &elements, registry.IANAEnterpriseID)
if err != nil {
return 0, nil, err
Expand Down Expand Up @@ -202,7 +202,7 @@ func SendTemplateRecordv6(exporter *ipfixExporter.ExportingProcess, enrichEnterp
}
elements := make([]entities.InfoElementWithValue, 0)

for _, field := range ipv6IANAFields {
for _, field := range IPv6IANAFields {
err = addElementToTemplate(field, nil, &elements, registry.IANAEnterpriseID)
if err != nil {
return 0, nil, err
Expand Down

0 comments on commit 1fc980c

Please sign in to comment.