Skip to content

Commit f9fc11c

Browse files
authored
Split compactor cleaner metrics (#6827)
* Split compactor cleaner metrics Signed-off-by: Daniel Deluiggi <[email protected]> * CHANGELOG Signed-off-by: Daniel Deluiggi <[email protected]> * fix lint Signed-off-by: Daniel Deluiggi <[email protected]> * update doc Signed-off-by: Daniel Deluiggi <[email protected]> * Address comments Signed-off-by: Daniel Deluiggi <[email protected]> * update doc Signed-off-by: Daniel Deluiggi <[email protected]> --------- Signed-off-by: Daniel Deluiggi <[email protected]>
1 parent e551a2e commit f9fc11c

File tree

7 files changed

+270
-60
lines changed

7 files changed

+270
-60
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
* [ENHANCEMENT] Query Frontend: Enhance the performance of the JSON codec. #6816
4949
* [ENHANCEMENT] Metadata Cache: Support inmemory and multi level cache backend. #6829
5050
* [ENHANCEMENT] Store Gateway: Allow to ignore syncing blocks older than certain time using `ignore_blocks_before`. #6830
51+
* [ENHANCEMENT] Compactor: Emit partition metrics separate from cleaner job. #6827
5152
* [BUGFIX] Ingester: Avoid error or early throttling when READONLY ingesters are present in the ring #6517
5253
* [BUGFIX] Ingester: Fix labelset data race condition. #6573
5354
* [BUGFIX] Compactor: Cleaner should not put deletion marker for blocks with no-compact marker. #6576

docs/blocks-storage/querier.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1378,6 +1378,11 @@ blocks_storage:
13781378
# CLI flag: -blocks-storage.bucket-store.metadata-cache.bucket-index-max-size-bytes
13791379
[bucket_index_max_size_bytes: <int> | default = 1048576]
13801380

1381+
# How long to cache list of partitioned groups for an user. 0 disables
1382+
# caching
1383+
# CLI flag: -blocks-storage.bucket-store.metadata-cache.partitioned-groups-list-ttl
1384+
[partitioned_groups_list_ttl: <duration> | default = 0s]
1385+
13811386
# Maximum number of entries in the regex matchers cache. 0 to disable.
13821387
# CLI flag: -blocks-storage.bucket-store.matchers-cache-max-items
13831388
[matchers_cache_max_items: <int> | default = 0]

docs/blocks-storage/store-gateway.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1499,6 +1499,11 @@ blocks_storage:
14991499
# CLI flag: -blocks-storage.bucket-store.metadata-cache.bucket-index-max-size-bytes
15001500
[bucket_index_max_size_bytes: <int> | default = 1048576]
15011501

1502+
# How long to cache list of partitioned groups for an user. 0 disables
1503+
# caching
1504+
# CLI flag: -blocks-storage.bucket-store.metadata-cache.partitioned-groups-list-ttl
1505+
[partitioned_groups_list_ttl: <duration> | default = 0s]
1506+
15021507
# Maximum number of entries in the regex matchers cache. 0 to disable.
15031508
# CLI flag: -blocks-storage.bucket-store.matchers-cache-max-items
15041509
[matchers_cache_max_items: <int> | default = 0]

docs/configuration/config-file-reference.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1969,6 +1969,11 @@ bucket_store:
19691969
# CLI flag: -blocks-storage.bucket-store.metadata-cache.bucket-index-max-size-bytes
19701970
[bucket_index_max_size_bytes: <int> | default = 1048576]
19711971

1972+
# How long to cache list of partitioned groups for an user. 0 disables
1973+
# caching
1974+
# CLI flag: -blocks-storage.bucket-store.metadata-cache.partitioned-groups-list-ttl
1975+
[partitioned_groups_list_ttl: <duration> | default = 0s]
1976+
19721977
# Maximum number of entries in the regex matchers cache. 0 to disable.
19731978
# CLI flag: -blocks-storage.bucket-store.matchers-cache-max-items
19741979
[matchers_cache_max_items: <int> | default = 0]

pkg/compactor/blocks_cleaner.go

Lines changed: 99 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,15 @@ func (c *BlocksCleaner) loop(ctx context.Context) error {
243243
go func() {
244244
c.runDeleteUserCleanup(ctx, deleteChan)
245245
}()
246+
var metricsChan chan *cleanerJob
247+
if c.cfg.ShardingStrategy == util.ShardingStrategyShuffle &&
248+
c.cfg.CompactionStrategy == util.CompactionStrategyPartitioning {
249+
metricsChan = make(chan *cleanerJob)
250+
defer close(metricsChan)
251+
go func() {
252+
c.runEmitPartitionMetricsWorker(ctx, metricsChan)
253+
}()
254+
}
246255

247256
for {
248257
select {
@@ -276,6 +285,17 @@ func (c *BlocksCleaner) loop(ctx context.Context) error {
276285
c.enqueueJobFailed.WithLabelValues(deletedStatus).Inc()
277286
}
278287

288+
if metricsChan != nil {
289+
select {
290+
case metricsChan <- &cleanerJob{
291+
users: activeUsers,
292+
timestamp: cleanJobTimestamp,
293+
}:
294+
default:
295+
level.Warn(c.logger).Log("msg", "unable to push metrics job to metricsChan")
296+
}
297+
}
298+
279299
case <-ctx.Done():
280300
return nil
281301
}
@@ -295,10 +315,25 @@ func (c *BlocksCleaner) checkRunError(runType string, err error) {
295315
}
296316
}
297317

298-
func (c *BlocksCleaner) runActiveUserCleanup(ctx context.Context, jobChan chan *cleanerJob) {
318+
func (c *BlocksCleaner) runEmitPartitionMetricsWorker(ctx context.Context, jobChan <-chan *cleanerJob) {
319+
for job := range jobChan {
320+
err := concurrency.ForEachUser(ctx, job.users, c.cfg.CleanupConcurrency, func(ctx context.Context, userID string) error {
321+
userLogger := util_log.WithUserID(userID, c.logger)
322+
userBucket := bucket.NewUserBucketClient(userID, c.bucketClient, c.cfgProvider)
323+
c.emitUserParititionMetrics(ctx, userLogger, userBucket, userID)
324+
return nil
325+
})
326+
327+
if err != nil {
328+
level.Error(c.logger).Log("msg", "emit metrics failed", "err", err.Error())
329+
}
330+
}
331+
}
332+
333+
func (c *BlocksCleaner) runActiveUserCleanup(ctx context.Context, jobChan <-chan *cleanerJob) {
299334
for job := range jobChan {
300335
if job.timestamp < time.Now().Add(-c.cfg.CleanupInterval).Unix() {
301-
level.Warn(c.logger).Log("Active user cleaner job too old. Ignoring to get recent data")
336+
level.Warn(c.logger).Log("msg", "Active user cleaner job too old. Ignoring to get recent data")
302337
continue
303338
}
304339
err := c.cleanUpActiveUsers(ctx, job.users, false)
@@ -746,59 +781,14 @@ func (c *BlocksCleaner) updateBucketMetrics(userID string, parquetEnabled bool,
746781
}
747782

748783
func (c *BlocksCleaner) cleanPartitionedGroupInfo(ctx context.Context, userBucket objstore.InstrumentedBucket, userLogger log.Logger, userID string) {
749-
existentPartitionedGroupInfo := make(map[*PartitionedGroupInfo]struct {
750-
path string
751-
status PartitionedGroupStatus
752-
})
753-
err := userBucket.Iter(ctx, PartitionedGroupDirectory, func(file string) error {
754-
if strings.Contains(file, PartitionVisitMarkerDirectory) {
755-
return nil
756-
}
757-
partitionedGroupInfo, err := ReadPartitionedGroupInfoFile(ctx, userBucket, userLogger, file)
758-
if err != nil {
759-
level.Warn(userLogger).Log("msg", "failed to read partitioned group info", "partitioned_group_info", file)
760-
return nil
761-
}
762-
763-
status := partitionedGroupInfo.getPartitionedGroupStatus(ctx, userBucket, c.compactionVisitMarkerTimeout, userLogger)
764-
level.Debug(userLogger).Log("msg", "got partitioned group status", "partitioned_group_status", status.String())
765-
existentPartitionedGroupInfo[partitionedGroupInfo] = struct {
766-
path string
767-
status PartitionedGroupStatus
768-
}{
769-
path: file,
770-
status: status,
771-
}
772-
return nil
773-
})
774-
784+
existentPartitionedGroupInfo, err := c.iterPartitionGroups(ctx, userBucket, userLogger)
775785
if err != nil {
776786
level.Warn(userLogger).Log("msg", "error return when going through partitioned group directory", "err", err)
777787
}
778788

779-
remainingCompactions := 0
780-
inProgressCompactions := 0
781-
var oldestPartitionGroup *PartitionedGroupInfo
782-
defer func() {
783-
c.remainingPlannedCompactions.WithLabelValues(userID).Set(float64(remainingCompactions))
784-
c.inProgressCompactions.WithLabelValues(userID).Set(float64(inProgressCompactions))
785-
if c.oldestPartitionGroupOffset != nil {
786-
if oldestPartitionGroup != nil {
787-
c.oldestPartitionGroupOffset.WithLabelValues(userID).Set(float64(time.Now().Unix() - oldestPartitionGroup.CreationTime))
788-
level.Debug(userLogger).Log("msg", "partition group info with oldest creation time", "partitioned_group_id", oldestPartitionGroup.PartitionedGroupID, "creation_time", oldestPartitionGroup.CreationTime)
789-
} else {
790-
c.oldestPartitionGroupOffset.WithLabelValues(userID).Set(0)
791-
}
792-
}
793-
}()
794789
for partitionedGroupInfo, extraInfo := range existentPartitionedGroupInfo {
795790
partitionedGroupInfoFile := extraInfo.path
796791

797-
remainingCompactions += extraInfo.status.PendingPartitions
798-
inProgressCompactions += extraInfo.status.InProgressPartitions
799-
if oldestPartitionGroup == nil || partitionedGroupInfo.CreationTime < oldestPartitionGroup.CreationTime {
800-
oldestPartitionGroup = partitionedGroupInfo
801-
}
802792
if extraInfo.status.CanDelete {
803793
if extraInfo.status.IsCompleted {
804794
// Try to remove all blocks included in partitioned group info
@@ -829,6 +819,67 @@ func (c *BlocksCleaner) cleanPartitionedGroupInfo(ctx context.Context, userBucke
829819
}
830820
}
831821

822+
func (c *BlocksCleaner) emitUserParititionMetrics(ctx context.Context, userLogger log.Logger, userBucket objstore.InstrumentedBucket, userID string) {
823+
existentPartitionedGroupInfo, err := c.iterPartitionGroups(ctx, userBucket, userLogger)
824+
if err != nil {
825+
level.Warn(userLogger).Log("msg", "error listing partitioned group directory to emit metrics", "err", err)
826+
return
827+
}
828+
829+
remainingCompactions := 0
830+
inProgressCompactions := 0
831+
var oldestPartitionGroup *PartitionedGroupInfo
832+
defer func() {
833+
c.remainingPlannedCompactions.WithLabelValues(userID).Set(float64(remainingCompactions))
834+
c.inProgressCompactions.WithLabelValues(userID).Set(float64(inProgressCompactions))
835+
if oldestPartitionGroup != nil {
836+
c.oldestPartitionGroupOffset.WithLabelValues(userID).Set(float64(time.Now().Unix() - oldestPartitionGroup.CreationTime))
837+
level.Debug(userLogger).Log("msg", "partition group info with oldest creation time", "partitioned_group_id", oldestPartitionGroup.PartitionedGroupID, "creation_time", oldestPartitionGroup.CreationTime)
838+
} else {
839+
c.oldestPartitionGroupOffset.WithLabelValues(userID).Set(0)
840+
}
841+
}()
842+
for partitionedGroupInfo, extraInfo := range existentPartitionedGroupInfo {
843+
remainingCompactions += extraInfo.status.PendingPartitions
844+
inProgressCompactions += extraInfo.status.InProgressPartitions
845+
if oldestPartitionGroup == nil || partitionedGroupInfo.CreationTime < oldestPartitionGroup.CreationTime {
846+
oldestPartitionGroup = partitionedGroupInfo
847+
}
848+
}
849+
}
850+
851+
func (c *BlocksCleaner) iterPartitionGroups(ctx context.Context, userBucket objstore.InstrumentedBucket, userLogger log.Logger) (map[*PartitionedGroupInfo]struct {
852+
path string
853+
status PartitionedGroupStatus
854+
}, error) {
855+
existentPartitionedGroupInfo := make(map[*PartitionedGroupInfo]struct {
856+
path string
857+
status PartitionedGroupStatus
858+
})
859+
err := userBucket.Iter(ctx, PartitionedGroupDirectory, func(file string) error {
860+
if strings.Contains(file, PartitionVisitMarkerDirectory) {
861+
return nil
862+
}
863+
partitionedGroupInfo, err := ReadPartitionedGroupInfoFile(ctx, userBucket, userLogger, file)
864+
if err != nil {
865+
level.Warn(userLogger).Log("msg", "failed to read partitioned group info", "partitioned_group_info", file)
866+
return nil
867+
}
868+
869+
status := partitionedGroupInfo.getPartitionedGroupStatus(ctx, userBucket, c.compactionVisitMarkerTimeout, userLogger)
870+
level.Debug(userLogger).Log("msg", "got partitioned group status", "partitioned_group_status", status.String())
871+
existentPartitionedGroupInfo[partitionedGroupInfo] = struct {
872+
path string
873+
status PartitionedGroupStatus
874+
}{
875+
path: file,
876+
status: status,
877+
}
878+
return nil
879+
})
880+
return existentPartitionedGroupInfo, err
881+
}
882+
832883
// cleanUserPartialBlocks delete partial blocks which are safe to be deleted. The provided partials map
833884
// and index are updated accordingly.
834885
func (c *BlocksCleaner) cleanUserPartialBlocks(ctx context.Context, userID string, partials map[ulid.ULID]error, idx *bucketindex.Index, userBucket objstore.InstrumentedBucket, userLogger log.Logger) {

pkg/compactor/blocks_cleaner_test.go

Lines changed: 124 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -969,7 +969,6 @@ func TestBlocksCleaner_CleanPartitionedGroupInfo(t *testing.T) {
969969
block2DeletionMarkerExists, err := userBucket.Exists(ctx, path.Join(block2.String(), metadata.DeletionMarkFilename))
970970
require.NoError(t, err)
971971
require.False(t, block2DeletionMarkerExists)
972-
973972
}
974973

975974
func TestBlocksCleaner_DeleteEmptyBucketIndex(t *testing.T) {
@@ -1127,6 +1126,130 @@ func TestBlocksCleaner_ParquetMetrics(t *testing.T) {
11271126
`)))
11281127
}
11291128

1129+
func TestBlocksCleaner_EmitUserMetrics(t *testing.T) {
1130+
bucketClient, _ := cortex_testutil.PrepareFilesystemBucket(t)
1131+
bucketClient = bucketindex.BucketWithGlobalMarkers(bucketClient)
1132+
1133+
cfg := BlocksCleanerConfig{
1134+
DeletionDelay: time.Hour,
1135+
CleanupInterval: time.Minute,
1136+
CleanupConcurrency: 1,
1137+
ShardingStrategy: util.ShardingStrategyShuffle,
1138+
CompactionStrategy: util.CompactionStrategyPartitioning,
1139+
}
1140+
1141+
ctx := context.Background()
1142+
logger := log.NewNopLogger()
1143+
registry := prometheus.NewPedanticRegistry()
1144+
scanner, err := users.NewScanner(tsdb.UsersScannerConfig{
1145+
Strategy: tsdb.UserScanStrategyList,
1146+
}, bucketClient, logger, registry)
1147+
require.NoError(t, err)
1148+
cfgProvider := newMockConfigProvider()
1149+
dummyCounterVec := prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"test"})
1150+
remainingPlannedCompactions := promauto.With(registry).NewGaugeVec(prometheus.GaugeOpts{
1151+
Name: "cortex_compactor_remaining_planned_compactions",
1152+
Help: "Total number of plans that remain to be compacted. Only available with shuffle-sharding strategy",
1153+
}, commonLabels)
1154+
1155+
cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, 15*time.Minute, cfgProvider, logger, "test-cleaner", registry, time.Minute, 30*time.Second, dummyCounterVec, remainingPlannedCompactions)
1156+
1157+
ts := func(hours int) int64 {
1158+
return time.Now().Add(time.Duration(hours)*time.Hour).Unix() * 1000
1159+
}
1160+
1161+
userID := "user-1"
1162+
partitionedGroupID := uint32(123)
1163+
partitionCount := 5
1164+
startTime := ts(-10)
1165+
endTime := ts(-8)
1166+
userBucket := bucket.NewUserBucketClient(userID, bucketClient, cfgProvider)
1167+
partitionedGroupInfo := PartitionedGroupInfo{
1168+
PartitionedGroupID: partitionedGroupID,
1169+
PartitionCount: partitionCount,
1170+
Partitions: []Partition{
1171+
{
1172+
PartitionID: 0,
1173+
},
1174+
{
1175+
PartitionID: 1,
1176+
},
1177+
{
1178+
PartitionID: 2,
1179+
},
1180+
{
1181+
PartitionID: 3,
1182+
},
1183+
{
1184+
PartitionID: 4,
1185+
},
1186+
},
1187+
RangeStart: startTime,
1188+
RangeEnd: endTime,
1189+
CreationTime: time.Now().Add(-1 * time.Hour).Unix(),
1190+
Version: PartitionedGroupInfoVersion1,
1191+
}
1192+
_, err = UpdatePartitionedGroupInfo(ctx, userBucket, logger, partitionedGroupInfo)
1193+
require.NoError(t, err)
1194+
1195+
//InProgress with valid VisitTime
1196+
v0 := &partitionVisitMarker{
1197+
PartitionedGroupID: partitionedGroupID,
1198+
PartitionID: 0,
1199+
Status: InProgress,
1200+
VisitTime: time.Now().Add(-2 * time.Minute).Unix(),
1201+
}
1202+
v0Manager := NewVisitMarkerManager(userBucket, logger, "dummy-cleaner", v0)
1203+
err = v0Manager.updateVisitMarker(ctx)
1204+
require.NoError(t, err)
1205+
1206+
//InProgress with expired VisitTime
1207+
v1 := &partitionVisitMarker{
1208+
PartitionedGroupID: partitionedGroupID,
1209+
PartitionID: 1,
1210+
Status: InProgress,
1211+
VisitTime: time.Now().Add(-30 * time.Minute).Unix(),
1212+
}
1213+
v1Manager := NewVisitMarkerManager(userBucket, logger, "dummy-cleaner", v1)
1214+
err = v1Manager.updateVisitMarker(ctx)
1215+
require.NoError(t, err)
1216+
1217+
//V2 and V3 are pending
1218+
//V4 is completed
1219+
v4 := &partitionVisitMarker{
1220+
PartitionedGroupID: partitionedGroupID,
1221+
PartitionID: 4,
1222+
Status: Completed,
1223+
VisitTime: time.Now().Add(-20 * time.Minute).Unix(),
1224+
}
1225+
v4Manager := NewVisitMarkerManager(userBucket, logger, "dummy-cleaner", v4)
1226+
err = v4Manager.updateVisitMarker(ctx)
1227+
require.NoError(t, err)
1228+
1229+
cleaner.emitUserParititionMetrics(ctx, logger, userBucket, userID)
1230+
1231+
metricNames := []string{
1232+
"cortex_compactor_remaining_planned_compactions",
1233+
"cortex_compactor_in_progress_compactions",
1234+
"cortex_compactor_oldest_partition_offset",
1235+
}
1236+
1237+
// Check tracked Prometheus metrics
1238+
expectedMetrics := `
1239+
# HELP cortex_compactor_in_progress_compactions Total number of in progress compactions. Only available with shuffle-sharding strategy and partitioning compaction strategy
1240+
# TYPE cortex_compactor_in_progress_compactions gauge
1241+
cortex_compactor_in_progress_compactions{user="user-1"} 1
1242+
# HELP cortex_compactor_oldest_partition_offset Time in seconds between now and the oldest created partition group not completed. Only available with shuffle-sharding strategy and partitioning compaction strategy
1243+
# TYPE cortex_compactor_oldest_partition_offset gauge
1244+
cortex_compactor_oldest_partition_offset{user="user-1"} 3600
1245+
# HELP cortex_compactor_remaining_planned_compactions Total number of plans that remain to be compacted. Only available with shuffle-sharding strategy
1246+
# TYPE cortex_compactor_remaining_planned_compactions gauge
1247+
cortex_compactor_remaining_planned_compactions{user="user-1"} 3
1248+
`
1249+
1250+
assert.NoError(t, prom_testutil.GatherAndCompare(registry, strings.NewReader(expectedMetrics), metricNames...))
1251+
}
1252+
11301253
type mockConfigProvider struct {
11311254
userRetentionPeriods map[string]time.Duration
11321255
parquetConverterEnabled map[string]bool

0 commit comments

Comments
 (0)