From 53b6fd7348d02675abdd57914f8cf9b036147284 Mon Sep 17 00:00:00 2001 From: rharding6373 Date: Thu, 5 Sep 2024 21:23:26 -0700 Subject: [PATCH] changefeedccl: add test to repro pgzip memory leak Adds a test that reproduces a memory leak from pgzip, the library used for fast gzip compression for changefeeds using cloud storage sinks. The leak was caused by a race condition between Flush/flushTopicVerions and the async flusher: if the Flush clears files before the async flusher closes the compression codec as part of flushing the files, and the flush returns an error, the compression codec will not be closed properly. This test uses the AsyncFlushSync testing knob to introduce synchronization points between these two goroutines to trigger the regression. Co-authored by: wenyihu6 Epic: none Release note: none --- pkg/ccl/changefeedccl/BUILD.bazel | 3 + .../changefeedccl/sink_cloudstorage_test.go | 198 ++++++++++++++++-- 2 files changed, 184 insertions(+), 17 deletions(-) diff --git a/pkg/ccl/changefeedccl/BUILD.bazel b/pkg/ccl/changefeedccl/BUILD.bazel index b5171b6c0cef..3e4d17e131bd 100644 --- a/pkg/ccl/changefeedccl/BUILD.bazel +++ b/pkg/ccl/changefeedccl/BUILD.bazel @@ -256,6 +256,7 @@ go_test( "//pkg/ccl/storageccl", "//pkg/ccl/utilccl", "//pkg/cloud", + "//pkg/cloud/cloudpb", "//pkg/cloud/impl:cloudimpl", "//pkg/internal/sqlsmith", "//pkg/jobs", @@ -324,6 +325,7 @@ go_test( "//pkg/util/encoding", "//pkg/util/hlc", "//pkg/util/intsets", + "//pkg/util/ioctx", "//pkg/util/json", "//pkg/util/leaktest", "//pkg/util/log", @@ -352,6 +354,7 @@ go_test( "@com_github_golang_mock//gomock", "@com_github_ibm_sarama//:sarama", "@com_github_jackc_pgx_v4//:pgx", + "@com_github_klauspost_compress//gzip", "@com_github_lib_pq//:pq", "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", diff --git a/pkg/ccl/changefeedccl/sink_cloudstorage_test.go b/pkg/ccl/changefeedccl/sink_cloudstorage_test.go index 0c0b8115418e..1cf52f3c106d 100644 --- a/pkg/ccl/changefeedccl/sink_cloudstorage_test.go +++ b/pkg/ccl/changefeedccl/sink_cloudstorage_test.go @@ -10,7 +10,6 @@ package changefeedccl import ( "bytes" - "compress/gzip" "context" "fmt" "io" @@ -21,6 +20,7 @@ import ( "sort" "strconv" "strings" + "sync" "testing" "time" @@ -29,6 +29,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" "github.com/cockroachdb/cockroach/pkg/cloud" + "github.com/cockroachdb/cockroach/pkg/cloud/cloudpb" _ "github.com/cockroachdb/cockroach/pkg/cloud/impl" // register cloud storage providers "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -39,13 +40,18 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/ioctx" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/randutil" "github.com/cockroachdb/cockroach/pkg/util/span" + "github.com/cockroachdb/errors" + "github.com/klauspost/compress/gzip" "github.com/stretchr/testify/require" ) +const unlimitedFileSize int64 = math.MaxInt64 + func makeTopic(name string) *tableDescriptorTopic { id, _ := strconv.ParseUint(name, 36, 64) desc := tabledesc.NewBuilder(&descpb.TableDescriptor{Name: name, ID: descpb.ID(id)}).BuildImmutableTable() @@ -89,10 +95,6 @@ func TestCloudStorageSink(t *testing.T) { return decompressed } - testDir := func(t *testing.T) string { - return strings.ReplaceAll(t.Name(), "/", ";") - } - listLeafDirectories := func(t *testing.T) []string { absRoot := filepath.Join(externalIODir, testDir(t)) @@ -156,7 +158,6 @@ func TestCloudStorageSink(t *testing.T) { return files } - const unlimitedFileSize int64 = math.MaxInt64 var noKey []byte settings := cluster.MakeTestingClusterSettings() settings.ExternalIODir = externalIODir @@ -184,16 +185,6 @@ func TestCloudStorageSink(t *testing.T) { user := username.RootUserName() - sinkURI := func(t *testing.T, maxFileSize int64) sinkURL { - u, err := url.Parse(fmt.Sprintf("nodelocal://1/%s", testDir(t))) - require.NoError(t, err) - sink := sinkURL{URL: u} - if maxFileSize != unlimitedFileSize { - sink.addParam(changefeedbase.SinkParamFileSize, strconv.FormatInt(maxFileSize, 10)) - } - return sink - } - testWithAndWithoutAsyncFlushing := func(t *testing.T, name string, testFn func(*testing.T)) { t.Helper() testutils.RunTrueAndFalse(t, name+"/asyncFlush", func(t *testing.T, enable bool) { @@ -276,7 +267,7 @@ func TestCloudStorageSink(t *testing.T) { require.Equal(t, []string(nil), slurpDir(t)) // Emitting rows and flushing should write them out in one file per table. Note - // the ordering among these two files is non deterministic as either of them could + // the ordering among these two files is non-deterministic as either of them could // be flushed first (and thus be assigned fileID 0). var pool testAllocPool require.NoError(t, s.EmitRow(ctx, t1, noKey, []byte(`v1`), ts(1), ts(1), pool.alloc())) @@ -841,3 +832,176 @@ type explicitTimestampOracle hlc.Timestamp func (o explicitTimestampOracle) inclusiveLowerBoundTS() hlc.Timestamp { return hlc.Timestamp(o) } + +// TestCloudStorageSinkFastGzip is a regression test for #129947. +// The original issue was a memory leak from pgzip, the library used for fast +// gzip compression for cloud storage. The leak was caused by a race condition +// between Flush and the async flusher: if the Flush clears files before the +// async flusher closes the compression codec as part of flushing the files, +// and the flush returns an error, the compression codec will not be closed +// properly. This test uses some test-only synchronization points in the cloud +// storage sink to test for the regression. +func TestCloudStorageSinkFastGzip(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + settings := cluster.MakeTestingClusterSettings() + + useFastGzip.Override(context.Background(), &settings.SV, true) + enableAsyncFlush.Override(context.Background(), &settings.SV, true) + + opts := changefeedbase.EncodingOptions{ + Format: changefeedbase.OptFormatJSON, + Envelope: changefeedbase.OptEnvelopeWrapped, + KeyInValue: true, + Compression: "gzip", + } + + testSpan := roachpb.Span{Key: []byte("a"), EndKey: []byte("b")} + sf, err := span.MakeFrontier(testSpan) + require.NoError(t, err) + timestampOracle := &changeAggregatorLowerBoundOracle{sf: sf} + + // Force the storage sink to always return an error. + getErrorWriter := func() io.WriteCloser { + return errorWriter{} + } + mockStorageSink := func(_ context.Context, _ string, _ username.SQLUsername, _ ...cloud.ExternalStorageOption) (cloud.ExternalStorage, error) { + return &mockSinkStorage{writer: getErrorWriter}, nil + } + + // The cloud storage sink calls the AsyncFlushSync function in two different + // goroutines: once in Flush(), and once in the async flusher. By waiting for + // the two goroutines to both reach those points, we can trigger the original + // issue, which was caused by a race condition between the two goroutines + // leading to leaked compression library resources. + wg := sync.WaitGroup{} + waiter := func() { + wg.Done() + wg.Wait() + } + testingKnobs := &TestingKnobs{AsyncFlushSync: waiter} + const sizeInBytes = 100 * 1024 * 1024 // 100MB + + // Test that there's no leak during an async Flush. + t.Run("flush", func(t *testing.T) { + wg.Add(2) + s, err := makeCloudStorageSink( + ctx, sinkURI(t, unlimitedFileSize), 1, settings, opts, timestampOracle, + mockStorageSink, username.RootUserName(), nil /* mb */, testingKnobs, + ) + require.NoError(t, err) + s.(*cloudStorageSink).sinkID = 7 // Force a deterministic sinkID. + + var noKey []byte + for i := 1; i < 10; i++ { + newTopic := makeTopic(fmt.Sprintf(`t%d`, i)) + byteSlice := make([]byte, sizeInBytes) + ts := hlc.Timestamp{WallTime: int64(i)} + _ = s.EmitRow(ctx, newTopic, noKey, byteSlice, ts, ts, zeroAlloc) + } + + // Flush the files and close the sink. Any leaks should be caught after the + // test by leaktest. + _ = s.Flush(ctx) + _ = s.Close() + }) + // Test that there's no leak during an async flushTopicVersions. + t.Run("flushTopicVersions", func(t *testing.T) { + wg.Add(2) + s, err := makeCloudStorageSink( + ctx, sinkURI(t, 2*sizeInBytes), 1, settings, opts, timestampOracle, + mockStorageSink, username.RootUserName(), nil /* mb */, testingKnobs, + ) + require.NoError(t, err) + s.(*cloudStorageSink).sinkID = 7 // Force a deterministic sinkID. + + // Insert data to the same topic with different versions so that they are + // in different files. + var noKey []byte + newTopic := makeTopic("test") + for i := 1; i < 10; i++ { + byteSlice := make([]byte, sizeInBytes) + ts := hlc.Timestamp{WallTime: int64(i)} + newTopic.Version++ + _ = s.EmitRow(ctx, newTopic, noKey, byteSlice, ts, ts, zeroAlloc) + } + + // Flush the files and close the sink. Any leaks should be caught after the + // test by leaktest. + _ = s.(*cloudStorageSink).flushTopicVersions(ctx, newTopic.GetTableName(), int64(newTopic.GetVersion())) + _ = s.Close() + }) +} + +func testDir(t *testing.T) string { + return strings.ReplaceAll(t.Name(), "/", ";") +} + +func sinkURI(t *testing.T, maxFileSize int64) sinkURL { + u, err := url.Parse(fmt.Sprintf("nodelocal://1/%s", testDir(t))) + require.NoError(t, err) + sink := sinkURL{URL: u} + if maxFileSize != unlimitedFileSize { + sink.addParam(changefeedbase.SinkParamFileSize, strconv.FormatInt(maxFileSize, 10)) + } + return sink +} + +// errorWriter always returns an error on writes. +type errorWriter struct{} + +func (errorWriter) Write(_ []byte) (int, error) { + return 0, errors.New("write error") +} +func (errorWriter) Close() error { return nil } + +// mockSinkStorage can be useful for testing to override the WriteCloser. +type mockSinkStorage struct { + writer func() io.WriteCloser +} + +var _ cloud.ExternalStorage = &mockSinkStorage{} + +func (n *mockSinkStorage) Close() error { + return nil +} + +func (n *mockSinkStorage) Conf() cloudpb.ExternalStorage { + return cloudpb.ExternalStorage{Provider: cloudpb.ExternalStorageProvider_null} +} + +func (n *mockSinkStorage) ExternalIOConf() base.ExternalIODirConfig { + return base.ExternalIODirConfig{} +} + +func (n *mockSinkStorage) RequiresExternalIOAccounting() bool { + return false +} + +func (n *mockSinkStorage) Settings() *cluster.Settings { + return nil +} + +func (n *mockSinkStorage) ReadFile( + _ context.Context, _ string, _ cloud.ReadOptions, +) (ioctx.ReadCloserCtx, int64, error) { + return nil, 0, io.EOF +} + +func (n *mockSinkStorage) Writer(_ context.Context, _ string) (io.WriteCloser, error) { + return n.writer(), nil +} + +func (n *mockSinkStorage) List(_ context.Context, _, _ string, _ cloud.ListingFn) error { + return nil +} + +func (n *mockSinkStorage) Delete(_ context.Context, _ string) error { + return nil +} + +func (n *mockSinkStorage) Size(_ context.Context, _ string) (int64, error) { + return 0, nil +}