Skip to content

Commit

Permalink
ToProto implemented in planner.Task
Browse files Browse the repository at this point in the history
  • Loading branch information
salvacorts committed Nov 7, 2024
1 parent 2ac8f3f commit 0d1e610
Show file tree
Hide file tree
Showing 7 changed files with 162 additions and 147 deletions.
73 changes: 44 additions & 29 deletions pkg/bloombuild/planner/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,9 +227,10 @@ func (p *Planner) runOne(ctx context.Context) error {
}

var (
wg sync.WaitGroup
start = time.Now()
status = statusFailure
wg sync.WaitGroup
start = time.Now()
status = statusFailure
openTSDBs strategies.TSDBSet
)
defer func() {
p.metrics.buildCompleted.WithLabelValues(status).Inc()
Expand All @@ -238,6 +239,15 @@ func (p *Planner) runOne(ctx context.Context) error {
if status == statusSuccess {
p.metrics.buildLastSuccess.SetToCurrentTime()
}

// Close all open TSDBs
// These are used to get the chunkrefs for the series in the gaps.
// We populate the chunkrefs when we send the task to the builder.
for idx, reader := range openTSDBs {
if err := reader.Close(); err != nil {
level.Error(p.logger).Log("msg", "failed to close tsdb", "tsdb", idx.Name(), "err", err)
}
}
}()

p.metrics.buildStarted.Inc()
Expand Down Expand Up @@ -275,7 +285,21 @@ func (p *Planner) runOne(ctx context.Context) error {
table: table,
}

tasks, existingMetas, err := p.computeTasks(ctx, table, tenant)
// Resolve TSDBs
tsdbs, err := p.tsdbStore.ResolveTSDBs(ctx, table, tenant)
if err != nil {
level.Error(logger).Log("msg", "failed to resolve tsdbs", "err", err)
continue
}

// Open new TSDBs
openTSDBs, err = openAllTSDBs(ctx, table, tenant, p.tsdbStore, tsdbs, openTSDBs)
if err != nil {
level.Error(logger).Log("msg", "failed to open all tsdbs", "err", err)
continue
}

tasks, existingMetas, err := p.computeTasks(ctx, table, tenant, openTSDBs)
if err != nil {
level.Error(logger).Log("msg", "failed to compute tasks", "err", err)
continue
Expand All @@ -286,7 +310,7 @@ func (p *Planner) runOne(ctx context.Context) error {

now := time.Now()
for _, task := range tasks {
queueTask := NewQueueTask(ctx, now, task, resultsCh)
queueTask := NewQueueTask(ctx, now, task, openTSDBs[task.TSDB], resultsCh)
if err := p.enqueueTask(queueTask); err != nil {
level.Error(logger).Log("msg", "error enqueuing task", "err", err)
continue
Expand Down Expand Up @@ -374,6 +398,7 @@ func (p *Planner) computeTasks(
ctx context.Context,
table config.DayTable,
tenant string,
tsdbs strategies.TSDBSet,
) ([]*strategies.Task, []bloomshipper.Meta, error) {
strategy, err := strategies.NewStrategy(tenant, p.limits, p.logger)
if err != nil {
Expand Down Expand Up @@ -402,29 +427,11 @@ func (p *Planner) computeTasks(
return nil, nil, fmt.Errorf("failed to delete outdated metas during planning: %w", err)
}

// Resolve TSDBs
tsdbs, err := p.tsdbStore.ResolveTSDBs(ctx, table, tenant)
if err != nil {
return nil, nil, fmt.Errorf("failed to resolve tsdbs: %w", err)
}

if len(tsdbs) == 0 {
return nil, metas, nil
}

openTSDBs, err := openAllTSDBs(ctx, table, tenant, p.tsdbStore, tsdbs)
if err != nil {
return nil, nil, fmt.Errorf("failed to open all tsdbs: %w", err)
}
defer func() {
for idx, reader := range openTSDBs {
if err := reader.Close(); err != nil {
level.Error(logger).Log("msg", "failed to close index", "err", err, "tsdb", idx.Name())
}
}
}()

tasks, err := strategy.Plan(ctx, table, tenant, openTSDBs, metas)
tasks, err := strategy.Plan(ctx, table, tenant, tsdbs, metas)
if err != nil {
return nil, nil, fmt.Errorf("failed to plan tasks: %w", err)
}
Expand Down Expand Up @@ -506,18 +513,26 @@ func openAllTSDBs(
tenant string,
store common.TSDBStore,
tsdbs []tsdb.SingleTenantTSDBIdentifier,
) (map[tsdb.SingleTenantTSDBIdentifier]common.ClosableForSeries, error) {
openTSDBs := make(map[tsdb.SingleTenantTSDBIdentifier]common.ClosableForSeries, len(tsdbs))
alreadyOpen strategies.TSDBSet,
) (strategies.TSDBSet, error) {
if len(alreadyOpen) == 0 {
alreadyOpen = make(strategies.TSDBSet, len(tsdbs))
}

for _, idx := range tsdbs {
tsdb, err := store.LoadTSDB(ctx, table, tenant, idx)
if _, ok := alreadyOpen[idx]; ok {
continue
}

reader, err := store.LoadTSDB(ctx, table, tenant, idx)
if err != nil {
return nil, fmt.Errorf("failed to load tsdb: %w", err)
}

openTSDBs[idx] = tsdb
alreadyOpen[idx] = reader
}

return openTSDBs, nil
return alreadyOpen, nil
}

// deleteOutdatedMetasAndBlocks filters out the outdated metas from the `metas` argument and deletes them from the store.
Expand Down
2 changes: 1 addition & 1 deletion pkg/bloombuild/planner/planner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -725,9 +725,9 @@ func createTasks(n int, resultsCh chan *protos.TaskResult) []*QueueTask {
"fakeTenant",
v1.NewBounds(0, 10),
plannertest.TsdbID(1),
forSeries,
nil,
),
forSeries,
resultsCh,
)
tasks = append(tasks, task)
Expand Down
2 changes: 1 addition & 1 deletion pkg/bloombuild/planner/strategies/chunksize.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func (s *ChunkSizeStrategy) Plan(
Blocks: blocks,
}

tasks = append(tasks, NewTask(table, tenant, bounds, batch.TSDB(), tsdbs[batch.TSDB()], []Gap{planGap}))
tasks = append(tasks, NewTask(table, tenant, bounds, batch.TSDB(), []Gap{planGap}))
}
if err := sizedIter.Err(); err != nil {
return nil, fmt.Errorf("failed to iterate over sized series: %w", err)
Expand Down
37 changes: 18 additions & 19 deletions pkg/bloombuild/planner/strategies/chunksize_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,14 @@ import (
"github.com/go-kit/log"
"github.com/stretchr/testify/require"

"github.com/grafana/loki/v3/pkg/bloombuild/common"
"github.com/grafana/loki/v3/pkg/bloombuild/planner/plannertest"
v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb"
)

func taskForGap(tsdb tsdb.SingleTenantTSDBIdentifier, forSeries common.ClosableForSeries, bounds v1.FingerprintBounds, blocks []bloomshipper.BlockRef) *Task {
return NewTask(plannertest.TestTable, "fake", bounds, tsdb, forSeries, []Gap{
func taskForGap(tsdb tsdb.SingleTenantTSDBIdentifier, bounds v1.FingerprintBounds, blocks []bloomshipper.BlockRef) *Task {
return NewTask(plannertest.TestTable, "fake", bounds, tsdb, []Gap{
{
Bounds: bounds,
Series: plannertest.GenSeriesWithStep(bounds, 10),
Expand Down Expand Up @@ -45,12 +44,12 @@ func Test_ChunkSizeStrategy_Plan(t *testing.T) {

// We expect 5 tasks, each with 2 series each
expectedTasks: []*Task{
taskForGap(plannertest.TsdbID(0), forSeries, v1.NewBounds(0, 10), nil),
taskForGap(plannertest.TsdbID(0), forSeries, v1.NewBounds(20, 30), nil),
taskForGap(plannertest.TsdbID(0), forSeries, v1.NewBounds(40, 50), nil),
taskForGap(plannertest.TsdbID(0), forSeries, v1.NewBounds(60, 70), nil),
taskForGap(plannertest.TsdbID(0), forSeries, v1.NewBounds(80, 90), nil),
taskForGap(plannertest.TsdbID(0), forSeries, v1.NewBounds(100, 100), nil),
taskForGap(plannertest.TsdbID(0), v1.NewBounds(0, 10), nil),
taskForGap(plannertest.TsdbID(0), v1.NewBounds(20, 30), nil),
taskForGap(plannertest.TsdbID(0), v1.NewBounds(40, 50), nil),
taskForGap(plannertest.TsdbID(0), v1.NewBounds(60, 70), nil),
taskForGap(plannertest.TsdbID(0), v1.NewBounds(80, 90), nil),
taskForGap(plannertest.TsdbID(0), v1.NewBounds(100, 100), nil),
},
},
{
Expand Down Expand Up @@ -128,7 +127,7 @@ func Test_ChunkSizeStrategy_Plan(t *testing.T) {

// We expect 1 tasks for the missing series
expectedTasks: []*Task{
taskForGap(plannertest.TsdbID(0), forSeries, v1.NewBounds(20, 30), nil),
taskForGap(plannertest.TsdbID(0), v1.NewBounds(20, 30), nil),
},
},
{
Expand Down Expand Up @@ -157,27 +156,27 @@ func Test_ChunkSizeStrategy_Plan(t *testing.T) {

// We expect 5 tasks, each with 2 series each
expectedTasks: []*Task{
taskForGap(plannertest.TsdbID(1), forSeries, v1.NewBounds(0, 10), []bloomshipper.BlockRef{
taskForGap(plannertest.TsdbID(1), v1.NewBounds(0, 10), []bloomshipper.BlockRef{
plannertest.GenBlockRef(0, 0),
plannertest.GenBlockRef(10, 10),
}),
taskForGap(plannertest.TsdbID(1), forSeries, v1.NewBounds(20, 30), []bloomshipper.BlockRef{
taskForGap(plannertest.TsdbID(1), v1.NewBounds(20, 30), []bloomshipper.BlockRef{
plannertest.GenBlockRef(20, 20),
plannertest.GenBlockRef(30, 30),
}),
taskForGap(plannertest.TsdbID(1), forSeries, v1.NewBounds(40, 50), []bloomshipper.BlockRef{
taskForGap(plannertest.TsdbID(1), v1.NewBounds(40, 50), []bloomshipper.BlockRef{
plannertest.GenBlockRef(40, 40),
plannertest.GenBlockRef(50, 50),
}),
taskForGap(plannertest.TsdbID(1), forSeries, v1.NewBounds(60, 70), []bloomshipper.BlockRef{
taskForGap(plannertest.TsdbID(1), v1.NewBounds(60, 70), []bloomshipper.BlockRef{
plannertest.GenBlockRef(60, 60),
plannertest.GenBlockRef(70, 70),
}),
taskForGap(plannertest.TsdbID(1), forSeries, v1.NewBounds(80, 90), []bloomshipper.BlockRef{
taskForGap(plannertest.TsdbID(1), v1.NewBounds(80, 90), []bloomshipper.BlockRef{
plannertest.GenBlockRef(80, 80),
plannertest.GenBlockRef(90, 90),
}),
taskForGap(plannertest.TsdbID(1), forSeries, v1.NewBounds(100, 100), []bloomshipper.BlockRef{
taskForGap(plannertest.TsdbID(1), v1.NewBounds(100, 100), []bloomshipper.BlockRef{
plannertest.GenBlockRef(100, 100),
}),
},
Expand Down Expand Up @@ -212,15 +211,15 @@ func Test_ChunkSizeStrategy_Plan(t *testing.T) {

// We expect 5 tasks, each with 2 series each
expectedTasks: []*Task{
taskForGap(plannertest.TsdbID(1), forSeries, v1.NewBounds(0, 10), []bloomshipper.BlockRef{
taskForGap(plannertest.TsdbID(1), v1.NewBounds(0, 10), []bloomshipper.BlockRef{
plannertest.GenBlockRef(0, 0),
plannertest.GenBlockRef(10, 10),
}),
taskForGap(plannertest.TsdbID(1), forSeries, v1.NewBounds(20, 30), []bloomshipper.BlockRef{
taskForGap(plannertest.TsdbID(1), v1.NewBounds(20, 30), []bloomshipper.BlockRef{
plannertest.GenBlockRef(20, 20),
plannertest.GenBlockRef(30, 30),
}),
taskForGap(plannertest.TsdbID(1), forSeries, v1.NewBounds(40, 40), []bloomshipper.BlockRef{
taskForGap(plannertest.TsdbID(1), v1.NewBounds(40, 40), []bloomshipper.BlockRef{
plannertest.GenBlockRef(40, 40),
}),
},
Expand Down
2 changes: 1 addition & 1 deletion pkg/bloombuild/planner/strategies/splitkeyspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (s *SplitKeyspaceStrategy) Plan(
}

for _, gap := range gaps {
tasks = append(tasks, NewTask(table, tenant, ownershipRange, gap.tsdb, tsdbs[gap.tsdb], gap.gaps))
tasks = append(tasks, NewTask(table, tenant, ownershipRange, gap.tsdb, gap.gaps))
}
}

Expand Down
98 changes: 2 additions & 96 deletions pkg/bloombuild/planner/strategies/task.go
Original file line number Diff line number Diff line change
@@ -1,22 +1,14 @@
package strategies

import (
"context"
"fmt"
"math"
"slices"

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"

"github.com/grafana/loki/v3/pkg/bloombuild/common"
"github.com/grafana/loki/v3/pkg/bloombuild/protos"
"github.com/grafana/loki/v3/pkg/logproto"
v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1"
"github.com/grafana/loki/v3/pkg/storage/config"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb/index"
)

type Gap struct {
Expand All @@ -31,8 +23,7 @@ type Task struct {
Table config.DayTable
Tenant string
OwnershipBounds v1.FingerprintBounds
tsdbIdentifier tsdb.SingleTenantTSDBIdentifier
forSeries common.ClosableForSeries
TSDB tsdb.SingleTenantTSDBIdentifier
Gaps []Gap
}

Expand All @@ -41,99 +32,14 @@ func NewTask(
tenant string,
bounds v1.FingerprintBounds,
tsdb tsdb.SingleTenantTSDBIdentifier,
forSeries common.ClosableForSeries,
gaps []Gap,
) *Task {
return &Task{
ID: fmt.Sprintf("%s-%s-%s-%d", table.Addr(), tenant, bounds.String(), len(gaps)),
Table: table,
Tenant: tenant,
OwnershipBounds: bounds,
tsdbIdentifier: tsdb,
forSeries: forSeries,
TSDB: tsdb,
Gaps: gaps,
}
}

// TODO: move to planner.Task and pass forSeries comming from the planner.
// ToProtoTask converts a Task to a ProtoTask.
// It will use the opened TSDB to get the chunks for the series in the gaps.
func (t *Task) ToProtoTask(ctx context.Context) (*protos.ProtoTask, error) {
if t == nil {
return nil, nil
}

protoGaps := make([]*protos.ProtoGapWithBlocks, 0, len(t.Gaps))
for _, gap := range t.Gaps {
blockRefs := make([]string, 0, len(gap.Blocks))
for _, block := range gap.Blocks {
blockRefs = append(blockRefs, block.String())
}

if !slices.IsSorted(gap.Series) {
slices.Sort(gap.Series)
}

series := make([]*protos.ProtoSeries, 0, len(gap.Series))
if err := t.forSeries.ForSeries(
ctx,
t.Tenant,
gap.Bounds,
0, math.MaxInt64,
func(_ labels.Labels, fp model.Fingerprint, chks []index.ChunkMeta) (stop bool) {
select {
case <-ctx.Done():
return true
default:
// Skip this series if it's not in the gap.
// Series are sorted, so we can break early.
if _, found := slices.BinarySearch(gap.Series, fp); !found {
return false
}

chunks := make([]*logproto.ShortRef, 0, len(chks))
for _, chk := range chks {
chunks = append(chunks, &logproto.ShortRef{
From: model.Time(chk.MinTime),
Through: model.Time(chk.MaxTime),
Checksum: chk.Checksum,
})
}

series = append(series, &protos.ProtoSeries{
Fingerprint: uint64(fp),
Chunks: chunks,
})
return false
}
},
labels.MustNewMatcher(labels.MatchEqual, "", ""),
); err != nil {
return nil, fmt.Errorf("failed to load series from TSDB for gap (%s): %w", gap.Bounds.String(), err)
}

protoGaps = append(protoGaps, &protos.ProtoGapWithBlocks{
Bounds: protos.ProtoFingerprintBounds{
Min: gap.Bounds.Min,
Max: gap.Bounds.Max,
},
Series: series,
BlockRef: blockRefs,
})
}

return &protos.ProtoTask{
Id: t.ID,
Table: protos.DayTable{
DayTimestampMS: int64(t.Table.Time),
Prefix: t.Table.Prefix,
},
Tenant: t.Tenant,
Bounds: protos.ProtoFingerprintBounds{
Min: t.OwnershipBounds.Min,
Max: t.OwnershipBounds.Max,
},
Tsdb: t.tsdbIdentifier.Path(),
Gaps: protoGaps,
}, nil
}
Loading

0 comments on commit 0d1e610

Please sign in to comment.