From 1caa80c37dad1868e56325f601934e79ee167924 Mon Sep 17 00:00:00 2001 From: Callum Styan Date: Tue, 4 Jun 2019 16:56:51 -0700 Subject: [PATCH] WAL Watcher needs to take in and pass a Registerer to LiveReader. Signed-off-by: Callum Styan --- wal/wal_watcher.go | 16 +++++++++++++--- wal/wal_watcher_test.go | 14 +++++++------- 2 files changed, 20 insertions(+), 10 deletions(-) diff --git a/wal/wal_watcher.go b/wal/wal_watcher.go index 51702320..be57975f 100644 --- a/wal/wal_watcher.go +++ b/wal/wal_watcher.go @@ -105,6 +105,7 @@ type WALWatcher struct { logger log.Logger walDir string lastCheckpoint string + reg prometheus.Registerer startTime int64 @@ -121,12 +122,21 @@ type WALWatcher struct { } // NewWALWatcher creates a new WAL watcher for a given WriteTo. -func NewWALWatcher(logger log.Logger, name string, writer writeTo, walDir string) *WALWatcher { +func NewWALWatcher(logger log.Logger, reg prometheus.Registerer, name string, writer writeTo, walDir string) *WALWatcher { if logger == nil { logger = log.NewNopLogger() } + if reg != nil { + // We can't use MustRegister because WALWatcher's are recreated on config changes within Prometheus. + reg.Register(watcherRecordsRead) + reg.Register(watcherRecordDecodeFails) + reg.Register(watcherSamplesSentPreTailing) + reg.Register(watcherCurrentSegment) + } + return &WALWatcher{ logger: logger, + reg: reg, writer: writer, walDir: path.Join(walDir, "wal"), name: name, @@ -298,7 +308,7 @@ func (w *WALWatcher) watch(segmentNum int, tail bool) error { } defer segment.Close() - reader := NewLiveReader(w.logger, segment) + reader := NewLiveReader(w.logger, w.reg, segment) readTicker := time.NewTicker(readPeriod) defer readTicker.Stop() @@ -513,7 +523,7 @@ func (w *WALWatcher) readCheckpoint(checkpointDir string) error { } defer sr.Close() - r := NewLiveReader(w.logger, sr) + r := NewLiveReader(w.logger, w.reg, sr) if err := w.readSegment(r, index, false); err != io.EOF && err != nil { return errors.Wrap(err, "readSegment") } diff --git a/wal/wal_watcher_test.go b/wal/wal_watcher_test.go index da8ae0a0..af80936c 100644 --- a/wal/wal_watcher_test.go +++ b/wal/wal_watcher_test.go @@ -134,7 +134,7 @@ func TestTailSamples(t *testing.T) { testutil.Ok(t, err) wt := newWriteToMock() - watcher := NewWALWatcher(nil, "", wt, dir) + watcher := NewWALWatcher(nil, nil, "", wt, dir) watcher.startTime = now.UnixNano() // Set the Watcher's metrics so they're not nil pointers. @@ -144,7 +144,7 @@ func TestTailSamples(t *testing.T) { testutil.Ok(t, err) defer segment.Close() - reader := NewLiveReader(nil, segment) + reader := NewLiveReader(nil, nil, segment) // Use tail true so we can ensure we got the right number of samples. watcher.readSegment(reader, i, true) } @@ -209,7 +209,7 @@ func TestReadToEndNoCheckpoint(t *testing.T) { testutil.Ok(t, err) wt := newWriteToMock() - watcher := NewWALWatcher(nil, "", wt, dir) + watcher := NewWALWatcher(nil, nil, "", wt, dir) go watcher.Start() expected := seriesCount @@ -291,7 +291,7 @@ func TestReadToEndWithCheckpoint(t *testing.T) { _, _, err = w.Segments() testutil.Ok(t, err) wt := newWriteToMock() - watcher := NewWALWatcher(nil, "", wt, dir) + watcher := NewWALWatcher(nil, nil, "", wt, dir) go watcher.Start() expected := seriesCount * 2 @@ -352,7 +352,7 @@ func TestReadCheckpoint(t *testing.T) { testutil.Ok(t, err) wt := newWriteToMock() - watcher := NewWALWatcher(nil, "", wt, dir) + watcher := NewWALWatcher(nil, nil, "", wt, dir) // watcher. go watcher.Start() @@ -419,7 +419,7 @@ func TestReadCheckpointMultipleSegments(t *testing.T) { } wt := newWriteToMock() - watcher := NewWALWatcher(nil, "", wt, dir) + watcher := NewWALWatcher(nil, nil, "", wt, dir) watcher.maxSegment = -1 // Set the Watcher's metrics so they're not nil pointers. @@ -479,7 +479,7 @@ func TestCheckpointSeriesReset(t *testing.T) { testutil.Ok(t, err) wt := newWriteToMock() - watcher := NewWALWatcher(nil, "", wt, dir) + watcher := NewWALWatcher(nil, nil, "", wt, dir) watcher.maxSegment = -1 go watcher.Start()