Skip to content

Commit

Permalink
stats/view: implement Flush
Browse files Browse the repository at this point in the history
Flush is a global function that immediately reports
all collected data by the worker, regardless of the
reporting duration or buffering.

Added tests but also ensured that calling (*worker).stop()
then Flush doesn't block and returns ASAP if the state
is "quit". The required protected w.quit from double close
but also using a select to detect cases of forever waiting
channel `w.quit` which was closed.

Fixes census-instrumentation#862
  • Loading branch information
odeke-em committed Sep 21, 2018
1 parent 572ae0b commit 25b8b8b
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 2 deletions.
2 changes: 2 additions & 0 deletions stats/view/view_measure_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
)

func TestMeasureFloat64AndInt64(t *testing.T) {
restart()

// Recording through both a Float64Measure and Int64Measure with the
// same name should work.

Expand Down
24 changes: 22 additions & 2 deletions stats/view/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package view

import (
"fmt"
"sync"
"time"

"go.opencensus.io/stats"
Expand All @@ -43,6 +44,8 @@ type worker struct {
timer *time.Ticker
c chan command
quit, done chan bool
flushCh chan bool
quitOnce sync.Once
}

var defaultWorker *worker
Expand Down Expand Up @@ -142,6 +145,21 @@ func newWorker() *worker {
c: make(chan command, 1024),
quit: make(chan bool),
done: make(chan bool),
flushCh: make(chan bool),
}
}

// Flush reports all collected points regardless
// of the time reporting period or buffering.
func Flush() {
select {
case <-defaultWorker.quit:
// If this channel is closed, do nothing
return
default: // Otherwise we can proceed with flushing
req := &flushReq{c: make(chan bool)}
defaultWorker.c <- req
<-req.c // don't return until the flush is complete.
}
}

Expand All @@ -162,8 +180,10 @@ func (w *worker) start() {
}

func (w *worker) stop() {
w.quit <- true
<-w.done
w.quitOnce.Do(func() {
close(w.quit)
<-w.done
})
}

func (w *worker) getMeasureRef(name string) *measureRef {
Expand Down
11 changes: 11 additions & 0 deletions stats/view/worker_commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,17 @@ func (cmd *registerViewReq) handleCommand(w *worker) {
}
}

// flushReq is the command to flush all recorded
// data regardless of time period and buffering.
type flushReq struct {
c chan bool
}

func (fr *flushReq) handleCommand(w *worker) {
w.reportUsage(time.Now())
fr.c <- true
}

// unregisterFromViewReq is the command to unregister to a view. Has no
// impact on the data collection for client that are pulling data from the
// library.
Expand Down
62 changes: 62 additions & 0 deletions stats/view/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,68 @@ func TestUnregisterReportsUsage(t *testing.T) {
}
}

func TestFlush(t *testing.T) {
restart()
ctx := context.Background()

SetReportingPeriod(time.Hour)

m1 := stats.Int64("measure", "desc", "unit")
view1 := &View{Name: "count", Measure: m1, Aggregation: Count()}
m2 := stats.Int64("measure2", "desc", "unit")
view2 := &View{Name: "count2", Measure: m2, Aggregation: Count()}

if err := Register(view1, view2); err != nil {
t.Fatalf("cannot register: %v", err)
}

e := &countExporter{}
RegisterExporter(e)

// Irrespective of the reporting period, with Flush
// all the recorded points should be reported. Hence we'll
// set an arbitrarily large period of 1 hr.
SetReportingPeriod(time.Hour)

stats.Record(ctx, m1.M(1))
stats.Record(ctx, m2.M(3))
stats.Record(ctx, m2.M(1))

<-time.After(40 * time.Millisecond)
Flush()
<-time.After(40 * time.Millisecond)

e.Lock()
got := e.totalCount
e.Unlock()
want := int64(3) // Number of wanted data points
if got != want {
t.Errorf("Count data\nGot: %d\nWant: %v", got, want)
}
}

func TestFlush_afterStopDoesnotBlock(t *testing.T) {
restart()

doneCh := make(chan bool)
go func() {
defer close(doneCh)

for i := 0; i < 10; i++ {
Flush()
defaultWorker.stop()
Flush()
}
}()

select {
case <-time.After(300 * time.Microsecond): // Arbitrary duration that's considered "long"
t.Fatal("Flush + stop goroutine did not return on time")
case <-doneCh:
// returned ASAP so okay
}
}

type countExporter struct {
sync.Mutex
count int64
Expand Down

0 comments on commit 25b8b8b

Please sign in to comment.