Skip to content

Commit

Permalink
Merge pull request #1 from iqoption/parallel-reporters
Browse files Browse the repository at this point in the history
Added parallels working of cached and stats reporters
  • Loading branch information
METALmasterKS authored Mar 30, 2020
2 parents 798e659 + 5a1f587 commit 007e493
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 80 deletions.
34 changes: 5 additions & 29 deletions scope.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,28 +215,6 @@ func (s *scope) report(r StatsReporter) {
s.hm.RUnlock()
}

func (s *scope) cachedReport() {
s.cm.RLock()
for _, counter := range s.countersSlice {
counter.cachedReport()
}
s.cm.RUnlock()

s.gm.RLock()
for _, gauge := range s.gaugesSlice {
gauge.cachedReport()
}
s.gm.RUnlock()

// we do nothing for timers here because timers report directly to ths StatsReporter without buffering

s.hm.RLock()
for _, histogram := range s.histogramsSlice {
histogram.cachedReport()
}
s.hm.RUnlock()
}

// reportLoop is used by the root scope for periodic reporting
func (s *scope) reportLoop(interval time.Duration) {
ticker := time.NewTicker(interval)
Expand Down Expand Up @@ -270,15 +248,13 @@ func (s *scope) reportRegistryWithLock() {
s.registry.RLock()
defer s.registry.RUnlock()

for _, ss := range s.registry.subscopes {
ss.report(s.reporter)
}
if s.reporter != nil {
for _, ss := range s.registry.subscopes {
ss.report(s.reporter)
}
s.reporter.Flush()
} else if s.cachedReporter != nil {
for _, ss := range s.registry.subscopes {
ss.cachedReport()
}
}
if s.cachedReporter != nil {
s.cachedReporter.Flush()
}
}
Expand Down
2 changes: 1 addition & 1 deletion scope_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func benchmarkScopeReportingN(b *testing.B, numElems int) {
b.ResetTimer()

for n := 0; n < b.N; n++ {
s.cachedReport()
s.report(NullStatsReporter)
}
}

Expand Down
16 changes: 6 additions & 10 deletions scope_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,10 +362,6 @@ func testReportLoopFlushOnce(t *testing.T, cached bool) {
assert.Equal(t, int32(1), v)
}

func TestCachedReporterFlushOnce(t *testing.T) {
testReportLoopFlushOnce(t, true)
}

func TestReporterFlushOnce(t *testing.T) {
testReportLoopFlushOnce(t, false)
}
Expand Down Expand Up @@ -458,26 +454,26 @@ func TestCachedReporter(t *testing.T) {

s := root.(*scope)

r.cg.Add(1)
r.cg.Add(2)
s.Counter("bar").Inc(1)
r.gg.Add(1)
r.gg.Add(2)
s.Gauge("zed").Update(1)
r.tg.Add(1)
s.Timer("ticky").Record(time.Millisecond * 175)
r.hg.Add(2)
r.hg.Add(4)
s.Histogram("baz", MustMakeLinearValueBuckets(0, 10, 10)).
RecordValue(42.42)
s.Histogram("qux", MustMakeLinearDurationBuckets(0, 10*time.Millisecond, 10)).
RecordDuration(42 * time.Millisecond)
RecordDuration(42 * time.Nanosecond)

s.cachedReport()
s.report(r)
r.WaitAll()

assert.EqualValues(t, 1, r.counters["bar"].val)
assert.EqualValues(t, 1, r.gauges["zed"].val)
assert.EqualValues(t, time.Millisecond*175, r.timers["ticky"].val)
assert.EqualValues(t, 1, r.histograms["baz"].valueSamples[50.0])
assert.EqualValues(t, 1, r.histograms["qux"].durationSamples[50*time.Millisecond])
assert.EqualValues(t, 1, r.histograms["qux"].durationSamples[10*time.Millisecond])
}

func TestRootScopeWithoutPrefix(t *testing.T) {
Expand Down
69 changes: 29 additions & 40 deletions stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,17 +86,12 @@ func (c *counter) report(name string, tags map[string]string, r StatsReporter) {
if delta == 0 {
return
}

r.ReportCounter(name, tags, delta)
}

func (c *counter) cachedReport() {
delta := c.value()
if delta == 0 {
return
if r != nil {
r.ReportCounter(name, tags, delta)
}
if c.cachedCount != nil {
c.cachedCount.ReportCount(delta)
}

c.cachedCount.ReportCount(delta)
}

func (c *counter) snapshot() int64 {
Expand Down Expand Up @@ -124,13 +119,13 @@ func (g *gauge) value() float64 {

func (g *gauge) report(name string, tags map[string]string, r StatsReporter) {
if atomic.SwapUint64(&g.updated, 0) == 1 {
r.ReportGauge(name, tags, g.value())
}
}

func (g *gauge) cachedReport() {
if atomic.SwapUint64(&g.updated, 0) == 1 {
g.cachedGauge.ReportGauge(g.value())
value := g.value()
if r != nil {
r.ReportGauge(name, tags, value)
}
if g.cachedGauge != nil {
g.cachedGauge.ReportGauge(value)
}
}
}

Expand Down Expand Up @@ -175,9 +170,8 @@ func newTimer(
func (t *timer) Record(interval time.Duration) {
if t.cachedTimer != nil {
t.cachedTimer.ReportTimer(interval)
} else {
t.reporter.ReportTimer(t.name, t.tags, interval)
}
t.reporter.ReportTimer(t.name, t.tags, interval)
}

func (t *timer) Start() Stopwatch {
Expand Down Expand Up @@ -322,28 +316,23 @@ func (h *histogram) report(name string, tags map[string]string, r StatsReporter)
}
switch h.htype {
case valueHistogramType:
r.ReportHistogramValueSamples(name, tags, h.specification,
h.buckets[i].valueLowerBound, h.buckets[i].valueUpperBound,
samples)
case durationHistogramType:
r.ReportHistogramDurationSamples(name, tags, h.specification,
h.buckets[i].durationLowerBound, h.buckets[i].durationUpperBound,
samples)
}
}
}

func (h *histogram) cachedReport() {
for i := range h.buckets {
samples := h.buckets[i].samples.value()
if samples == 0 {
continue
}
switch h.htype {
case valueHistogramType:
h.buckets[i].cachedValueBucket.ReportSamples(samples)
if r != nil {
r.ReportHistogramValueSamples(name, tags, h.specification,
h.buckets[i].valueLowerBound, h.buckets[i].valueUpperBound,
samples)
}
if h.buckets[i].cachedValueBucket != nil {
h.buckets[i].cachedValueBucket.ReportSamples(samples)
}
case durationHistogramType:
h.buckets[i].cachedDurationBucket.ReportSamples(samples)
if r != nil {
r.ReportHistogramDurationSamples(name, tags, h.specification,
h.buckets[i].durationLowerBound, h.buckets[i].durationUpperBound,
samples)
}
if h.buckets[i].cachedValueBucket != nil {
h.buckets[i].cachedDurationBucket.ReportSamples(samples)
}
}
}
}
Expand Down

0 comments on commit 007e493

Please sign in to comment.