diff --git a/pkg/storage/stores/shipper/indexshipper/tsdb/builder.go b/pkg/storage/stores/shipper/indexshipper/tsdb/builder.go index 0b07416a814f..a86ac392d259 100644 --- a/pkg/storage/stores/shipper/indexshipper/tsdb/builder.go +++ b/pkg/storage/stores/shipper/indexshipper/tsdb/builder.go @@ -3,6 +3,7 @@ package tsdb import ( "context" "fmt" + "io" "math/rand" "os" "path/filepath" @@ -111,12 +112,47 @@ func (b *Builder) Build( name := fmt.Sprintf("%s-%x.staging", index.IndexFilename, rng) tmpPath := filepath.Join(scratchDir, name) - var writer *index.Creator + writer, err := index.NewFileWriterWithVersion(ctx, b.version, tmpPath) + if err != nil { + return id, err + } - writer, err = index.NewWriterWithVersion(ctx, b.version, tmpPath) + if _, err := b.build(writer, false); err != nil { + return id, err + } + + reader, err := index.NewFileReader(tmpPath) if err != nil { return id, err } + + from, through := reader.Bounds() + + // load the newly compacted index to grab checksum, promptly close + dst := createFn(model.Time(from), model.Time(through), reader.Checksum()) + + reader.Close() + defer func() { + if err != nil { + os.RemoveAll(tmpPath) + } + }() + + if err := chunk_util.EnsureDirectory(filepath.Dir(dst.Path())); err != nil { + return id, err + } + dstPath := dst.Path() + if err := os.Rename(tmpPath, dstPath); err != nil { + return id, err + } + + return dst, nil +} + +func (b *Builder) build( + writer *index.Creator, + reader bool, // whether to return the ReadCloser of the underlying DB +) (io.ReadCloser, error) { // TODO(owen-d): multithread // Sort series @@ -155,7 +191,7 @@ func (b *Builder) Build( // Add symbols for _, symbol := range symbols { if err := writer.AddSymbol(symbol); err != nil { - return id, err + return nil, err } } @@ -165,38 +201,45 @@ func (b *Builder) Build( s.chunks = s.chunks.Finalize() } if err := writer.AddSeries(storage.SeriesRef(i), s.labels, s.fp, s.chunks...); err != nil { - return id, err + return nil, err } } - if _, err := writer.Close(false); err != nil { - return id, err - } + return writer.Close(reader) +} - reader, err := index.NewFileReader(tmpPath) +func (b *Builder) BuildInMemory( + ctx context.Context, + // Determines how to create the resulting Identifier and file name. + // This is variable as we use Builder for multiple reasons, + // such as building multi-tenant tsdbs on the ingester + // and per tenant ones during compaction + createFn func(from, through model.Time, checksum uint32) Identifier, +) (id Identifier, data []byte, err error) { + writer, err := index.NewMemWriterWithVersion(ctx, b.version) if err != nil { - return id, err + return id, nil, err } - from, through := reader.Bounds() - - // load the newly compacted index to grab checksum, promptly close - dst := createFn(model.Time(from), model.Time(through), reader.Checksum()) - - reader.Close() - defer func() { - if err != nil { - os.RemoveAll(tmpPath) - } - }() + readCloser, err := b.build(writer, true) + if err != nil { + return id, nil, err + } + defer readCloser.Close() - if err := chunk_util.EnsureDirectory(filepath.Dir(dst.Path())); err != nil { - return id, err + data, err = io.ReadAll(readCloser) + if err != nil { + return nil, nil, err } - dstPath := dst.Path() - if err := os.Rename(tmpPath, dstPath); err != nil { - return id, err + + reader, err := index.NewReader(index.RealByteSlice(data)) + if err != nil { + return id, nil, err } + defer reader.Close() - return dst, nil + from, through := reader.Bounds() + id = createFn(model.Time(from), model.Time(through), reader.Checksum()) + + return id, data, nil } diff --git a/pkg/storage/stores/shipper/indexshipper/tsdb/index/index.go b/pkg/storage/stores/shipper/indexshipper/tsdb/index/index.go index 64a9b69f341e..756d354d1ed6 100644 --- a/pkg/storage/stores/shipper/indexshipper/tsdb/index/index.go +++ b/pkg/storage/stores/shipper/indexshipper/tsdb/index/index.go @@ -211,7 +211,8 @@ func NewTOCFromByteSlice(bs ByteSlice) (*TOC, error) { }, nil } -func NewWriterWithVersion(ctx context.Context, version int, fn string) (*Creator, error) { +// For writing TSDBs using temporary files +func NewFileWriterWithVersion(ctx context.Context, version int, fn string) (*Creator, error) { dir := filepath.Dir(fn) df, err := fileutil.OpenDir(dir) @@ -243,12 +244,39 @@ func NewWriterWithVersion(ctx context.Context, version int, fn string) (*Creator return nil, errors.Wrap(err, "sync dir") } + return newWriter( + ctx, + version, + f, + fP, + fPO, + ) +} + +// For writing TSDBs in memory +func NewMemWriterWithVersion(ctx context.Context, version int) (*Creator, error) { + return newWriter( + ctx, + version, + NewBufferWriter(), + NewBufferWriter(), + NewBufferWriter(), + ) +} + +func newWriter( + ctx context.Context, + version int, + fWriter writer, + postingsWriter writer, + postingOffsetsWriter writer, +) (*Creator, error) { iw := &Creator{ Version: version, ctx: ctx, - f: f, - fP: fP, - fPO: fPO, + f: fWriter, + fP: postingsWriter, + fPO: postingOffsetsWriter, stage: idxStageNone, // Reusable memory. @@ -267,7 +295,7 @@ func NewWriterWithVersion(ctx context.Context, version int, fn string) (*Creator // NewWriter returns a new Writer to the given filename. func NewWriter(ctx context.Context, indexFormat int, fn string) (*Creator, error) { - return NewWriterWithVersion(ctx, indexFormat, fn) + return NewFileWriterWithVersion(ctx, indexFormat, fn) } func (w *Creator) write(bufs ...[]byte) error { diff --git a/pkg/storage/stores/shipper/indexshipper/tsdb/index/index_test.go b/pkg/storage/stores/shipper/indexshipper/tsdb/index/index_test.go index 7491282dae84..8b2ac832a579 100644 --- a/pkg/storage/stores/shipper/indexshipper/tsdb/index/index_test.go +++ b/pkg/storage/stores/shipper/indexshipper/tsdb/index/index_test.go @@ -727,7 +727,7 @@ func TestDecoder_ChunkSamples(t *testing.T) { }, } { t.Run(name, func(t *testing.T) { - iw, err := NewWriterWithVersion(context.Background(), FormatV2, filepath.Join(dir, name)) + iw, err := NewFileWriterWithVersion(context.Background(), FormatV2, filepath.Join(dir, name)) require.NoError(t, err) syms := []string{}