Skip to content
This repository has been archived by the owner on Aug 13, 2019. It is now read-only.

Commit

Permalink
Move WAL Watcher from Prometheus to TSDB WAL package.
Browse files Browse the repository at this point in the history
Signed-off-by: Callum Styan <[email protected]>
  • Loading branch information
cstyan committed Jun 4, 2019
1 parent 882162d commit e679638
Show file tree
Hide file tree
Showing 22 changed files with 1,976 additions and 794 deletions.
25 changes: 13 additions & 12 deletions block.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/prometheus/tsdb/fileutil"
"github.com/prometheus/tsdb/index"
"github.com/prometheus/tsdb/labels"
"github.com/prometheus/tsdb/record"
)

// IndexWriter serializes the index for a block of series data.
Expand Down Expand Up @@ -136,7 +137,7 @@ type BlockReader interface {
Chunks() (ChunkReader, error)

// Tombstones returns a TombstoneReader over the block's deleted data.
Tombstones() (TombstoneReader, error)
Tombstones() (record.TombstoneReader, error)

// MinTime returns the min time of the block.
MinTime() int64
Expand Down Expand Up @@ -282,7 +283,7 @@ type Block struct {

chunkr ChunkReader
indexr IndexReader
tombstones TombstoneReader
tombstones record.TombstoneReader

logger log.Logger
}
Expand Down Expand Up @@ -319,7 +320,7 @@ func OpenBlock(logger log.Logger, dir string, pool chunkenc.Pool) (pb *Block, er
}
closers = append(closers, ir)

tr, tsr, err := readTombstones(dir)
tr, tsr, err := record.ReadTombstones(dir)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -423,7 +424,7 @@ func (pb *Block) Chunks() (ChunkReader, error) {
}

// Tombstones returns a new TombstoneReader against the block data.
func (pb *Block) Tombstones() (TombstoneReader, error) {
func (pb *Block) Tombstones() (record.TombstoneReader, error) {
if err := pb.startRead(); err != nil {
return nil, err
}
Expand Down Expand Up @@ -487,7 +488,7 @@ func (r blockIndexReader) Close() error {
}

type blockTombstoneReader struct {
TombstoneReader
record.TombstoneReader
b *Block
}

Expand Down Expand Up @@ -523,7 +524,7 @@ func (pb *Block) Delete(mint, maxt int64, ms ...labels.Matcher) error {
ir := pb.indexr

// Choose only valid postings which have chunks in the time-range.
stones := newMemTombstones()
stones := record.NewMemTombstones()

var lset labels.Labels
var chks []chunks.Meta
Expand All @@ -539,7 +540,7 @@ Outer:
if chk.OverlapsClosedInterval(mint, maxt) {
// Delete only until the current values and not beyond.
tmin, tmax := clampInterval(mint, maxt, chks[0].MinTime, chks[len(chks)-1].MaxTime)
stones.addInterval(p.At(), Interval{tmin, tmax})
stones.AddInterval(p.At(), record.Interval{tmin, tmax})
continue Outer
}
}
Expand All @@ -549,9 +550,9 @@ Outer:
return p.Err()
}

err = pb.tombstones.Iter(func(id uint64, ivs Intervals) error {
err = pb.tombstones.Iter(func(id uint64, ivs record.Intervals) error {
for _, iv := range ivs {
stones.addInterval(id, iv)
stones.AddInterval(id, iv)
}
return nil
})
Expand All @@ -561,7 +562,7 @@ Outer:
pb.tombstones = stones
pb.meta.Stats.NumTombstones = pb.tombstones.Total()

if err := writeTombstoneFile(pb.logger, pb.dir, pb.tombstones); err != nil {
if err := record.WriteTombstoneFile(pb.logger, pb.dir, pb.tombstones); err != nil {
return err
}
return writeMetaFile(pb.logger, pb.dir, &pb.meta)
Expand All @@ -572,7 +573,7 @@ Outer:
func (pb *Block) CleanTombstones(dest string, c Compactor) (*ulid.ULID, error) {
numStones := 0

if err := pb.tombstones.Iter(func(id uint64, ivs Intervals) error {
if err := pb.tombstones.Iter(func(id uint64, ivs record.Intervals) error {
numStones += len(ivs)
return nil
}); err != nil {
Expand Down Expand Up @@ -607,7 +608,7 @@ func (pb *Block) Snapshot(dir string) error {
for _, fname := range []string{
metaFilename,
indexFilename,
tombstoneFilename,
record.TombstoneFilename,
} {
if err := os.Link(filepath.Join(pb.dir, fname), filepath.Join(blockDir, fname)); err != nil {
return errors.Wrapf(err, "create snapshot %s", fname)
Expand Down
3 changes: 2 additions & 1 deletion cmd/tsdb/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/prometheus/tsdb"
"github.com/prometheus/tsdb/chunks"
"github.com/prometheus/tsdb/labels"
"github.com/prometheus/tsdb/record"
"gopkg.in/alecthomas/kingpin.v2"
)

Expand Down Expand Up @@ -265,7 +266,7 @@ func (b *writeBenchmark) ingestScrapesShard(lbls []labels.Labels, scrapeCount in
s.ref = &ref
} else if err := app.AddFast(*s.ref, ts, float64(s.value)); err != nil {

if errors.Cause(err) != tsdb.ErrNotFound {
if errors.Cause(err) != record.ErrNotFound {
panic(err)
}

Expand Down
19 changes: 10 additions & 9 deletions compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/prometheus/tsdb/fileutil"
"github.com/prometheus/tsdb/index"
"github.com/prometheus/tsdb/labels"
"github.com/prometheus/tsdb/record"
)

// ExponentialBlockRanges returns the time ranges based on the stepSize.
Expand Down Expand Up @@ -605,7 +606,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe
}

// Create an empty tombstones file.
if err := writeTombstoneFile(c.logger, tmp, newMemTombstones()); err != nil {
if err := record.WriteTombstoneFile(c.logger, tmp, record.NewMemTombstones()); err != nil {
return errors.Wrap(err, "write new tombstones file")
}

Expand Down Expand Up @@ -848,15 +849,15 @@ type compactionSeriesSet struct {
p index.Postings
index IndexReader
chunks ChunkReader
tombstones TombstoneReader
tombstones record.TombstoneReader

l labels.Labels
c []chunks.Meta
intervals Intervals
intervals record.Intervals
err error
}

func newCompactionSeriesSet(i IndexReader, c ChunkReader, t TombstoneReader, p index.Postings) *compactionSeriesSet {
func newCompactionSeriesSet(i IndexReader, c ChunkReader, t record.TombstoneReader, p index.Postings) *compactionSeriesSet {
return &compactionSeriesSet{
index: i,
chunks: c,
Expand Down Expand Up @@ -886,7 +887,7 @@ func (c *compactionSeriesSet) Next() bool {
if len(c.intervals) > 0 {
chks := make([]chunks.Meta, 0, len(c.c))
for _, chk := range c.c {
if !(Interval{chk.MinTime, chk.MaxTime}.isSubrange(c.intervals)) {
if !(record.Interval{chk.MinTime, chk.MaxTime}.IsSubrange(c.intervals)) {
chks = append(chks, chk)
}
}
Expand Down Expand Up @@ -914,7 +915,7 @@ func (c *compactionSeriesSet) Err() error {
return c.p.Err()
}

func (c *compactionSeriesSet) At() (labels.Labels, []chunks.Meta, Intervals) {
func (c *compactionSeriesSet) At() (labels.Labels, []chunks.Meta, record.Intervals) {
return c.l, c.c, c.intervals
}

Expand All @@ -924,7 +925,7 @@ type compactionMerger struct {
aok, bok bool
l labels.Labels
c []chunks.Meta
intervals Intervals
intervals record.Intervals
}

func newCompactionMerger(a, b ChunkSeriesSet) (*compactionMerger, error) {
Expand Down Expand Up @@ -980,7 +981,7 @@ func (c *compactionMerger) Next() bool {
_, cb, rb := c.b.At()

for _, r := range rb {
ra = ra.add(r)
ra = ra.Add(r)
}

c.l = append(c.l[:0], l...)
Expand All @@ -1001,6 +1002,6 @@ func (c *compactionMerger) Err() error {
return c.b.Err()
}

func (c *compactionMerger) At() (labels.Labels, []chunks.Meta, Intervals) {
func (c *compactionMerger) At() (labels.Labels, []chunks.Meta, record.Intervals) {
return c.l, c.c, c.intervals
}
13 changes: 8 additions & 5 deletions compact_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/prometheus/tsdb/chunks"
"github.com/prometheus/tsdb/fileutil"
"github.com/prometheus/tsdb/labels"
"github.com/prometheus/tsdb/record"
"github.com/prometheus/tsdb/testutil"
)

Expand Down Expand Up @@ -455,11 +456,13 @@ func metaRange(name string, mint, maxt int64, stats *BlockStats) dirMeta {

type erringBReader struct{}

func (erringBReader) Index() (IndexReader, error) { return nil, errors.New("index") }
func (erringBReader) Chunks() (ChunkReader, error) { return nil, errors.New("chunks") }
func (erringBReader) Tombstones() (TombstoneReader, error) { return nil, errors.New("tombstones") }
func (erringBReader) MinTime() int64 { return 0 }
func (erringBReader) MaxTime() int64 { return 0 }
func (erringBReader) Index() (IndexReader, error) { return nil, errors.New("index") }
func (erringBReader) Chunks() (ChunkReader, error) { return nil, errors.New("chunks") }
func (erringBReader) Tombstones() (record.TombstoneReader, error) {
return nil, errors.New("tombstones")
}
func (erringBReader) MinTime() int64 { return 0 }
func (erringBReader) MaxTime() int64 { return 0 }

type nopChunkWriter struct{}

Expand Down
47 changes: 24 additions & 23 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/prometheus/tsdb/chunks"
"github.com/prometheus/tsdb/index"
"github.com/prometheus/tsdb/labels"
"github.com/prometheus/tsdb/record"
"github.com/prometheus/tsdb/testutil"
"github.com/prometheus/tsdb/tsdbutil"
"github.com/prometheus/tsdb/wal"
Expand Down Expand Up @@ -199,7 +200,7 @@ func TestDBAppenderAddRef(t *testing.T) {
testutil.Ok(t, err)

err = app2.AddFast(9999999, 1, 1)
testutil.Equals(t, ErrNotFound, errors.Cause(err))
testutil.Equals(t, record.ErrNotFound, errors.Cause(err))

testutil.Ok(t, app2.Commit())

Expand Down Expand Up @@ -246,27 +247,27 @@ func TestDeleteSimple(t *testing.T) {
numSamples := int64(10)

cases := []struct {
intervals Intervals
intervals record.Intervals
remaint []int64
}{
{
intervals: Intervals{{0, 3}},
intervals: record.Intervals{{0, 3}},
remaint: []int64{4, 5, 6, 7, 8, 9},
},
{
intervals: Intervals{{1, 3}},
intervals: record.Intervals{{1, 3}},
remaint: []int64{0, 4, 5, 6, 7, 8, 9},
},
{
intervals: Intervals{{1, 3}, {4, 7}},
intervals: record.Intervals{{1, 3}, {4, 7}},
remaint: []int64{0, 8, 9},
},
{
intervals: Intervals{{1, 3}, {4, 700}},
intervals: record.Intervals{{1, 3}, {4, 700}},
remaint: []int64{0},
},
{ // This case is to ensure that labels and symbols are deleted.
intervals: Intervals{{0, 9}},
intervals: record.Intervals{{0, 9}},
remaint: []int64{},
},
}
Expand Down Expand Up @@ -362,7 +363,7 @@ func TestAmendDatapointCausesError(t *testing.T) {

app = db.Appender()
_, err = app.Add(labels.Labels{}, 0, 1)
testutil.Equals(t, ErrAmendSample, err)
testutil.Equals(t, record.ErrAmendSample, err)
testutil.Ok(t, app.Rollback())
}

Expand Down Expand Up @@ -396,7 +397,7 @@ func TestNonDuplicateNaNDatapointsCausesAmendError(t *testing.T) {

app = db.Appender()
_, err = app.Add(labels.Labels{}, 0, math.Float64frombits(0x7ff0000000000002))
testutil.Equals(t, ErrAmendSample, err)
testutil.Equals(t, record.ErrAmendSample, err)
}

func TestSkippingInvalidValuesInSameTxn(t *testing.T) {
Expand Down Expand Up @@ -508,11 +509,11 @@ func TestDB_SnapshotWithDelete(t *testing.T) {

testutil.Ok(t, app.Commit())
cases := []struct {
intervals Intervals
intervals record.Intervals
remaint []int64
}{
{
intervals: Intervals{{1, 3}, {4, 7}},
intervals: record.Intervals{{1, 3}, {4, 7}},
remaint: []int64{0, 8, 9},
},
}
Expand Down Expand Up @@ -835,11 +836,11 @@ func TestTombstoneClean(t *testing.T) {

testutil.Ok(t, app.Commit())
cases := []struct {
intervals Intervals
intervals record.Intervals
remaint []int64
}{
{
intervals: Intervals{{1, 3}, {4, 7}},
intervals: record.Intervals{{1, 3}, {4, 7}},
remaint: []int64{0, 8, 9},
},
}
Expand Down Expand Up @@ -911,7 +912,7 @@ func TestTombstoneClean(t *testing.T) {
}

for _, b := range db.Blocks() {
testutil.Equals(t, newMemTombstones(), b.tombstones)
testutil.Equals(t, record.NewMemTombstones(), b.tombstones)
}
}
}
Expand All @@ -937,8 +938,8 @@ func TestTombstoneCleanFail(t *testing.T) {
block, err := OpenBlock(nil, blockDir, nil)
testutil.Ok(t, err)
// Add some some fake tombstones to trigger the compaction.
tomb := newMemTombstones()
tomb.addInterval(0, Interval{0, 1})
tomb := record.NewMemTombstones()
tomb.AddInterval(0, record.Interval{0, 1})
block.tombstones = tomb

db.blocks = append(db.blocks, block)
Expand Down Expand Up @@ -1091,7 +1092,7 @@ func dbDiskSize(dir string) int64 {
// Include only index,tombstone and chunks.
if filepath.Dir(path) == chunkDir(filepath.Dir(filepath.Dir(path))) ||
info.Name() == indexFilename ||
info.Name() == tombstoneFilename {
info.Name() == record.TombstoneFilename {
statSize += info.Size()
}
return nil
Expand Down Expand Up @@ -1407,13 +1408,13 @@ func TestInitializeHeadTimestamp(t *testing.T) {
w, err := wal.New(nil, nil, path.Join(dir, "wal"))
testutil.Ok(t, err)

var enc RecordEncoder
var enc record.RecordEncoder
err = w.Log(
enc.Series([]RefSeries{
enc.Series([]record.RefSeries{
{Ref: 123, Labels: labels.FromStrings("a", "1")},
{Ref: 124, Labels: labels.FromStrings("a", "2")},
}, nil),
enc.Samples([]RefSample{
enc.Samples([]record.RefSample{
{Ref: 123, T: 5000, V: 1},
{Ref: 124, T: 15000, V: 1},
}, nil),
Expand Down Expand Up @@ -1457,13 +1458,13 @@ func TestInitializeHeadTimestamp(t *testing.T) {
w, err := wal.New(nil, nil, path.Join(dir, "wal"))
testutil.Ok(t, err)

var enc RecordEncoder
var enc record.RecordEncoder
err = w.Log(
enc.Series([]RefSeries{
enc.Series([]record.RefSeries{
{Ref: 123, Labels: labels.FromStrings("a", "1")},
{Ref: 124, Labels: labels.FromStrings("a", "2")},
}, nil),
enc.Samples([]RefSample{
enc.Samples([]record.RefSample{
{Ref: 123, T: 5000, V: 1},
{Ref: 124, T: 15000, V: 1},
}, nil),
Expand Down
Loading

0 comments on commit e679638

Please sign in to comment.