Skip to content
This repository has been archived by the owner on Aug 13, 2019. It is now read-only.

Add ChunksIterator method to Series interface. #665

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
## master / unreleased

- [CHANGE] `chunks.MergeOverlappingChunks` moved to `tsdb.MergeOverlappingChunks`
- [CHANGE] `Series` interface allows return chunk iterator that allows iterating over encoded chunks.
bwplotka marked this conversation as resolved.
Show resolved Hide resolved

## 0.10.0

- [FEATURE] Added `DBReadOnly` to allow opening a database in read only mode.
Expand Down
13 changes: 11 additions & 2 deletions chunkenc/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,19 @@ type Appender interface {
Append(int64, float64)
}

// Iterator is a simple iterator that can only get the next value.
// Iterator iterates over the data of a time series.
bwplotka marked this conversation as resolved.
Show resolved Hide resolved
type Iterator interface {
// Seek advances the iterator forward to the sample with the timestamp t or first value after t.
// If the current iterator points to the sample with timestamp after t already, Seek should not advance the iterator.
bwplotka marked this conversation as resolved.
Show resolved Hide resolved
// Seek returns false if there is no such sample with the timestamp equal or larger than t.
// Iterator can be exhausted when the Seek returns false.
bwplotka marked this conversation as resolved.
Show resolved Hide resolved
Seek(t int64) bool
// At returns the current timestamp/value pair.
At() (int64, float64)
Err() error
// Next advances the iterator by one.
Next() bool
// Err returns the current error.
Err() error
}

// NewNopIterator returns a new chunk iterator that does not hold any data.
Expand All @@ -70,6 +78,7 @@ func NewNopIterator() Iterator {

type nopIterator struct{}

func (nopIterator) Seek(t int64) bool { return false }
func (nopIterator) At() (int64, float64) { return 0, 0 }
func (nopIterator) Next() bool { return false }
func (nopIterator) Err() error { return nil }
Expand Down
68 changes: 43 additions & 25 deletions chunkenc/chunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"fmt"
"io"
"math/rand"
"reflect"
"testing"

"github.com/prometheus/tsdb/testutil"
Expand All @@ -35,28 +34,23 @@ func TestChunk(t *testing.T) {
t.Run(fmt.Sprintf("%v", enc), func(t *testing.T) {
for range make([]struct{}, 1) {
c := nc()
if err := testChunk(c); err != nil {
t.Fatal(err)
}
testChunk(t, c)
}
})
}
}

func testChunk(c Chunk) error {
func testChunk(t *testing.T, c Chunk) {
app, err := c.Appender()
if err != nil {
return err
}
testutil.Ok(t, err)

var exp []pair
var all []pair
var (
ts = int64(1234123324)
v = 1243535.123
)
for i := 0; i < 300; i++ {
ts += int64(rand.Intn(10000) + 1)
// v = rand.Float64()
if i%2 == 0 {
v += float64(rand.Intn(1000000))
} else {
Expand All @@ -67,29 +61,53 @@ func testChunk(c Chunk) error {
// appending to a partially filled chunk.
if i%10 == 0 {
app, err = c.Appender()
if err != nil {
return err
}
testutil.Ok(t, err)
}

app.Append(ts, v)
exp = append(exp, pair{t: ts, v: v})
// fmt.Println("appended", len(c.Bytes()), c.Bytes())
all = append(all, pair{t: ts, v: v})
}

it := c.Iterator(nil)
var res []pair
for it.Next() {
ts, v := it.At()
res = append(res, pair{t: ts, v: v})
// 1. Expand iterator in simple case.
it1 := c.Iterator(nil)
var res1 []pair
for it1.Next() {
ts, v := it1.At()
res1 = append(res1, pair{t: ts, v: v})
}
if it.Err() != nil {
return it.Err()
testutil.Ok(t, it1.Err())
testutil.Equals(t, all, res1)

// 2. Expand second iterator while reusing first one.
krasi-georgiev marked this conversation as resolved.
Show resolved Hide resolved
it2 := c.Iterator(it1)
var res2 []pair
for it2.Next() {
ts, v := it2.At()
res2 = append(res2, pair{t: ts, v: v})
}
if !reflect.DeepEqual(exp, res) {
return fmt.Errorf("unexpected result\n\ngot: %v\n\nexp: %v", res, exp)
testutil.Ok(t, it2.Err())
testutil.Equals(t, all, res2)

// 3. Test Iterator Seek.
mid := len(all) / 2

it3 := c.Iterator(nil)
var res3 []pair
testutil.Equals(t, true, it3.Seek(all[mid].t))
// Below ones should not matter.
testutil.Equals(t, true, it3.Seek(all[mid].t))
testutil.Equals(t, true, it3.Seek(all[mid].t))
ts, v = it3.At()
res3 = append(res3, pair{t: ts, v: v})

for it3.Next() {
ts, v := it3.At()
res3 = append(res3, pair{t: ts, v: v})
}
return nil
testutil.Ok(t, it3.Err())
testutil.Equals(t, all[mid:], res3)

testutil.Equals(t, false, it3.Seek(all[len(all)-1].t+1))
}

func benchmarkIterator(b *testing.B, newChunk func() Chunk) {
Expand Down
13 changes: 13 additions & 0 deletions chunkenc/xor.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,19 @@ type xorIterator struct {
err error
}

func (it *xorIterator) Seek(t int64) bool {
if it.err != nil {
return false
}

for t > it.t || it.numRead == 0 {
if !it.Next() {
return false
}
}
return true
}

func (it *xorIterator) At() (int64, float64) {
return it.t, it.val
}
Expand Down
79 changes: 0 additions & 79 deletions chunks/chunks.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,85 +205,6 @@ func (w *Writer) write(b []byte) error {
return err
}

// MergeOverlappingChunks removes the samples whose timestamp is overlapping.
// The last appearing sample is retained in case there is overlapping.
// This assumes that `chks []Meta` is sorted w.r.t. MinTime.
func MergeOverlappingChunks(chks []Meta) ([]Meta, error) {
if len(chks) < 2 {
return chks, nil
}
newChks := make([]Meta, 0, len(chks)) // Will contain the merged chunks.
newChks = append(newChks, chks[0])
last := 0
for _, c := range chks[1:] {
// We need to check only the last chunk in newChks.
// Reason: (1) newChks[last-1].MaxTime < newChks[last].MinTime (non overlapping)
// (2) As chks are sorted w.r.t. MinTime, newChks[last].MinTime < c.MinTime.
// So never overlaps with newChks[last-1] or anything before that.
if c.MinTime > newChks[last].MaxTime {
newChks = append(newChks, c)
last++
continue
}
nc := &newChks[last]
if c.MaxTime > nc.MaxTime {
nc.MaxTime = c.MaxTime
}
chk, err := MergeChunks(nc.Chunk, c.Chunk)
if err != nil {
return nil, err
}
nc.Chunk = chk
}

return newChks, nil
}

// MergeChunks vertically merges a and b, i.e., if there is any sample
// with same timestamp in both a and b, the sample in a is discarded.
func MergeChunks(a, b chunkenc.Chunk) (*chunkenc.XORChunk, error) {
newChunk := chunkenc.NewXORChunk()
app, err := newChunk.Appender()
if err != nil {
return nil, err
}
ait := a.Iterator(nil)
bit := b.Iterator(nil)
aok, bok := ait.Next(), bit.Next()
for aok && bok {
at, av := ait.At()
bt, bv := bit.At()
if at < bt {
app.Append(at, av)
aok = ait.Next()
} else if bt < at {
app.Append(bt, bv)
bok = bit.Next()
} else {
app.Append(bt, bv)
aok = ait.Next()
bok = bit.Next()
}
}
for aok {
at, av := ait.At()
app.Append(at, av)
aok = ait.Next()
}
for bok {
bt, bv := bit.At()
app.Append(bt, bv)
bok = bit.Next()
}
if ait.Err() != nil {
return nil, ait.Err()
}
if bit.Err() != nil {
return nil, bit.Err()
}
return newChunk, nil
}

func (w *Writer) WriteChunks(chks ...Meta) error {
// Calculate maximum space we need and cut a new segment in case
// we don't fit into the current one.
Expand Down
33 changes: 32 additions & 1 deletion compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -811,7 +811,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,

mergedChks := chks
if overlapping {
mergedChks, err = chunks.MergeOverlappingChunks(chks)
mergedChks, err = MergeOverlappingChunks(chks)
if err != nil {
return errors.Wrap(err, "merge overlapping chunks")
}
Expand Down Expand Up @@ -1032,3 +1032,34 @@ func (c *compactionMerger) Err() error {
func (c *compactionMerger) At() (labels.Labels, []chunks.Meta, Intervals) {
return c.l, c.c, c.intervals
}

// MergeOverlappingChunks removes the samples whose timestamp is overlapping.
// The last appearing sample is retained in case there is overlapping.
// This assumes that `chks []Meta` is sorted w.r.t. MinTime.
func MergeOverlappingChunks(chks []chunks.Meta) ([]chunks.Meta, error) {
if len(chks) < 2 {
return chks, nil
}
newChks := make([]chunks.Meta, 0, len(chks)) // Will contain the merged chunks.
newChks = append(newChks, chks[0])
last := 0
var aReuseIter, bReuseIter chunkenc.Iterator
for _, c := range chks[1:] {
// We need to check only the last chunk in newChks.
// Reason: (1) newChks[last-1].MaxTime < newChks[last].MinTime (non overlapping)
// (2) As chks are sorted w.r.t. MinTime, newChks[last].MinTime < c.MinTime.
// So never overlaps with newChks[last-1] or anything before that.
if c.MinTime > newChks[last].MaxTime {
newChks = append(newChks, c)
last++
continue
}
chk, err := mergeOverlappingChunks(newChks[last], c, aReuseIter, bReuseIter)
if err != nil {
return nil, err
}
newChks[last] = chk
}

return newChks, nil
}
2 changes: 1 addition & 1 deletion head_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -740,7 +740,7 @@ func TestDelete_e2e(t *testing.T) {
smpls = deletedSamples(smpls, del.drange)
// Only append those series for which samples exist as mockSeriesSet
// doesn't skip series with no samples.
// TODO: But sometimes SeriesSet returns an empty SeriesIterator
// TODO: But sometimes SeriesSet returns an empty chunkenc.Iterator
if len(smpls) > 0 {
matchedSeries = append(matchedSeries, newSeries(
m.Map(),
Expand Down
Loading