Skip to content

Commit

Permalink
[connector/datadog] Fix data race (open-telemetry#31921)
Browse files Browse the repository at this point in the history
**Description:** <Describe what has changed.>
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->
Fixes data race issue introduced in the prior PR for container Tags. 

open-telemetry#31642

**Link to tracking Issue:** <Issue number if applicable>

**Testing:** <Describe what testing was performed and which tests were
added.>

**Documentation:** <Describe the documentation added.>
  • Loading branch information
dineshg13 authored Mar 25, 2024
1 parent 2eff581 commit 4900c42
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 16 deletions.
29 changes: 18 additions & 11 deletions connector/datadogconnector/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package datadogconnector // import "github.com/open-telemetry/opentelemetry-coll
import (
"context"
"fmt"
"sync"
"time"

pb "github.com/DataDog/datadog-agent/pkg/proto/pbgo/trace"
Expand Down Expand Up @@ -156,13 +157,17 @@ func (c *traceToMetricConnector) populateContainerTagsCache(traces ptrace.Traces
ddContainerTags := attributes.ContainerTagsFromResourceAttributes(attrs)
for attr := range c.resourceAttrs {
if val, ok := ddContainerTags[attr]; ok {
var cacheVal map[string]struct{}
key := fmt.Sprintf("%s:%s", attr, val)
if v, ok := c.containerTagCache.Get(containerID.AsString()); ok {
cacheVal = v.(map[string]struct{})
cacheVal[fmt.Sprintf("%s:%s", attr, val)] = struct{}{}
cacheVal := v.(*sync.Map)
// check if the key already exists in the cache
if _, ok := cacheVal.Load(key); ok {
continue
}
cacheVal.Store(key, struct{}{})
} else {
cacheVal = make(map[string]struct{})
cacheVal[fmt.Sprintf("%s:%s", attr, val)] = struct{}{}
cacheVal := &sync.Map{}
cacheVal.Store(key, struct{}{})
c.containerTagCache.Set(containerID.AsString(), cacheVal, cache.DefaultExpiration)
}

Expand All @@ -181,14 +186,16 @@ func (c *traceToMetricConnector) enrichStatsPayload(stats *pb.StatsPayload) {
for _, stat := range stats.Stats {
if stat.ContainerID != "" {
if tags, ok := c.containerTagCache.Get(stat.ContainerID); ok {
tagList := tags.(map[string]struct{})

tagList := tags.(*sync.Map)
for _, tag := range stat.Tags {
tagList[tag] = struct{}{}
}
stat.Tags = make([]string, 0, len(tagList))
for tag := range tagList {
stat.Tags = append(stat.Tags, tag)
tagList.Store(tag, struct{}{})
}
stat.Tags = make([]string, 0)
tagList.Range(func(key, value any) bool {
stat.Tags = append(stat.Tags, key.(string))
return true
})
}
}
}
Expand Down
63 changes: 58 additions & 5 deletions connector/datadogconnector/connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package datadogconnector

import (
"context"
"sync"
"testing"
"time"

Expand All @@ -19,7 +20,7 @@ import (
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/ptrace"
semconv "go.opentelemetry.io/collector/semconv/v1.17.0"
semconv "go.opentelemetry.io/collector/semconv/v1.5.0"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
)
Expand Down Expand Up @@ -97,19 +98,26 @@ func fillSpanOne(span ptrace.Span) {
status.SetMessage("status-cancelled")
}

func TestContainerTags(t *testing.T) {
func creteConnector(t *testing.T) (*traceToMetricConnector, *consumertest.MetricsSink) {
factory := NewFactory()

creationParams := connectortest.NewNopCreateSettings()
cfg := factory.CreateDefaultConfig().(*Config)
cfg.Traces.ResourceAttributesAsContainerTags = []string{semconv.AttributeCloudAvailabilityZone, semconv.AttributeCloudRegion}

metricsSink := &consumertest.MetricsSink{}

traceToMetricsConnector, err := factory.CreateTracesToMetrics(context.Background(), creationParams, cfg, metricsSink)
assert.NoError(t, err)

connector, ok := traceToMetricsConnector.(*traceToMetricConnector)
err = connector.Start(context.Background(), componenttest.NewNopHost())
require.True(t, ok)
return connector, metricsSink
}

func TestContainerTags(t *testing.T) {
connector, metricsSink := creteConnector(t)
err := connector.Start(context.Background(), componenttest.NewNopHost())
if err != nil {
t.Errorf("Error starting connector: %v", err)
return
Expand All @@ -118,7 +126,6 @@ func TestContainerTags(t *testing.T) {
_ = connector.Shutdown(context.Background())
}()

assert.True(t, ok) // checks if the created connector implements the connectorImp struct
trace1 := generateTrace()

err = connector.ConsumeTraces(context.Background(), trace1)
Expand All @@ -130,7 +137,12 @@ func TestContainerTags(t *testing.T) {
assert.NoError(t, err)
// check if the container tags are added to the cache
assert.Equal(t, 1, len(connector.containerTagCache.Items()))
assert.Equal(t, 2, len(connector.containerTagCache.Items()["my-container-id"].Object.(map[string]struct{})))
count := 0
connector.containerTagCache.Items()["my-container-id"].Object.(*sync.Map).Range(func(key, value any) bool {
count++
return true
})
assert.Equal(t, 2, count)

for {
if len(metricsSink.AllMetrics()) > 0 {
Expand Down Expand Up @@ -181,3 +193,44 @@ func newTranslatorWithStatsChannel(t *testing.T, logger *zap.Logger, ch chan []b
require.NoError(t, err)
return tr
}

func TestDataRace(t *testing.T) {
connector, _ := creteConnector(t)
trace1 := generateTrace()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
default:
connector.populateContainerTagsCache(trace1)
}
}
}()

wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
default:
sp := &pb.StatsPayload{
Stats: []*pb.ClientStatsPayload{
{
ContainerID: "my-container-id",
},
},
}
connector.enrichStatsPayload(sp)
}
}
}()
wg.Wait()
}

0 comments on commit 4900c42

Please sign in to comment.