diff --git a/cmd/thanos/compact.go b/cmd/thanos/compact.go index 98f9699a43..7359b18c89 100644 --- a/cmd/thanos/compact.go +++ b/cmd/thanos/compact.go @@ -436,6 +436,12 @@ func runCompact( level.Info(logger).Log("msg", "retention policy of 1 hour aggregated samples is enabled", "duration", retentionByResolution[compact.ResolutionLevel1h]) } + retentionByTenant, err := compact.ParesRetentionPolicyByTenant(logger, *conf.retentionTenants) + if err != nil && len(retentionByTenant) != 0 { + level.Error(logger).Log("msg", "failed to parse retention policy by tenant", "err", err) + return err + } + var cleanMtx sync.Mutex // TODO(GiedriusS): we could also apply retention policies here but the logic would be a bit more complex. cleanPartialMarked := func() error { @@ -534,6 +540,10 @@ func runCompact( return errors.Wrap(err, "sync before retention") } + if err := compact.ApplyRetentionPolicyByTenant(ctx, logger, insBkt, sy.Metas(), retentionByTenant, compactMetrics.blocksMarked.WithLabelValues(metadata.DeletionMarkFilename, "")); err != nil { + return errors.Wrap(err, "retention by tenant failed") + } + if err := compact.ApplyRetentionPolicyByResolution(ctx, logger, insBkt, sy.Metas(), retentionByResolution, compactMetrics.blocksMarked.WithLabelValues(metadata.DeletionMarkFilename, "")); err != nil { return errors.Wrap(err, "retention failed") } @@ -726,6 +736,7 @@ type compactConfig struct { objStore extflag.PathOrContent consistencyDelay time.Duration retentionRaw, retentionFiveMin, retentionOneHr model.Duration + retentionTenants *[]string wait bool waitInterval time.Duration disableDownsampling bool @@ -781,6 +792,7 @@ func (cc *compactConfig) registerFlag(cmd extkingpin.FlagClause) { Default("0d").SetValue(&cc.retentionFiveMin) cmd.Flag("retention.resolution-1h", "How long to retain samples of resolution 2 (1 hour) in bucket. Setting this to 0d will retain samples of this resolution forever"). Default("0d").SetValue(&cc.retentionOneHr) + cc.retentionTenants = cmd.Flag("retention.tenant", "How long to retain samples in bucket per tenant. Setting this to 0d will retain samples of this resolution forever").Strings() // TODO(kakkoyun, pgough): https://github.com/thanos-io/thanos/issues/2266. cmd.Flag("wait", "Do not exit after all compactions have been processed and wait for new work."). diff --git a/pkg/compact/retention.go b/pkg/compact/retention.go index 40ea0a1a72..a1d9c3b466 100644 --- a/pkg/compact/retention.go +++ b/pkg/compact/retention.go @@ -6,6 +6,7 @@ package compact import ( "context" "fmt" + "regexp" "time" "github.com/go-kit/log" @@ -13,12 +14,17 @@ import ( "github.com/oklog/ulid" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" "github.com/thanos-io/objstore" "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/block/metadata" ) +const ( + tenantRetentionRegex = `^([\w-]+):((\d{4}-\d{2}-\d{2})|(\d+d))$` +) + // ApplyRetentionPolicyByResolution removes blocks depending on the specified retentionByResolution based on blocks MaxTime. // A value of 0 disables the retention for its resolution. func ApplyRetentionPolicyByResolution( @@ -47,3 +53,75 @@ func ApplyRetentionPolicyByResolution( level.Info(logger).Log("msg", "optional retention apply done") return nil } + +type RetentionPolicy struct { + CutoffDate time.Time + RetentionDuration time.Duration +} + +func (r RetentionPolicy) isExpired(blockMaxTime time.Time) bool { + if r.CutoffDate.IsZero() { + return time.Now().After(blockMaxTime.Add(r.RetentionDuration)) + } + return r.CutoffDate.After(blockMaxTime) +} + +func ParesRetentionPolicyByTenant(logger log.Logger, retentionTenants []string) (map[string]RetentionPolicy, error) { + pattern := regexp.MustCompile(tenantRetentionRegex) + retentionByTenant := make(map[string]RetentionPolicy, len(retentionTenants)) + for _, tenantRetention := range retentionTenants { + matches := pattern.FindStringSubmatch(tenantRetention) + invalidFormat := errors.Errorf("invalid retention format for tenant: %s, must be `:(|d)`", tenantRetention) + if len(matches) != 5 { + return nil, errors.Wrapf(invalidFormat, "matched size %d", len(matches)) + } + tenant := matches[1] + var policy RetentionPolicy + if _, ok := retentionByTenant[tenant]; ok { + return nil, errors.Errorf("duplicate retention policy for tenant: %s", tenant) + } + if cutoffDate, err := time.Parse(time.DateOnly, matches[3]); err != nil && matches[3] != "" { + return nil, errors.Wrapf(invalidFormat, "error parsing cutoff date: %v", err) + } else if matches[3] != "" { + policy.CutoffDate = cutoffDate + } + if duration, err := model.ParseDuration(matches[4]); err != nil && matches[4] != "" { + return nil, errors.Wrapf(invalidFormat, "error parsing duration: %v", err) + } else if matches[4] != "" { + policy.RetentionDuration = time.Duration(duration) + } + level.Info(logger).Log("msg", "retention policy for tenant is enabled", "tenant", tenant, "retention policy", policy) + retentionByTenant[tenant] = policy + } + return retentionByTenant, nil +} + +// ApplyRetentionPolicyByTenant removes blocks depending on the specified retentionByTenant based on blocks MaxTime. +func ApplyRetentionPolicyByTenant( + ctx context.Context, + logger log.Logger, + bkt objstore.Bucket, + metas map[ulid.ULID]*metadata.Meta, + retentionByTenant map[string]RetentionPolicy, + blocksMarkedForDeletion prometheus.Counter) error { + if len(retentionByTenant) == 0 { + level.Info(logger).Log("msg", "tenant retention is disabled due to no policy") + return nil + } + level.Info(logger).Log("msg", "start tenant retention") + for id, m := range metas { + policy, ok := retentionByTenant[m.Thanos.GetTenant()] + if !ok { + continue + } + maxTime := time.Unix(m.MaxTime/1000, 0) + if policy.isExpired(maxTime) { + level.Info(logger).Log("msg", "applying retention: marking block for deletion", "id", id, "maxTime", maxTime.String()) + if err := block.MarkForDeletion(ctx, logger, bkt, id, fmt.Sprintf("block exceeding retention of %v", policy), blocksMarkedForDeletion); err != nil { + return errors.Wrap(err, "delete block") + } + } + } + level.Info(logger).Log("msg", "tenant retention apply done") + return nil +} diff --git a/pkg/compact/retention_test.go b/pkg/compact/retention_test.go index d883f23fea..37e39b86f1 100644 --- a/pkg/compact/retention_test.go +++ b/pkg/compact/retention_test.go @@ -12,6 +12,7 @@ import ( "testing" "time" + "github.com/efficientgo/core/testutil" "github.com/go-kit/log" "github.com/oklog/ulid" "github.com/prometheus/client_golang/prometheus" @@ -20,8 +21,6 @@ import ( "github.com/prometheus/prometheus/tsdb" "github.com/thanos-io/objstore" - "github.com/efficientgo/core/testutil" - "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/compact" @@ -282,6 +281,66 @@ func TestApplyRetentionPolicyByResolution(t *testing.T) { } } +func TestParseRetentionPolicyByTenant(t *testing.T) { + t.Parallel() + + for _, tt := range []struct { + name string + retentionTenants []string + expected map[string]compact.RetentionPolicy + expectedErr bool + }{ + { + "empty", + []string{}, + map[string]compact.RetentionPolicy{}, + false, + }, + { + "valid", + []string{"tenant-1:2021-01-01", "tenant-2:3d"}, + map[string]compact.RetentionPolicy{ + "tenant-1": { + CutoffDate: time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC), + RetentionDuration: time.Duration(0), + }, + "tenant-2": { + CutoffDate: time.Time{}, + RetentionDuration: 3 * 24 * time.Hour, + }, + }, + false, + }, + { + "invalid tenant", + []string{"tenant1:2021-01-01", "tenant#2:1d"}, + nil, + true, + }, + { + "invalid date", + []string{"tenant1:2021-010-01", "tenant2:1d"}, + nil, + true, + }, + { + "invalid duration", + []string{"tenant1:2021-01-01", "tenant2:1w"}, + nil, + true, + }, + } { + t.Run(tt.name, func(t *testing.T) { + got, err := compact.ParesRetentionPolicyByTenant(log.NewNopLogger(), tt.retentionTenants) + if (err != nil) != tt.expectedErr { + t.Errorf("ParseRetentionPolicyByTenant() error = %v, wantErr %v", err, tt.expectedErr) + return + } + testutil.Equals(t, got, tt.expected) + }) + } +} + func uploadMockBlock(t *testing.T, bkt objstore.Bucket, id string, minTime, maxTime time.Time, resolutionLevel int64) { t.Helper() meta1 := metadata.Meta{