Skip to content

Commit

Permalink
fixed data race
Browse files Browse the repository at this point in the history
  • Loading branch information
kelindar committed Nov 8, 2018
1 parent 56e0ecf commit 14cefba
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 16 deletions.
42 changes: 26 additions & 16 deletions metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type Measurer interface {
// and a counter.
type Metric struct {
sync.Mutex
sample sample // The sample used to build a histogram
data sample // The sample used to build a histogram
count int32 // The number of samples observed
create int64 // The first updated time
name string // The name of the metric
Expand All @@ -52,7 +52,7 @@ const (
func NewMetric(name string) *Metric {
return &Metric{
name: name,
sample: make([]int32, 0, reservoirSize),
data: make([]int32, reservoirSize, reservoirSize),
create: time.Now().Unix(),
}
}
Expand All @@ -62,8 +62,7 @@ func (m *Metric) Reset() {
m.Lock()
defer m.Unlock()

m.count = 0
m.sample = m.sample[:0]
atomic.StoreInt32(&m.count, 0)
m.create = time.Now().Unix()
}

Expand All @@ -84,6 +83,15 @@ func (m *Metric) Window() (time.Time, time.Time) {
return time.Unix(m.create, 0), time.Now()
}

// sample returns the usable sample
func (m *Metric) sample() sample {
count := m.count
if count > reservoirSize {
count = reservoirSize
}
return m.data[:count]
}

// Count returns the number of samples recorded, which may exceed the
// reservoir size.
func (m *Metric) Count() int {
Expand All @@ -97,38 +105,40 @@ func (m *Metric) Count() int {
func (m *Metric) Max() int {
m.Lock()
defer m.Unlock()
return m.sample.Max()
return m.sample().Max()
}

// Mean returns the mean of the values in the sample.
func (m *Metric) Mean() float64 {
m.Lock()
defer m.Unlock()
return m.sample.Mean()
return m.sample().Mean()
}

// Min returns the minimum value in the sample, which may not be the minimum
// value ever to be part of the sample.
func (m *Metric) Min() int {
m.Lock()
defer m.Unlock()
return m.sample.Min()
return m.sample().Min()
}

// Quantile returns a slice of arbitrary quantiles of the sample.
func (m *Metric) Quantile(quantiles ...float64) []float64 {
m.Lock()
defer m.Unlock()
return m.sample.Quantile(quantiles...)
return m.sample().Quantile(quantiles...)
}

// Snapshot returns a read-only copy of the sample.
func (m *Metric) Snapshot() *Snapshot {
m.Lock()
defer m.Unlock()

dest := make([]int32, len(m.sample))
copy(dest, m.sample)
// Snapshot the data
sample := m.sample()
dest := make([]int32, len(sample))
copy(dest, sample)
return &Snapshot{
Metric: m.name,
Label: m.tag,
Expand All @@ -143,14 +153,14 @@ func (m *Metric) Snapshot() *Snapshot {
func (m *Metric) StdDev() float64 {
m.Lock()
defer m.Unlock()
return m.sample.StdDev()
return m.sample().StdDev()
}

// Variance returns the variance of the values in the sample.
func (m *Metric) Variance() float64 {
m.Lock()
defer m.Unlock()
return m.sample.Variance()
return m.sample().Variance()
}

// Rate returns a operation per second rate since the creation of the metric.
Expand All @@ -162,16 +172,16 @@ func (m *Metric) Rate() float64 {
// Update samples a new value into the metric.
func (m *Metric) Update(v int32) {
count := atomic.AddInt32(&m.count, 1)
if count < reservoirSize {
if count <= reservoirSize {
m.Lock()
m.sample = append(m.sample, v)
m.data[count-1] = v
m.Unlock()
return
}

if r := int(rand.Int31n(count)); r < (reservoirSize - 1) {
if r := int(rand.Int31n(count)); r < reservoirSize {
m.Lock()
m.sample[r] = v
m.data[r] = v
m.Unlock()
return
}
Expand Down
34 changes: 34 additions & 0 deletions metric_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package stats

import (
"sync"
"testing"
"time"

Expand Down Expand Up @@ -46,6 +47,29 @@ func BenchmarkMetricUpdate(b *testing.B) {
}
}

func TestMetricConcurrency(t *testing.T) {
h := NewMetric("x")

var wg sync.WaitGroup
wg.Add(2)

go func() {
for i := int32(0); i < 50000; i++ {
h.Update(i)
}
wg.Done()
}()

go func() {
for i := int32(0); i < 50000; i++ {
h.Reset()
}
wg.Done()
}()

wg.Wait()
}

func TestMetric(t *testing.T) {
h := NewMetric("x")
for i := int32(0); i < 100; i++ {
Expand Down Expand Up @@ -74,3 +98,13 @@ func TestMetric(t *testing.T) {
assert.Equal(t, 0, h.Count())
assert.Equal(t, 0, h.Max())
}

func TestSampleClamp(t *testing.T) {
h := NewMetric("x")
for i := int32(0); i < 2000; i++ {
h.Update(i)
}

sample := h.sample()
assert.Equal(t, reservoirSize, len(sample))
}

0 comments on commit 14cefba

Please sign in to comment.