From 14cefba52453ba56630eeb8b9d41ba763c69aa8b Mon Sep 17 00:00:00 2001 From: Roman Date: Thu, 8 Nov 2018 21:13:50 +0800 Subject: [PATCH] fixed data race --- metric.go | 42 ++++++++++++++++++++++++++---------------- metric_test.go | 34 ++++++++++++++++++++++++++++++++++ 2 files changed, 60 insertions(+), 16 deletions(-) diff --git a/metric.go b/metric.go index 51d2c32..7222f34 100644 --- a/metric.go +++ b/metric.go @@ -37,7 +37,7 @@ type Measurer interface { // and a counter. type Metric struct { sync.Mutex - sample sample // The sample used to build a histogram + data sample // The sample used to build a histogram count int32 // The number of samples observed create int64 // The first updated time name string // The name of the metric @@ -52,7 +52,7 @@ const ( func NewMetric(name string) *Metric { return &Metric{ name: name, - sample: make([]int32, 0, reservoirSize), + data: make([]int32, reservoirSize, reservoirSize), create: time.Now().Unix(), } } @@ -62,8 +62,7 @@ func (m *Metric) Reset() { m.Lock() defer m.Unlock() - m.count = 0 - m.sample = m.sample[:0] + atomic.StoreInt32(&m.count, 0) m.create = time.Now().Unix() } @@ -84,6 +83,15 @@ func (m *Metric) Window() (time.Time, time.Time) { return time.Unix(m.create, 0), time.Now() } +// sample returns the usable sample +func (m *Metric) sample() sample { + count := m.count + if count > reservoirSize { + count = reservoirSize + } + return m.data[:count] +} + // Count returns the number of samples recorded, which may exceed the // reservoir size. func (m *Metric) Count() int { @@ -97,14 +105,14 @@ func (m *Metric) Count() int { func (m *Metric) Max() int { m.Lock() defer m.Unlock() - return m.sample.Max() + return m.sample().Max() } // Mean returns the mean of the values in the sample. func (m *Metric) Mean() float64 { m.Lock() defer m.Unlock() - return m.sample.Mean() + return m.sample().Mean() } // Min returns the minimum value in the sample, which may not be the minimum @@ -112,14 +120,14 @@ func (m *Metric) Mean() float64 { func (m *Metric) Min() int { m.Lock() defer m.Unlock() - return m.sample.Min() + return m.sample().Min() } // Quantile returns a slice of arbitrary quantiles of the sample. func (m *Metric) Quantile(quantiles ...float64) []float64 { m.Lock() defer m.Unlock() - return m.sample.Quantile(quantiles...) + return m.sample().Quantile(quantiles...) } // Snapshot returns a read-only copy of the sample. @@ -127,8 +135,10 @@ func (m *Metric) Snapshot() *Snapshot { m.Lock() defer m.Unlock() - dest := make([]int32, len(m.sample)) - copy(dest, m.sample) + // Snapshot the data + sample := m.sample() + dest := make([]int32, len(sample)) + copy(dest, sample) return &Snapshot{ Metric: m.name, Label: m.tag, @@ -143,14 +153,14 @@ func (m *Metric) Snapshot() *Snapshot { func (m *Metric) StdDev() float64 { m.Lock() defer m.Unlock() - return m.sample.StdDev() + return m.sample().StdDev() } // Variance returns the variance of the values in the sample. func (m *Metric) Variance() float64 { m.Lock() defer m.Unlock() - return m.sample.Variance() + return m.sample().Variance() } // Rate returns a operation per second rate since the creation of the metric. @@ -162,16 +172,16 @@ func (m *Metric) Rate() float64 { // Update samples a new value into the metric. func (m *Metric) Update(v int32) { count := atomic.AddInt32(&m.count, 1) - if count < reservoirSize { + if count <= reservoirSize { m.Lock() - m.sample = append(m.sample, v) + m.data[count-1] = v m.Unlock() return } - if r := int(rand.Int31n(count)); r < (reservoirSize - 1) { + if r := int(rand.Int31n(count)); r < reservoirSize { m.Lock() - m.sample[r] = v + m.data[r] = v m.Unlock() return } diff --git a/metric_test.go b/metric_test.go index 3f2a575..c07eaba 100644 --- a/metric_test.go +++ b/metric_test.go @@ -17,6 +17,7 @@ package stats import ( + "sync" "testing" "time" @@ -46,6 +47,29 @@ func BenchmarkMetricUpdate(b *testing.B) { } } +func TestMetricConcurrency(t *testing.T) { + h := NewMetric("x") + + var wg sync.WaitGroup + wg.Add(2) + + go func() { + for i := int32(0); i < 50000; i++ { + h.Update(i) + } + wg.Done() + }() + + go func() { + for i := int32(0); i < 50000; i++ { + h.Reset() + } + wg.Done() + }() + + wg.Wait() +} + func TestMetric(t *testing.T) { h := NewMetric("x") for i := int32(0); i < 100; i++ { @@ -74,3 +98,13 @@ func TestMetric(t *testing.T) { assert.Equal(t, 0, h.Count()) assert.Equal(t, 0, h.Max()) } + +func TestSampleClamp(t *testing.T) { + h := NewMetric("x") + for i := int32(0); i < 2000; i++ { + h.Update(i) + } + + sample := h.sample() + assert.Equal(t, reservoirSize, len(sample)) +}