Skip to content

Commit

Permalink
plumbing + tsdb uploader + building multiple tsdbs per cycle
Browse files Browse the repository at this point in the history
Signed-off-by: Owen Diehl <[email protected]>
  • Loading branch information
owen-d committed Nov 6, 2024
1 parent 312e273 commit 96880be
Show file tree
Hide file tree
Showing 3 changed files with 154 additions and 47 deletions.
29 changes: 22 additions & 7 deletions pkg/blockbuilder/slimgester.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@ import (
"github.com/grafana/loki/v3/pkg/compression"
"github.com/grafana/loki/v3/pkg/ingester"
"github.com/grafana/loki/v3/pkg/storage/chunk"
"github.com/grafana/loki/v3/pkg/storage/chunk/client"
"github.com/grafana/loki/v3/pkg/storage/config"
"github.com/grafana/loki/v3/pkg/storage/stores"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb/index"
"github.com/grafana/loki/v3/pkg/util"
"github.com/grafana/loki/v3/pkg/util/flagext"
util_log "github.com/grafana/loki/v3/pkg/util/log"

"github.com/grafana/loki/pkg/push"
)
Expand Down Expand Up @@ -96,7 +98,6 @@ type BlockBuilder struct {

store stores.ChunkWriter

tsdbCreator *TsdbCreator
jobController *PartitionJobController
}

Expand All @@ -106,7 +107,6 @@ func NewBlockBuilder(
store stores.ChunkWriter,
logger log.Logger,
reg prometheus.Registerer,
tsdbCreator *TsdbCreator,
jobController *PartitionJobController,
) (*BlockBuilder,
error) {
Expand All @@ -117,7 +117,6 @@ func NewBlockBuilder(
logger: logger,
instances: make(map[string]*instance),
store: store,
tsdbCreator: tsdbCreator,
jobController: jobController,
}

Expand Down Expand Up @@ -283,15 +282,31 @@ func (i *BlockBuilder) runOne(ctx context.Context) (skipped bool, err error) {
return false, err
}

built, err := indexer.Create(ctx)
var (
// TODO(owen-d)
nodeName string // from lifecycler id
tabelRanges []config.TableRange // from periodconfigs
// TODO(owen-d): build uploaders based on table ranges, which can use different
// object clients
c client.ObjectClient
)

built, err := indexer.create(ctx, nodeName, tabelRanges)
if err != nil {
return false, err
}

// ship built
fmt.Println(built)
for _, db := range built {
u := newUploader(c)
if err := u.Put(ctx, db); err != nil {
level.Error(util_log.Logger).Log(
"msg", "failed to upload tsdb",
"path", db.id.Path(),
)

// TODO: build in mem tsdb; flush
return false, err
}
}

if err = i.jobController.part.Commit(ctx, lastOffset); err != nil {
return false, err
Expand Down
171 changes: 132 additions & 39 deletions pkg/blockbuilder/tsdb.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,24 @@
package blockbuilder

import (
"bytes"
"context"
"fmt"
"io"
"sort"
"sync"
"time"

"github.com/cespare/xxhash"
"github.com/go-kit/log/level"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"

"github.com/grafana/loki/v3/pkg/compression"
"github.com/grafana/loki/v3/pkg/storage/chunk/client"
"github.com/grafana/loki/v3/pkg/storage/config"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb/index"
util_log "github.com/grafana/loki/v3/pkg/util/log"
)

// TsdbCreator accepts writes and builds TSDBs.
Expand Down Expand Up @@ -48,58 +55,112 @@ func (m *TsdbCreator) Append(userID string, ls labels.Labels, fprint uint64, chk
return nil
}

type chunkInfo struct {
chunkMetas index.ChunkMetas
tsdbFormat int
}

type tsdbWithId struct {

Check warning on line 63 in pkg/blockbuilder/tsdb.go

View workflow job for this annotation

GitHub Actions / check / golangciLint

var-naming: type tsdbWithId should be tsdbWithID (revive)
data []byte
id tsdb.Identifier
}

// Create builds a TSDB from the current state using the provided mkTsdb function
func (m *TsdbCreator) Create(ctx context.Context) ([]byte, error) {
func (m *TsdbCreator) create(ctx context.Context, nodeName string, tableRanges []config.TableRange) ([]tsdbWithId, error) {
m.mtx.Lock()
defer m.mtx.Unlock()

builder, err := index.NewMemWriterWithVersion(ctx, index.FormatV3)
if err != nil {
periods := make(map[string]*tsdb.Builder)

if err := m.heads.forAll(
func(user string, ls labels.Labels, fp uint64, chks index.ChunkMetas) error {
// chunks may overlap index period bounds, in which case they're written to multiple
pds := make(map[string]chunkInfo)
for _, chk := range chks {
idxBuckets := tsdb.IndexBuckets(chk.From(), chk.Through(), tableRanges)

for _, bucket := range idxBuckets {
chkinfo := pds[bucket.Prefix]
chkinfo.chunkMetas = append(chkinfo.chunkMetas, chk)
chkinfo.tsdbFormat = bucket.TsdbFormat
pds[bucket.Prefix] = chkinfo
}
}

// Embed the tenant label into TSDB
lb := labels.NewBuilder(ls)
lb.Set(index.TenantLabel, user)
withTenant := lb.Labels()

// Add the chunks to all relevant builders
for pd, chkinfo := range pds {
matchingChks := chkinfo.chunkMetas
b, ok := periods[pd]
if !ok {
b = tsdb.NewBuilder(chkinfo.tsdbFormat)
periods[pd] = b
}

b.AddSeries(
withTenant,
// use the fingerprint without the added tenant label
// so queries route to the chunks which actually exist.
model.Fingerprint(fp),
matchingChks,
)
}

return nil
},
); err != nil {
level.Error(util_log.Logger).Log("err", err.Error(), "msg", "building TSDB")
return nil, err
}

type seriesWithFP struct {
tenant string
fp uint64
ls labels.Labels
chks index.ChunkMetas
}
var orderdByFprint []seriesWithFP

if err := m.heads.forAll(func(user string, ls labels.Labels, fp uint64, chks index.ChunkMetas) error {
orderdByFprint = append(
orderdByFprint,
seriesWithFP{
tenant: user,
fp: fp,
ls: ls,
chks: chks,
},
now := time.Now()
res := make([]tsdbWithId, 0, len(periods))

for p, b := range periods {

level.Debug(util_log.Logger).Log(
"msg", "building tsdb for period",
"pd", p,
)
return nil
}); err != nil {
return nil, err
}

sort.Slice(orderdByFprint, func(i, j int) bool {
return orderdByFprint[i].fp < orderdByFprint[j].fp
})
// build+move tsdb to multitenant dir
start := time.Now()
dst, data, err := b.BuildInMemory(
ctx,
func(_, _ model.Time, _ uint32) tsdb.Identifier {
return tsdb.NewPrefixedIdentifier(
tsdb.MultitenantTSDBIdentifier{
NodeName: nodeName,
Ts: now,
},
p,
"",
)
},
)

for i, s := range orderdByFprint {
// Must add tenantLabel initially to multitenant tsdbs.
ls := labels.NewBuilder(s.ls).Set(index.TenantLabel, s.tenant).Labels()
if err := builder.AddSeries(storage.SeriesRef(i), ls, model.Fingerprint(s.fp), s.chks...); err != nil {
if err != nil {
return nil, err
}
}

reader, err := builder.Close(true)
if err != nil {
return nil, err
level.Debug(util_log.Logger).Log(
"msg", "finished building tsdb for period",
"pd", p,
"dst", dst.Path(),
"duration", time.Since(start),
)
res = append(res, tsdbWithId{
id: dst,
data: data,
})
}
m.heads = newTenantHeads(m.shards)

return io.ReadAll(reader)
m.heads = newTenantHeads(m.shards)
return res, nil
}

// tenantHeads manages per-tenant series
Expand Down Expand Up @@ -213,3 +274,35 @@ func (h *Head) forAll(fn func(ls labels.Labels, fp uint64, chks index.ChunkMetas
}
return nil
}

type uploader struct {
c client.ObjectClient
}

func newUploader(client client.ObjectClient) *uploader {
return &uploader{c: client}
}

func (u *uploader) Put(ctx context.Context, db tsdbWithId) error {
reader := bytes.NewReader(db.data)
gzipPool := compression.GetWriterPool(compression.GZIP)
buf := bytes.NewBuffer(make([]byte, 0, 1<<20))
compressedWriter := gzipPool.GetWriter(buf)
defer gzipPool.PutWriter(compressedWriter)

_, err := io.Copy(compressedWriter, reader)
if err != nil {
return err
}

err = compressedWriter.Close()
if err != nil {
return err
}

return u.c.PutObject(ctx, db.id.Path(), buf)
}

func buildFileName(indexName string) string {
return fmt.Sprintf("%s.gz", indexName)
}
1 change: 0 additions & 1 deletion pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -1813,7 +1813,6 @@ func (t *Loki) initBlockBuilder() (services.Service, error) {
t.Store,
logger,
prometheus.DefaultRegisterer,
tsdbCreator,
blockbuilder.NewPartitionJobController(
reader,
),
Expand Down

0 comments on commit 96880be

Please sign in to comment.