Skip to content

Commit

Permalink
Merge branch 'main' into rachel.yang/http-net-error-codes
Browse files Browse the repository at this point in the history
  • Loading branch information
rachelyangdog authored Jan 2, 2025
2 parents 25c62bf + 4f57a47 commit 089eed1
Show file tree
Hide file tree
Showing 8 changed files with 155 additions and 18 deletions.
28 changes: 21 additions & 7 deletions ddtrace/tracer/abandonedspans.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
"sync/atomic"
"time"

"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/internal"
"gopkg.in/DataDog/dd-trace-go.v1/internal/log"
)

Expand Down Expand Up @@ -77,27 +79,36 @@ type abandonedSpanCandidate struct {
TraceID, SpanID uint64
Start int64
Finished bool
Integration string
}

func newAbandonedSpanCandidate(s *span, finished bool) *abandonedSpanCandidate {
var component string
if v, ok := s.Meta[ext.Component]; ok {
component = v
} else {
component = "manual"
}
// finished is explicit instead of implicit as s.finished may be not set
// at the moment of calling this method.
// Also, locking is not required as it's called while the span is already locked or it's
// being initialized.
return &abandonedSpanCandidate{
Name: s.Name,
TraceID: s.TraceID,
SpanID: s.SpanID,
Start: s.Start,
Finished: finished,
c := &abandonedSpanCandidate{
Name: s.Name,
TraceID: s.TraceID,
SpanID: s.SpanID,
Start: s.Start,
Finished: finished,
Integration: component,
}
return c
}

// String takes a span and returns a human-readable string representing that span.
func (s *abandonedSpanCandidate) String() string {
age := now() - s.Start
a := fmt.Sprintf("%d sec", age/1e9)
return fmt.Sprintf("[name: %s, span_id: %d, trace_id: %d, age: %s],", s.Name, s.SpanID, s.TraceID, a)
return fmt.Sprintf("[name: %s, integration: %s, span_id: %d, trace_id: %d, age: %s],", s.Name, s.Integration, s.SpanID, s.TraceID, a)
}

type abandonedSpansDebugger struct {
Expand Down Expand Up @@ -292,6 +303,9 @@ func formatAbandonedSpans(b *bucket[uint64, *abandonedSpanCandidate], interval *
if interval != nil && curTime-s.Start < interval.Nanoseconds() {
continue
}
if t, ok := internal.GetGlobalTracer().(*tracer); ok {
t.statsd.Incr("datadog.tracer.abandoned_spans", []string{"name:" + s.Name, "integration:" + s.Integration}, 1)
}
spanCount++
msg := s.String()
sb.WriteString(msg)
Expand Down
55 changes: 54 additions & 1 deletion ddtrace/tracer/abandonedspans_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ import (
"testing"
"time"

"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext"
"gopkg.in/DataDog/dd-trace-go.v1/internal/log"
"gopkg.in/DataDog/dd-trace-go.v1/internal/statsdtest"
"gopkg.in/DataDog/dd-trace-go.v1/internal/version"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -58,11 +60,62 @@ func assertProcessedSpans(assert *assert.Assertions, t *tracer, startedSpans, fi

func formatSpanString(s *span) string {
s.Lock()
msg := fmt.Sprintf("[name: %s, span_id: %d, trace_id: %d, age: %s],", s.Name, s.SpanID, s.TraceID, spanAge(s))
var integration string
if v, ok := s.Meta[ext.Component]; ok {
integration = v
} else {
integration = "manual"
}
msg := fmt.Sprintf("[name: %s, integration: %s, span_id: %d, trace_id: %d, age: %s],", s.Name, integration, s.SpanID, s.TraceID, spanAge(s))
s.Unlock()
return msg
}

func TestAbandonedSpansMetric(t *testing.T) {
assert := assert.New(t)
var tg statsdtest.TestStatsdClient
tp := new(log.RecordLogger)
tickerInterval = 100 * time.Millisecond
t.Run("finished", func(t *testing.T) {
tp.Reset()
tg.Reset()
defer setTestTime()()
tracer, _, _, stop := startTestTracer(t, WithLogger(tp), WithDebugSpansMode(500*time.Millisecond), withStatsdClient(&tg))
defer stop()
s := tracer.StartSpan("operation", StartTime(spanStart)).(*span)
s.Finish()
assertProcessedSpans(assert, tracer, 1, 1)
assert.Empty(tg.GetCallsByName("datadog.tracer.abandoned_spans"))
})
t.Run("open", func(t *testing.T) {
tp.Reset()
tg.Reset()
defer setTestTime()()
tracer, _, _, stop := startTestTracer(t, WithLogger(tp), WithDebugSpansMode(500*time.Millisecond), withStatsdClient(&tg))
defer stop()
tracer.StartSpan("operation", StartTime(spanStart), Tag(ext.Component, "some_integration_name"))
assertProcessedSpans(assert, tracer, 1, 0)
calls := tg.GetCallsByName("datadog.tracer.abandoned_spans")
assert.Len(calls, 1)
call := calls[0]
assert.Equal([]string{"name:operation", "integration:some_integration_name"}, call.Tags())
})
t.Run("both", func(t *testing.T) {
tp.Reset()
tg.Reset()
defer setTestTime()()
tracer, _, _, stop := startTestTracer(t, WithLogger(tp), WithDebugSpansMode(500*time.Millisecond), withStatsdClient(&tg))
defer stop()
sf := tracer.StartSpan("op", StartTime(spanStart)).(*span)
sf.Finish()
s := tracer.StartSpan("op2", StartTime(spanStart)).(*span)
assertProcessedSpans(assert, tracer, 2, 1)
calls := tg.GetCallsByName("datadog.tracer.abandoned_spans")
assert.Len(calls, 1)
s.Finish()
})
}

func TestReportAbandonedSpans(t *testing.T) {
assert := assert.New(t)
tp := new(log.RecordLogger)
Expand Down
4 changes: 3 additions & 1 deletion ddtrace/tracer/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,9 @@ func (t *tracer) reportRuntimeMetrics(interval time.Duration) {
}
}

func (t *tracer) reportHealthMetrics(interval time.Duration) {
// reportHealthMetricsAtInterval reports noisy health metrics at the specified interval.
// The periodic reporting ensures metrics are delivered without overwhelming the system or logs.
func (t *tracer) reportHealthMetricsAtInterval(interval time.Duration) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
Expand Down
28 changes: 26 additions & 2 deletions ddtrace/tracer/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func TestReportRuntimeMetrics(t *testing.T) {
assert.Contains(calls, "runtime.go.gc_stats.pause_quantiles.75p")
}

func TestReportHealthMetrics(t *testing.T) {
func TestReportHealthMetricsAtInterval(t *testing.T) {
assert := assert.New(t)
var tg statsdtest.TestStatsdClient

Expand All @@ -55,12 +55,36 @@ func TestReportHealthMetrics(t *testing.T) {

tracer.StartSpan("operation").Finish()
flush(1)
tg.Wait(assert, 3, 10*time.Second)
tg.Wait(assert, 4, 10*time.Second)

counts := tg.Counts()
assert.Equal(int64(1), counts["datadog.tracer.spans_started"])
assert.Equal(int64(1), counts["datadog.tracer.spans_finished"])
assert.Equal(int64(0), counts["datadog.tracer.traces_dropped"])
assert.Equal(int64(1), counts["datadog.tracer.queue.enqueued.traces"])
}

func TestEnqueuedTracesHealthMetric(t *testing.T) {
assert := assert.New(t)
var tg statsdtest.TestStatsdClient

defer func(old time.Duration) { statsInterval = old }(statsInterval)
statsInterval = time.Nanosecond

tracer, _, flush, stop := startTestTracer(t, withStatsdClient(&tg))
defer stop()

for i := 0; i < 3; i++ {
tracer.StartSpan("operation").Finish()
}
flush(3)
tg.Wait(assert, 1, 10*time.Second)

counts := tg.Counts()
assert.Equal(int64(3), counts["datadog.tracer.queue.enqueued.traces"])
w, ok := tracer.traceWriter.(*agentTraceWriter)
assert.True(ok)
assert.Equal(uint32(0), w.tracesQueued)
}

func TestTracerMetrics(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion ddtrace/tracer/tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ func newTracer(opts ...StartOption) *tracer {
t.wg.Add(1)
go func() {
defer t.wg.Done()
t.reportHealthMetrics(statsInterval)
t.reportHealthMetricsAtInterval(statsInterval)
}()
t.stats.Start()
return t
Expand Down
5 changes: 5 additions & 0 deletions ddtrace/tracer/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"os"
"strconv"
"sync"
"sync/atomic"
"time"

globalinternal "gopkg.in/DataDog/dd-trace-go.v1/internal"
Expand Down Expand Up @@ -50,6 +51,8 @@ type agentTraceWriter struct {

// statsd is used to send metrics
statsd globalinternal.StatsdClient

tracesQueued uint32
}

func newAgentTraceWriter(c *config, s *prioritySampler, statsdClient globalinternal.StatsdClient) *agentTraceWriter {
Expand All @@ -67,6 +70,7 @@ func (h *agentTraceWriter) add(trace []*span) {
h.statsd.Incr("datadog.tracer.traces_dropped", []string{"reason:encoding_error"}, 1)
log.Error("Error encoding msgpack: %v", err)
}
atomic.AddUint32(&h.tracesQueued, 1) // TODO: This does not differentiate between complete traces and partial chunks
if h.payload.size() > payloadSizeLimit {
h.statsd.Incr("datadog.tracer.flush_triggered", []string{"reason:size"}, 1)
h.flush()
Expand Down Expand Up @@ -94,6 +98,7 @@ func (h *agentTraceWriter) flush() {
// collection to avoid a memory leak when references to this object
// may still be kept by faulty transport implementations or the
// standard library. See dd-trace-go#976
h.statsd.Count("datadog.tracer.queue.enqueued.traces", int64(atomic.SwapUint32(&h.tracesQueued, 0)), nil, 1)
p.clear()

<-h.climit
Expand Down
10 changes: 6 additions & 4 deletions ddtrace/tracer/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,12 +386,14 @@ func TestTraceWriterFlushRetries(t *testing.T) {
}

sentCounts := map[string]int64{
"datadog.tracer.decode_error": 1,
"datadog.tracer.flush_bytes": 185,
"datadog.tracer.flush_traces": 1,
"datadog.tracer.decode_error": 1,
"datadog.tracer.flush_bytes": 185,
"datadog.tracer.flush_traces": 1,
"datadog.tracer.queue.enqueued.traces": 1,
}
droppedCounts := map[string]int64{
"datadog.tracer.traces_dropped": 1,
"datadog.tracer.queue.enqueued.traces": 1,
"datadog.tracer.traces_dropped": 1,
}

ss := []*span{makeSpan(0)}
Expand Down
41 changes: 39 additions & 2 deletions internal/statsdtest/statsdtest.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,16 @@ type TestStatsdCall struct {
rate float64
}

func (c *TestStatsdCall) Tags() []string {
return c.tags
func (t TestStatsdCall) Name() string {
return t.name
}

func (t TestStatsdCall) Tags() []string {
return t.tags
}

func (t TestStatsdCall) IntVal() int64 {
return t.intVal
}

func (tg *TestStatsdClient) addCount(name string, value int64) {
Expand Down Expand Up @@ -225,6 +233,35 @@ func (tg *TestStatsdClient) CallsByName() map[string]int {
return counts
}

// GetCallsByName returns a slice of TestStatsdCalls with the provided name on the TestStatsdClient
// It's useful if you want to use any TestStatsdCall method calls on the result(s)
func (tg *TestStatsdClient) GetCallsByName(name string) (calls []TestStatsdCall) {
tg.mu.RLock()
defer tg.mu.RUnlock()
for _, c := range tg.gaugeCalls {
if c.Name() == name {
calls = append(calls, c)
}
}
for _, c := range tg.incrCalls {
if c.Name() == name {
calls = append(calls, c)
}
}
for _, c := range tg.countCalls {
if c.Name() == name {
calls = append(calls, c)
}
}
for _, c := range tg.timingCalls {
if c.Name() == name {
calls = append(calls, c)
}
}
return calls
}

// FilterCallsByName returns a slice of TestStatsdCalls with the provided name, from the list of provided TestStatsdCalls
func FilterCallsByName(calls []TestStatsdCall, name string) []TestStatsdCall {
var matches []TestStatsdCall
for _, c := range calls {
Expand Down

0 comments on commit 089eed1

Please sign in to comment.