From 88e9c75b0cfc84139ad1bae3b7f123786cfd0770 Mon Sep 17 00:00:00 2001 From: Rob Skillington Date: Fri, 23 Jun 2017 09:27:35 -0400 Subject: [PATCH] Avoid send on closed channel when emitting values after closing an M3 reporter (#46) --- m3/reporter.go | 13 ++++++++++ m3/reporter_benchmark_test.go | 46 +++++++++++++++++++++++++++++++++ m3/reporter_test.go | 48 ++++++++++++++++++++++++++++++++++- 3 files changed, 106 insertions(+), 1 deletion(-) diff --git a/m3/reporter.go b/m3/reporter.go index 0c72e21a..fc513c40 100644 --- a/m3/reporter.go +++ b/m3/reporter.go @@ -427,6 +427,14 @@ func (r *reporter) reportCopyMetric( copy.MetricValue.Timer = t } + // NB(r): This is to avoid sending on a closed channel, + // it's faster to actually defer/recover than acquire + // a read lock here to ensure we aren't closed: benchmarked + // this with BenchmarkTimer in reporter_benchmark_test.go + defer func() { + recover() + }() + select { case r.metCh <- sizedMetric{copy, size}: default: @@ -435,6 +443,11 @@ func (r *reporter) reportCopyMetric( // Flush implements tally.CachedStatsReporter. func (r *reporter) Flush() { + // Avoid send on a closed channel + defer func() { + recover() + }() + r.metCh <- sizedMetric{} } diff --git a/m3/reporter_benchmark_test.go b/m3/reporter_benchmark_test.go index 89dbeffe..a19b6689 100644 --- a/m3/reporter_benchmark_test.go +++ b/m3/reporter_benchmark_test.go @@ -22,6 +22,10 @@ package m3 import ( "testing" + "time" + + customtransport "github.com/uber-go/tally/m3/customtransports" + m3thrift "github.com/uber-go/tally/m3/thrift" "github.com/apache/thrift/lib/go/thrift" ) @@ -41,6 +45,8 @@ func BenchmarkNewMetric(b *testing.B) { resourcePool := newResourcePool(protocolFactory) benchReporter := &reporter{resourcePool: resourcePool} + b.ResetTimer() + for n := 0; n < b.N; n++ { benchReporter.newMetric("foo", nil, counterType) } @@ -55,7 +61,47 @@ func BenchmarkCalulateSize(b *testing.B) { met := benchReporter.newMetric("foo", nil, counterType) met.MetricValue.Count.I64Value = &val + b.ResetTimer() + for n := 0; n < b.N; n++ { benchReporter.calculateSize(met) } } + +func BenchmarkTimer(b *testing.B) { + protocolFactory := thrift.NewTCompactProtocolFactory() + resourcePool := newResourcePool(protocolFactory) + tags := resourcePool.getTagList() + batch := resourcePool.getBatch() + batch.CommonTags = tags + batch.Metrics = []*m3thrift.Metric{} + proto := resourcePool.getProto() + batch.Write(proto) + calc := proto.Transport().(*customtransport.TCalcTransport) + calc.ResetCount() + benchReporter := &reporter{ + calc: calc, + calcProto: proto, + resourcePool: resourcePool, + metCh: make(chan sizedMetric, DefaultMaxQueueSize), + } + // Close the met ch to end consume metrics loop + defer close(benchReporter.metCh) + + go func() { + // Blindly consume metrics + for met := range benchReporter.metCh { + resourcePool.releaseShallowMetric(met.m) + } + }() + + timer := benchReporter.AllocateTimer("foo", nil) + + b.ResetTimer() + + for n := 0; n < b.N; n++ { + timer.ReportTimer(time.Duration(n) * time.Millisecond) + } + + b.StopTimer() +} diff --git a/m3/reporter_test.go b/m3/reporter_test.go index e5042400..d421cfb5 100644 --- a/m3/reporter_test.go +++ b/m3/reporter_test.go @@ -112,7 +112,7 @@ func TestReporter(t *testing.T) { } } - //Validate metrics + // Validate metrics emittedCounters := batches[0].GetMetrics() require.Equal(t, 1, len(emittedCounters)) emittedTimers := batches[1].GetMetrics() @@ -222,6 +222,52 @@ func TestReporterFinalFlush(t *testing.T) { require.Equal(t, 1, len(server.Service.getBatches()[0].GetMetrics())) } +// TestReporterNoPanicOnTimerAfterClose ensure the reporter avoids panic +// after close of the reporter when emitting a timer value +func TestReporterNoPanicOnTimerAfterClose(t *testing.T) { + server := newFakeM3Server(t, &sync.WaitGroup{}, true, Compact) + go server.Serve() + defer server.Close() + + r, err := NewReporter(Options{ + HostPorts: []string{server.Addr}, + Service: "test-service", + CommonTags: defaultCommonTags, + MaxQueueSize: queueSize, + MaxPacketSizeBytes: maxPacketSize, + }) + require.NoError(t, err) + + timer := r.AllocateTimer("my-timer", nil) + r.Close() + + assert.NotPanics(t, func() { + timer.ReportTimer(time.Millisecond) + }) +} + +// TestReporterNoPanicOnFlushAfterClose ensure the reporter avoids panic +// after close of the reporter when calling flush +func TestReporterNoPanicOnFlushAfterClose(t *testing.T) { + server := newFakeM3Server(t, &sync.WaitGroup{}, true, Compact) + go server.Serve() + defer server.Close() + + r, err := NewReporter(Options{ + HostPorts: []string{server.Addr}, + Service: "test-service", + CommonTags: defaultCommonTags, + MaxQueueSize: queueSize, + MaxPacketSizeBytes: maxPacketSize, + }) + require.NoError(t, err) + r.Close() + + assert.NotPanics(t, func() { + r.Flush() + }) +} + func TestReporterHistogram(t *testing.T) { var wg sync.WaitGroup server := newFakeM3Server(t, &wg, true, Compact)