Skip to content

Commit

Permalink
per tenant retention
Browse files Browse the repository at this point in the history
Signed-off-by: Yi Jin <[email protected]>
  • Loading branch information
jnyi committed Dec 17, 2024
1 parent a26ab5f commit c106c13
Show file tree
Hide file tree
Showing 3 changed files with 151 additions and 2 deletions.
12 changes: 12 additions & 0 deletions cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,12 @@ func runCompact(
level.Info(logger).Log("msg", "retention policy of 1 hour aggregated samples is enabled", "duration", retentionByResolution[compact.ResolutionLevel1h])
}

retentionByTenant, err := compact.ParesRetentionPolicyByTenant(logger, *conf.retentionTenants)
if err != nil && len(retentionByTenant) != 0 {
level.Error(logger).Log("msg", "failed to parse retention policy by tenant", "err", err)
return err
}

var cleanMtx sync.Mutex
// TODO(GiedriusS): we could also apply retention policies here but the logic would be a bit more complex.
cleanPartialMarked := func() error {
Expand Down Expand Up @@ -534,6 +540,10 @@ func runCompact(
return errors.Wrap(err, "sync before retention")
}

if err := compact.ApplyRetentionPolicyByTenant(ctx, logger, insBkt, sy.Metas(), retentionByTenant, compactMetrics.blocksMarked.WithLabelValues(metadata.DeletionMarkFilename, "")); err != nil {
return errors.Wrap(err, "retention by tenant failed")
}

if err := compact.ApplyRetentionPolicyByResolution(ctx, logger, insBkt, sy.Metas(), retentionByResolution, compactMetrics.blocksMarked.WithLabelValues(metadata.DeletionMarkFilename, "")); err != nil {
return errors.Wrap(err, "retention failed")
}
Expand Down Expand Up @@ -726,6 +736,7 @@ type compactConfig struct {
objStore extflag.PathOrContent
consistencyDelay time.Duration
retentionRaw, retentionFiveMin, retentionOneHr model.Duration
retentionTenants *[]string
wait bool
waitInterval time.Duration
disableDownsampling bool
Expand Down Expand Up @@ -781,6 +792,7 @@ func (cc *compactConfig) registerFlag(cmd extkingpin.FlagClause) {
Default("0d").SetValue(&cc.retentionFiveMin)
cmd.Flag("retention.resolution-1h", "How long to retain samples of resolution 2 (1 hour) in bucket. Setting this to 0d will retain samples of this resolution forever").
Default("0d").SetValue(&cc.retentionOneHr)
cc.retentionTenants = cmd.Flag("retention.tenant", "How long to retain samples in bucket per tenant. Setting this to 0d will retain samples of this resolution forever").Strings()

// TODO(kakkoyun, pgough): https://github.com/thanos-io/thanos/issues/2266.
cmd.Flag("wait", "Do not exit after all compactions have been processed and wait for new work.").
Expand Down
78 changes: 78 additions & 0 deletions pkg/compact/retention.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,25 @@ package compact
import (
"context"
"fmt"
"regexp"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/thanos-io/objstore"

"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"
)

const (
tenantRetentionRegex = `^([\w-]+):((\d{4}-\d{2}-\d{2})|(\d+d))$`
)

// ApplyRetentionPolicyByResolution removes blocks depending on the specified retentionByResolution based on blocks MaxTime.
// A value of 0 disables the retention for its resolution.
func ApplyRetentionPolicyByResolution(
Expand Down Expand Up @@ -47,3 +53,75 @@ func ApplyRetentionPolicyByResolution(
level.Info(logger).Log("msg", "optional retention apply done")
return nil
}

type RetentionPolicy struct {
CutoffDate time.Time
RetentionDuration time.Duration
}

func (r RetentionPolicy) isExpired(blockMaxTime time.Time) bool {
if r.CutoffDate.IsZero() {
return time.Now().After(blockMaxTime.Add(r.RetentionDuration))
}
return r.CutoffDate.After(blockMaxTime)
}

func ParesRetentionPolicyByTenant(logger log.Logger, retentionTenants []string) (map[string]RetentionPolicy, error) {
pattern := regexp.MustCompile(tenantRetentionRegex)
retentionByTenant := make(map[string]RetentionPolicy, len(retentionTenants))
for _, tenantRetention := range retentionTenants {
matches := pattern.FindStringSubmatch(tenantRetention)
invalidFormat := errors.Errorf("invalid retention format for tenant: %s, must be `<tenant>:(<yyyy-mm-dd>|<duration>d)`", tenantRetention)
if len(matches) != 5 {
return nil, errors.Wrapf(invalidFormat, "matched size %d", len(matches))
}
tenant := matches[1]
var policy RetentionPolicy
if _, ok := retentionByTenant[tenant]; ok {
return nil, errors.Errorf("duplicate retention policy for tenant: %s", tenant)
}
if cutoffDate, err := time.Parse(time.DateOnly, matches[3]); err != nil && matches[3] != "" {
return nil, errors.Wrapf(invalidFormat, "error parsing cutoff date: %v", err)
} else if matches[3] != "" {
policy.CutoffDate = cutoffDate
}
if duration, err := model.ParseDuration(matches[4]); err != nil && matches[4] != "" {
return nil, errors.Wrapf(invalidFormat, "error parsing duration: %v", err)
} else if matches[4] != "" {
policy.RetentionDuration = time.Duration(duration)
}
level.Info(logger).Log("msg", "retention policy for tenant is enabled", "tenant", tenant, "retention policy", policy)
retentionByTenant[tenant] = policy
}
return retentionByTenant, nil
}

// ApplyRetentionPolicyByTenant removes blocks depending on the specified retentionByTenant based on blocks MaxTime.
func ApplyRetentionPolicyByTenant(
ctx context.Context,
logger log.Logger,
bkt objstore.Bucket,
metas map[ulid.ULID]*metadata.Meta,
retentionByTenant map[string]RetentionPolicy,
blocksMarkedForDeletion prometheus.Counter) error {
if len(retentionByTenant) == 0 {
level.Info(logger).Log("msg", "tenant retention is disabled due to no policy")
return nil
}
level.Info(logger).Log("msg", "start tenant retention")
for id, m := range metas {
policy, ok := retentionByTenant[m.Thanos.GetTenant()]
if !ok {
continue
}
maxTime := time.Unix(m.MaxTime/1000, 0)
if policy.isExpired(maxTime) {
level.Info(logger).Log("msg", "applying retention: marking block for deletion", "id", id, "maxTime", maxTime.String())
if err := block.MarkForDeletion(ctx, logger, bkt, id, fmt.Sprintf("block exceeding retention of %v", policy), blocksMarkedForDeletion); err != nil {
return errors.Wrap(err, "delete block")
}
}
}
level.Info(logger).Log("msg", "tenant retention apply done")
return nil
}
63 changes: 61 additions & 2 deletions pkg/compact/retention_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"testing"
"time"

"github.com/efficientgo/core/testutil"
"github.com/go-kit/log"
"github.com/oklog/ulid"
"github.com/prometheus/client_golang/prometheus"
Expand All @@ -20,8 +21,6 @@ import (
"github.com/prometheus/prometheus/tsdb"
"github.com/thanos-io/objstore"

"github.com/efficientgo/core/testutil"

"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/compact"
Expand Down Expand Up @@ -282,6 +281,66 @@ func TestApplyRetentionPolicyByResolution(t *testing.T) {
}
}

func TestParseRetentionPolicyByTenant(t *testing.T) {
t.Parallel()

for _, tt := range []struct {
name string
retentionTenants []string
expected map[string]compact.RetentionPolicy
expectedErr bool
}{
{
"empty",
[]string{},
map[string]compact.RetentionPolicy{},
false,
},
{
"valid",
[]string{"tenant-1:2021-01-01", "tenant-2:3d"},
map[string]compact.RetentionPolicy{
"tenant-1": {
CutoffDate: time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC),
RetentionDuration: time.Duration(0),
},
"tenant-2": {
CutoffDate: time.Time{},
RetentionDuration: 3 * 24 * time.Hour,
},
},
false,
},
{
"invalid tenant",
[]string{"tenant1:2021-01-01", "tenant#2:1d"},
nil,
true,
},
{
"invalid date",
[]string{"tenant1:2021-010-01", "tenant2:1d"},
nil,
true,
},
{
"invalid duration",
[]string{"tenant1:2021-01-01", "tenant2:1w"},
nil,
true,
},
} {
t.Run(tt.name, func(t *testing.T) {
got, err := compact.ParesRetentionPolicyByTenant(log.NewNopLogger(), tt.retentionTenants)
if (err != nil) != tt.expectedErr {
t.Errorf("ParseRetentionPolicyByTenant() error = %v, wantErr %v", err, tt.expectedErr)
return
}
testutil.Equals(t, got, tt.expected)
})
}
}

func uploadMockBlock(t *testing.T, bkt objstore.Bucket, id string, minTime, maxTime time.Time, resolutionLevel int64) {
t.Helper()
meta1 := metadata.Meta{
Expand Down

0 comments on commit c106c13

Please sign in to comment.