Skip to content
This repository has been archived by the owner on Aug 13, 2019. It is now read-only.

Commit

Permalink
WAL Watcher needs to take in and pass a Registerer to LiveReader.
Browse files Browse the repository at this point in the history
Signed-off-by: Callum Styan <[email protected]>
  • Loading branch information
cstyan committed Jun 4, 2019
1 parent 3a1fae4 commit 1caa80c
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 10 deletions.
16 changes: 13 additions & 3 deletions wal/wal_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ type WALWatcher struct {
logger log.Logger
walDir string
lastCheckpoint string
reg prometheus.Registerer

startTime int64

Expand All @@ -121,12 +122,21 @@ type WALWatcher struct {
}

// NewWALWatcher creates a new WAL watcher for a given WriteTo.
func NewWALWatcher(logger log.Logger, name string, writer writeTo, walDir string) *WALWatcher {
func NewWALWatcher(logger log.Logger, reg prometheus.Registerer, name string, writer writeTo, walDir string) *WALWatcher {
if logger == nil {
logger = log.NewNopLogger()
}
if reg != nil {
// We can't use MustRegister because WALWatcher's are recreated on config changes within Prometheus.
reg.Register(watcherRecordsRead)
reg.Register(watcherRecordDecodeFails)
reg.Register(watcherSamplesSentPreTailing)
reg.Register(watcherCurrentSegment)
}

return &WALWatcher{
logger: logger,
reg: reg,
writer: writer,
walDir: path.Join(walDir, "wal"),
name: name,
Expand Down Expand Up @@ -298,7 +308,7 @@ func (w *WALWatcher) watch(segmentNum int, tail bool) error {
}
defer segment.Close()

reader := NewLiveReader(w.logger, segment)
reader := NewLiveReader(w.logger, w.reg, segment)

readTicker := time.NewTicker(readPeriod)
defer readTicker.Stop()
Expand Down Expand Up @@ -513,7 +523,7 @@ func (w *WALWatcher) readCheckpoint(checkpointDir string) error {
}
defer sr.Close()

r := NewLiveReader(w.logger, sr)
r := NewLiveReader(w.logger, w.reg, sr)
if err := w.readSegment(r, index, false); err != io.EOF && err != nil {
return errors.Wrap(err, "readSegment")
}
Expand Down
14 changes: 7 additions & 7 deletions wal/wal_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func TestTailSamples(t *testing.T) {
testutil.Ok(t, err)

wt := newWriteToMock()
watcher := NewWALWatcher(nil, "", wt, dir)
watcher := NewWALWatcher(nil, nil, "", wt, dir)
watcher.startTime = now.UnixNano()

// Set the Watcher's metrics so they're not nil pointers.
Expand All @@ -144,7 +144,7 @@ func TestTailSamples(t *testing.T) {
testutil.Ok(t, err)
defer segment.Close()

reader := NewLiveReader(nil, segment)
reader := NewLiveReader(nil, nil, segment)
// Use tail true so we can ensure we got the right number of samples.
watcher.readSegment(reader, i, true)
}
Expand Down Expand Up @@ -209,7 +209,7 @@ func TestReadToEndNoCheckpoint(t *testing.T) {
testutil.Ok(t, err)

wt := newWriteToMock()
watcher := NewWALWatcher(nil, "", wt, dir)
watcher := NewWALWatcher(nil, nil, "", wt, dir)
go watcher.Start()

expected := seriesCount
Expand Down Expand Up @@ -291,7 +291,7 @@ func TestReadToEndWithCheckpoint(t *testing.T) {
_, _, err = w.Segments()
testutil.Ok(t, err)
wt := newWriteToMock()
watcher := NewWALWatcher(nil, "", wt, dir)
watcher := NewWALWatcher(nil, nil, "", wt, dir)
go watcher.Start()

expected := seriesCount * 2
Expand Down Expand Up @@ -352,7 +352,7 @@ func TestReadCheckpoint(t *testing.T) {
testutil.Ok(t, err)

wt := newWriteToMock()
watcher := NewWALWatcher(nil, "", wt, dir)
watcher := NewWALWatcher(nil, nil, "", wt, dir)
// watcher.
go watcher.Start()

Expand Down Expand Up @@ -419,7 +419,7 @@ func TestReadCheckpointMultipleSegments(t *testing.T) {
}

wt := newWriteToMock()
watcher := NewWALWatcher(nil, "", wt, dir)
watcher := NewWALWatcher(nil, nil, "", wt, dir)
watcher.maxSegment = -1

// Set the Watcher's metrics so they're not nil pointers.
Expand Down Expand Up @@ -479,7 +479,7 @@ func TestCheckpointSeriesReset(t *testing.T) {
testutil.Ok(t, err)

wt := newWriteToMock()
watcher := NewWALWatcher(nil, "", wt, dir)
watcher := NewWALWatcher(nil, nil, "", wt, dir)
watcher.maxSegment = -1
go watcher.Start()

Expand Down

0 comments on commit 1caa80c

Please sign in to comment.