From c3c1e5913d72b9b59d4c5b5dca2abfe8ad834f71 Mon Sep 17 00:00:00 2001
From: Yi Jin <yi.jin@databricks.com>
Date: Tue, 17 Dec 2024 09:52:21 -0800
Subject: [PATCH] per tenant retention

Signed-off-by: Yi Jin <yi.jin@databricks.com>
---
 cmd/thanos/compact.go         | 12 ++++++
 pkg/block/metadata/markers.go |  2 +
 pkg/compact/retention.go      | 78 +++++++++++++++++++++++++++++++++++
 pkg/compact/retention_test.go | 63 +++++++++++++++++++++++++++-
 4 files changed, 153 insertions(+), 2 deletions(-)

diff --git a/cmd/thanos/compact.go b/cmd/thanos/compact.go
index 98f9699a43..f2a8290afe 100644
--- a/cmd/thanos/compact.go
+++ b/cmd/thanos/compact.go
@@ -436,6 +436,12 @@ func runCompact(
 		level.Info(logger).Log("msg", "retention policy of 1 hour aggregated samples is enabled", "duration", retentionByResolution[compact.ResolutionLevel1h])
 	}
 
+	retentionByTenant, err := compact.ParesRetentionPolicyByTenant(logger, *conf.retentionTenants)
+	if err != nil {
+		level.Error(logger).Log("msg", "failed to parse retention policy by tenant", "err", err)
+		return err
+	}
+
 	var cleanMtx sync.Mutex
 	// TODO(GiedriusS): we could also apply retention policies here but the logic would be a bit more complex.
 	cleanPartialMarked := func() error {
@@ -534,6 +540,10 @@ func runCompact(
 			return errors.Wrap(err, "sync before retention")
 		}
 
+		if err := compact.ApplyRetentionPolicyByTenant(ctx, logger, insBkt, sy.Metas(), retentionByTenant, compactMetrics.blocksMarked.WithLabelValues(metadata.DeletionMarkFilename, metadata.TenantRetentionExpired)); err != nil {
+			return errors.Wrap(err, "retention by tenant failed")
+		}
+
 		if err := compact.ApplyRetentionPolicyByResolution(ctx, logger, insBkt, sy.Metas(), retentionByResolution, compactMetrics.blocksMarked.WithLabelValues(metadata.DeletionMarkFilename, "")); err != nil {
 			return errors.Wrap(err, "retention failed")
 		}
@@ -726,6 +736,7 @@ type compactConfig struct {
 	objStore                                       extflag.PathOrContent
 	consistencyDelay                               time.Duration
 	retentionRaw, retentionFiveMin, retentionOneHr model.Duration
+	retentionTenants                               *[]string
 	wait                                           bool
 	waitInterval                                   time.Duration
 	disableDownsampling                            bool
@@ -781,6 +792,7 @@ func (cc *compactConfig) registerFlag(cmd extkingpin.FlagClause) {
 		Default("0d").SetValue(&cc.retentionFiveMin)
 	cmd.Flag("retention.resolution-1h", "How long to retain samples of resolution 2 (1 hour) in bucket. Setting this to 0d will retain samples of this resolution forever").
 		Default("0d").SetValue(&cc.retentionOneHr)
+	cc.retentionTenants = cmd.Flag("retention.tenant", "How long to retain samples in bucket per tenant. Setting this to 0d will retain samples of this resolution forever").Strings()
 
 	// TODO(kakkoyun, pgough): https://github.com/thanos-io/thanos/issues/2266.
 	cmd.Flag("wait", "Do not exit after all compactions have been processed and wait for new work.").
diff --git a/pkg/block/metadata/markers.go b/pkg/block/metadata/markers.go
index 0a351a5fab..416178eb74 100644
--- a/pkg/block/metadata/markers.go
+++ b/pkg/block/metadata/markers.go
@@ -81,6 +81,8 @@ const (
 	OutOfOrderChunksNoCompactReason = "block-index-out-of-order-chunk"
 	// DownsampleVerticalCompactionNoCompactReason is a reason to not compact overlapping downsampled blocks as it does not make sense e.g. how to vertically compact the average.
 	DownsampleVerticalCompactionNoCompactReason = "downsample-vertical-compaction"
+	// TenantRetentionExpired is a reason to delete block as it's per tenant retention is expired.
+	TenantRetentionExpired = "tenant-retention-expired"
 )
 
 // NoCompactMark marker stores reason of block being excluded from compaction if needed.
diff --git a/pkg/compact/retention.go b/pkg/compact/retention.go
index 40ea0a1a72..e5b1392e58 100644
--- a/pkg/compact/retention.go
+++ b/pkg/compact/retention.go
@@ -6,6 +6,7 @@ package compact
 import (
 	"context"
 	"fmt"
+	"regexp"
 	"time"
 
 	"github.com/go-kit/log"
@@ -13,12 +14,17 @@ import (
 	"github.com/oklog/ulid"
 	"github.com/pkg/errors"
 	"github.com/prometheus/client_golang/prometheus"
+	"github.com/prometheus/common/model"
 	"github.com/thanos-io/objstore"
 
 	"github.com/thanos-io/thanos/pkg/block"
 	"github.com/thanos-io/thanos/pkg/block/metadata"
 )
 
+const (
+	tenantRetentionRegex = `^([\w-]+):((\d{4}-\d{2}-\d{2})|(\d+d))$`
+)
+
 // ApplyRetentionPolicyByResolution removes blocks depending on the specified retentionByResolution based on blocks MaxTime.
 // A value of 0 disables the retention for its resolution.
 func ApplyRetentionPolicyByResolution(
@@ -47,3 +53,75 @@ func ApplyRetentionPolicyByResolution(
 	level.Info(logger).Log("msg", "optional retention apply done")
 	return nil
 }
+
+type RetentionPolicy struct {
+	CutoffDate        time.Time
+	RetentionDuration time.Duration
+}
+
+func (r RetentionPolicy) isExpired(blockMaxTime time.Time) bool {
+	if r.CutoffDate.IsZero() {
+		return time.Now().After(blockMaxTime.Add(r.RetentionDuration))
+	}
+	return r.CutoffDate.After(blockMaxTime)
+}
+
+func ParesRetentionPolicyByTenant(logger log.Logger, retentionTenants []string) (map[string]RetentionPolicy, error) {
+	pattern := regexp.MustCompile(tenantRetentionRegex)
+	retentionByTenant := make(map[string]RetentionPolicy, len(retentionTenants))
+	for _, tenantRetention := range retentionTenants {
+		matches := pattern.FindStringSubmatch(tenantRetention)
+		invalidFormat := errors.Errorf("invalid retention format for tenant: %s, must be `<tenant>:(<yyyy-mm-dd>|<duration>d)`", tenantRetention)
+		if len(matches) != 5 {
+			return nil, errors.Wrapf(invalidFormat, "matched size %d", len(matches))
+		}
+		tenant := matches[1]
+		var policy RetentionPolicy
+		if _, ok := retentionByTenant[tenant]; ok {
+			return nil, errors.Errorf("duplicate retention policy for tenant: %s", tenant)
+		}
+		if cutoffDate, err := time.Parse(time.DateOnly, matches[3]); err != nil && matches[3] != "" {
+			return nil, errors.Wrapf(invalidFormat, "error parsing cutoff date: %v", err)
+		} else if matches[3] != "" {
+			policy.CutoffDate = cutoffDate
+		}
+		if duration, err := model.ParseDuration(matches[4]); err != nil && matches[4] != "" {
+			return nil, errors.Wrapf(invalidFormat, "error parsing duration: %v", err)
+		} else if matches[4] != "" {
+			policy.RetentionDuration = time.Duration(duration)
+		}
+		level.Info(logger).Log("msg", "retention policy for tenant is enabled", "tenant", tenant, "retention policy", fmt.Sprintf("%v", policy))
+		retentionByTenant[tenant] = policy
+	}
+	return retentionByTenant, nil
+}
+
+// ApplyRetentionPolicyByTenant removes blocks depending on the specified retentionByTenant based on blocks MaxTime.
+func ApplyRetentionPolicyByTenant(
+	ctx context.Context,
+	logger log.Logger,
+	bkt objstore.Bucket,
+	metas map[ulid.ULID]*metadata.Meta,
+	retentionByTenant map[string]RetentionPolicy,
+	blocksMarkedForDeletion prometheus.Counter) error {
+	if len(retentionByTenant) == 0 {
+		level.Info(logger).Log("msg", "tenant retention is disabled due to no policy")
+		return nil
+	}
+	level.Info(logger).Log("msg", "start tenant retention")
+	for id, m := range metas {
+		policy, ok := retentionByTenant[m.Thanos.GetTenant()]
+		if !ok {
+			continue
+		}
+		maxTime := time.Unix(m.MaxTime/1000, 0)
+		if policy.isExpired(maxTime) {
+			level.Info(logger).Log("msg", "applying retention: marking block for deletion", "id", id, "maxTime", maxTime.String())
+			if err := block.MarkForDeletion(ctx, logger, bkt, id, fmt.Sprintf("block exceeding retention of %v", policy), blocksMarkedForDeletion); err != nil {
+				return errors.Wrap(err, "delete block")
+			}
+		}
+	}
+	level.Info(logger).Log("msg", "tenant retention apply done")
+	return nil
+}
diff --git a/pkg/compact/retention_test.go b/pkg/compact/retention_test.go
index d883f23fea..37e39b86f1 100644
--- a/pkg/compact/retention_test.go
+++ b/pkg/compact/retention_test.go
@@ -12,6 +12,7 @@ import (
 	"testing"
 	"time"
 
+	"github.com/efficientgo/core/testutil"
 	"github.com/go-kit/log"
 	"github.com/oklog/ulid"
 	"github.com/prometheus/client_golang/prometheus"
@@ -20,8 +21,6 @@ import (
 	"github.com/prometheus/prometheus/tsdb"
 	"github.com/thanos-io/objstore"
 
-	"github.com/efficientgo/core/testutil"
-
 	"github.com/thanos-io/thanos/pkg/block"
 	"github.com/thanos-io/thanos/pkg/block/metadata"
 	"github.com/thanos-io/thanos/pkg/compact"
@@ -282,6 +281,66 @@ func TestApplyRetentionPolicyByResolution(t *testing.T) {
 	}
 }
 
+func TestParseRetentionPolicyByTenant(t *testing.T) {
+	t.Parallel()
+
+	for _, tt := range []struct {
+		name             string
+		retentionTenants []string
+		expected         map[string]compact.RetentionPolicy
+		expectedErr      bool
+	}{
+		{
+			"empty",
+			[]string{},
+			map[string]compact.RetentionPolicy{},
+			false,
+		},
+		{
+			"valid",
+			[]string{"tenant-1:2021-01-01", "tenant-2:3d"},
+			map[string]compact.RetentionPolicy{
+				"tenant-1": {
+					CutoffDate:        time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC),
+					RetentionDuration: time.Duration(0),
+				},
+				"tenant-2": {
+					CutoffDate:        time.Time{},
+					RetentionDuration: 3 * 24 * time.Hour,
+				},
+			},
+			false,
+		},
+		{
+			"invalid tenant",
+			[]string{"tenant1:2021-01-01", "tenant#2:1d"},
+			nil,
+			true,
+		},
+		{
+			"invalid date",
+			[]string{"tenant1:2021-010-01", "tenant2:1d"},
+			nil,
+			true,
+		},
+		{
+			"invalid duration",
+			[]string{"tenant1:2021-01-01", "tenant2:1w"},
+			nil,
+			true,
+		},
+	} {
+		t.Run(tt.name, func(t *testing.T) {
+			got, err := compact.ParesRetentionPolicyByTenant(log.NewNopLogger(), tt.retentionTenants)
+			if (err != nil) != tt.expectedErr {
+				t.Errorf("ParseRetentionPolicyByTenant() error = %v, wantErr %v", err, tt.expectedErr)
+				return
+			}
+			testutil.Equals(t, got, tt.expected)
+		})
+	}
+}
+
 func uploadMockBlock(t *testing.T, bkt objstore.Bucket, id string, minTime, maxTime time.Time, resolutionLevel int64) {
 	t.Helper()
 	meta1 := metadata.Meta{