Skip to content

Commit

Permalink
Avoid send on closed channel when emitting values after closing an M3…
Browse files Browse the repository at this point in the history
… reporter (#46)
  • Loading branch information
robskillington authored Jun 23, 2017
1 parent e9b6018 commit 88e9c75
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 1 deletion.
13 changes: 13 additions & 0 deletions m3/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,14 @@ func (r *reporter) reportCopyMetric(
copy.MetricValue.Timer = t
}

// NB(r): This is to avoid sending on a closed channel,
// it's faster to actually defer/recover than acquire
// a read lock here to ensure we aren't closed: benchmarked
// this with BenchmarkTimer in reporter_benchmark_test.go
defer func() {
recover()
}()

select {
case r.metCh <- sizedMetric{copy, size}:
default:
Expand All @@ -435,6 +443,11 @@ func (r *reporter) reportCopyMetric(

// Flush implements tally.CachedStatsReporter.
func (r *reporter) Flush() {
// Avoid send on a closed channel
defer func() {
recover()
}()

r.metCh <- sizedMetric{}
}

Expand Down
46 changes: 46 additions & 0 deletions m3/reporter_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ package m3

import (
"testing"
"time"

customtransport "github.com/uber-go/tally/m3/customtransports"
m3thrift "github.com/uber-go/tally/m3/thrift"

"github.com/apache/thrift/lib/go/thrift"
)
Expand All @@ -41,6 +45,8 @@ func BenchmarkNewMetric(b *testing.B) {
resourcePool := newResourcePool(protocolFactory)
benchReporter := &reporter{resourcePool: resourcePool}

b.ResetTimer()

for n := 0; n < b.N; n++ {
benchReporter.newMetric("foo", nil, counterType)
}
Expand All @@ -55,7 +61,47 @@ func BenchmarkCalulateSize(b *testing.B) {
met := benchReporter.newMetric("foo", nil, counterType)
met.MetricValue.Count.I64Value = &val

b.ResetTimer()

for n := 0; n < b.N; n++ {
benchReporter.calculateSize(met)
}
}

func BenchmarkTimer(b *testing.B) {
protocolFactory := thrift.NewTCompactProtocolFactory()
resourcePool := newResourcePool(protocolFactory)
tags := resourcePool.getTagList()
batch := resourcePool.getBatch()
batch.CommonTags = tags
batch.Metrics = []*m3thrift.Metric{}
proto := resourcePool.getProto()
batch.Write(proto)
calc := proto.Transport().(*customtransport.TCalcTransport)
calc.ResetCount()
benchReporter := &reporter{
calc: calc,
calcProto: proto,
resourcePool: resourcePool,
metCh: make(chan sizedMetric, DefaultMaxQueueSize),
}
// Close the met ch to end consume metrics loop
defer close(benchReporter.metCh)

go func() {
// Blindly consume metrics
for met := range benchReporter.metCh {
resourcePool.releaseShallowMetric(met.m)
}
}()

timer := benchReporter.AllocateTimer("foo", nil)

b.ResetTimer()

for n := 0; n < b.N; n++ {
timer.ReportTimer(time.Duration(n) * time.Millisecond)
}

b.StopTimer()
}
48 changes: 47 additions & 1 deletion m3/reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func TestReporter(t *testing.T) {
}
}

//Validate metrics
// Validate metrics
emittedCounters := batches[0].GetMetrics()
require.Equal(t, 1, len(emittedCounters))
emittedTimers := batches[1].GetMetrics()
Expand Down Expand Up @@ -222,6 +222,52 @@ func TestReporterFinalFlush(t *testing.T) {
require.Equal(t, 1, len(server.Service.getBatches()[0].GetMetrics()))
}

// TestReporterNoPanicOnTimerAfterClose ensure the reporter avoids panic
// after close of the reporter when emitting a timer value
func TestReporterNoPanicOnTimerAfterClose(t *testing.T) {
server := newFakeM3Server(t, &sync.WaitGroup{}, true, Compact)
go server.Serve()
defer server.Close()

r, err := NewReporter(Options{
HostPorts: []string{server.Addr},
Service: "test-service",
CommonTags: defaultCommonTags,
MaxQueueSize: queueSize,
MaxPacketSizeBytes: maxPacketSize,
})
require.NoError(t, err)

timer := r.AllocateTimer("my-timer", nil)
r.Close()

assert.NotPanics(t, func() {
timer.ReportTimer(time.Millisecond)
})
}

// TestReporterNoPanicOnFlushAfterClose ensure the reporter avoids panic
// after close of the reporter when calling flush
func TestReporterNoPanicOnFlushAfterClose(t *testing.T) {
server := newFakeM3Server(t, &sync.WaitGroup{}, true, Compact)
go server.Serve()
defer server.Close()

r, err := NewReporter(Options{
HostPorts: []string{server.Addr},
Service: "test-service",
CommonTags: defaultCommonTags,
MaxQueueSize: queueSize,
MaxPacketSizeBytes: maxPacketSize,
})
require.NoError(t, err)
r.Close()

assert.NotPanics(t, func() {
r.Flush()
})
}

func TestReporterHistogram(t *testing.T) {
var wg sync.WaitGroup
server := newFakeM3Server(t, &wg, true, Compact)
Expand Down

0 comments on commit 88e9c75

Please sign in to comment.