Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[bloom-compactor] Clean up work to create blooms #11591

Merged
merged 22 commits into from
Jan 5, 2024
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 42 additions & 16 deletions pkg/bloomcompactor/bloomcompactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"context"
"fmt"
"math"
"math/rand"
"os"
"time"

Expand Down Expand Up @@ -303,7 +304,7 @@ func (c *Compactor) compactUsers(ctx context.Context, logger log.Logger, sc stor
continue
}

// Skip this table if it is too new/old for the tenant limits.
// Skip this table if it is too old for the tenant limits.
now := model.Now()
tableMaxAge := c.limits.BloomCompactorMaxTableAge(tenant)
if tableMaxAge > 0 && tableInterval.Start.Before(now.Add(-tableMaxAge)) {
Expand Down Expand Up @@ -352,20 +353,24 @@ func (c *Compactor) compactTenant(ctx context.Context, logger log.Logger, sc sto
}

// Tokenizer is not thread-safe so we need one per goroutine.
NGramLength := c.limits.BloomNGramLength(tenant)
NGramSkip := c.limits.BloomNGramSkip(tenant)
bt := v1.NewBloomTokenizer(NGramLength, NGramSkip, c.btMetrics)
nGramLen := c.limits.BloomNGramLength(tenant)
nGramSkip := c.limits.BloomNGramSkip(tenant)
bt := v1.NewBloomTokenizer(nGramLen, nGramSkip, c.btMetrics)

errs := multierror.New()
rs, err := c.sharding.GetTenantSubRing(tenant).GetAllHealthy(RingOp)
if err != nil {
return err
}
tokenRanges := bloomutils.GetInstanceWithTokenRange(c.cfg.Ring.InstanceID, rs.Instances)
for _, tr := range tokenRanges {
level.Debug(logger).Log("msg", "got token range for instance", "id", tr.Instance.Id, "min", tr.MinToken, "max", tr.MaxToken)
}

_ = sc.indexShipper.ForEach(ctx, tableName, tenant, func(isMultiTenantIndex bool, idx shipperindex.Index) error {
if isMultiTenantIndex {
// Skip multi-tenant indexes
level.Debug(logger).Log("msg", "skipping multi-tenant index", "table", tableName, "index", idx.Name())
return nil
}

Expand Down Expand Up @@ -396,13 +401,19 @@ func (c *Compactor) compactTenant(ctx context.Context, logger log.Logger, sc sto
//All seriesMetas given a table within fp of this compactor shard
seriesMetas = append(seriesMetas, seriesMeta{seriesFP: fingerprint, seriesLbs: labels, chunkRefs: temp})
},
labels.MustNewMatcher(labels.MatchEqual, "", ""),
)

if err != nil {
errs.Add(err)
return nil
}

if len(seriesMetas) == 0 {
level.Debug(logger).Log("msg", "skipping index because it does not have any matching series", "table", tableName, "index", idx.Name())
return nil
}

job := NewJob(tenant, tableName, idx.Path(), seriesMetas)
jobLogger := log.With(logger, "job", job.String())
c.metrics.compactionRunJobStarted.Inc()
Expand Down Expand Up @@ -486,12 +497,12 @@ func (c *Compactor) runCompact(ctx context.Context, logger log.Logger, job Job,
localDst := createLocalDirName(c.cfg.WorkingDirectory, job)
blockOptions := v1.NewBlockOptions(bt.GetNGramLength(), bt.GetNGramSkip())

defer func() {
//clean up the bloom directory
if err := os.RemoveAll(localDst); err != nil {
level.Error(logger).Log("msg", "failed to remove block directory", "dir", localDst, "err", err)
}
}()
//defer func() {
// //clean up the bloom directory
// if err := os.RemoveAll(localDst); err != nil {
// level.Error(logger).Log("msg", "failed to remove block directory", "dir", localDst, "err", err)
// }
//}()

var resultingBlock bloomshipper.Block
defer func() {
Expand All @@ -507,6 +518,7 @@ func (c *Compactor) runCompact(ctx context.Context, logger log.Logger, job Job,
return nil
} else if len(metasMatchingJob) == 0 {
// No matching existing blocks for this job, compact all series from scratch
level.Info(logger).Log("msg", "No matching existing blocks for this job, compact all series from scratch")

builder, err := NewPersistentBlockBuilder(localDst, blockOptions)
if err != nil {
Expand All @@ -522,6 +534,7 @@ func (c *Compactor) runCompact(ctx context.Context, logger log.Logger, job Job,

} else if len(blocksMatchingJob) > 0 {
// When already compacted metas exists, we need to merge all blocks with amending blooms with new series
level.Info(logger).Log("msg", "already compacted metas exists, use mergeBlockBuilder")

var populate = createPopulateFunc(ctx, logger, job, storeClient, bt)

Expand Down Expand Up @@ -560,12 +573,13 @@ func (c *Compactor) runCompact(ctx context.Context, logger log.Logger, job Job,
level.Error(logger).Log("msg", "failed compressing bloom blocks into tar file", "err", err)
return err
}
defer func() {
err = os.Remove(archivePath)
if err != nil {
level.Error(logger).Log("msg", "failed removing archive file", "err", err, "file", archivePath)
}
}()

//defer func() {
// err = os.Remove(archivePath)
// if err != nil {
// level.Error(logger).Log("msg", "failed removing archive file", "err", err, "file", archivePath)
// }
//}()

// Do not change the signature of PutBlocks yet.
// Once block size is limited potentially, compactNewChunks will return multiple blocks, hence a list is appropriate.
Expand All @@ -583,9 +597,21 @@ func (c *Compactor) runCompact(ctx context.Context, logger log.Logger, job Job,
// TODO delete old metas in later compactions
// After all is done, create one meta file and upload to storage
meta := bloomshipper.Meta{
MetaRef: bloomshipper.MetaRef{
Ref: bloomshipper.Ref{
TenantID: job.tenantID,
TableName: job.tableName,
MinFingerprint: uint64(job.minFp),
MaxFingerprint: uint64(job.maxFp),
StartTimestamp: job.from,
EndTimestamp: job.through,
Checksum: rand.Uint32(), // Discuss if checksum is needed for Metas, why should we read all data again.
},
},
Tombstones: blocksMatchingJob,
Blocks: activeBloomBlocksRefs,
}

err = c.bloomShipperClient.PutMeta(ctx, meta)
if err != nil {
level.Error(logger).Log("msg", "failed uploading meta.json to storage", "err", err)
Expand Down
14 changes: 10 additions & 4 deletions pkg/bloomcompactor/chunkcompactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"os"
"path/filepath"

"github.com/google/uuid"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/common/model"
Expand Down Expand Up @@ -148,7 +150,7 @@ func buildBlockFromBlooms(
}

func createLocalDirName(workingDir string, job Job) string {
dir := fmt.Sprintf("bloomBlock-%s-%s-%s-%s-%d-%d", job.tableName, job.tenantID, job.minFp, job.maxFp, job.from, job.through)
dir := fmt.Sprintf("bloomBlock-%s-%s-%s-%s-%d-%d-%s", job.tableName, job.tenantID, job.minFp, job.maxFp, job.from, job.through, uuid.New().String())
return filepath.Join(workingDir, dir)
}

Expand All @@ -167,7 +169,7 @@ func compactNewChunks(
return bloomshipper.Block{}, err
}

bloomIter := newLazyBloomBuilder(ctx, job, storeClient, bt, fpRate)
bloomIter := newLazyBloomBuilder(ctx, job, storeClient, bt, fpRate, logger)

// Build and upload bloomBlock to storage
block, err := buildBlockFromBlooms(ctx, logger, builder, bloomIter, job)
Expand All @@ -186,6 +188,7 @@ type lazyBloomBuilder struct {
client chunkClient
bt compactorTokenizer
fpRate float64
logger log.Logger

cur v1.SeriesWithBloom // retured by At()
err error // returned by Err()
Expand All @@ -195,21 +198,22 @@ type lazyBloomBuilder struct {
// which are used by the blockBuilder to write a bloom block.
// We use an interator to avoid loading all blooms into memory first, before
// building the block.
func newLazyBloomBuilder(ctx context.Context, job Job, client chunkClient, bt compactorTokenizer, fpRate float64) *lazyBloomBuilder {
func newLazyBloomBuilder(ctx context.Context, job Job, client chunkClient, bt compactorTokenizer, fpRate float64, logger log.Logger) *lazyBloomBuilder {
return &lazyBloomBuilder{
ctx: ctx,
metas: v1.NewSliceIter(job.seriesMetas),
client: client,
tenant: job.tenantID,
bt: bt,
fpRate: fpRate,
logger: logger,
}
}

func (it *lazyBloomBuilder) Next() bool {
if !it.metas.Next() {
it.err = io.EOF
it.cur = v1.SeriesWithBloom{}
level.Debug(it.logger).Log("msg", "No seriesMeta")
return false
}
meta := it.metas.At()
Expand All @@ -219,13 +223,15 @@ func (it *lazyBloomBuilder) Next() bool {
if err != nil {
it.err = err
it.cur = v1.SeriesWithBloom{}
level.Debug(it.logger).Log("err in getChunks", err)
return false
}

it.cur, err = buildBloomFromSeries(meta, it.fpRate, it.bt, chks)
if err != nil {
it.err = err
it.cur = v1.SeriesWithBloom{}
level.Debug(it.logger).Log("err in buildBloomFromSeries", err)
return false
}
return true
Expand Down
4 changes: 3 additions & 1 deletion pkg/bloomcompactor/chunkcompactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,8 @@ func TestChunkCompactor_CompactNewChunks(t *testing.T) {
}

func TestLazyBloomBuilder(t *testing.T) {
logger := log.NewNopLogger()

label := labels.FromStrings("foo", "bar")
fp1 := model.Fingerprint(100)
fp2 := model.Fingerprint(999)
Expand Down Expand Up @@ -167,7 +169,7 @@ func TestLazyBloomBuilder(t *testing.T) {
mbt := &mockBloomTokenizer{}
mcc := &mockChunkClient{}

it := newLazyBloomBuilder(context.Background(), job, mcc, mbt, fpRate)
it := newLazyBloomBuilder(context.Background(), job, mcc, mbt, fpRate, logger)

// first seriesMeta has 1 chunks
require.True(t, it.Next())
Expand Down
17 changes: 8 additions & 9 deletions pkg/storage/bloom/v1/bloom_tokenizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,14 @@ const eightBits = 8
// 1) The token slices generated must not be mutated externally
// 2) The token slice must not be used after the next call to `Tokens()` as it will repopulate the slice.
// 2) This is not thread safe.
func NewBloomTokenizer(NGramLength, NGramSkip int, metrics *Metrics) *BloomTokenizer {
t := &BloomTokenizer{
metrics: metrics,
func NewBloomTokenizer(nGramLen, nGramSkip int, metrics *Metrics) *BloomTokenizer {
// TODO(chaudum): Replace logger
level.Info(util_log.Logger).Log("msg", "create new bloom tokenizer", "ngram length", nGramLen, "ngram skip", nGramSkip)
return &BloomTokenizer{
metrics: metrics,
cache: make(map[string]interface{}, cacheSize),
lineTokenizer: NewNGramTokenizer(nGramLen, nGramSkip),
}
t.cache = make(map[string]interface{}, cacheSize)
t.lineTokenizer = NewNGramTokenizer(NGramLength, NGramSkip)

level.Info(util_log.Logger).Log("bloom tokenizer created")

return t
}

func (bt *BloomTokenizer) SetLineTokenizer(t *NGramTokenizer) {
Expand Down Expand Up @@ -86,6 +84,7 @@ func prefixedToken(ngram int, chk logproto.ChunkRef) ([]byte, int) {
// PopulateSeriesWithBloom is intended to be called on the write path, and is used to populate the bloom filter for a given series.
func (bt *BloomTokenizer) PopulateSeriesWithBloom(seriesWithBloom *SeriesWithBloom, chunks []chunk.Chunk) error {
startTime := time.Now().UnixMilli()
level.Debug(util_log.Logger).Log("msg", "PopulateSeriesWithBloom")

clearCache(bt.cache)
chunkTotalUncompressedSize := 0
Expand Down
6 changes: 6 additions & 0 deletions pkg/storage/stores/shipper/bloomshipper/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,12 @@ func (b *BloomClient) PutBlocks(ctx context.Context, blocks []Block) ([]Block, e
}
key := createBlockObjectKey(block.Ref)
objectClient := b.periodicObjectClients[period]

_, err = block.Data.Seek(0, 0)
if err != nil {
return fmt.Errorf("error uploading block file: %w", err)
}

err = objectClient.PutObject(ctx, key, block.Data)
if err != nil {
return fmt.Errorf("error uploading block file: %w", err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ func CompressBloomBlock(ref BlockRef, archivePath, localDst string, logger log.L

blockToUpload.BlockRef = ref
blockToUpload.Data = archiveFile

return blockToUpload, nil
}

Expand Down
Loading