Skip to content

Commit

Permalink
Wrap ToProtoTask
Browse files Browse the repository at this point in the history
  • Loading branch information
salvacorts committed Nov 7, 2024
1 parent 4ace777 commit 9d6870f
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 87 deletions.
4 changes: 1 addition & 3 deletions pkg/bloombuild/planner/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ func (p *Planner) runOne(ctx context.Context) error {
p.metrics.buildLastSuccess.SetToCurrentTime()
}

// Close all open TSDBs
// Close all open TSDBs.
// These are used to get the chunkrefs for the series in the gaps.
// We populate the chunkrefs when we send the task to the builder.
for idx, reader := range openTSDBs {
Expand Down Expand Up @@ -285,14 +285,12 @@ func (p *Planner) runOne(ctx context.Context) error {
table: table,
}

// Resolve TSDBs
tsdbs, err := p.tsdbStore.ResolveTSDBs(ctx, table, tenant)
if err != nil {
level.Error(logger).Log("msg", "failed to resolve tsdbs", "err", err)
continue
}

// Open new TSDBs
openTSDBs, err = openAllTSDBs(ctx, table, tenant, p.tsdbStore, tsdbs, openTSDBs)
if err != nil {
level.Error(logger).Log("msg", "failed to open all tsdbs", "err", err)
Expand Down
69 changes: 69 additions & 0 deletions pkg/bloombuild/planner/strategies/task.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,21 @@
package strategies

import (
"context"
"fmt"
"math"
"slices"

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"

"github.com/grafana/loki/v3/pkg/bloombuild/protos"
v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1"
"github.com/grafana/loki/v3/pkg/storage/config"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb/index"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb/sharding"
)

type Gap struct {
Expand Down Expand Up @@ -35,3 +43,64 @@ func NewTask(
Gaps: gaps,
}
}

// ToProtoTask converts a Task to a ProtoTask.
// It will use the opened TSDB to get the chunks for the series in the gaps.
func (t *Task) ToProtoTask(ctx context.Context, forSeries sharding.ForSeries) (*protos.ProtoTask, error) {
// Populate the gaps with the series and chunks.
protoGaps := make([]protos.Gap, 0, len(t.Gaps))
for _, gap := range t.Gaps {
if !slices.IsSorted(gap.Series) {
slices.Sort(gap.Series)
}

series := make([]*v1.Series, 0, len(gap.Series))
if err := forSeries.ForSeries(
ctx,
t.Tenant,
gap.Bounds,
0, math.MaxInt64,
func(_ labels.Labels, fp model.Fingerprint, chks []index.ChunkMeta) (stop bool) {
select {
case <-ctx.Done():
return true
default:
// Skip this series if it's not in the gap.
// Series are sorted, so we can break early.
if _, found := slices.BinarySearch(gap.Series, fp); !found {
return false
}

chunks := make(v1.ChunkRefs, 0, len(chks))
for _, chk := range chks {
chunks = append(chunks, v1.ChunkRef{
From: model.Time(chk.MinTime),
Through: model.Time(chk.MaxTime),
Checksum: chk.Checksum,
})
}

series = append(series, &v1.Series{
Fingerprint: fp,
Chunks: chunks,
})
return false
}
},
labels.MustNewMatcher(labels.MatchEqual, "", ""),
); err != nil {
return nil, fmt.Errorf("failed to load series from TSDB for gap (%s): %w", gap.Bounds.String(), err)
}

protoGaps = append(protoGaps, protos.Gap{
Bounds: gap.Bounds,
Series: series,
Blocks: gap.Blocks,
})
}

// Copy inner task and set gaps
task := *t.Task
task.Gaps = protoGaps
return task.ToProtoTask(), nil
}
85 changes: 1 addition & 84 deletions pkg/bloombuild/planner/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,13 @@ package planner

import (
"context"
"fmt"
"math"
"slices"
"time"

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"go.uber.org/atomic"

"github.com/grafana/loki/v3/pkg/bloombuild/common"
"github.com/grafana/loki/v3/pkg/bloombuild/planner/strategies"
"github.com/grafana/loki/v3/pkg/bloombuild/protos"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb/index"
)

type QueueTask struct {
Expand Down Expand Up @@ -51,81 +44,5 @@ func NewQueueTask(
// ToProtoTask converts a Task to a ProtoTask.
// It will use the opened TSDB to get the chunks for the series in the gaps.
func (t *QueueTask) ToProtoTask(ctx context.Context) (*protos.ProtoTask, error) {
if t == nil {
return nil, nil
}

protoGaps := make([]*protos.ProtoGapWithBlocks, 0, len(t.Gaps))
for _, gap := range t.Gaps {
blockRefs := make([]string, 0, len(gap.Blocks))
for _, block := range gap.Blocks {
blockRefs = append(blockRefs, block.String())
}

if !slices.IsSorted(gap.Series) {
slices.Sort(gap.Series)
}

series := make([]*protos.ProtoSeries, 0, len(gap.Series))
if err := t.forSeries.ForSeries(
ctx,
t.Tenant,
gap.Bounds,
0, math.MaxInt64,
func(_ labels.Labels, fp model.Fingerprint, chks []index.ChunkMeta) (stop bool) {
select {
case <-ctx.Done():
return true
default:
// Skip this series if it's not in the gap.
// Series are sorted, so we can break early.
if _, found := slices.BinarySearch(gap.Series, fp); !found {
return false
}

chunks := make([]*logproto.ShortRef, 0, len(chks))
for _, chk := range chks {
chunks = append(chunks, &logproto.ShortRef{
From: model.Time(chk.MinTime),
Through: model.Time(chk.MaxTime),
Checksum: chk.Checksum,
})
}

series = append(series, &protos.ProtoSeries{
Fingerprint: uint64(fp),
Chunks: chunks,
})
return false
}
},
labels.MustNewMatcher(labels.MatchEqual, "", ""),
); err != nil {
return nil, fmt.Errorf("failed to load series from TSDB for gap (%s): %w", gap.Bounds.String(), err)
}

protoGaps = append(protoGaps, &protos.ProtoGapWithBlocks{
Bounds: protos.ProtoFingerprintBounds{
Min: gap.Bounds.Min,
Max: gap.Bounds.Max,
},
Series: series,
BlockRef: blockRefs,
})
}

return &protos.ProtoTask{
Id: t.ID,
Table: protos.DayTable{
DayTimestampMS: int64(t.Table.Time),
Prefix: t.Table.Prefix,
},
Tenant: t.Tenant,
Bounds: protos.ProtoFingerprintBounds{
Min: t.OwnershipBounds.Min,
Max: t.OwnershipBounds.Max,
},
Tsdb: t.TSDB.Path(),
Gaps: protoGaps,
}, nil
return t.Task.ToProtoTask(ctx, t.forSeries)
}
54 changes: 54 additions & 0 deletions pkg/bloombuild/protos/compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/common/model"

"github.com/grafana/loki/v3/pkg/logproto"
v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1"
"github.com/grafana/loki/v3/pkg/storage/config"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper"
Expand Down Expand Up @@ -106,6 +107,59 @@ func FromProtoTask(task *ProtoTask) (*Task, error) {
}, nil
}

func (t *Task) ToProtoTask() *ProtoTask {
if t == nil {
return nil
}

protoGaps := make([]*ProtoGapWithBlocks, 0, len(t.Gaps))
for _, gap := range t.Gaps {
blockRefs := make([]string, 0, len(gap.Blocks))
for _, block := range gap.Blocks {
blockRefs = append(blockRefs, block.String())
}

// TODO(salvacorts): Cast []*v1.Series to []*ProtoSeries right away
series := make([]*ProtoSeries, 0, len(gap.Series))
for _, s := range gap.Series {
chunks := make([]*logproto.ShortRef, 0, len(s.Chunks))
for _, c := range s.Chunks {
chunk := logproto.ShortRef(c)
chunks = append(chunks, &chunk)
}

series = append(series, &ProtoSeries{
Fingerprint: uint64(s.Fingerprint),
Chunks: chunks,
})
}

protoGaps = append(protoGaps, &ProtoGapWithBlocks{
Bounds: ProtoFingerprintBounds{
Min: gap.Bounds.Min,
Max: gap.Bounds.Max,
},
Series: series,
BlockRef: blockRefs,
})
}

return &ProtoTask{
Id: t.ID,
Table: DayTable{
DayTimestampMS: int64(t.Table.Time),
Prefix: t.Table.Prefix,
},
Tenant: t.Tenant,
Bounds: ProtoFingerprintBounds{
Min: t.OwnershipBounds.Min,
Max: t.OwnershipBounds.Max,
},
Tsdb: t.TSDB.Path(),
Gaps: protoGaps,
}
}

func (t *Task) GetLogger(logger log.Logger) log.Logger {
return log.With(logger,
"task", t.ID,
Expand Down

0 comments on commit 9d6870f

Please sign in to comment.