diff --git a/pkg/compact/retention.go b/pkg/compact/retention.go index e5b1392e58..3d95640241 100644 --- a/pkg/compact/retention.go +++ b/pkg/compact/retention.go @@ -22,6 +22,8 @@ import ( ) const ( + // tenantRetentionRegex is the regex pattern for parsing tenant retention. + // valid format is `:(|d)` where > 0. tenantRetentionRegex = `^([\w-]+):((\d{4}-\d{2}-\d{2})|(\d+d))$` ) @@ -80,14 +82,18 @@ func ParesRetentionPolicyByTenant(logger log.Logger, retentionTenants []string) if _, ok := retentionByTenant[tenant]; ok { return nil, errors.Errorf("duplicate retention policy for tenant: %s", tenant) } - if cutoffDate, err := time.Parse(time.DateOnly, matches[3]); err != nil && matches[3] != "" { - return nil, errors.Wrapf(invalidFormat, "error parsing cutoff date: %v", err) - } else if matches[3] != "" { + if cutoffDate, err := time.Parse(time.DateOnly, matches[3]); matches[3] != "" { + if err != nil { + return nil, errors.Wrapf(invalidFormat, "error parsing cutoff date: %v", err) + } policy.CutoffDate = cutoffDate } - if duration, err := model.ParseDuration(matches[4]); err != nil && matches[4] != "" { - return nil, errors.Wrapf(invalidFormat, "error parsing duration: %v", err) - } else if matches[4] != "" { + if duration, err := model.ParseDuration(matches[4]); matches[4] != "" { + if err != nil { + return nil, errors.Wrapf(invalidFormat, "error parsing duration: %v", err) + } else if duration == 0 { + return nil, errors.Wrapf(invalidFormat, "duration must be greater than 0") + } policy.RetentionDuration = time.Duration(duration) } level.Info(logger).Log("msg", "retention policy for tenant is enabled", "tenant", tenant, "retention policy", fmt.Sprintf("%v", policy)) diff --git a/pkg/compact/retention_test.go b/pkg/compact/retention_test.go index 37e39b86f1..760403ceac 100644 --- a/pkg/compact/retention_test.go +++ b/pkg/compact/retention_test.go @@ -12,7 +12,6 @@ import ( "testing" "time" - "github.com/efficientgo/core/testutil" "github.com/go-kit/log" "github.com/oklog/ulid" "github.com/prometheus/client_golang/prometheus" @@ -21,6 +20,8 @@ import ( "github.com/prometheus/prometheus/tsdb" "github.com/thanos-io/objstore" + "github.com/efficientgo/core/testutil" + "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/compact" @@ -280,6 +281,30 @@ func TestApplyRetentionPolicyByResolution(t *testing.T) { }) } } +func uploadMockBlock(t *testing.T, bkt objstore.Bucket, id string, minTime, maxTime time.Time, resolutionLevel int64) { + t.Helper() + meta1 := metadata.Meta{ + BlockMeta: tsdb.BlockMeta{ + ULID: ulid.MustParse(id), + MinTime: minTime.Unix() * 1000, + MaxTime: maxTime.Unix() * 1000, + Version: 1, + }, + Thanos: metadata.Thanos{ + Downsample: metadata.ThanosDownsample{ + Resolution: resolutionLevel, + }, + }, + } + + b, err := json.Marshal(meta1) + testutil.Ok(t, err) + + testutil.Ok(t, bkt.Upload(context.Background(), id+"/meta.json", bytes.NewReader(b))) + testutil.Ok(t, bkt.Upload(context.Background(), id+"/chunks/000001", strings.NewReader("@test-data@"))) + testutil.Ok(t, bkt.Upload(context.Background(), id+"/chunks/000002", strings.NewReader("@test-data@"))) + testutil.Ok(t, bkt.Upload(context.Background(), id+"/chunks/000003", strings.NewReader("@test-data@"))) +} func TestParseRetentionPolicyByTenant(t *testing.T) { t.Parallel() @@ -298,7 +323,7 @@ func TestParseRetentionPolicyByTenant(t *testing.T) { }, { "valid", - []string{"tenant-1:2021-01-01", "tenant-2:3d"}, + []string{"tenant-1:2021-01-01", "tenant-2:11d"}, map[string]compact.RetentionPolicy{ "tenant-1": { CutoffDate: time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC), @@ -306,7 +331,7 @@ func TestParseRetentionPolicyByTenant(t *testing.T) { }, "tenant-2": { CutoffDate: time.Time{}, - RetentionDuration: 3 * 24 * time.Hour, + RetentionDuration: 11 * 24 * time.Hour, }, }, false, @@ -329,6 +354,12 @@ func TestParseRetentionPolicyByTenant(t *testing.T) { nil, true, }, + { + "invalid duration which is 0", + []string{"tenant1:2021-01-01", "tenant2:0d"}, + nil, + true, + }, } { t.Run(tt.name, func(t *testing.T) { got, err := compact.ParesRetentionPolicyByTenant(log.NewNopLogger(), tt.retentionTenants) @@ -341,7 +372,179 @@ func TestParseRetentionPolicyByTenant(t *testing.T) { } } -func uploadMockBlock(t *testing.T, bkt objstore.Bucket, id string, minTime, maxTime time.Time, resolutionLevel int64) { +func TestApplyRetentionPolicyByTenant(t *testing.T) { + t.Parallel() + + type testBlock struct { + id, tenant string + minTime time.Time + maxTime time.Time + } + + logger := log.NewNopLogger() + ctx := context.TODO() + + for _, tt := range []struct { + name string + blocks []testBlock + retentionByTenant map[string]compact.RetentionPolicy + want []string + wantErr bool + }{ + { + "empty bucket", + []testBlock{}, + map[string]compact.RetentionPolicy{}, + []string{}, + false, + }, + { + "tenant retention disabled", + []testBlock{ + { + "01CPHBEX20729MJQZXE3W0BW48", + "tenant-1", + time.Now().Add(-3 * 24 * time.Hour), + time.Now().Add(-2 * 24 * time.Hour), + }, + { + "01CPHBEX20729MJQZXE3W0BW49", + "tenant-2", + time.Now().Add(-3 * 24 * time.Hour), + time.Now().Add(-2 * 24 * time.Hour), + }, + }, + map[string]compact.RetentionPolicy{}, + []string{ + "01CPHBEX20729MJQZXE3W0BW48/", + "01CPHBEX20729MJQZXE3W0BW49/", + }, + false, + }, + { + "tenant retention with duration", + []testBlock{ + { + "01CPHBEX20729MJQZXE3W0BW48", + "tenant-1", + time.Now().Add(-3 * 24 * time.Hour), + time.Now().Add(-2 * 24 * time.Hour), + }, + { + "01CPHBEX20729MJQZXE3W0BW49", + "tenant-1", + time.Now().Add(-2 * 24 * time.Hour), + time.Now().Add(-24 * time.Hour), + }, + { + "01CPHBEX20729MJQZXE3W0BW50", + "tenant-2", + time.Now().Add(-24 * time.Hour), + time.Now().Add(-23 * time.Hour), + }, + { + "01CPHBEX20729MJQZXE3W0BW51", + "tenant-2", + time.Now().Add(-23 * time.Hour), + time.Now().Add(-6 * time.Hour), + }, + }, + map[string]compact.RetentionPolicy{ + "tenant-2": { + CutoffDate: time.Time{}, + RetentionDuration: 10 * time.Hour, + }, + }, + []string{ + "01CPHBEX20729MJQZXE3W0BW48/", + "01CPHBEX20729MJQZXE3W0BW49/", + "01CPHBEX20729MJQZXE3W0BW51/", + }, + false, + }, + { + "tenant retention with cutoff date", + []testBlock{ + { + "01CPHBEX20729MJQZXE3W0BW48", + "tenant-1", + time.Now().Add(-3 * 24 * time.Hour), + time.Now().Add(-2 * 24 * time.Hour), + }, + { + "01CPHBEX20729MJQZXE3W0BW49", + "tenant-1", + time.Now().Add(-2 * 24 * time.Hour), + time.Now().Add(-24 * time.Hour), + }, + { + "01CPHBEX20729MJQZXE3W0BW50", + "tenant-2", + time.Date(2024, 11, 1, 0, 0, 0, 0, time.UTC), + time.Date(2024, 11, 1, 0, 0, 0, 0, time.UTC), + }, + { + "01CPHBEX20729MJQZXE3W0BW51", + "tenant-2", + time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC), + time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC), + }, + }, + map[string]compact.RetentionPolicy{ + "tenant-2": { + CutoffDate: time.Date(2024, 10, 1, 0, 0, 0, 0, time.UTC), + RetentionDuration: 0, + }, + }, + []string{ + "01CPHBEX20729MJQZXE3W0BW48/", + "01CPHBEX20729MJQZXE3W0BW49/", + "01CPHBEX20729MJQZXE3W0BW50/", + }, + false, + }, + } { + t.Run(tt.name, func(t *testing.T) { + bkt := objstore.WithNoopInstr(objstore.NewInMemBucket()) + for _, b := range tt.blocks { + uploadTenantBlock(t, bkt, b.id, b.tenant, b.minTime, b.maxTime) + } + + baseBlockIDsFetcher := block.NewConcurrentLister(logger, bkt) + metaFetcher, err := block.NewMetaFetcher(logger, 32, bkt, baseBlockIDsFetcher, "", nil, nil) + testutil.Ok(t, err) + + blocksMarkedForDeletion := promauto.With(nil).NewCounter(prometheus.CounterOpts{}) + + metas, _, err := metaFetcher.Fetch(ctx) + testutil.Ok(t, err) + + if err := compact.ApplyRetentionPolicyByTenant(ctx, logger, bkt, metas, tt.retentionByTenant, blocksMarkedForDeletion); (err != nil) != tt.wantErr { + t.Errorf("ApplyRetentionPolicyByResolution() error = %v, wantErr %v", err, tt.wantErr) + } + + got := []string{} + gotMarkedBlocksCount := 0.0 + testutil.Ok(t, bkt.Iter(context.TODO(), "", func(name string) error { + exists, err := bkt.Exists(ctx, filepath.Join(name, metadata.DeletionMarkFilename)) + if err != nil { + return err + } + if !exists { + got = append(got, name) + return nil + } + gotMarkedBlocksCount += 1.0 + return nil + })) + + testutil.Equals(t, got, tt.want) + testutil.Equals(t, gotMarkedBlocksCount, promtest.ToFloat64(blocksMarkedForDeletion)) + }) + } +} + +func uploadTenantBlock(t *testing.T, bkt objstore.Bucket, id, tenant string, minTime, maxTime time.Time) { t.Helper() meta1 := metadata.Meta{ BlockMeta: tsdb.BlockMeta{ @@ -351,8 +554,8 @@ func uploadMockBlock(t *testing.T, bkt objstore.Bucket, id string, minTime, maxT Version: 1, }, Thanos: metadata.Thanos{ - Downsample: metadata.ThanosDownsample{ - Resolution: resolutionLevel, + Labels: map[string]string{ + metadata.TenantLabel: tenant, }, }, }