From d39332478672b57617dafc315fc258e888067143 Mon Sep 17 00:00:00 2001 From: Mikayla Toffler <46911781+mtoffl01@users.noreply.github.com> Date: Fri, 20 Dec 2024 10:49:22 -0500 Subject: [PATCH 1/3] ddtrace/tracer: Report datadog.tracer.queue.enqueued.traces as health metric (#3019) --- ddtrace/tracer/metrics_test.go | 26 +++++++++++++++++++++++++- ddtrace/tracer/writer.go | 5 +++++ ddtrace/tracer/writer_test.go | 10 ++++++---- 3 files changed, 36 insertions(+), 5 deletions(-) diff --git a/ddtrace/tracer/metrics_test.go b/ddtrace/tracer/metrics_test.go index 4aad5e373c..64548fce86 100644 --- a/ddtrace/tracer/metrics_test.go +++ b/ddtrace/tracer/metrics_test.go @@ -55,12 +55,36 @@ func TestReportHealthMetrics(t *testing.T) { tracer.StartSpan("operation").Finish() flush(1) - tg.Wait(assert, 3, 10*time.Second) + tg.Wait(assert, 4, 10*time.Second) counts := tg.Counts() assert.Equal(int64(1), counts["datadog.tracer.spans_started"]) assert.Equal(int64(1), counts["datadog.tracer.spans_finished"]) assert.Equal(int64(0), counts["datadog.tracer.traces_dropped"]) + assert.Equal(int64(1), counts["datadog.tracer.queue.enqueued.traces"]) +} + +func TestEnqueuedTracesHealthMetric(t *testing.T) { + assert := assert.New(t) + var tg statsdtest.TestStatsdClient + + defer func(old time.Duration) { statsInterval = old }(statsInterval) + statsInterval = time.Nanosecond + + tracer, _, flush, stop := startTestTracer(t, withStatsdClient(&tg)) + defer stop() + + for i := 0; i < 3; i++ { + tracer.StartSpan("operation").Finish() + } + flush(3) + tg.Wait(assert, 1, 10*time.Second) + + counts := tg.Counts() + assert.Equal(int64(3), counts["datadog.tracer.queue.enqueued.traces"]) + w, ok := tracer.traceWriter.(*agentTraceWriter) + assert.True(ok) + assert.Equal(uint32(0), w.tracesQueued) } func TestTracerMetrics(t *testing.T) { diff --git a/ddtrace/tracer/writer.go b/ddtrace/tracer/writer.go index 5ceadcb2e6..cff333c8c4 100644 --- a/ddtrace/tracer/writer.go +++ b/ddtrace/tracer/writer.go @@ -14,6 +14,7 @@ import ( "os" "strconv" "sync" + "sync/atomic" "time" globalinternal "gopkg.in/DataDog/dd-trace-go.v1/internal" @@ -50,6 +51,8 @@ type agentTraceWriter struct { // statsd is used to send metrics statsd globalinternal.StatsdClient + + tracesQueued uint32 } func newAgentTraceWriter(c *config, s *prioritySampler, statsdClient globalinternal.StatsdClient) *agentTraceWriter { @@ -67,6 +70,7 @@ func (h *agentTraceWriter) add(trace []*span) { h.statsd.Incr("datadog.tracer.traces_dropped", []string{"reason:encoding_error"}, 1) log.Error("Error encoding msgpack: %v", err) } + atomic.AddUint32(&h.tracesQueued, 1) // TODO: This does not differentiate between complete traces and partial chunks if h.payload.size() > payloadSizeLimit { h.statsd.Incr("datadog.tracer.flush_triggered", []string{"reason:size"}, 1) h.flush() @@ -94,6 +98,7 @@ func (h *agentTraceWriter) flush() { // collection to avoid a memory leak when references to this object // may still be kept by faulty transport implementations or the // standard library. See dd-trace-go#976 + h.statsd.Count("datadog.tracer.queue.enqueued.traces", int64(atomic.SwapUint32(&h.tracesQueued, 0)), nil, 1) p.clear() <-h.climit diff --git a/ddtrace/tracer/writer_test.go b/ddtrace/tracer/writer_test.go index bcdf529a58..224ae2a1ca 100644 --- a/ddtrace/tracer/writer_test.go +++ b/ddtrace/tracer/writer_test.go @@ -386,12 +386,14 @@ func TestTraceWriterFlushRetries(t *testing.T) { } sentCounts := map[string]int64{ - "datadog.tracer.decode_error": 1, - "datadog.tracer.flush_bytes": 185, - "datadog.tracer.flush_traces": 1, + "datadog.tracer.decode_error": 1, + "datadog.tracer.flush_bytes": 185, + "datadog.tracer.flush_traces": 1, + "datadog.tracer.queue.enqueued.traces": 1, } droppedCounts := map[string]int64{ - "datadog.tracer.traces_dropped": 1, + "datadog.tracer.queue.enqueued.traces": 1, + "datadog.tracer.traces_dropped": 1, } ss := []*span{makeSpan(0)} From 3961f7fb1c2f2b1d170db287d092e8ef9ffe7271 Mon Sep 17 00:00:00 2001 From: Mikayla Toffler <46911781+mtoffl01@users.noreply.github.com> Date: Mon, 23 Dec 2024 14:49:50 -0500 Subject: [PATCH 2/3] chore: rename reportHealthMetrics function (#3040) --- ddtrace/tracer/metrics.go | 4 +++- ddtrace/tracer/metrics_test.go | 2 +- ddtrace/tracer/tracer.go | 2 +- 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/ddtrace/tracer/metrics.go b/ddtrace/tracer/metrics.go index 409d8a439a..9cd17a968b 100644 --- a/ddtrace/tracer/metrics.go +++ b/ddtrace/tracer/metrics.go @@ -85,7 +85,9 @@ func (t *tracer) reportRuntimeMetrics(interval time.Duration) { } } -func (t *tracer) reportHealthMetrics(interval time.Duration) { +// reportHealthMetricsAtInterval reports noisy health metrics at the specified interval. +// The periodic reporting ensures metrics are delivered without overwhelming the system or logs. +func (t *tracer) reportHealthMetricsAtInterval(interval time.Duration) { ticker := time.NewTicker(interval) defer ticker.Stop() for { diff --git a/ddtrace/tracer/metrics_test.go b/ddtrace/tracer/metrics_test.go index 64548fce86..16a40e8492 100644 --- a/ddtrace/tracer/metrics_test.go +++ b/ddtrace/tracer/metrics_test.go @@ -43,7 +43,7 @@ func TestReportRuntimeMetrics(t *testing.T) { assert.Contains(calls, "runtime.go.gc_stats.pause_quantiles.75p") } -func TestReportHealthMetrics(t *testing.T) { +func TestReportHealthMetricsAtInterval(t *testing.T) { assert := assert.New(t) var tg statsdtest.TestStatsdClient diff --git a/ddtrace/tracer/tracer.go b/ddtrace/tracer/tracer.go index a345fd6366..25e08b7cb3 100644 --- a/ddtrace/tracer/tracer.go +++ b/ddtrace/tracer/tracer.go @@ -376,7 +376,7 @@ func newTracer(opts ...StartOption) *tracer { t.wg.Add(1) go func() { defer t.wg.Done() - t.reportHealthMetrics(statsInterval) + t.reportHealthMetricsAtInterval(statsInterval) }() t.stats.Start() return t From 4f57a472cc1b5877db967ff5d157231dcfdd82ef Mon Sep 17 00:00:00 2001 From: Mikayla Toffler <46911781+mtoffl01@users.noreply.github.com> Date: Mon, 23 Dec 2024 14:58:04 -0500 Subject: [PATCH 3/3] Report datadog.tracer.abandoned_spans health metric (#3032) --- ddtrace/tracer/abandonedspans.go | 28 ++++++++++---- ddtrace/tracer/abandonedspans_test.go | 55 ++++++++++++++++++++++++++- internal/statsdtest/statsdtest.go | 41 +++++++++++++++++++- 3 files changed, 114 insertions(+), 10 deletions(-) diff --git a/ddtrace/tracer/abandonedspans.go b/ddtrace/tracer/abandonedspans.go index defad41831..9ccf1ca045 100644 --- a/ddtrace/tracer/abandonedspans.go +++ b/ddtrace/tracer/abandonedspans.go @@ -14,6 +14,8 @@ import ( "sync/atomic" "time" + "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext" + "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/internal" "gopkg.in/DataDog/dd-trace-go.v1/internal/log" ) @@ -77,27 +79,36 @@ type abandonedSpanCandidate struct { TraceID, SpanID uint64 Start int64 Finished bool + Integration string } func newAbandonedSpanCandidate(s *span, finished bool) *abandonedSpanCandidate { + var component string + if v, ok := s.Meta[ext.Component]; ok { + component = v + } else { + component = "manual" + } // finished is explicit instead of implicit as s.finished may be not set // at the moment of calling this method. // Also, locking is not required as it's called while the span is already locked or it's // being initialized. - return &abandonedSpanCandidate{ - Name: s.Name, - TraceID: s.TraceID, - SpanID: s.SpanID, - Start: s.Start, - Finished: finished, + c := &abandonedSpanCandidate{ + Name: s.Name, + TraceID: s.TraceID, + SpanID: s.SpanID, + Start: s.Start, + Finished: finished, + Integration: component, } + return c } // String takes a span and returns a human-readable string representing that span. func (s *abandonedSpanCandidate) String() string { age := now() - s.Start a := fmt.Sprintf("%d sec", age/1e9) - return fmt.Sprintf("[name: %s, span_id: %d, trace_id: %d, age: %s],", s.Name, s.SpanID, s.TraceID, a) + return fmt.Sprintf("[name: %s, integration: %s, span_id: %d, trace_id: %d, age: %s],", s.Name, s.Integration, s.SpanID, s.TraceID, a) } type abandonedSpansDebugger struct { @@ -292,6 +303,9 @@ func formatAbandonedSpans(b *bucket[uint64, *abandonedSpanCandidate], interval * if interval != nil && curTime-s.Start < interval.Nanoseconds() { continue } + if t, ok := internal.GetGlobalTracer().(*tracer); ok { + t.statsd.Incr("datadog.tracer.abandoned_spans", []string{"name:" + s.Name, "integration:" + s.Integration}, 1) + } spanCount++ msg := s.String() sb.WriteString(msg) diff --git a/ddtrace/tracer/abandonedspans_test.go b/ddtrace/tracer/abandonedspans_test.go index 745f03f222..a929961df2 100644 --- a/ddtrace/tracer/abandonedspans_test.go +++ b/ddtrace/tracer/abandonedspans_test.go @@ -12,7 +12,9 @@ import ( "testing" "time" + "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext" "gopkg.in/DataDog/dd-trace-go.v1/internal/log" + "gopkg.in/DataDog/dd-trace-go.v1/internal/statsdtest" "gopkg.in/DataDog/dd-trace-go.v1/internal/version" "github.com/stretchr/testify/assert" @@ -58,11 +60,62 @@ func assertProcessedSpans(assert *assert.Assertions, t *tracer, startedSpans, fi func formatSpanString(s *span) string { s.Lock() - msg := fmt.Sprintf("[name: %s, span_id: %d, trace_id: %d, age: %s],", s.Name, s.SpanID, s.TraceID, spanAge(s)) + var integration string + if v, ok := s.Meta[ext.Component]; ok { + integration = v + } else { + integration = "manual" + } + msg := fmt.Sprintf("[name: %s, integration: %s, span_id: %d, trace_id: %d, age: %s],", s.Name, integration, s.SpanID, s.TraceID, spanAge(s)) s.Unlock() return msg } +func TestAbandonedSpansMetric(t *testing.T) { + assert := assert.New(t) + var tg statsdtest.TestStatsdClient + tp := new(log.RecordLogger) + tickerInterval = 100 * time.Millisecond + t.Run("finished", func(t *testing.T) { + tp.Reset() + tg.Reset() + defer setTestTime()() + tracer, _, _, stop := startTestTracer(t, WithLogger(tp), WithDebugSpansMode(500*time.Millisecond), withStatsdClient(&tg)) + defer stop() + s := tracer.StartSpan("operation", StartTime(spanStart)).(*span) + s.Finish() + assertProcessedSpans(assert, tracer, 1, 1) + assert.Empty(tg.GetCallsByName("datadog.tracer.abandoned_spans")) + }) + t.Run("open", func(t *testing.T) { + tp.Reset() + tg.Reset() + defer setTestTime()() + tracer, _, _, stop := startTestTracer(t, WithLogger(tp), WithDebugSpansMode(500*time.Millisecond), withStatsdClient(&tg)) + defer stop() + tracer.StartSpan("operation", StartTime(spanStart), Tag(ext.Component, "some_integration_name")) + assertProcessedSpans(assert, tracer, 1, 0) + calls := tg.GetCallsByName("datadog.tracer.abandoned_spans") + assert.Len(calls, 1) + call := calls[0] + assert.Equal([]string{"name:operation", "integration:some_integration_name"}, call.Tags()) + }) + t.Run("both", func(t *testing.T) { + tp.Reset() + tg.Reset() + defer setTestTime()() + tracer, _, _, stop := startTestTracer(t, WithLogger(tp), WithDebugSpansMode(500*time.Millisecond), withStatsdClient(&tg)) + defer stop() + sf := tracer.StartSpan("op", StartTime(spanStart)).(*span) + sf.Finish() + s := tracer.StartSpan("op2", StartTime(spanStart)).(*span) + assertProcessedSpans(assert, tracer, 2, 1) + calls := tg.GetCallsByName("datadog.tracer.abandoned_spans") + assert.Len(calls, 1) + s.Finish() + }) +} + func TestReportAbandonedSpans(t *testing.T) { assert := assert.New(t) tp := new(log.RecordLogger) diff --git a/internal/statsdtest/statsdtest.go b/internal/statsdtest/statsdtest.go index bed3646fbc..e31cdb4a9f 100644 --- a/internal/statsdtest/statsdtest.go +++ b/internal/statsdtest/statsdtest.go @@ -49,8 +49,16 @@ type TestStatsdCall struct { rate float64 } -func (c *TestStatsdCall) Tags() []string { - return c.tags +func (t TestStatsdCall) Name() string { + return t.name +} + +func (t TestStatsdCall) Tags() []string { + return t.tags +} + +func (t TestStatsdCall) IntVal() int64 { + return t.intVal } func (tg *TestStatsdClient) addCount(name string, value int64) { @@ -225,6 +233,35 @@ func (tg *TestStatsdClient) CallsByName() map[string]int { return counts } +// GetCallsByName returns a slice of TestStatsdCalls with the provided name on the TestStatsdClient +// It's useful if you want to use any TestStatsdCall method calls on the result(s) +func (tg *TestStatsdClient) GetCallsByName(name string) (calls []TestStatsdCall) { + tg.mu.RLock() + defer tg.mu.RUnlock() + for _, c := range tg.gaugeCalls { + if c.Name() == name { + calls = append(calls, c) + } + } + for _, c := range tg.incrCalls { + if c.Name() == name { + calls = append(calls, c) + } + } + for _, c := range tg.countCalls { + if c.Name() == name { + calls = append(calls, c) + } + } + for _, c := range tg.timingCalls { + if c.Name() == name { + calls = append(calls, c) + } + } + return calls +} + +// FilterCallsByName returns a slice of TestStatsdCalls with the provided name, from the list of provided TestStatsdCalls func FilterCallsByName(calls []TestStatsdCall, name string) []TestStatsdCall { var matches []TestStatsdCall for _, c := range calls {