Skip to content

Commit

Permalink
exposing some tsdb shipper fns+types
Browse files Browse the repository at this point in the history
Signed-off-by: Owen Diehl <[email protected]>
  • Loading branch information
owen-d committed Nov 6, 2024
1 parent 52725e7 commit 312e273
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 30 deletions.
24 changes: 12 additions & 12 deletions pkg/storage/stores/shipper/indexshipper/tsdb/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,8 @@ func setupMultiTenantIndex(t *testing.T, indexFormat int, userStreams map[string

dst := NewPrefixedIdentifier(
MultitenantTSDBIdentifier{
nodeName: "test",
ts: ts,
NodeName: "test",
Ts: ts,
},
destDir,
"",
Expand Down Expand Up @@ -239,7 +239,7 @@ func TestCompactor_Compact(t *testing.T) {
PeriodicTableConfig: config.PeriodicTableConfig{Period: config.ObjectStorageIndexRequiredPeriod}},
Schema: "v12",
}
indexBkts := indexBuckets(now, now, []config.TableRange{periodConfig.GetIndexTableNumberRange(config.DayTime{Time: now})})
indexBkts := IndexBuckets(now, now, []config.TableRange{periodConfig.GetIndexTableNumberRange(config.DayTime{Time: now})})

tableName := indexBkts[0]
lbls1 := mustParseLabels(`{foo="bar", a="b"}`)
Expand Down Expand Up @@ -497,8 +497,8 @@ func TestCompactor_Compact(t *testing.T) {
t.Run(name, func(t *testing.T) {
tempDir := t.TempDir()
objectStoragePath := filepath.Join(tempDir, objectsStorageDirName)
tablePathInStorage := filepath.Join(objectStoragePath, tableName.prefix)
tableWorkingDirectory := filepath.Join(tempDir, workingDirName, tableName.prefix)
tablePathInStorage := filepath.Join(objectStoragePath, tableName.Prefix)
tableWorkingDirectory := filepath.Join(tempDir, workingDirName, tableName.Prefix)

require.NoError(t, util.EnsureDirectory(objectStoragePath))
require.NoError(t, util.EnsureDirectory(tablePathInStorage))
Expand Down Expand Up @@ -551,27 +551,27 @@ func TestCompactor_Compact(t *testing.T) {
objectClient, err := local.NewFSObjectClient(local.FSConfig{Directory: objectStoragePath})
require.NoError(t, err)

_, commonPrefixes, err := objectClient.List(context.Background(), tableName.prefix, "/")
_, commonPrefixes, err := objectClient.List(context.Background(), tableName.Prefix, "/")
require.NoError(t, err)

initializedIndexSets := map[string]compactor.IndexSet{}
initializedIndexSetsMtx := sync.Mutex{}
existingUserIndexSets := make(map[string]compactor.IndexSet, len(commonPrefixes))
for _, commonPrefix := range commonPrefixes {
userID := path.Base(string(commonPrefix))
idxSet, err := newMockIndexSet(userID, tableName.prefix, filepath.Join(tableWorkingDirectory, userID), objectClient)
idxSet, err := newMockIndexSet(userID, tableName.Prefix, filepath.Join(tableWorkingDirectory, userID), objectClient)
require.NoError(t, err)

existingUserIndexSets[userID] = idxSet
initializedIndexSets[userID] = idxSet
}

commonIndexSet, err := newMockIndexSet("", tableName.prefix, tableWorkingDirectory, objectClient)
commonIndexSet, err := newMockIndexSet("", tableName.Prefix, tableWorkingDirectory, objectClient)
require.NoError(t, err)

// build TableCompactor and compact the index
tCompactor := newTableCompactor(context.Background(), commonIndexSet, existingUserIndexSets, func(userID string) (compactor.IndexSet, error) {
idxSet, err := newMockIndexSet(userID, tableName.prefix, filepath.Join(tableWorkingDirectory, userID), objectClient)
idxSet, err := newMockIndexSet(userID, tableName.Prefix, filepath.Join(tableWorkingDirectory, userID), objectClient)
require.NoError(t, err)

initializedIndexSetsMtx.Lock()
Expand Down Expand Up @@ -875,9 +875,9 @@ func setupCompactedIndex(t *testing.T) *testContext {
schemaCfg := config.SchemaConfig{
Configs: []config.PeriodConfig{periodConfig},
}
indexBuckets := indexBuckets(now, now, []config.TableRange{periodConfig.GetIndexTableNumberRange(config.DayTime{Time: now})})
indexBuckets := IndexBuckets(now, now, []config.TableRange{periodConfig.GetIndexTableNumberRange(config.DayTime{Time: now})})
tableName := indexBuckets[0]
tableInterval := retention.ExtractIntervalFromTableName(tableName.prefix)
tableInterval := retention.ExtractIntervalFromTableName(tableName.Prefix)
// shiftTableStart shift tableInterval.Start by the given amount of milliseconds.
// It is used for building chunkmetas relative to start time of the table.
shiftTableStart := func(ms int64) int64 {
Expand All @@ -900,7 +900,7 @@ func setupCompactedIndex(t *testing.T) *testContext {

builder.FinalizeChunks()

return newCompactedIndex(context.Background(), tableName.prefix, buildUserID(0), t.TempDir(), periodConfig, builder)
return newCompactedIndex(context.Background(), tableName.Prefix, buildUserID(0), t.TempDir(), periodConfig, builder)
}

expectedChunkEntries := map[string][]retention.ChunkEntry{
Expand Down
10 changes: 5 additions & 5 deletions pkg/storage/stores/shipper/indexshipper/tsdb/identifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,13 +161,13 @@ func ParseSingleTenantTSDBPath(p string) (id SingleTenantTSDBIdentifier, ok bool
}

type MultitenantTSDBIdentifier struct {
nodeName string
ts time.Time
NodeName string
Ts time.Time
}

// Name builds filename with format <file-creation-ts> + `-` + `<nodeName>
func (id MultitenantTSDBIdentifier) Name() string {
return fmt.Sprintf("%d-%s.tsdb", id.ts.Unix(), id.nodeName)
return fmt.Sprintf("%d-%s.tsdb", id.Ts.Unix(), id.NodeName)
}

func (id MultitenantTSDBIdentifier) Path() string {
Expand Down Expand Up @@ -200,7 +200,7 @@ func parseMultitenantTSDBNameFromBase(name string) (res MultitenantTSDBIdentifie
}

return MultitenantTSDBIdentifier{
ts: time.Unix(int64(ts), 0),
nodeName: strings.Join(xs[1:], "-"),
Ts: time.Unix(int64(ts), 0),
NodeName: strings.Join(xs[1:], "-"),
}, true
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ func (i indexIterFunc) For(_ context.Context, _ int, f func(context.Context, Ind
func (i *indexShipperQuerier) indices(ctx context.Context, from, through model.Time, user string) (Index, error) {
itr := indexIterFunc(func(f func(context.Context, Index) error) error {
// Ensure we query both per tenant and multitenant TSDBs
idxBuckets := indexBuckets(from, through, []config.TableRange{i.tableRange})
idxBuckets := IndexBuckets(from, through, []config.TableRange{i.tableRange})
for _, bkt := range idxBuckets {
if err := i.shipper.ForEachConcurrent(ctx, bkt.prefix, user, func(multitenant bool, idx shipperindex.Index) error {
if err := i.shipper.ForEachConcurrent(ctx, bkt.Prefix, user, func(multitenant bool, idx shipperindex.Index) error {
impl, ok := idx.(Index)
if !ok {
return fmt.Errorf("unexpected shipper index type: %T", idx)
Expand Down
22 changes: 11 additions & 11 deletions pkg/storage/stores/shipper/indexshipper/tsdb/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,13 +165,13 @@ func (m *tsdbManager) buildFromHead(heads *tenantHeads, indexShipper indexshippe
// chunks may overlap index period bounds, in which case they're written to multiple
pds := make(map[string]chunkInfo)
for _, chk := range chks {
idxBuckets := indexBuckets(chk.From(), chk.Through(), tableRanges)
idxBuckets := IndexBuckets(chk.From(), chk.Through(), tableRanges)

for _, bucket := range idxBuckets {
chkinfo := pds[bucket.prefix]
chkinfo := pds[bucket.Prefix]
chkinfo.chunkMetas = append(chkinfo.chunkMetas, chk)
chkinfo.tsdbFormat = bucket.tsdbFormat
pds[bucket.prefix] = chkinfo
chkinfo.tsdbFormat = bucket.TsdbFormat
pds[bucket.Prefix] = chkinfo
}
}

Expand Down Expand Up @@ -208,8 +208,8 @@ func (m *tsdbManager) buildFromHead(heads *tenantHeads, indexShipper indexshippe
dstDir := filepath.Join(managerMultitenantDir(m.dir), fmt.Sprint(p))
dst := NewPrefixedIdentifier(
MultitenantTSDBIdentifier{
nodeName: m.nodeName,
ts: heads.start,
NodeName: m.nodeName,
Ts: heads.start,
},
dstDir,
"",
Expand Down Expand Up @@ -300,19 +300,19 @@ func (m *tsdbManager) BuildFromWALs(t time.Time, ids []WALIdentifier, legacy boo
return nil
}

type indexInfo struct {
prefix string
tsdbFormat int
type IndexInfo struct {
Prefix string
TsdbFormat int
}

func indexBuckets(from, through model.Time, tableRanges config.TableRanges) (res []indexInfo) {
func IndexBuckets(from, through model.Time, tableRanges config.TableRanges) (res []IndexInfo) {
start := from.Time().UnixNano() / int64(config.ObjectStorageIndexRequiredPeriod)
end := through.Time().UnixNano() / int64(config.ObjectStorageIndexRequiredPeriod)
for cur := start; cur <= end; cur++ {
cfg := tableRanges.ConfigForTableNumber(cur)
if cfg != nil {
tsdbFormat, _ := cfg.TSDBFormat() // Ignoring error, as any valid period config should return valid format.
res = append(res, indexInfo{prefix: cfg.IndexTables.Prefix + strconv.Itoa(int(cur)), tsdbFormat: tsdbFormat})
res = append(res, IndexInfo{Prefix: cfg.IndexTables.Prefix + strconv.Itoa(int(cur)), TsdbFormat: tsdbFormat})
}
}
if len(res) == 0 {
Expand Down

0 comments on commit 312e273

Please sign in to comment.