Skip to content

Commit

Permalink
Do not store chunkrefs in queue task. Defer populating chunks to just…
Browse files Browse the repository at this point in the history
… before sending to builder
  • Loading branch information
salvacorts committed Nov 7, 2024
1 parent 867ce3d commit 2ac8f3f
Show file tree
Hide file tree
Showing 13 changed files with 295 additions and 226 deletions.
20 changes: 4 additions & 16 deletions pkg/bloombuild/common/tsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,9 @@ func (b *BloomTSDBStore) LoadTSDB(
return idx, nil
}

func NewTSDBSeriesIter(ctx context.Context, user string, f sharding.ForSeries, bounds v1.FingerprintBounds) (iter.Iterator[*v1.Series], error) {
func NewTSDBSeriesIter(ctx context.Context, user string, f sharding.ForSeries, bounds v1.FingerprintBounds) (iter.Iterator[model.Fingerprint], error) {
// TODO(salvacorts): Create a pool
series := make([]*v1.Series, 0, 100)
series := make([]model.Fingerprint, 0, 100)

if err := f.ForSeries(
ctx,
Expand All @@ -138,19 +138,7 @@ func NewTSDBSeriesIter(ctx context.Context, user string, f sharding.ForSeries, b
case <-ctx.Done():
return true
default:
res := &v1.Series{
Fingerprint: fp,
Chunks: make(v1.ChunkRefs, 0, len(chks)),
}
for _, chk := range chks {
res.Chunks = append(res.Chunks, v1.ChunkRef{
From: model.Time(chk.MinTime),
Through: model.Time(chk.MaxTime),
Checksum: chk.Checksum,
})
}

series = append(series, res)
series = append(series, fp)
return false
}
},
Expand All @@ -161,7 +149,7 @@ func NewTSDBSeriesIter(ctx context.Context, user string, f sharding.ForSeries, b

select {
case <-ctx.Done():
return iter.NewEmptyIter[*v1.Series](), ctx.Err()
return iter.NewEmptyIter[model.Fingerprint](), ctx.Err()
default:
return iter.NewCancelableIter(ctx, iter.NewSliceIter(series)), nil
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/bloombuild/common/tsdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,10 @@ func TestTSDBSeriesIter(t *testing.T) {
itr, err := NewTSDBSeriesIter(context.Background(), "", forSeriesTestImpl(input), v1.NewBounds(0, math.MaxUint64))
require.NoError(t, err)

v1.EqualIterators(
v1.CompareIterators(
t,
func(a, b *v1.Series) {
require.Equal(t, a, b)
func(t *testing.T, a model.Fingerprint, b *v1.Series) {
require.Equal(t, a, b.Fingerprint)
},
itr,
srcItr,
Expand Down
9 changes: 7 additions & 2 deletions pkg/bloombuild/planner/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ func (p *Planner) computeTasks(
ctx context.Context,
table config.DayTable,
tenant string,
) ([]*protos.Task, []bloomshipper.Meta, error) {
) ([]*strategies.Task, []bloomshipper.Meta, error) {
strategy, err := strategies.NewStrategy(tenant, p.limits, p.logger)
if err != nil {
return nil, nil, fmt.Errorf("error creating strategy: %w", err)
Expand Down Expand Up @@ -847,8 +847,13 @@ func (p *Planner) forwardTaskToBuilder(
builderID string,
task *QueueTask,
) (*protos.TaskResult, error) {
protoTask, err := task.ToProtoTask(builder.Context())
if err != nil {
return nil, fmt.Errorf("error converting task to proto task: %w", err)
}

msg := &protos.PlannerToBuilder{
Task: task.ToProtoTask(),
Task: protoTask,
}

if err := builder.Send(msg); err != nil {
Expand Down
11 changes: 10 additions & 1 deletion pkg/bloombuild/planner/planner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -713,12 +713,21 @@ func (f *fakeBuilder) Recv() (*protos.BuilderToPlanner, error) {
}

func createTasks(n int, resultsCh chan *protos.TaskResult) []*QueueTask {
forSeries := plannertest.NewFakeForSeries(plannertest.GenV1Series(v1.NewBounds(0, 100)))

tasks := make([]*QueueTask, 0, n)
// Enqueue tasks
for i := 0; i < n; i++ {
task := NewQueueTask(
context.Background(), time.Now(),
protos.NewTask(config.NewDayTable(plannertest.TestDay, "fake"), "fakeTenant", v1.NewBounds(0, 10), plannertest.TsdbID(1), nil),
strategies.NewTask(
config.NewDayTable(plannertest.TestDay, "fake"),
"fakeTenant",
v1.NewBounds(0, 10),
plannertest.TsdbID(1),
forSeries,
nil,
),
resultsCh,
)
tasks = append(tasks, task)
Expand Down
58 changes: 56 additions & 2 deletions pkg/bloombuild/planner/plannertest/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@ import (
"time"

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"

"github.com/grafana/loki/v3/pkg/compression"
v2 "github.com/grafana/loki/v3/pkg/iter/v2"
v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1"
"github.com/grafana/loki/v3/pkg/storage/config"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb/index"
)

var TestDay = ParseDayTime("2023-09-01")
Expand Down Expand Up @@ -87,11 +89,23 @@ func GenBlock(ref bloomshipper.BlockRef) (bloomshipper.Block, error) {
}, nil
}

func GenSeries(bounds v1.FingerprintBounds) []*v1.Series {
func GenSeries(bounds v1.FingerprintBounds) []model.Fingerprint {
return GenSeriesWithStep(bounds, 1)
}

func GenSeriesWithStep(bounds v1.FingerprintBounds, step int) []*v1.Series {
func GenSeriesWithStep(bounds v1.FingerprintBounds, step int) []model.Fingerprint {
series := make([]model.Fingerprint, 0, int(bounds.Max-bounds.Min+1)/step)
for i := bounds.Min; i <= bounds.Max; i += model.Fingerprint(step) {
series = append(series, i)
}
return series
}

func GenV1Series(bounds v1.FingerprintBounds) []*v1.Series {
return GenV1SeriesWithStep(bounds, 1)
}

func GenV1SeriesWithStep(bounds v1.FingerprintBounds, step int) []*v1.Series {
series := make([]*v1.Series, 0, int(bounds.Max-bounds.Min+1)/step)
for i := bounds.Min; i <= bounds.Max; i += model.Fingerprint(step) {
series = append(series, &v1.Series{
Expand Down Expand Up @@ -139,3 +153,43 @@ func ParseDayTime(s string) config.DayTime {
Time: model.TimeFromUnix(t.Unix()),
}
}

type FakeForSeries struct {
series []*v1.Series
}

func NewFakeForSeries(series []*v1.Series) *FakeForSeries {
return &FakeForSeries{
series: series,
}
}

func (f FakeForSeries) ForSeries(_ context.Context, _ string, ff index.FingerprintFilter, _ model.Time, _ model.Time, fn func(labels.Labels, model.Fingerprint, []index.ChunkMeta) (stop bool), _ ...*labels.Matcher) error {
overlapping := make([]*v1.Series, 0, len(f.series))
for _, s := range f.series {
if ff.Match(s.Fingerprint) {
overlapping = append(overlapping, s)
}
}

for _, s := range overlapping {
chunks := make([]index.ChunkMeta, 0, len(s.Chunks))
for _, c := range s.Chunks {
chunks = append(chunks, index.ChunkMeta{
MinTime: int64(c.From),
MaxTime: int64(c.Through),
Checksum: c.Checksum,
KB: 100,
})
}

if fn(labels.EmptyLabels(), s.Fingerprint, chunks) {
break
}
}
return nil
}

func (f FakeForSeries) Close() error {
return nil
}
82 changes: 26 additions & 56 deletions pkg/bloombuild/planner/strategies/chunksize.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"

"github.com/grafana/loki/v3/pkg/bloombuild/protos"
iter "github.com/grafana/loki/v3/pkg/iter/v2"
v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1"
"github.com/grafana/loki/v3/pkg/storage/config"
Expand Down Expand Up @@ -50,7 +49,7 @@ func (s *ChunkSizeStrategy) Plan(
tenant string,
tsdbs TSDBSet,
metas []bloomshipper.Meta,
) ([]*protos.Task, error) {
) ([]*Task, error) {
targetTaskSize := s.limits.BloomTaskTargetSeriesChunksSizeBytes(tenant)

logger := log.With(s.logger, "table", table.Addr(), "tenant", tenant)
Expand All @@ -73,29 +72,29 @@ func (s *ChunkSizeStrategy) Plan(
return nil, fmt.Errorf("failed to get sized series iter: %w", err)
}

tasks := make([]*protos.Task, 0, iterSize)
tasks := make([]*Task, 0, iterSize)
for sizedIter.Next() {
series := sizedIter.At()
if series.Len() == 0 {
batch := sizedIter.At()
if batch.Len() == 0 {
// This should never happen, but just in case.
level.Warn(logger).Log("msg", "got empty series batch", "tsdb", series.TSDB().Name())
level.Warn(logger).Log("msg", "got empty series batch", "tsdb", batch.TSDB().Name())
continue
}

bounds := series.Bounds()
bounds := batch.Bounds()

blocks, err := getBlocksMatchingBounds(metas, bounds)
if err != nil {
return nil, fmt.Errorf("failed to get blocks matching bounds: %w", err)
}

planGap := protos.Gap{
planGap := Gap{
Bounds: bounds,
Series: series.V1Series(),
Series: batch.series,
Blocks: blocks,
}

tasks = append(tasks, protos.NewTask(table, tenant, bounds, series.TSDB(), []protos.Gap{planGap}))
tasks = append(tasks, NewTask(table, tenant, bounds, batch.TSDB(), tsdbs[batch.TSDB()], []Gap{planGap}))
}
if err := sizedIter.Err(); err != nil {
return nil, fmt.Errorf("failed to iterate over sized series: %w", err)
Expand Down Expand Up @@ -155,20 +154,16 @@ func getBlocksMatchingBounds(metas []bloomshipper.Meta, bounds v1.FingerprintBou
return deduped, nil
}

type seriesWithChunks struct {
tsdb tsdb.SingleTenantTSDBIdentifier
fp model.Fingerprint
chunks []index.ChunkMeta
}

type seriesBatch struct {
series []seriesWithChunks
tsdb tsdb.SingleTenantTSDBIdentifier
series []model.Fingerprint
size uint64
}

func newSeriesBatch() seriesBatch {
func newSeriesBatch(tsdb tsdb.SingleTenantTSDBIdentifier) seriesBatch {
return seriesBatch{
series: make([]seriesWithChunks, 0, 100),
tsdb: tsdb,
series: make([]model.Fingerprint, 0, 100),
}
}

Expand All @@ -179,32 +174,11 @@ func (b *seriesBatch) Bounds() v1.FingerprintBounds {

// We assume that the series are sorted by fingerprint.
// This is guaranteed since series are iterated in order by the TSDB.
return v1.NewBounds(b.series[0].fp, b.series[len(b.series)-1].fp)
}

func (b *seriesBatch) V1Series() []*v1.Series {
series := make([]*v1.Series, 0, len(b.series))
for _, s := range b.series {
res := &v1.Series{
Fingerprint: s.fp,
Chunks: make(v1.ChunkRefs, 0, len(s.chunks)),
}
for _, chk := range s.chunks {
res.Chunks = append(res.Chunks, v1.ChunkRef{
From: model.Time(chk.MinTime),
Through: model.Time(chk.MaxTime),
Checksum: chk.Checksum,
})
}

series = append(series, res)
}

return series
return v1.NewBounds(b.series[0], b.series[len(b.series)-1])
}

func (b *seriesBatch) Append(s seriesWithChunks, size uint64) {
b.series = append(b.series, s)
func (b *seriesBatch) Append(series model.Fingerprint, size uint64) {
b.series = append(b.series, series)
b.size += size
}

Expand All @@ -217,10 +191,7 @@ func (b *seriesBatch) Size() uint64 {
}

func (b *seriesBatch) TSDB() tsdb.SingleTenantTSDBIdentifier {
if len(b.series) == 0 {
return tsdb.SingleTenantTSDBIdentifier{}
}
return b.series[0].tsdb
return b.tsdb
}

func (s *ChunkSizeStrategy) sizedSeriesIter(
Expand All @@ -230,9 +201,12 @@ func (s *ChunkSizeStrategy) sizedSeriesIter(
targetTaskSizeBytes uint64,
) (iter.Iterator[seriesBatch], int, error) {
batches := make([]seriesBatch, 0, 100)
currentBatch := newSeriesBatch()
var currentBatch seriesBatch

for _, idx := range tsdbsWithGaps {
// We cut a new batch for each TSDB.
currentBatch = newSeriesBatch(idx.tsdbIdentifier)

for _, gap := range idx.gaps {
if err := idx.tsdb.ForSeries(
ctx,
Expand All @@ -253,14 +227,10 @@ func (s *ChunkSizeStrategy) sizedSeriesIter(
// AND Adding this series to the batch would exceed the target task size.
if currentBatch.Len() > 0 && currentBatch.Size()+seriesSize > targetTaskSizeBytes {
batches = append(batches, currentBatch)
currentBatch = newSeriesBatch()
currentBatch = newSeriesBatch(idx.tsdbIdentifier)
}

currentBatch.Append(seriesWithChunks{
tsdb: idx.tsdbIdentifier,
fp: fp,
chunks: chks,
}, seriesSize)
currentBatch.Append(fp, seriesSize)
return false
}
},
Expand All @@ -269,10 +239,10 @@ func (s *ChunkSizeStrategy) sizedSeriesIter(
return nil, 0, err
}

// Add the last batch for this TSDB if it's not empty.
// Add the last batch for this gap if it's not empty.
if currentBatch.Len() > 0 {
batches = append(batches, currentBatch)
currentBatch = newSeriesBatch()
currentBatch = newSeriesBatch(idx.tsdbIdentifier)
}
}
}
Expand Down
Loading

0 comments on commit 2ac8f3f

Please sign in to comment.