From d6ba13b4ce82212ab0c351dccf05cce22ffdcfbb Mon Sep 17 00:00:00 2001 From: Callum Styan Date: Thu, 20 Jun 2019 14:21:05 -0700 Subject: [PATCH] Fix some stuff I broke during rebase. Signed-off-by: Callum Styan --- compact.go | 4 ++-- go.mod | 1 - go.sum | 2 -- head.go | 27 ++++---------------------- head_test.go | 42 ++++++++++++++++++++-------------------- record/internal.go | 12 ++++++------ tombstones/tombstones.go | 12 ++++++------ wal/wal_watcher.go | 5 +++-- 8 files changed, 42 insertions(+), 63 deletions(-) diff --git a/compact.go b/compact.go index e6e748a3..7904ebd2 100644 --- a/compact.go +++ b/compact.go @@ -608,7 +608,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe } // Create an empty tombstones file. - if _, err := tombstones.WriteTombstoneFile(c.logger, tmp, record.NewMemTombstones()); err != nil { + if _, err := tombstones.WriteTombstoneFile(c.logger, tmp, tombstones.NewMemTombstones()); err != nil { return errors.Wrap(err, "write new tombstones file") } @@ -768,7 +768,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, // // TODO think how to avoid the typecasting to verify when it is head block. if _, isHeadChunk := chk.Chunk.(*safeChunk); isHeadChunk && chk.MaxTime >= meta.MaxTime { - dranges = append(dranges, Interval{Mint: meta.MaxTime, Maxt: math.MaxInt64}) + dranges = append(dranges, tombstones.Interval{Mint: meta.MaxTime, Maxt: math.MaxInt64}) } else // Sanity check for disk blocks. diff --git a/go.mod b/go.mod index c75e4ed7..ccdd4372 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,6 @@ require ( github.com/oklog/ulid v1.3.1 github.com/pkg/errors v0.8.0 github.com/prometheus/client_golang v1.0.0 - github.com/prometheus/prometheus v2.5.0+incompatible golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4 golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5 gopkg.in/alecthomas/kingpin.v2 v2.2.6 diff --git a/go.sum b/go.sum index ad7f9516..e854d810 100644 --- a/go.sum +++ b/go.sum @@ -59,8 +59,6 @@ github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d h1:GoAlyOgbOEIFd github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/prometheus/procfs v0.0.2 h1:6LJUbpNm42llc4HRCuvApCSWB/WfhuNo9K98Q9sNGfs= github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA= -github.com/prometheus/prometheus v2.5.0+incompatible h1:7QPitgO2kOFG8ecuRn9O/4L9+10He72rVRJvMXrE9Hg= -github.com/prometheus/prometheus v2.5.0+incompatible/go.mod h1:oAIUtOny2rjMX0OWN5vPR5/q/twIROJvdqnQKDdil/s= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72 h1:qLC7fQah7D6K1B0ujays3HV9gkFtllcxhzImRR7ArPQ= github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= diff --git a/head.go b/head.go index 3c860828..0f429961 100644 --- a/head.go +++ b/head.go @@ -347,20 +347,11 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64) (err error) { } var ( -<<<<<<< HEAD - dec RecordDecoder - series []RefSeries - samples []RefSample - tstones []Stone - allStones = newMemTombstones() -======= dec record.RecordDecoder series []record.RefSeries samples []record.RefSample tstones []tombstones.Stone allStones = tombstones.NewMemTombstones() - err error ->>>>>>> Move tombstones to it's own package. ) defer func() { if err := allStones.Close(); err != nil { @@ -385,7 +376,7 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64) (err error) { series, created := h.getOrCreateWithID(s.Ref, s.Labels.Hash(), s.Labels) if !created { - // There's already a different ref for this series. + // There's already a different Ref for this series. multiRefLock.Lock() multiRef[s.Ref] = series.Ref multiRefLock.Unlock() @@ -474,15 +465,11 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64) (err error) { } wg.Wait() -<<<<<<< HEAD if r.Err() != nil { return errors.Wrap(r.Err(), "read records") } - if err := allStones.Iter(func(ref uint64, dranges Intervals) error { -======= if err := allStones.Iter(func(ref uint64, dranges tombstones.Intervals) error { ->>>>>>> Move tombstones to it's own package. return h.chunkRewrite(ref, dranges) }); err != nil { return errors.Wrap(r.Err(), "deleting samples from tombstones") @@ -1303,21 +1290,15 @@ func (h *headIndexReader) Series(ref uint64, lbls *labels.Labels, chks *[]chunks continue } // Set the head chunks as open (being appended to). - maxTime := c.maxTime - if s.headChunk == c { + maxTime := c.MaxTime + if s.HeadChunk == c { maxTime = math.MaxInt64 } *chks = append(*chks, chunks.Meta{ -<<<<<<< HEAD - MinTime: c.minTime, - MaxTime: maxTime, - Ref: packChunkID(s.ref, uint64(s.chunkID(i))), -======= MinTime: c.MinTime, - MaxTime: c.MaxTime, + MaxTime: maxTime, Ref: packChunkID(s.Ref, uint64(s.ChunkID(i))), ->>>>>>> Move WAL Watcher from Prometheus to TSDB WAL package. }) } diff --git a/head_test.go b/head_test.go index cd3648d3..456c0958 100644 --- a/head_test.go +++ b/head_test.go @@ -102,28 +102,28 @@ func TestHead_ReadWAL(t *testing.T) { for _, compress := range []bool{false, true} { t.Run(fmt.Sprintf("compress=%t", compress), func(t *testing.T) { entries := []interface{}{ - []RefSeries{ + []record.RefSeries{ {Ref: 10, Labels: labels.FromStrings("a", "1")}, {Ref: 11, Labels: labels.FromStrings("a", "2")}, {Ref: 100, Labels: labels.FromStrings("a", "3")}, }, - []RefSample{ + []record.RefSample{ {Ref: 0, T: 99, V: 1}, {Ref: 10, T: 100, V: 2}, {Ref: 100, T: 100, V: 3}, }, - []RefSeries{ + []record.RefSeries{ {Ref: 50, Labels: labels.FromStrings("a", "4")}, // This series has two refs pointing to it. {Ref: 101, Labels: labels.FromStrings("a", "3")}, }, - []RefSample{ + []record.RefSample{ {Ref: 10, T: 101, V: 5}, {Ref: 50, T: 101, V: 6}, {Ref: 101, T: 101, V: 7}, }, - []Stone{ - {ref: 0, intervals: []Interval{{Mint: 99, Maxt: 101}}}, + []tombstones.Stone{ + {Ref: 0, Intervals: []tombstones.Interval{{Mint: 99, Maxt: 101}}}, }, } dir, err := ioutil.TempDir("", "test_read_wal") @@ -148,10 +148,10 @@ func TestHead_ReadWAL(t *testing.T) { s50 := head.series.getByID(50) s100 := head.series.getByID(100) - testutil.Equals(t, labels.FromStrings("a", "1"), s10.lset) - testutil.Equals(t, (*memSeries)(nil), s11) // Series without samples should be garbage colected at head.Init(). - testutil.Equals(t, labels.FromStrings("a", "4"), s50.lset) - testutil.Equals(t, labels.FromStrings("a", "3"), s100.lset) + testutil.Equals(t, labels.FromStrings("a", "1"), s10.Lset) + testutil.Equals(t, (*record.MemSeries)(nil), s11) // Series without samples should be garbage colected at head.Init(). + testutil.Equals(t, labels.FromStrings("a", "4"), s50.Lset) + testutil.Equals(t, labels.FromStrings("a", "3"), s100.Lset) expandChunk := func(c chunkenc.Iterator) (x []sample) { for c.Next() { @@ -161,9 +161,9 @@ func TestHead_ReadWAL(t *testing.T) { testutil.Ok(t, c.Err()) return x } - testutil.Equals(t, []sample{{100, 2}, {101, 5}}, expandChunk(s10.iterator(0))) - testutil.Equals(t, []sample{{101, 6}}, expandChunk(s50.iterator(0))) - testutil.Equals(t, []sample{{100, 3}, {101, 7}}, expandChunk(s100.iterator(0))) + testutil.Equals(t, []sample{{100, 2}, {101, 5}}, expandChunk(s10.Iterator(0))) + testutil.Equals(t, []sample{{101, 6}}, expandChunk(s50.Iterator(0))) + testutil.Equals(t, []sample{{100, 3}, {101, 7}}, expandChunk(s100.Iterator(0))) }) } } @@ -328,14 +328,14 @@ func TestHeadDeleteSeriesWithoutSamples(t *testing.T) { for _, compress := range []bool{false, true} { t.Run(fmt.Sprintf("compress=%t", compress), func(t *testing.T) { entries := []interface{}{ - []RefSeries{ + []record.RefSeries{ {Ref: 10, Labels: labels.FromStrings("a", "1")}, }, - []RefSample{}, - []RefSeries{ + []record.RefSample{}, + []record.RefSeries{ {Ref: 50, Labels: labels.FromStrings("a", "2")}, }, - []RefSample{ + []record.RefSample{ {Ref: 50, T: 80, V: 1}, {Ref: 50, T: 90, V: 1}, }, @@ -1056,9 +1056,9 @@ func TestHead_LogRollback(t *testing.T) { testutil.Equals(t, 1, len(recs)) - series, ok := recs[0].([]RefSeries) + series, ok := recs[0].([]record.RefSeries) testutil.Assert(t, ok, "expected series record but got %+v", recs[0]) - testutil.Equals(t, []RefSeries{{Ref: 1, Labels: labels.FromStrings("a", "b")}}, series) + testutil.Equals(t, []record.RefSeries{{Ref: 1, Labels: labels.FromStrings("a", "b")}}, series) }) } } @@ -1066,7 +1066,7 @@ func TestHead_LogRollback(t *testing.T) { // TestWalRepair_DecodingError ensures that a repair is run for an error // when decoding a record. func TestWalRepair_DecodingError(t *testing.T) { - var enc RecordEncoder + var enc record.RecordEncoder for name, test := range map[string]struct { corrFunc func(rec []byte) []byte // Func that applies the corruption to a record. rec []byte @@ -1078,7 +1078,7 @@ func TestWalRepair_DecodingError(t *testing.T) { // Do not modify the base record because it is Logged multiple times. res := make([]byte, len(rec)) copy(res, rec) - res[0] = byte(RecordInvalid) + res[0] = byte(record.RecordInvalid) return res }, enc.Series([]record.RefSeries{{Ref: 1, Labels: labels.FromStrings("a", "b")}}, []byte{}), diff --git a/record/internal.go b/record/internal.go index dbc166db..89a0cc2d 100644 --- a/record/internal.go +++ b/record/internal.go @@ -74,8 +74,8 @@ type MemSeries struct { PendingCommit bool // Whether there are samples waiting to be committed to this series. Chunks []*MemChunk Lset labels.Labels + HeadChunk *MemChunk - headChunk *MemChunk chunkRange int64 firstChunkID int @@ -117,7 +117,7 @@ func (s *MemSeries) cut(mint int64) *MemChunk { MaxTime: math.MinInt64, } s.Chunks = append(s.Chunks, c) - s.headChunk = c + s.HeadChunk = c // Set upper bound on when the next chunk must be started. An earlier timestamp // may be chosen dynamically at a later point. @@ -143,7 +143,7 @@ func (s *MemSeries) ChunksMetas() []chunks.Meta { // and 'chunkRange', like how it would appear after 'newMemSeries(...)'. func (s *MemSeries) Reset() { s.Chunks = nil - s.headChunk = nil + s.HeadChunk = nil s.firstChunkID = 0 s.nextAt = math.MinInt64 s.sampleBuf = [4]sample{} @@ -197,9 +197,9 @@ func (s *MemSeries) TruncateChunksBefore(mint int64) (removed int) { s.Chunks = append(s.Chunks[:0], s.Chunks[k:]...) s.firstChunkID += k if len(s.Chunks) == 0 { - s.headChunk = nil + s.HeadChunk = nil } else { - s.headChunk = s.Chunks[len(s.Chunks)-1] + s.HeadChunk = s.Chunks[len(s.Chunks)-1] } return k @@ -270,7 +270,7 @@ func (s *MemSeries) Iterator(id int) chunkenc.Iterator { } func (s *MemSeries) head() *MemChunk { - return s.headChunk + return s.HeadChunk } type MemChunk struct { diff --git a/tombstones/tombstones.go b/tombstones/tombstones.go index ed2e3d61..ef2d261f 100644 --- a/tombstones/tombstones.go +++ b/tombstones/tombstones.go @@ -50,9 +50,9 @@ func init() { castagnoliTable = crc32.MakeTable(crc32.Castagnoli) } -// NewCRC32 initializes a CRC32 hash with a preconfigured polynomial, so the +// newCRC32 initializes a CRC32 hash with a preconfigured polynomial, so the // polynomial may be easily changed in one location at a later time, if necessary. -func NewCRC32() hash.Hash32 { +func newCRC32() hash.Hash32 { return crc32.New(castagnoliTable) } @@ -72,7 +72,7 @@ type TombstoneReader interface { } func WriteTombstoneFile(logger log.Logger, dir string, tr TombstoneReader) (int64, error) { - path := filepath.Join(dir, tombstoneFilename) + path := filepath.Join(dir, TombstoneFilename) tmp := path + ".tmp" hash := newCRC32() var size int @@ -151,9 +151,9 @@ type Stone struct { } func ReadTombstones(dir string) (TombstoneReader, int64, error) { - b, err := ioutil.ReadFile(filepath.Join(dir, tombstoneFilename)) + b, err := ioutil.ReadFile(filepath.Join(dir, TombstoneFilename)) if os.IsNotExist(err) { - return newMemTombstones(), 0, nil + return NewMemTombstones(), 0, nil } else if err != nil { return nil, 0, err } @@ -175,7 +175,7 @@ func ReadTombstones(dir string) (TombstoneReader, int64, error) { } // Verify checksum. - hash := NewCRC32() + hash := newCRC32() if _, err := hash.Write(d.Get()); err != nil { return nil, 0, errors.Wrap(err, "write to hash") } diff --git a/wal/wal_watcher.go b/wal/wal_watcher.go index be57975f..bc21994d 100644 --- a/wal/wal_watcher.go +++ b/wal/wal_watcher.go @@ -76,6 +76,7 @@ var ( }, []string{consumer}, ) + lrMetrics = NewLiveReaderMetrics(prometheus.DefaultRegisterer) ) // This function is copied from prometheus/prometheus/pkg/timestamp to avoid adding vendor to TSDB repo. @@ -308,7 +309,7 @@ func (w *WALWatcher) watch(segmentNum int, tail bool) error { } defer segment.Close() - reader := NewLiveReader(w.logger, w.reg, segment) + reader := NewLiveReader(w.logger, lrMetrics, segment) readTicker := time.NewTicker(readPeriod) defer readTicker.Stop() @@ -523,7 +524,7 @@ func (w *WALWatcher) readCheckpoint(checkpointDir string) error { } defer sr.Close() - r := NewLiveReader(w.logger, w.reg, sr) + r := NewLiveReader(w.logger, lrMetrics, sr) if err := w.readSegment(r, index, false); err != io.EOF && err != nil { return errors.Wrap(err, "readSegment") }