Skip to content

Commit

Permalink
Introduce cleaner visit marker (cortexproject#6113)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexqyle authored Jul 31, 2024
1 parent e5f47e1 commit 4539330
Show file tree
Hide file tree
Showing 10 changed files with 623 additions and 22 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
* [ENHANCEMENT] Compactor: Differentiate retry and halt error and retry failed compaction only on retriable error. #6111
* [ENHANCEMENT] Ruler: Add support for filtering by `state` and `health` field on Rules API. #6040
* [ENHANCEMENT] Compactor: Split cleaner cycle for active and deleted tenants. #6112
* [ENHANCEMENT] Compactor: Introduce cleaner visit marker. #6113
* [BUGFIX] Configsdb: Fix endline issue in db password. #5920
* [BUGFIX] Ingester: Fix `user` and `type` labels for the `cortex_ingester_tsdb_head_samples_appended_total` TSDB metric. #5952
* [BUGFIX] Querier: Enforce max query length check for `/api/v1/series` API even though `ignoreMaxQueryLength` is set to true. #6018
Expand Down
11 changes: 11 additions & 0 deletions docs/blocks-storage/compactor.md
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,17 @@ compactor:
# CLI flag: -compactor.block-visit-marker-file-update-interval
[block_visit_marker_file_update_interval: <duration> | default = 1m]

# How long cleaner visit marker file should be considered as expired and able
# to be picked up by cleaner again. The value should be smaller than
# -compactor.cleanup-interval
# CLI flag: -compactor.cleaner-visit-marker-timeout
[cleaner_visit_marker_timeout: <duration> | default = 10m]

# How frequently cleaner visit marker file should be updated when cleaning
# user.
# CLI flag: -compactor.cleaner-visit-marker-file-update-interval
[cleaner_visit_marker_file_update_interval: <duration> | default = 5m]

# When enabled, index verification will ignore out of order label names.
# CLI flag: -compactor.accept-malformed-index
[accept_malformed_index: <boolean> | default = false]
Expand Down
10 changes: 10 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -2232,6 +2232,16 @@ sharding_ring:
# CLI flag: -compactor.block-visit-marker-file-update-interval
[block_visit_marker_file_update_interval: <duration> | default = 1m]
# How long cleaner visit marker file should be considered as expired and able to
# be picked up by cleaner again. The value should be smaller than
# -compactor.cleanup-interval
# CLI flag: -compactor.cleaner-visit-marker-timeout
[cleaner_visit_marker_timeout: <duration> | default = 10m]
# How frequently cleaner visit marker file should be updated when cleaning user.
# CLI flag: -compactor.cleaner-visit-marker-file-update-interval
[cleaner_visit_marker_file_update_interval: <duration> | default = 5m]
# When enabled, index verification will ignore out of order label names.
# CLI flag: -compactor.accept-malformed-index
[accept_malformed_index: <boolean> | default = false]
Expand Down
50 changes: 45 additions & 5 deletions pkg/compactor/blocks_cleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,14 @@ type BlocksCleaner struct {
bucketClient objstore.InstrumentedBucket
usersScanner *cortex_tsdb.UsersScanner

ringLifecyclerID string

// Keep track of the last owned users.
lastOwnedUsers []string

cleanerVisitMarkerTimeout time.Duration
cleanerVisitMarkerFileUpdateInterval time.Duration

// Metrics.
runsStarted *prometheus.CounterVec
runsCompleted *prometheus.CounterVec
Expand All @@ -76,15 +81,21 @@ func NewBlocksCleaner(
usersScanner *cortex_tsdb.UsersScanner,
cfgProvider ConfigProvider,
logger log.Logger,
ringLifecyclerID string,
reg prometheus.Registerer,
cleanerVisitMarkerTimeout time.Duration,
cleanerVisitMarkerFileUpdateInterval time.Duration,
blocksMarkedForDeletion *prometheus.CounterVec,
) *BlocksCleaner {
c := &BlocksCleaner{
cfg: cfg,
bucketClient: bucketClient,
usersScanner: usersScanner,
cfgProvider: cfgProvider,
logger: log.With(logger, "component", "cleaner"),
cfg: cfg,
bucketClient: bucketClient,
usersScanner: usersScanner,
cfgProvider: cfgProvider,
logger: log.With(logger, "component", "cleaner"),
ringLifecyclerID: ringLifecyclerID,
cleanerVisitMarkerTimeout: cleanerVisitMarkerTimeout,
cleanerVisitMarkerFileUpdateInterval: cleanerVisitMarkerFileUpdateInterval,
runsStarted: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_compactor_block_cleanup_started_total",
Help: "Total number of blocks cleanup runs started.",
Expand Down Expand Up @@ -246,7 +257,15 @@ func (c *BlocksCleaner) cleanUpActiveUsers(ctx context.Context, users []string,
return concurrency.ForEachUser(ctx, users, c.cfg.CleanupConcurrency, func(ctx context.Context, userID string) error {
userLogger := util_log.WithUserID(userID, c.logger)
userBucket := bucket.NewUserBucketClient(userID, c.bucketClient, c.cfgProvider)
visitMarkerManager, isVisited, err := c.obtainVisitMarkerManager(ctx, userLogger, userBucket)
if err != nil {
return err
}
if isVisited {
return nil
}
errChan := make(chan error, 1)
go visitMarkerManager.HeartBeat(ctx, errChan, c.cleanerVisitMarkerFileUpdateInterval, true)
defer func() {
errChan <- nil
}()
Expand All @@ -273,7 +292,15 @@ func (c *BlocksCleaner) cleanDeletedUsers(ctx context.Context, users []string) e
return concurrency.ForEachUser(ctx, users, c.cfg.CleanupConcurrency, func(ctx context.Context, userID string) error {
userLogger := util_log.WithUserID(userID, c.logger)
userBucket := bucket.NewUserBucketClient(userID, c.bucketClient, c.cfgProvider)
visitMarkerManager, isVisited, err := c.obtainVisitMarkerManager(ctx, userLogger, userBucket)
if err != nil {
return err
}
if isVisited {
return nil
}
errChan := make(chan error, 1)
go visitMarkerManager.HeartBeat(ctx, errChan, c.cleanerVisitMarkerFileUpdateInterval, true)
defer func() {
errChan <- nil
}()
Expand Down Expand Up @@ -307,6 +334,19 @@ func (c *BlocksCleaner) scanUsers(ctx context.Context) ([]string, []string, erro
return users, deleted, nil
}

func (c *BlocksCleaner) obtainVisitMarkerManager(ctx context.Context, userLogger log.Logger, userBucket objstore.InstrumentedBucket) (visitMarkerManager *VisitMarkerManager, isVisited bool, err error) {
cleanerVisitMarker := NewCleanerVisitMarker(c.ringLifecyclerID)
visitMarkerManager = NewVisitMarkerManager(userBucket, userLogger, c.ringLifecyclerID, cleanerVisitMarker)

existingCleanerVisitMarker := &CleanerVisitMarker{}
err = visitMarkerManager.ReadVisitMarker(ctx, existingCleanerVisitMarker)
if err != nil && !errors.Is(err, errorVisitMarkerNotFound) {
return nil, false, errors.Wrapf(err, "failed to read cleaner visit marker")
}
isVisited = !errors.Is(err, errorVisitMarkerNotFound) && existingCleanerVisitMarker.IsVisited(c.cleanerVisitMarkerTimeout)
return visitMarkerManager, isVisited, nil
}

// Remove blocks and remaining data for tenant marked for deletion.
func (c *BlocksCleaner) deleteUserMarkedForDeletion(ctx context.Context, userLogger log.Logger, userBucket objstore.InstrumentedBucket, userID string) error {

Expand Down
36 changes: 30 additions & 6 deletions pkg/compactor/blocks_cleaner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func TestBlockCleaner_KeyPermissionDenied(t *testing.T) {
Help: blocksMarkedForDeletionHelp,
}, append(commonLabels, reasonLabelName))

cleaner := NewBlocksCleaner(cfg, mbucket, scanner, cfgProvider, logger, nil, blocksMarkedForDeletion)
cleaner := NewBlocksCleaner(cfg, mbucket, scanner, cfgProvider, logger, "test-cleaner", nil, time.Minute, 30*time.Second, blocksMarkedForDeletion)

// Clean User with no error
cleaner.bucketClient = bkt
Expand Down Expand Up @@ -194,7 +194,7 @@ func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions
Help: blocksMarkedForDeletionHelp,
}, append(commonLabels, reasonLabelName))

cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, reg, blocksMarkedForDeletion)
cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, "test-cleaner", reg, time.Minute, 30*time.Second, blocksMarkedForDeletion)
require.NoError(t, services.StartAndAwaitRunning(ctx, cleaner))
defer services.StopAndAwaitTerminated(ctx, cleaner) //nolint:errcheck

Expand Down Expand Up @@ -355,7 +355,7 @@ func TestBlocksCleaner_ShouldContinueOnBlockDeletionFailure(t *testing.T) {
Help: blocksMarkedForDeletionHelp,
}, append(commonLabels, reasonLabelName))

cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, nil, blocksMarkedForDeletion)
cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, "test-cleaner", nil, time.Minute, 30*time.Second, blocksMarkedForDeletion)
require.NoError(t, services.StartAndAwaitRunning(ctx, cleaner))
defer services.StopAndAwaitTerminated(ctx, cleaner) //nolint:errcheck

Expand Down Expand Up @@ -419,7 +419,7 @@ func TestBlocksCleaner_ShouldRebuildBucketIndexOnCorruptedOne(t *testing.T) {
Help: blocksMarkedForDeletionHelp,
}, append(commonLabels, reasonLabelName))

cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, nil, blocksMarkedForDeletion)
cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, "test-cleaner", nil, time.Minute, 30*time.Second, blocksMarkedForDeletion)
require.NoError(t, services.StartAndAwaitRunning(ctx, cleaner))
defer services.StopAndAwaitTerminated(ctx, cleaner) //nolint:errcheck

Expand Down Expand Up @@ -477,7 +477,7 @@ func TestBlocksCleaner_ShouldRemoveMetricsForTenantsNotBelongingAnymoreToTheShar
Help: blocksMarkedForDeletionHelp,
}, append(commonLabels, reasonLabelName))

cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, reg, blocksMarkedForDeletion)
cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, "test-cleaner", reg, time.Minute, 30*time.Second, blocksMarkedForDeletion)
activeUsers, deleteUsers, err := cleaner.scanUsers(ctx)
require.NoError(t, err)
require.NoError(t, cleaner.cleanUpActiveUsers(ctx, activeUsers, true))
Expand Down Expand Up @@ -618,7 +618,7 @@ func TestBlocksCleaner_ShouldRemoveBlocksOutsideRetentionPeriod(t *testing.T) {
Help: blocksMarkedForDeletionHelp,
}, append(commonLabels, reasonLabelName))

cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, reg, blocksMarkedForDeletion)
cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, "test-cleaner", reg, time.Minute, 30*time.Second, blocksMarkedForDeletion)

assertBlockExists := func(user string, block ulid.ULID, expectExists bool) {
exists, err := bucketClient.Exists(ctx, path.Join(user, block.String(), metadata.MetaFilename))
Expand All @@ -628,6 +628,10 @@ func TestBlocksCleaner_ShouldRemoveBlocksOutsideRetentionPeriod(t *testing.T) {

// Existing behaviour - retention period disabled.
{
// clean up cleaner visit marker before running test
bucketClient.Delete(ctx, path.Join("user-1", GetCleanerVisitMarkerFilePath())) //nolint:errcheck
bucketClient.Delete(ctx, path.Join("user-2", GetCleanerVisitMarkerFilePath())) //nolint:errcheck

cfgProvider.userRetentionPeriods["user-1"] = 0
cfgProvider.userRetentionPeriods["user-2"] = 0

Expand Down Expand Up @@ -662,6 +666,10 @@ func TestBlocksCleaner_ShouldRemoveBlocksOutsideRetentionPeriod(t *testing.T) {

// Retention enabled only for a single user, but does nothing.
{
// clean up cleaner visit marker before running test
bucketClient.Delete(ctx, path.Join("user-1", GetCleanerVisitMarkerFilePath())) //nolint:errcheck
bucketClient.Delete(ctx, path.Join("user-2", GetCleanerVisitMarkerFilePath())) //nolint:errcheck

cfgProvider.userRetentionPeriods["user-1"] = 9 * time.Hour

activeUsers, deleteUsers, err := cleaner.scanUsers(ctx)
Expand All @@ -677,6 +685,10 @@ func TestBlocksCleaner_ShouldRemoveBlocksOutsideRetentionPeriod(t *testing.T) {
// Retention enabled only for a single user, marking a single block.
// Note the block won't be deleted yet due to deletion delay.
{
// clean up cleaner visit marker before running test
bucketClient.Delete(ctx, path.Join("user-1", GetCleanerVisitMarkerFilePath())) //nolint:errcheck
bucketClient.Delete(ctx, path.Join("user-2", GetCleanerVisitMarkerFilePath())) //nolint:errcheck

cfgProvider.userRetentionPeriods["user-1"] = 7 * time.Hour

activeUsers, deleteUsers, err := cleaner.scanUsers(ctx)
Expand Down Expand Up @@ -710,6 +722,10 @@ func TestBlocksCleaner_ShouldRemoveBlocksOutsideRetentionPeriod(t *testing.T) {

// Marking the block again, before the deletion occurs, should not cause an error.
{
// clean up cleaner visit marker before running test
bucketClient.Delete(ctx, path.Join("user-1", GetCleanerVisitMarkerFilePath())) //nolint:errcheck
bucketClient.Delete(ctx, path.Join("user-2", GetCleanerVisitMarkerFilePath())) //nolint:errcheck

activeUsers, deleteUsers, err := cleaner.scanUsers(ctx)
require.NoError(t, err)
require.NoError(t, cleaner.cleanUpActiveUsers(ctx, activeUsers, false))
Expand All @@ -722,6 +738,10 @@ func TestBlocksCleaner_ShouldRemoveBlocksOutsideRetentionPeriod(t *testing.T) {

// Reduce the deletion delay. Now the block will be deleted.
{
// clean up cleaner visit marker before running test
bucketClient.Delete(ctx, path.Join("user-1", GetCleanerVisitMarkerFilePath())) //nolint:errcheck
bucketClient.Delete(ctx, path.Join("user-2", GetCleanerVisitMarkerFilePath())) //nolint:errcheck

cleaner.cfg.DeletionDelay = 0

activeUsers, deleteUsers, err := cleaner.scanUsers(ctx)
Expand Down Expand Up @@ -755,6 +775,10 @@ func TestBlocksCleaner_ShouldRemoveBlocksOutsideRetentionPeriod(t *testing.T) {

// Retention enabled for other user; test deleting multiple blocks.
{
// clean up cleaner visit marker before running test
bucketClient.Delete(ctx, path.Join("user-1", GetCleanerVisitMarkerFilePath())) //nolint:errcheck
bucketClient.Delete(ctx, path.Join("user-2", GetCleanerVisitMarkerFilePath())) //nolint:errcheck

cfgProvider.userRetentionPeriods["user-2"] = 5 * time.Hour

activeUsers, deleteUsers, err := cleaner.scanUsers(ctx)
Expand Down
66 changes: 66 additions & 0 deletions pkg/compactor/cleaner_visit_marker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package compactor

import (
"fmt"
"path"
"time"

"github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex"
)

const (
// CleanerVisitMarkerName is the name of cleaner visit marker file.
CleanerVisitMarkerName = "cleaner-visit-marker.json"
// CleanerVisitMarkerVersion1 is the current supported version of cleaner visit mark file.
CleanerVisitMarkerVersion1 = 1
)

type CleanerVisitMarker struct {
CompactorID string `json:"compactorID"`
Status VisitStatus `json:"status"`
// VisitTime is a unix timestamp of when the partition was visited (mark updated).
VisitTime int64 `json:"visitTime"`
// Version of the file.
Version int `json:"version"`
}

func NewCleanerVisitMarker(compactorID string) *CleanerVisitMarker {
return &CleanerVisitMarker{
CompactorID: compactorID,
Version: CleanerVisitMarkerVersion1,
}
}

func (b *CleanerVisitMarker) IsExpired(cleanerVisitMarkerTimeout time.Duration) bool {
return !time.Now().Before(time.Unix(b.VisitTime, 0).Add(cleanerVisitMarkerTimeout))
}

func (b *CleanerVisitMarker) IsVisited(cleanerVisitMarkerTimeout time.Duration) bool {
return !(b.GetStatus() == Completed) && !(b.GetStatus() == Failed) && !b.IsExpired(cleanerVisitMarkerTimeout)
}

func (b *CleanerVisitMarker) GetStatus() VisitStatus {
return b.Status
}

func (b *CleanerVisitMarker) GetVisitMarkerFilePath() string {
return GetCleanerVisitMarkerFilePath()
}

func (b *CleanerVisitMarker) UpdateStatus(ownerIdentifier string, status VisitStatus) {
b.CompactorID = ownerIdentifier
b.Status = status
b.VisitTime = time.Now().Unix()
}

func (b *CleanerVisitMarker) String() string {
return fmt.Sprintf("compactor_id=%s status=%s visit_time=%s",
b.CompactorID,
b.Status,
time.Unix(b.VisitTime, 0).String(),
)
}

func GetCleanerVisitMarkerFilePath() string {
return path.Join(bucketindex.MarkersPathname, CleanerVisitMarkerName)
}
29 changes: 20 additions & 9 deletions pkg/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,10 @@ type Config struct {
BlockVisitMarkerTimeout time.Duration `yaml:"block_visit_marker_timeout"`
BlockVisitMarkerFileUpdateInterval time.Duration `yaml:"block_visit_marker_file_update_interval"`

// Cleaner visit marker file config
CleanerVisitMarkerTimeout time.Duration `yaml:"cleaner_visit_marker_timeout"`
CleanerVisitMarkerFileUpdateInterval time.Duration `yaml:"cleaner_visit_marker_file_update_interval"`

AcceptMalformedIndex bool `yaml:"accept_malformed_index"`
CachingBucketEnabled bool `yaml:"caching_bucket_enabled"`
}
Expand Down Expand Up @@ -255,6 +259,9 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.DurationVar(&cfg.BlockVisitMarkerTimeout, "compactor.block-visit-marker-timeout", 5*time.Minute, "How long block visit marker file should be considered as expired and able to be picked up by compactor again.")
f.DurationVar(&cfg.BlockVisitMarkerFileUpdateInterval, "compactor.block-visit-marker-file-update-interval", 1*time.Minute, "How frequently block visit marker file should be updated duration compaction.")

f.DurationVar(&cfg.CleanerVisitMarkerTimeout, "compactor.cleaner-visit-marker-timeout", 10*time.Minute, "How long cleaner visit marker file should be considered as expired and able to be picked up by cleaner again. The value should be smaller than -compactor.cleanup-interval")
f.DurationVar(&cfg.CleanerVisitMarkerFileUpdateInterval, "compactor.cleaner-visit-marker-file-update-interval", 5*time.Minute, "How frequently cleaner visit marker file should be updated when cleaning user.")

f.BoolVar(&cfg.AcceptMalformedIndex, "compactor.accept-malformed-index", false, "When enabled, index verification will ignore out of order label names.")
f.BoolVar(&cfg.CachingBucketEnabled, "compactor.caching-bucket-enabled", false, "When enabled, caching bucket will be used for compactor, except cleaner service, which serves as the source of truth for block status")
}
Expand Down Expand Up @@ -522,15 +529,7 @@ func (c *Compactor) starting(ctx context.Context) error {
// Create the users scanner.
c.usersScanner = cortex_tsdb.NewUsersScanner(c.bucketClient, c.ownUserForCleanUp, c.parentLogger)

// Create the blocks cleaner (service).
c.blocksCleaner = NewBlocksCleaner(BlocksCleanerConfig{
DeletionDelay: c.compactorCfg.DeletionDelay,
CleanupInterval: util.DurationWithJitter(c.compactorCfg.CleanupInterval, 0.1),
CleanupConcurrency: c.compactorCfg.CleanupConcurrency,
BlockDeletionMarksMigrationEnabled: c.compactorCfg.BlockDeletionMarksMigrationEnabled,
TenantCleanupDelay: c.compactorCfg.TenantCleanupDelay,
}, c.bucketClient, c.usersScanner, c.limits, c.parentLogger, c.registerer, c.compactorMetrics.syncerBlocksMarkedForDeletion)

var cleanerRingLifecyclerID = "default-cleaner"
// Initialize the compactors ring if sharding is enabled.
if c.compactorCfg.ShardingEnabled {
lifecyclerCfg := c.compactorCfg.ShardingRing.ToLifecyclerConfig()
Expand All @@ -539,6 +538,8 @@ func (c *Compactor) starting(ctx context.Context) error {
return errors.Wrap(err, "unable to initialize compactor ring lifecycler")
}

cleanerRingLifecyclerID = c.ringLifecycler.ID

c.ring, err = ring.New(lifecyclerCfg.RingConfig, "compactor", ringKey, c.logger, prometheus.WrapRegistererWithPrefix("cortex_", c.registerer))
if err != nil {
return errors.Wrap(err, "unable to initialize compactor ring")
Expand Down Expand Up @@ -588,6 +589,16 @@ func (c *Compactor) starting(ctx context.Context) error {
}
}

// Create the blocks cleaner (service).
c.blocksCleaner = NewBlocksCleaner(BlocksCleanerConfig{
DeletionDelay: c.compactorCfg.DeletionDelay,
CleanupInterval: util.DurationWithJitter(c.compactorCfg.CleanupInterval, 0.1),
CleanupConcurrency: c.compactorCfg.CleanupConcurrency,
BlockDeletionMarksMigrationEnabled: c.compactorCfg.BlockDeletionMarksMigrationEnabled,
TenantCleanupDelay: c.compactorCfg.TenantCleanupDelay,
}, c.bucketClient, c.usersScanner, c.limits, c.parentLogger, cleanerRingLifecyclerID, c.registerer, c.compactorCfg.CleanerVisitMarkerTimeout, c.compactorCfg.CleanerVisitMarkerFileUpdateInterval,
c.compactorMetrics.syncerBlocksMarkedForDeletion)

// Ensure an initial cleanup occurred before starting the compactor.
if err := services.StartAndAwaitRunning(ctx, c.blocksCleaner); err != nil {
c.ringSubservices.StopAsync()
Expand Down
Loading

0 comments on commit 4539330

Please sign in to comment.