From 312e2734f84a5fb03a89a21536b50104fe48ad69 Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Wed, 6 Nov 2024 14:49:03 -0800 Subject: [PATCH] exposing some tsdb shipper fns+types Signed-off-by: Owen Diehl --- .../indexshipper/tsdb/compactor_test.go | 24 +++++++++---------- .../shipper/indexshipper/tsdb/identifier.go | 10 ++++---- .../tsdb/index_shipper_querier.go | 4 ++-- .../shipper/indexshipper/tsdb/manager.go | 22 ++++++++--------- 4 files changed, 30 insertions(+), 30 deletions(-) diff --git a/pkg/storage/stores/shipper/indexshipper/tsdb/compactor_test.go b/pkg/storage/stores/shipper/indexshipper/tsdb/compactor_test.go index 23a951deacbd..be0a343309c5 100644 --- a/pkg/storage/stores/shipper/indexshipper/tsdb/compactor_test.go +++ b/pkg/storage/stores/shipper/indexshipper/tsdb/compactor_test.go @@ -134,8 +134,8 @@ func setupMultiTenantIndex(t *testing.T, indexFormat int, userStreams map[string dst := NewPrefixedIdentifier( MultitenantTSDBIdentifier{ - nodeName: "test", - ts: ts, + NodeName: "test", + Ts: ts, }, destDir, "", @@ -239,7 +239,7 @@ func TestCompactor_Compact(t *testing.T) { PeriodicTableConfig: config.PeriodicTableConfig{Period: config.ObjectStorageIndexRequiredPeriod}}, Schema: "v12", } - indexBkts := indexBuckets(now, now, []config.TableRange{periodConfig.GetIndexTableNumberRange(config.DayTime{Time: now})}) + indexBkts := IndexBuckets(now, now, []config.TableRange{periodConfig.GetIndexTableNumberRange(config.DayTime{Time: now})}) tableName := indexBkts[0] lbls1 := mustParseLabels(`{foo="bar", a="b"}`) @@ -497,8 +497,8 @@ func TestCompactor_Compact(t *testing.T) { t.Run(name, func(t *testing.T) { tempDir := t.TempDir() objectStoragePath := filepath.Join(tempDir, objectsStorageDirName) - tablePathInStorage := filepath.Join(objectStoragePath, tableName.prefix) - tableWorkingDirectory := filepath.Join(tempDir, workingDirName, tableName.prefix) + tablePathInStorage := filepath.Join(objectStoragePath, tableName.Prefix) + tableWorkingDirectory := filepath.Join(tempDir, workingDirName, tableName.Prefix) require.NoError(t, util.EnsureDirectory(objectStoragePath)) require.NoError(t, util.EnsureDirectory(tablePathInStorage)) @@ -551,7 +551,7 @@ func TestCompactor_Compact(t *testing.T) { objectClient, err := local.NewFSObjectClient(local.FSConfig{Directory: objectStoragePath}) require.NoError(t, err) - _, commonPrefixes, err := objectClient.List(context.Background(), tableName.prefix, "/") + _, commonPrefixes, err := objectClient.List(context.Background(), tableName.Prefix, "/") require.NoError(t, err) initializedIndexSets := map[string]compactor.IndexSet{} @@ -559,19 +559,19 @@ func TestCompactor_Compact(t *testing.T) { existingUserIndexSets := make(map[string]compactor.IndexSet, len(commonPrefixes)) for _, commonPrefix := range commonPrefixes { userID := path.Base(string(commonPrefix)) - idxSet, err := newMockIndexSet(userID, tableName.prefix, filepath.Join(tableWorkingDirectory, userID), objectClient) + idxSet, err := newMockIndexSet(userID, tableName.Prefix, filepath.Join(tableWorkingDirectory, userID), objectClient) require.NoError(t, err) existingUserIndexSets[userID] = idxSet initializedIndexSets[userID] = idxSet } - commonIndexSet, err := newMockIndexSet("", tableName.prefix, tableWorkingDirectory, objectClient) + commonIndexSet, err := newMockIndexSet("", tableName.Prefix, tableWorkingDirectory, objectClient) require.NoError(t, err) // build TableCompactor and compact the index tCompactor := newTableCompactor(context.Background(), commonIndexSet, existingUserIndexSets, func(userID string) (compactor.IndexSet, error) { - idxSet, err := newMockIndexSet(userID, tableName.prefix, filepath.Join(tableWorkingDirectory, userID), objectClient) + idxSet, err := newMockIndexSet(userID, tableName.Prefix, filepath.Join(tableWorkingDirectory, userID), objectClient) require.NoError(t, err) initializedIndexSetsMtx.Lock() @@ -875,9 +875,9 @@ func setupCompactedIndex(t *testing.T) *testContext { schemaCfg := config.SchemaConfig{ Configs: []config.PeriodConfig{periodConfig}, } - indexBuckets := indexBuckets(now, now, []config.TableRange{periodConfig.GetIndexTableNumberRange(config.DayTime{Time: now})}) + indexBuckets := IndexBuckets(now, now, []config.TableRange{periodConfig.GetIndexTableNumberRange(config.DayTime{Time: now})}) tableName := indexBuckets[0] - tableInterval := retention.ExtractIntervalFromTableName(tableName.prefix) + tableInterval := retention.ExtractIntervalFromTableName(tableName.Prefix) // shiftTableStart shift tableInterval.Start by the given amount of milliseconds. // It is used for building chunkmetas relative to start time of the table. shiftTableStart := func(ms int64) int64 { @@ -900,7 +900,7 @@ func setupCompactedIndex(t *testing.T) *testContext { builder.FinalizeChunks() - return newCompactedIndex(context.Background(), tableName.prefix, buildUserID(0), t.TempDir(), periodConfig, builder) + return newCompactedIndex(context.Background(), tableName.Prefix, buildUserID(0), t.TempDir(), periodConfig, builder) } expectedChunkEntries := map[string][]retention.ChunkEntry{ diff --git a/pkg/storage/stores/shipper/indexshipper/tsdb/identifier.go b/pkg/storage/stores/shipper/indexshipper/tsdb/identifier.go index 149d41bfa944..eab26fe643d5 100644 --- a/pkg/storage/stores/shipper/indexshipper/tsdb/identifier.go +++ b/pkg/storage/stores/shipper/indexshipper/tsdb/identifier.go @@ -161,13 +161,13 @@ func ParseSingleTenantTSDBPath(p string) (id SingleTenantTSDBIdentifier, ok bool } type MultitenantTSDBIdentifier struct { - nodeName string - ts time.Time + NodeName string + Ts time.Time } // Name builds filename with format + `-` + ` func (id MultitenantTSDBIdentifier) Name() string { - return fmt.Sprintf("%d-%s.tsdb", id.ts.Unix(), id.nodeName) + return fmt.Sprintf("%d-%s.tsdb", id.Ts.Unix(), id.NodeName) } func (id MultitenantTSDBIdentifier) Path() string { @@ -200,7 +200,7 @@ func parseMultitenantTSDBNameFromBase(name string) (res MultitenantTSDBIdentifie } return MultitenantTSDBIdentifier{ - ts: time.Unix(int64(ts), 0), - nodeName: strings.Join(xs[1:], "-"), + Ts: time.Unix(int64(ts), 0), + NodeName: strings.Join(xs[1:], "-"), }, true } diff --git a/pkg/storage/stores/shipper/indexshipper/tsdb/index_shipper_querier.go b/pkg/storage/stores/shipper/indexshipper/tsdb/index_shipper_querier.go index b0d1824936d5..6ca252770169 100644 --- a/pkg/storage/stores/shipper/indexshipper/tsdb/index_shipper_querier.go +++ b/pkg/storage/stores/shipper/indexshipper/tsdb/index_shipper_querier.go @@ -40,9 +40,9 @@ func (i indexIterFunc) For(_ context.Context, _ int, f func(context.Context, Ind func (i *indexShipperQuerier) indices(ctx context.Context, from, through model.Time, user string) (Index, error) { itr := indexIterFunc(func(f func(context.Context, Index) error) error { // Ensure we query both per tenant and multitenant TSDBs - idxBuckets := indexBuckets(from, through, []config.TableRange{i.tableRange}) + idxBuckets := IndexBuckets(from, through, []config.TableRange{i.tableRange}) for _, bkt := range idxBuckets { - if err := i.shipper.ForEachConcurrent(ctx, bkt.prefix, user, func(multitenant bool, idx shipperindex.Index) error { + if err := i.shipper.ForEachConcurrent(ctx, bkt.Prefix, user, func(multitenant bool, idx shipperindex.Index) error { impl, ok := idx.(Index) if !ok { return fmt.Errorf("unexpected shipper index type: %T", idx) diff --git a/pkg/storage/stores/shipper/indexshipper/tsdb/manager.go b/pkg/storage/stores/shipper/indexshipper/tsdb/manager.go index 96f56d7021f4..dd89911841d3 100644 --- a/pkg/storage/stores/shipper/indexshipper/tsdb/manager.go +++ b/pkg/storage/stores/shipper/indexshipper/tsdb/manager.go @@ -165,13 +165,13 @@ func (m *tsdbManager) buildFromHead(heads *tenantHeads, indexShipper indexshippe // chunks may overlap index period bounds, in which case they're written to multiple pds := make(map[string]chunkInfo) for _, chk := range chks { - idxBuckets := indexBuckets(chk.From(), chk.Through(), tableRanges) + idxBuckets := IndexBuckets(chk.From(), chk.Through(), tableRanges) for _, bucket := range idxBuckets { - chkinfo := pds[bucket.prefix] + chkinfo := pds[bucket.Prefix] chkinfo.chunkMetas = append(chkinfo.chunkMetas, chk) - chkinfo.tsdbFormat = bucket.tsdbFormat - pds[bucket.prefix] = chkinfo + chkinfo.tsdbFormat = bucket.TsdbFormat + pds[bucket.Prefix] = chkinfo } } @@ -208,8 +208,8 @@ func (m *tsdbManager) buildFromHead(heads *tenantHeads, indexShipper indexshippe dstDir := filepath.Join(managerMultitenantDir(m.dir), fmt.Sprint(p)) dst := NewPrefixedIdentifier( MultitenantTSDBIdentifier{ - nodeName: m.nodeName, - ts: heads.start, + NodeName: m.nodeName, + Ts: heads.start, }, dstDir, "", @@ -300,19 +300,19 @@ func (m *tsdbManager) BuildFromWALs(t time.Time, ids []WALIdentifier, legacy boo return nil } -type indexInfo struct { - prefix string - tsdbFormat int +type IndexInfo struct { + Prefix string + TsdbFormat int } -func indexBuckets(from, through model.Time, tableRanges config.TableRanges) (res []indexInfo) { +func IndexBuckets(from, through model.Time, tableRanges config.TableRanges) (res []IndexInfo) { start := from.Time().UnixNano() / int64(config.ObjectStorageIndexRequiredPeriod) end := through.Time().UnixNano() / int64(config.ObjectStorageIndexRequiredPeriod) for cur := start; cur <= end; cur++ { cfg := tableRanges.ConfigForTableNumber(cur) if cfg != nil { tsdbFormat, _ := cfg.TSDBFormat() // Ignoring error, as any valid period config should return valid format. - res = append(res, indexInfo{prefix: cfg.IndexTables.Prefix + strconv.Itoa(int(cur)), tsdbFormat: tsdbFormat}) + res = append(res, IndexInfo{Prefix: cfg.IndexTables.Prefix + strconv.Itoa(int(cur)), TsdbFormat: tsdbFormat}) } } if len(res) == 0 {