Skip to content
This repository has been archived by the owner on Aug 13, 2019. It is now read-only.

Commit

Permalink
Fix some stuff I broke during rebase.
Browse files Browse the repository at this point in the history
Signed-off-by: Callum Styan <[email protected]>
  • Loading branch information
cstyan committed Jul 3, 2019
1 parent 26482ca commit d6ba13b
Show file tree
Hide file tree
Showing 8 changed files with 42 additions and 63 deletions.
4 changes: 2 additions & 2 deletions compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -608,7 +608,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe
}

// Create an empty tombstones file.
if _, err := tombstones.WriteTombstoneFile(c.logger, tmp, record.NewMemTombstones()); err != nil {
if _, err := tombstones.WriteTombstoneFile(c.logger, tmp, tombstones.NewMemTombstones()); err != nil {
return errors.Wrap(err, "write new tombstones file")
}

Expand Down Expand Up @@ -768,7 +768,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
//
// TODO think how to avoid the typecasting to verify when it is head block.
if _, isHeadChunk := chk.Chunk.(*safeChunk); isHeadChunk && chk.MaxTime >= meta.MaxTime {
dranges = append(dranges, Interval{Mint: meta.MaxTime, Maxt: math.MaxInt64})
dranges = append(dranges, tombstones.Interval{Mint: meta.MaxTime, Maxt: math.MaxInt64})

} else
// Sanity check for disk blocks.
Expand Down
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ require (
github.com/oklog/ulid v1.3.1
github.com/pkg/errors v0.8.0
github.com/prometheus/client_golang v1.0.0
github.com/prometheus/prometheus v2.5.0+incompatible
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4
golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5
gopkg.in/alecthomas/kingpin.v2 v2.2.6
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,6 @@ github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d h1:GoAlyOgbOEIFd
github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
github.com/prometheus/procfs v0.0.2 h1:6LJUbpNm42llc4HRCuvApCSWB/WfhuNo9K98Q9sNGfs=
github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA=
github.com/prometheus/prometheus v2.5.0+incompatible h1:7QPitgO2kOFG8ecuRn9O/4L9+10He72rVRJvMXrE9Hg=
github.com/prometheus/prometheus v2.5.0+incompatible/go.mod h1:oAIUtOny2rjMX0OWN5vPR5/q/twIROJvdqnQKDdil/s=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72 h1:qLC7fQah7D6K1B0ujays3HV9gkFtllcxhzImRR7ArPQ=
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
Expand Down
27 changes: 4 additions & 23 deletions head.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,20 +347,11 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64) (err error) {
}

var (
<<<<<<< HEAD
dec RecordDecoder
series []RefSeries
samples []RefSample
tstones []Stone
allStones = newMemTombstones()
=======
dec record.RecordDecoder
series []record.RefSeries
samples []record.RefSample
tstones []tombstones.Stone
allStones = tombstones.NewMemTombstones()
err error
>>>>>>> Move tombstones to it's own package.
)
defer func() {
if err := allStones.Close(); err != nil {
Expand All @@ -385,7 +376,7 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64) (err error) {
series, created := h.getOrCreateWithID(s.Ref, s.Labels.Hash(), s.Labels)

if !created {
// There's already a different ref for this series.
// There's already a different Ref for this series.
multiRefLock.Lock()
multiRef[s.Ref] = series.Ref
multiRefLock.Unlock()
Expand Down Expand Up @@ -474,15 +465,11 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64) (err error) {
}
wg.Wait()

<<<<<<< HEAD
if r.Err() != nil {
return errors.Wrap(r.Err(), "read records")
}

if err := allStones.Iter(func(ref uint64, dranges Intervals) error {
=======
if err := allStones.Iter(func(ref uint64, dranges tombstones.Intervals) error {
>>>>>>> Move tombstones to it's own package.
return h.chunkRewrite(ref, dranges)
}); err != nil {
return errors.Wrap(r.Err(), "deleting samples from tombstones")
Expand Down Expand Up @@ -1303,21 +1290,15 @@ func (h *headIndexReader) Series(ref uint64, lbls *labels.Labels, chks *[]chunks
continue
}
// Set the head chunks as open (being appended to).
maxTime := c.maxTime
if s.headChunk == c {
maxTime := c.MaxTime
if s.HeadChunk == c {
maxTime = math.MaxInt64
}

*chks = append(*chks, chunks.Meta{
<<<<<<< HEAD
MinTime: c.minTime,
MaxTime: maxTime,
Ref: packChunkID(s.ref, uint64(s.chunkID(i))),
=======
MinTime: c.MinTime,
MaxTime: c.MaxTime,
MaxTime: maxTime,
Ref: packChunkID(s.Ref, uint64(s.ChunkID(i))),
>>>>>>> Move WAL Watcher from Prometheus to TSDB WAL package.
})
}

Expand Down
42 changes: 21 additions & 21 deletions head_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,28 +102,28 @@ func TestHead_ReadWAL(t *testing.T) {
for _, compress := range []bool{false, true} {
t.Run(fmt.Sprintf("compress=%t", compress), func(t *testing.T) {
entries := []interface{}{
[]RefSeries{
[]record.RefSeries{
{Ref: 10, Labels: labels.FromStrings("a", "1")},
{Ref: 11, Labels: labels.FromStrings("a", "2")},
{Ref: 100, Labels: labels.FromStrings("a", "3")},
},
[]RefSample{
[]record.RefSample{
{Ref: 0, T: 99, V: 1},
{Ref: 10, T: 100, V: 2},
{Ref: 100, T: 100, V: 3},
},
[]RefSeries{
[]record.RefSeries{
{Ref: 50, Labels: labels.FromStrings("a", "4")},
// This series has two refs pointing to it.
{Ref: 101, Labels: labels.FromStrings("a", "3")},
},
[]RefSample{
[]record.RefSample{
{Ref: 10, T: 101, V: 5},
{Ref: 50, T: 101, V: 6},
{Ref: 101, T: 101, V: 7},
},
[]Stone{
{ref: 0, intervals: []Interval{{Mint: 99, Maxt: 101}}},
[]tombstones.Stone{
{Ref: 0, Intervals: []tombstones.Interval{{Mint: 99, Maxt: 101}}},
},
}
dir, err := ioutil.TempDir("", "test_read_wal")
Expand All @@ -148,10 +148,10 @@ func TestHead_ReadWAL(t *testing.T) {
s50 := head.series.getByID(50)
s100 := head.series.getByID(100)

testutil.Equals(t, labels.FromStrings("a", "1"), s10.lset)
testutil.Equals(t, (*memSeries)(nil), s11) // Series without samples should be garbage colected at head.Init().
testutil.Equals(t, labels.FromStrings("a", "4"), s50.lset)
testutil.Equals(t, labels.FromStrings("a", "3"), s100.lset)
testutil.Equals(t, labels.FromStrings("a", "1"), s10.Lset)
testutil.Equals(t, (*record.MemSeries)(nil), s11) // Series without samples should be garbage colected at head.Init().
testutil.Equals(t, labels.FromStrings("a", "4"), s50.Lset)
testutil.Equals(t, labels.FromStrings("a", "3"), s100.Lset)

expandChunk := func(c chunkenc.Iterator) (x []sample) {
for c.Next() {
Expand All @@ -161,9 +161,9 @@ func TestHead_ReadWAL(t *testing.T) {
testutil.Ok(t, c.Err())
return x
}
testutil.Equals(t, []sample{{100, 2}, {101, 5}}, expandChunk(s10.iterator(0)))
testutil.Equals(t, []sample{{101, 6}}, expandChunk(s50.iterator(0)))
testutil.Equals(t, []sample{{100, 3}, {101, 7}}, expandChunk(s100.iterator(0)))
testutil.Equals(t, []sample{{100, 2}, {101, 5}}, expandChunk(s10.Iterator(0)))
testutil.Equals(t, []sample{{101, 6}}, expandChunk(s50.Iterator(0)))
testutil.Equals(t, []sample{{100, 3}, {101, 7}}, expandChunk(s100.Iterator(0)))
})
}
}
Expand Down Expand Up @@ -328,14 +328,14 @@ func TestHeadDeleteSeriesWithoutSamples(t *testing.T) {
for _, compress := range []bool{false, true} {
t.Run(fmt.Sprintf("compress=%t", compress), func(t *testing.T) {
entries := []interface{}{
[]RefSeries{
[]record.RefSeries{
{Ref: 10, Labels: labels.FromStrings("a", "1")},
},
[]RefSample{},
[]RefSeries{
[]record.RefSample{},
[]record.RefSeries{
{Ref: 50, Labels: labels.FromStrings("a", "2")},
},
[]RefSample{
[]record.RefSample{
{Ref: 50, T: 80, V: 1},
{Ref: 50, T: 90, V: 1},
},
Expand Down Expand Up @@ -1056,17 +1056,17 @@ func TestHead_LogRollback(t *testing.T) {

testutil.Equals(t, 1, len(recs))

series, ok := recs[0].([]RefSeries)
series, ok := recs[0].([]record.RefSeries)
testutil.Assert(t, ok, "expected series record but got %+v", recs[0])
testutil.Equals(t, []RefSeries{{Ref: 1, Labels: labels.FromStrings("a", "b")}}, series)
testutil.Equals(t, []record.RefSeries{{Ref: 1, Labels: labels.FromStrings("a", "b")}}, series)
})
}
}

// TestWalRepair_DecodingError ensures that a repair is run for an error
// when decoding a record.
func TestWalRepair_DecodingError(t *testing.T) {
var enc RecordEncoder
var enc record.RecordEncoder
for name, test := range map[string]struct {
corrFunc func(rec []byte) []byte // Func that applies the corruption to a record.
rec []byte
Expand All @@ -1078,7 +1078,7 @@ func TestWalRepair_DecodingError(t *testing.T) {
// Do not modify the base record because it is Logged multiple times.
res := make([]byte, len(rec))
copy(res, rec)
res[0] = byte(RecordInvalid)
res[0] = byte(record.RecordInvalid)
return res
},
enc.Series([]record.RefSeries{{Ref: 1, Labels: labels.FromStrings("a", "b")}}, []byte{}),
Expand Down
12 changes: 6 additions & 6 deletions record/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ type MemSeries struct {
PendingCommit bool // Whether there are samples waiting to be committed to this series.
Chunks []*MemChunk
Lset labels.Labels
HeadChunk *MemChunk

headChunk *MemChunk
chunkRange int64
firstChunkID int

Expand Down Expand Up @@ -117,7 +117,7 @@ func (s *MemSeries) cut(mint int64) *MemChunk {
MaxTime: math.MinInt64,
}
s.Chunks = append(s.Chunks, c)
s.headChunk = c
s.HeadChunk = c

// Set upper bound on when the next chunk must be started. An earlier timestamp
// may be chosen dynamically at a later point.
Expand All @@ -143,7 +143,7 @@ func (s *MemSeries) ChunksMetas() []chunks.Meta {
// and 'chunkRange', like how it would appear after 'newMemSeries(...)'.
func (s *MemSeries) Reset() {
s.Chunks = nil
s.headChunk = nil
s.HeadChunk = nil
s.firstChunkID = 0
s.nextAt = math.MinInt64
s.sampleBuf = [4]sample{}
Expand Down Expand Up @@ -197,9 +197,9 @@ func (s *MemSeries) TruncateChunksBefore(mint int64) (removed int) {
s.Chunks = append(s.Chunks[:0], s.Chunks[k:]...)
s.firstChunkID += k
if len(s.Chunks) == 0 {
s.headChunk = nil
s.HeadChunk = nil
} else {
s.headChunk = s.Chunks[len(s.Chunks)-1]
s.HeadChunk = s.Chunks[len(s.Chunks)-1]
}

return k
Expand Down Expand Up @@ -270,7 +270,7 @@ func (s *MemSeries) Iterator(id int) chunkenc.Iterator {
}

func (s *MemSeries) head() *MemChunk {
return s.headChunk
return s.HeadChunk
}

type MemChunk struct {
Expand Down
12 changes: 6 additions & 6 deletions tombstones/tombstones.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@ func init() {
castagnoliTable = crc32.MakeTable(crc32.Castagnoli)
}

// NewCRC32 initializes a CRC32 hash with a preconfigured polynomial, so the
// newCRC32 initializes a CRC32 hash with a preconfigured polynomial, so the
// polynomial may be easily changed in one location at a later time, if necessary.
func NewCRC32() hash.Hash32 {
func newCRC32() hash.Hash32 {
return crc32.New(castagnoliTable)
}

Expand All @@ -72,7 +72,7 @@ type TombstoneReader interface {
}

func WriteTombstoneFile(logger log.Logger, dir string, tr TombstoneReader) (int64, error) {
path := filepath.Join(dir, tombstoneFilename)
path := filepath.Join(dir, TombstoneFilename)
tmp := path + ".tmp"
hash := newCRC32()
var size int
Expand Down Expand Up @@ -151,9 +151,9 @@ type Stone struct {
}

func ReadTombstones(dir string) (TombstoneReader, int64, error) {
b, err := ioutil.ReadFile(filepath.Join(dir, tombstoneFilename))
b, err := ioutil.ReadFile(filepath.Join(dir, TombstoneFilename))
if os.IsNotExist(err) {
return newMemTombstones(), 0, nil
return NewMemTombstones(), 0, nil
} else if err != nil {
return nil, 0, err
}
Expand All @@ -175,7 +175,7 @@ func ReadTombstones(dir string) (TombstoneReader, int64, error) {
}

// Verify checksum.
hash := NewCRC32()
hash := newCRC32()
if _, err := hash.Write(d.Get()); err != nil {
return nil, 0, errors.Wrap(err, "write to hash")
}
Expand Down
5 changes: 3 additions & 2 deletions wal/wal_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ var (
},
[]string{consumer},
)
lrMetrics = NewLiveReaderMetrics(prometheus.DefaultRegisterer)
)

// This function is copied from prometheus/prometheus/pkg/timestamp to avoid adding vendor to TSDB repo.
Expand Down Expand Up @@ -308,7 +309,7 @@ func (w *WALWatcher) watch(segmentNum int, tail bool) error {
}
defer segment.Close()

reader := NewLiveReader(w.logger, w.reg, segment)
reader := NewLiveReader(w.logger, lrMetrics, segment)

readTicker := time.NewTicker(readPeriod)
defer readTicker.Stop()
Expand Down Expand Up @@ -523,7 +524,7 @@ func (w *WALWatcher) readCheckpoint(checkpointDir string) error {
}
defer sr.Close()

r := NewLiveReader(w.logger, w.reg, sr)
r := NewLiveReader(w.logger, lrMetrics, sr)
if err := w.readSegment(r, index, false); err != io.EOF && err != nil {
return errors.Wrap(err, "readSegment")
}
Expand Down

0 comments on commit d6ba13b

Please sign in to comment.