From 599eed7c523da4fab07e7842181b01e32174a6a4 Mon Sep 17 00:00:00 2001 From: Poyzan <31743851+poyzannur@users.noreply.github.com> Date: Thu, 4 Jan 2024 18:40:35 +0000 Subject: [PATCH] [bloom-compactor] remove BloomCompactorMinTableAge check (#11546) **What this PR does / why we need it**: `BloomCompactorMinTableAge` was added with the idea that we will skip most recent index files from ingesters that are not compacted into TSDB indexes yet. The default value is an hour. This blocks bloom-compacter processing TSBD of the day, because end timestamp is always in the last 15 mins. We can either reduce it something lower than the frequency of TSDB indexes being built (< 15mins). Here I choose to remove it altogether, assuming uncompacted indexes from ingesters will not be processed as a table as there won't be a schema for that table with the [check here](https://github.com/grafana/loki/blob/main/pkg/bloomcompactor/bloomcompactor.go#L268-L272). --- docs/sources/configure/_index.md | 7 ------- pkg/bloomcompactor/bloomcompactor.go | 9 ++------- pkg/bloomcompactor/config.go | 1 - pkg/validation/limits.go | 6 ------ 4 files changed, 2 insertions(+), 21 deletions(-) diff --git a/docs/sources/configure/_index.md b/docs/sources/configure/_index.md index 6998635c90ce..e2185c19474f 100644 --- a/docs/sources/configure/_index.md +++ b/docs/sources/configure/_index.md @@ -3084,13 +3084,6 @@ shard_streams: # CLI flag: -bloom-compactor.max-table-age [bloom_compactor_max_table_age: | default = 168h] -# The minimum age of a table before it is compacted. Do not compact tables newer -# than the the configured time. Default to 1 hour. 0s means no limit. This is -# useful to avoid compacting tables that will be updated with out-of-order -# writes. -# CLI flag: -bloom-compactor.min-table-age -[bloom_compactor_min_table_age: | default = 1h] - # Whether to compact chunks into bloom filters. # CLI flag: -bloom-compactor.enable-compaction [bloom_compactor_enable_compaction: | default = false] diff --git a/pkg/bloomcompactor/bloomcompactor.go b/pkg/bloomcompactor/bloomcompactor.go index d9a83ec5b710..06cdf5c6f93a 100644 --- a/pkg/bloomcompactor/bloomcompactor.go +++ b/pkg/bloomcompactor/bloomcompactor.go @@ -246,13 +246,11 @@ func (c *Compactor) runCompaction(ctx context.Context) error { _ = concurrency.ForEachJob(ctx, len(tables), parallelism, func(ctx context.Context, i int) error { tableName := tables[i] logger := log.With(c.logger, "table", tableName) - level.Info(logger).Log("msg", "compacting table") err := c.compactTable(ctx, logger, tableName, tablesIntervals[tableName]) if err != nil { errs.Add(err) return nil } - level.Info(logger).Log("msg", "finished compacting table") return nil }) @@ -307,12 +305,7 @@ func (c *Compactor) compactUsers(ctx context.Context, logger log.Logger, sc stor // Skip this table if it is too new/old for the tenant limits. now := model.Now() - tableMinAge := c.limits.BloomCompactorMinTableAge(tenant) tableMaxAge := c.limits.BloomCompactorMaxTableAge(tenant) - if tableMinAge > 0 && tableInterval.End.After(now.Add(-tableMinAge)) { - level.Debug(tenantLogger).Log("msg", "skipping tenant because table is too new ", "table-min-age", tableMinAge, "table-end", tableInterval.End, "now", now) - continue - } if tableMaxAge > 0 && tableInterval.Start.Before(now.Add(-tableMaxAge)) { level.Debug(tenantLogger).Log("msg", "skipping tenant because table is too old", "table-max-age", tableMaxAge, "table-start", tableInterval.Start, "now", now) continue @@ -507,6 +500,7 @@ func (c *Compactor) runCompact(ctx context.Context, logger log.Logger, job Job, } }() + level.Info(logger).Log("msg", "started compacting table", "table", job.tableName, "tenant", job.tenantID) if len(blocksMatchingJob) == 0 && len(metasMatchingJob) > 0 { // There is no change to any blocks, no compaction needed level.Info(logger).Log("msg", "No changes to tsdb, no compaction needed") @@ -597,5 +591,6 @@ func (c *Compactor) runCompact(ctx context.Context, logger log.Logger, job Job, level.Error(logger).Log("msg", "failed uploading meta.json to storage", "err", err) return err } + level.Info(logger).Log("msg", "finished compacting table", "table", job.tableName, "tenant", job.tenantID) return nil } diff --git a/pkg/bloomcompactor/config.go b/pkg/bloomcompactor/config.go index 3bdf65d3e68a..c3969ac6af38 100644 --- a/pkg/bloomcompactor/config.go +++ b/pkg/bloomcompactor/config.go @@ -42,7 +42,6 @@ type Limits interface { downloads.Limits BloomCompactorShardSize(tenantID string) int BloomCompactorMaxTableAge(tenantID string) time.Duration - BloomCompactorMinTableAge(tenantID string) time.Duration BloomCompactorEnabled(tenantID string) bool BloomNGramLength(tenantID string) int BloomNGramSkip(tenantID string) int diff --git a/pkg/validation/limits.go b/pkg/validation/limits.go index 7a1cab3e1e0e..7f1f6ea0d734 100644 --- a/pkg/validation/limits.go +++ b/pkg/validation/limits.go @@ -186,7 +186,6 @@ type Limits struct { BloomCompactorShardSize int `yaml:"bloom_compactor_shard_size" json:"bloom_compactor_shard_size"` BloomCompactorMaxTableAge time.Duration `yaml:"bloom_compactor_max_table_age" json:"bloom_compactor_max_table_age"` - BloomCompactorMinTableAge time.Duration `yaml:"bloom_compactor_min_table_age" json:"bloom_compactor_min_table_age"` BloomCompactorEnabled bool `yaml:"bloom_compactor_enable_compaction" json:"bloom_compactor_enable_compaction"` BloomNGramLength int `yaml:"bloom_ngram_length" json:"bloom_ngram_length"` BloomNGramSkip int `yaml:"bloom_ngram_skip" json:"bloom_ngram_skip"` @@ -312,7 +311,6 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.IntVar(&l.BloomCompactorShardSize, "bloom-compactor.shard-size", 1, "The shard size defines how many bloom compactors should be used by a tenant when computing blooms. If it's set to 0, shuffle sharding is disabled.") f.DurationVar(&l.BloomCompactorMaxTableAge, "bloom-compactor.max-table-age", 7*24*time.Hour, "The maximum age of a table before it is compacted. Do not compact tables older than the the configured time. Default to 7 days. 0s means no limit.") - f.DurationVar(&l.BloomCompactorMinTableAge, "bloom-compactor.min-table-age", 1*time.Hour, "The minimum age of a table before it is compacted. Do not compact tables newer than the the configured time. Default to 1 hour. 0s means no limit. This is useful to avoid compacting tables that will be updated with out-of-order writes.") f.BoolVar(&l.BloomCompactorEnabled, "bloom-compactor.enable-compaction", false, "Whether to compact chunks into bloom filters.") f.IntVar(&l.BloomNGramLength, "bloom-compactor.ngram-length", 4, "Length of the n-grams created when computing blooms from log lines.") f.IntVar(&l.BloomNGramSkip, "bloom-compactor.ngram-skip", 0, "Skip factor for the n-grams created when computing blooms from log lines.") @@ -838,10 +836,6 @@ func (o *Overrides) BloomCompactorMaxTableAge(userID string) time.Duration { return o.getOverridesForUser(userID).BloomCompactorMaxTableAge } -func (o *Overrides) BloomCompactorMinTableAge(userID string) time.Duration { - return o.getOverridesForUser(userID).BloomCompactorMinTableAge -} - func (o *Overrides) BloomCompactorEnabled(userID string) bool { return o.getOverridesForUser(userID).BloomCompactorEnabled }