Skip to content
This repository has been archived by the owner on Aug 13, 2019. It is now read-only.

Commit

Permalink
Reused same interface in chunkenc.
Browse files Browse the repository at this point in the history
Signed-off-by: Bartek Plotka <[email protected]>
  • Loading branch information
bwplotka committed Aug 5, 2019
1 parent 6f6dd39 commit 78e9e36
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 55 deletions.
2 changes: 1 addition & 1 deletion chunkenc/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ type Appender interface {
Append(int64, float64)
}

// Iterator is a simple iterator that can only get the next value.
// Iterator iterates over the data of a time series.
type Iterator interface {
// Seek advances the iterator forward to the given timestamp.
// If there's no value exactly at t, it advances to the first value
Expand Down
2 changes: 1 addition & 1 deletion head_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -740,7 +740,7 @@ func TestDelete_e2e(t *testing.T) {
smpls = deletedSamples(smpls, del.drange)
// Only append those series for which samples exist as mockSeriesSet
// doesn't skip series with no samples.
// TODO: But sometimes SeriesSet returns an empty SeriesIterator
// TODO: But sometimes SeriesSet returns an empty chunkenc.Iterator
if len(smpls) > 0 {
matchedSeries = append(matchedSeries, newSeries(
m.Map(),
Expand Down
37 changes: 11 additions & 26 deletions querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ type Series interface {
Labels() labels.Labels

// Iterator returns a new iterator of the data of the series.
Iterator() SeriesIterator
Iterator() chunkenc.Iterator

// ChunkIterator returns a new iterator for the non-overlapping chunks of the series.
// ChunkIterator returns a new iterator that iterates over non-overlapping chunks of the series.
ChunkIterator() ChunkIterator
}

Expand Down Expand Up @@ -876,28 +876,14 @@ func (s *chunkSeries) Labels() labels.Labels {
return s.labels
}

func (s *chunkSeries) Iterator() SeriesIterator {
func (s *chunkSeries) Iterator() chunkenc.Iterator {
return newChunkSeriesIterator(s.chunks, s.intervals, s.mint, s.maxt)
}

func (s *chunkSeries) ChunkIterator() ChunkIterator {
return &chunkIterator{chunks: s.chunks}
}

// SeriesIterator iterates over the data of a time series.
type SeriesIterator interface {
// Seek advances the iterator forward to the given timestamp.
// If there's no value exactly at t, it advances to the first value
// after t.
Seek(t int64) bool
// At returns the current timestamp/value pair.
At() (t int64, v float64)
// Next advances the iterator by one.
Next() bool
// Err returns the current error.
Err() error
}

// chainedSeries implements a series for a list of time-sorted series.
// They all must have the same labels.
type chainedSeries struct {
Expand All @@ -908,7 +894,7 @@ func (s *chainedSeries) Labels() labels.Labels {
return s.series[0].Labels()
}

func (s *chainedSeries) Iterator() SeriesIterator {
func (s *chainedSeries) Iterator() chunkenc.Iterator {
return newChainedSeriesIterator(s.series...)
}

Expand All @@ -926,7 +912,7 @@ type chainedSeriesIterator struct {
series []Series // series in time order

i int
cur SeriesIterator
cur chunkenc.Iterator
}

func newChainedSeriesIterator(s ...Series) *chainedSeriesIterator {
Expand Down Expand Up @@ -987,7 +973,7 @@ func (s *verticalChainedSeries) Labels() labels.Labels {
return s.series[0].Labels()
}

func (s *verticalChainedSeries) Iterator() SeriesIterator {
func (s *verticalChainedSeries) Iterator() chunkenc.Iterator {
return newVerticalMergeSeriesIterator(s.series...)
}

Expand All @@ -998,14 +984,14 @@ func (s *verticalChainedSeries) ChunkIterator() ChunkIterator {
// verticalMergeSeriesIterator implements a series iterator over a list
// of time-sorted, time-overlapping iterators.
type verticalMergeSeriesIterator struct {
a, b SeriesIterator
a, b chunkenc.Iterator
aok, bok, initialized bool

curT int64
curV float64
}

func newVerticalMergeSeriesIterator(s ...Series) SeriesIterator {
func newVerticalMergeSeriesIterator(s ...Series) chunkenc.Iterator {
if len(s) == 1 {
return s[0].Iterator()
} else if len(s) == 2 {
Expand Down Expand Up @@ -1148,7 +1134,6 @@ func (it *verticalMergeChunkIterator) Next() bool {
return true
}


it.curMeta, it.err = mergeOverlappingChunks(aCurMeta, bCurMeta, it.aReuseIter, it.bReuseIter)
if it.err != nil {
return false
Expand Down Expand Up @@ -1189,9 +1174,9 @@ func mergeOverlappingChunks(a, b chunks.Meta, aReuseIter, bReuseIter chunkenc.It
}

return chunks.Meta{
MinTime: mint,
MaxTime: maxt,
Chunk: chk,
MinTime: mint,
MaxTime: maxt,
Chunk: chk,
}, nil
}

Expand Down
18 changes: 9 additions & 9 deletions querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ Outer:
}
}

func expandSeriesIterator(it SeriesIterator) (r []tsdbutil.Sample, err error) {
func expandSeriesIterator(it chunkenc.Iterator) (r []tsdbutil.Sample, err error) {
for it.Next() {
t, v := it.At()
r = append(r, sample{t: t, v: v})
Expand Down Expand Up @@ -271,7 +271,7 @@ func TestBlockQuerier(t *testing.T) {
newSeries := func(l map[string]string, s []tsdbutil.Sample) Series {
return &mockSeries{
labels: func() labels.Labels { return labels.FromMap(l) },
iterator: func() SeriesIterator { return newListSeriesIterator(s) },
iterator: func() chunkenc.Iterator { return newListSeriesIterator(s) },
}
}

Expand Down Expand Up @@ -409,7 +409,7 @@ func TestBlockQuerierDelete(t *testing.T) {
newSeries := func(l map[string]string, s []tsdbutil.Sample) Series {
return &mockSeries{
labels: func() labels.Labels { return labels.FromMap(l) },
iterator: func() SeriesIterator { return newListSeriesIterator(s) },
iterator: func() chunkenc.Iterator { return newListSeriesIterator(s) },
}
}

Expand Down Expand Up @@ -665,10 +665,10 @@ func TestBaseChunkSeries(t *testing.T) {

// TODO: Remove after simpleSeries is merged
type itSeries struct {
si SeriesIterator
si chunkenc.Iterator
}

func (s itSeries) Iterator() SeriesIterator { return s.si }
func (s itSeries) Iterator() chunkenc.Iterator { return s.si }
func (s itSeries) Labels() labels.Labels { return labels.Labels{} }
func (s itSeries) ChunkIterator() ChunkIterator { return nil }

Expand Down Expand Up @@ -1013,7 +1013,7 @@ func TestSeriesIterator(t *testing.T) {

t.Run("Seek", func(t *testing.T) {
for _, tc := range seekcases {
ress := []SeriesIterator{
ress := []chunkenc.Iterator{
newChainedSeriesIterator(
itSeries{newListSeriesIterator(tc.a)},
itSeries{newListSeriesIterator(tc.b)},
Expand Down Expand Up @@ -1478,17 +1478,17 @@ func (m mockIndex) LabelNames() ([]string, error) {

type mockSeries struct {
labels func() labels.Labels
iterator func() SeriesIterator
iterator func() chunkenc.Iterator
}

func newSeries(l map[string]string, s []tsdbutil.Sample) Series {
return &mockSeries{
labels: func() labels.Labels { return labels.FromMap(l) },
iterator: func() SeriesIterator { return newListSeriesIterator(s) },
iterator: func() chunkenc.Iterator { return newListSeriesIterator(s) },
}
}
func (m *mockSeries) Labels() labels.Labels { return m.labels() }
func (m *mockSeries) Iterator() SeriesIterator { return m.iterator() }
func (m *mockSeries) Iterator() chunkenc.Iterator { return m.iterator() }
func (m *mockSeries) ChunkIterator() ChunkIterator { return nil }

type listSeriesIterator struct {
Expand Down
24 changes: 6 additions & 18 deletions tsdbutil/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,33 +15,21 @@ package tsdbutil

import (
"math"
)

// SeriesIterator iterates over the data of a time series.
type SeriesIterator interface {
// Seek advances the iterator forward to the given timestamp.
// If there's no value exactly at t, it advances to the first value
// after t.
Seek(t int64) bool
// At returns the current timestamp/value pair.
At() (t int64, v float64)
// Next advances the iterator by one.
Next() bool
// Err returns the current error.
Err() error
}
"github.com/prometheus/tsdb/chunkenc"
)

// BufferedSeriesIterator wraps an iterator with a look-back buffer.
type BufferedSeriesIterator struct {
it SeriesIterator
it chunkenc.Iterator
buf *sampleRing

lastTime int64
}

// NewBuffer returns a new iterator that buffers the values within the time range
// of the current element and the duration of delta before.
func NewBuffer(it SeriesIterator, delta int64) *BufferedSeriesIterator {
func NewBuffer(it chunkenc.Iterator, delta int64) *BufferedSeriesIterator {
return &BufferedSeriesIterator{
it: it,
buf: newSampleRing(delta, 16),
Expand All @@ -56,7 +44,7 @@ func (b *BufferedSeriesIterator) PeekBack() (t int64, v float64, ok bool) {
}

// Buffer returns an iterator over the buffered data.
func (b *BufferedSeriesIterator) Buffer() SeriesIterator {
func (b *BufferedSeriesIterator) Buffer() chunkenc.Iterator {
return b.buf.iterator()
}

Expand Down Expand Up @@ -145,7 +133,7 @@ func (r *sampleRing) reset() {
r.f = 0
}

func (r *sampleRing) iterator() SeriesIterator {
func (r *sampleRing) iterator() chunkenc.Iterator {
return &sampleRingIterator{r: r, i: -1}
}

Expand Down

0 comments on commit 78e9e36

Please sign in to comment.