Skip to content

Commit

Permalink
Merge pull request #130626 from rharding6373/backport24.1.5-rc-130204
Browse files Browse the repository at this point in the history
release-24.1.5-rc: changefeedccl: fix memory leak in cloud storage sink with fast gzip
  • Loading branch information
rharding6373 authored Sep 20, 2024
2 parents a2667b3 + 4526119 commit b40f8a2
Show file tree
Hide file tree
Showing 4 changed files with 233 additions and 19 deletions.
3 changes: 3 additions & 0 deletions pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,7 @@ go_test(
"//pkg/ccl/storageccl",
"//pkg/ccl/utilccl",
"//pkg/cloud",
"//pkg/cloud/cloudpb",
"//pkg/cloud/impl:cloudimpl",
"//pkg/internal/sqlsmith",
"//pkg/jobs",
Expand Down Expand Up @@ -324,6 +325,7 @@ go_test(
"//pkg/util/encoding",
"//pkg/util/hlc",
"//pkg/util/intsets",
"//pkg/util/ioctx",
"//pkg/util/json",
"//pkg/util/leaktest",
"//pkg/util/log",
Expand Down Expand Up @@ -352,6 +354,7 @@ go_test(
"@com_github_golang_mock//gomock",
"@com_github_ibm_sarama//:sarama",
"@com_github_jackc_pgx_v4//:pgx",
"@com_github_klauspost_compress//gzip",
"@com_github_lib_pq//:pq",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
Expand Down
46 changes: 44 additions & 2 deletions pkg/ccl/changefeedccl/sink_cloudstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -659,6 +659,27 @@ func (s *cloudStorageSink) flushTopicVersions(
}
return err == nil
})
if err != nil {
return err
}

// Allow synchronization with the async flusher to happen.
if s.testingKnobs != nil && s.testingKnobs.AsyncFlushSync != nil {
s.testingKnobs.AsyncFlushSync()
}

// Wait for the async flush to complete before clearing files.
// Note that if waitAsyncFlush returns an error some successfully
// flushed files may not be removed from s.files. This is ok, since
// the error will trigger the sink to be closed, and we will only use
// s.files to ensure that the codecs are closed before deallocating it.
err = s.waitAsyncFlush(ctx)
if err != nil {
return err
}

// Files need to be cleared after the flush completes, otherwise file
// resources may be leaked.
for _, v := range toRemove {
s.files.Delete(cloudStorageSinkKey{topic: topic, schemaID: v})
}
Expand All @@ -681,9 +702,24 @@ func (s *cloudStorageSink) Flush(ctx context.Context) error {
if err != nil {
return err
}
s.files.Clear(true /* addNodesToFreeList */)
// Allow synchronization with the async flusher to happen.
if s.testingKnobs != nil && s.testingKnobs.AsyncFlushSync != nil {
s.testingKnobs.AsyncFlushSync()
}
s.setDataFileTimestamp()
return s.waitAsyncFlush(ctx)

// Note that if waitAsyncFlush returns an error some successfully
// flushed files may not be removed from s.files. This is ok, since
// the error will trigger the sink to be closed, and we will only use
// s.files to ensure that the codecs are closed before deallocating it.
err = s.waitAsyncFlush(ctx)
if err != nil {
return err
}
// Files need to be cleared after the flush completes, otherwise file resources
// may not be released properly when closing the sink.
s.files.Clear(true /* addNodesToFreeList */)
return nil
}

func (s *cloudStorageSink) setDataFileTimestamp() {
Expand Down Expand Up @@ -816,6 +852,12 @@ func (s *cloudStorageSink) asyncFlusher(ctx context.Context) error {
continue
}

// Allow synchronization with the flushing routine to happen between getting
// the flush request from the channel and completing the flush.
if s.testingKnobs != nil && s.testingKnobs.AsyncFlushSync != nil {
s.testingKnobs.AsyncFlushSync()
}

// flush file to storage.
flushDone := s.metrics.recordFlushRequestCallback()
err := req.file.flushToStorage(ctx, s.es, req.dest, s.metrics)
Expand Down
200 changes: 183 additions & 17 deletions pkg/ccl/changefeedccl/sink_cloudstorage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ package changefeedccl

import (
"bytes"
"compress/gzip"
"context"
"fmt"
"io"
Expand All @@ -21,6 +20,7 @@ import (
"sort"
"strconv"
"strings"
"sync"
"testing"
"time"

Expand All @@ -29,6 +29,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
"github.com/cockroachdb/cockroach/pkg/cloud"
"github.com/cockroachdb/cockroach/pkg/cloud/cloudpb"
_ "github.com/cockroachdb/cockroach/pkg/cloud/impl" // register cloud storage providers
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand All @@ -38,14 +39,20 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/ioctx"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/cockroach/pkg/util/span"
"github.com/cockroachdb/errors"
"github.com/klauspost/compress/gzip"
"github.com/stretchr/testify/require"
)

const unlimitedFileSize int64 = math.MaxInt64

func makeTopic(name string) *tableDescriptorTopic {
id, _ := strconv.ParseUint(name, 36, 64)
desc := tabledesc.NewBuilder(&descpb.TableDescriptor{Name: name, ID: descpb.ID(id)}).BuildImmutableTable()
Expand Down Expand Up @@ -89,10 +96,6 @@ func TestCloudStorageSink(t *testing.T) {
return decompressed
}

testDir := func(t *testing.T) string {
return strings.ReplaceAll(t.Name(), "/", ";")
}

listLeafDirectories := func(t *testing.T) []string {
absRoot := filepath.Join(externalIODir, testDir(t))

Expand Down Expand Up @@ -156,7 +159,6 @@ func TestCloudStorageSink(t *testing.T) {
return files
}

const unlimitedFileSize int64 = math.MaxInt64
var noKey []byte
settings := cluster.MakeTestingClusterSettings()
settings.ExternalIODir = externalIODir
Expand Down Expand Up @@ -184,16 +186,6 @@ func TestCloudStorageSink(t *testing.T) {

user := username.RootUserName()

sinkURI := func(t *testing.T, maxFileSize int64) sinkURL {
u, err := url.Parse(fmt.Sprintf("nodelocal://1/%s", testDir(t)))
require.NoError(t, err)
sink := sinkURL{URL: u}
if maxFileSize != unlimitedFileSize {
sink.addParam(changefeedbase.SinkParamFileSize, strconv.FormatInt(maxFileSize, 10))
}
return sink
}

testWithAndWithoutAsyncFlushing := func(t *testing.T, name string, testFn func(*testing.T)) {
t.Helper()
testutils.RunTrueAndFalse(t, name+"/asyncFlush", func(t *testing.T, enable bool) {
Expand Down Expand Up @@ -276,7 +268,7 @@ func TestCloudStorageSink(t *testing.T) {
require.Equal(t, []string(nil), slurpDir(t))

// Emitting rows and flushing should write them out in one file per table. Note
// the ordering among these two files is non deterministic as either of them could
// the ordering among these two files is non-deterministic as either of them could
// be flushed first (and thus be assigned fileID 0).
var pool testAllocPool
require.NoError(t, s.EmitRow(ctx, t1, noKey, []byte(`v1`), ts(1), ts(1), pool.alloc()))
Expand Down Expand Up @@ -841,3 +833,177 @@ type explicitTimestampOracle hlc.Timestamp
func (o explicitTimestampOracle) inclusiveLowerBoundTS() hlc.Timestamp {
return hlc.Timestamp(o)
}

// TestCloudStorageSinkFastGzip is a regression test for #129947.
// The original issue was a memory leak from pgzip, the library used for fast
// gzip compression for cloud storage. The leak was caused by a race condition
// between Flush and the async flusher: if the Flush clears files before the
// async flusher closes the compression codec as part of flushing the files,
// and the flush returns an error, the compression codec will not be closed
// properly. This test uses some test-only synchronization points in the cloud
// storage sink to test for the regression.
func TestCloudStorageSinkFastGzip(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
skip.UnderRace(t, "#130651")

ctx := context.Background()
settings := cluster.MakeTestingClusterSettings()

useFastGzip.Override(context.Background(), &settings.SV, true)
enableAsyncFlush.Override(context.Background(), &settings.SV, true)

opts := changefeedbase.EncodingOptions{
Format: changefeedbase.OptFormatJSON,
Envelope: changefeedbase.OptEnvelopeWrapped,
KeyInValue: true,
Compression: "gzip",
}

testSpan := roachpb.Span{Key: []byte("a"), EndKey: []byte("b")}
sf, err := span.MakeFrontier(testSpan)
require.NoError(t, err)
timestampOracle := &changeAggregatorLowerBoundOracle{sf: sf}

// Force the storage sink to always return an error.
getErrorWriter := func() io.WriteCloser {
return errorWriter{}
}
mockStorageSink := func(_ context.Context, _ string, _ username.SQLUsername, _ ...cloud.ExternalStorageOption) (cloud.ExternalStorage, error) {
return &mockSinkStorage{writer: getErrorWriter}, nil
}

// The cloud storage sink calls the AsyncFlushSync function in two different
// goroutines: once in Flush(), and once in the async flusher. By waiting for
// the two goroutines to both reach those points, we can trigger the original
// issue, which was caused by a race condition between the two goroutines
// leading to leaked compression library resources.
wg := sync.WaitGroup{}
waiter := func() {
wg.Done()
wg.Wait()
}
testingKnobs := &TestingKnobs{AsyncFlushSync: waiter}
const sizeInBytes = 100 * 1024 * 1024 // 100MB

// Test that there's no leak during an async Flush.
t.Run("flush", func(t *testing.T) {
wg.Add(2)
s, err := makeCloudStorageSink(
ctx, sinkURI(t, unlimitedFileSize), 1, settings, opts, timestampOracle,
mockStorageSink, username.RootUserName(), nil /* mb */, testingKnobs,
)
require.NoError(t, err)
s.(*cloudStorageSink).sinkID = 7 // Force a deterministic sinkID.

var noKey []byte
for i := 1; i < 10; i++ {
newTopic := makeTopic(fmt.Sprintf(`t%d`, i))
byteSlice := make([]byte, sizeInBytes)
ts := hlc.Timestamp{WallTime: int64(i)}
_ = s.EmitRow(ctx, newTopic, noKey, byteSlice, ts, ts, zeroAlloc)
}

// Flush the files and close the sink. Any leaks should be caught after the
// test by leaktest.
_ = s.Flush(ctx)
_ = s.Close()
})
// Test that there's no leak during an async flushTopicVersions.
t.Run("flushTopicVersions", func(t *testing.T) {
wg.Add(2)
s, err := makeCloudStorageSink(
ctx, sinkURI(t, 2*sizeInBytes), 1, settings, opts, timestampOracle,
mockStorageSink, username.RootUserName(), nil /* mb */, testingKnobs,
)
require.NoError(t, err)
s.(*cloudStorageSink).sinkID = 7 // Force a deterministic sinkID.

// Insert data to the same topic with different versions so that they are
// in different files.
var noKey []byte
newTopic := makeTopic("test")
for i := 1; i < 10; i++ {
byteSlice := make([]byte, sizeInBytes)
ts := hlc.Timestamp{WallTime: int64(i)}
newTopic.Version++
_ = s.EmitRow(ctx, newTopic, noKey, byteSlice, ts, ts, zeroAlloc)
}

// Flush the files and close the sink. Any leaks should be caught after the
// test by leaktest.
_ = s.(*cloudStorageSink).flushTopicVersions(ctx, newTopic.GetTableName(), int64(newTopic.GetVersion()))
_ = s.Close()
})
}

func testDir(t *testing.T) string {
return strings.ReplaceAll(t.Name(), "/", ";")
}

func sinkURI(t *testing.T, maxFileSize int64) sinkURL {
u, err := url.Parse(fmt.Sprintf("nodelocal://1/%s", testDir(t)))
require.NoError(t, err)
sink := sinkURL{URL: u}
if maxFileSize != unlimitedFileSize {
sink.addParam(changefeedbase.SinkParamFileSize, strconv.FormatInt(maxFileSize, 10))
}
return sink
}

// errorWriter always returns an error on writes.
type errorWriter struct{}

func (errorWriter) Write(_ []byte) (int, error) {
return 0, errors.New("write error")
}
func (errorWriter) Close() error { return nil }

// mockSinkStorage can be useful for testing to override the WriteCloser.
type mockSinkStorage struct {
writer func() io.WriteCloser
}

var _ cloud.ExternalStorage = &mockSinkStorage{}

func (n *mockSinkStorage) Close() error {
return nil
}

func (n *mockSinkStorage) Conf() cloudpb.ExternalStorage {
return cloudpb.ExternalStorage{Provider: cloudpb.ExternalStorageProvider_null}
}

func (n *mockSinkStorage) ExternalIOConf() base.ExternalIODirConfig {
return base.ExternalIODirConfig{}
}

func (n *mockSinkStorage) RequiresExternalIOAccounting() bool {
return false
}

func (n *mockSinkStorage) Settings() *cluster.Settings {
return nil
}

func (n *mockSinkStorage) ReadFile(
_ context.Context, _ string, _ cloud.ReadOptions,
) (ioctx.ReadCloserCtx, int64, error) {
return nil, 0, io.EOF
}

func (n *mockSinkStorage) Writer(_ context.Context, _ string) (io.WriteCloser, error) {
return n.writer(), nil
}

func (n *mockSinkStorage) List(_ context.Context, _, _ string, _ cloud.ListingFn) error {
return nil
}

func (n *mockSinkStorage) Delete(_ context.Context, _ string) error {
return nil
}

func (n *mockSinkStorage) Size(_ context.Context, _ string) (int64, error) {
return 0, nil
}
3 changes: 3 additions & 0 deletions pkg/ccl/changefeedccl/testing_knobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ type TestingKnobs struct {

// OverrideExecCfg returns a modified ExecutorConfig to use under tests.
OverrideExecCfg func(actual *sql.ExecutorConfig) *sql.ExecutorConfig

// AsyncFlushSync is called in async flush goroutines as a way to provide synchronization between them.
AsyncFlushSync func()
}

// ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface.
Expand Down

0 comments on commit b40f8a2

Please sign in to comment.