Skip to content

Commit

Permalink
changefeedccl: add test to repro pgzip memory leak
Browse files Browse the repository at this point in the history
Adds a test that reproduces a memory leak from pgzip, the library used
for fast gzip compression for changefeeds using cloud storage sinks. The
leak was caused by a race condition between Flush/flushTopicVerions and
the async flusher: if the Flush clears files before the async flusher
closes the compression codec as part of flushing the files, and the
flush returns an error, the compression codec will not be closed
properly. This test uses the AsyncFlushSync testing knob to introduce
synchronization points between these two goroutines to trigger the
regression.

Co-authored by: wenyihu6

Epic: none

Release note: none
  • Loading branch information
rharding6373 committed Sep 12, 2024
1 parent c4e4907 commit 53b6fd7
Show file tree
Hide file tree
Showing 2 changed files with 184 additions and 17 deletions.
3 changes: 3 additions & 0 deletions pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,7 @@ go_test(
"//pkg/ccl/storageccl",
"//pkg/ccl/utilccl",
"//pkg/cloud",
"//pkg/cloud/cloudpb",
"//pkg/cloud/impl:cloudimpl",
"//pkg/internal/sqlsmith",
"//pkg/jobs",
Expand Down Expand Up @@ -324,6 +325,7 @@ go_test(
"//pkg/util/encoding",
"//pkg/util/hlc",
"//pkg/util/intsets",
"//pkg/util/ioctx",
"//pkg/util/json",
"//pkg/util/leaktest",
"//pkg/util/log",
Expand Down Expand Up @@ -352,6 +354,7 @@ go_test(
"@com_github_golang_mock//gomock",
"@com_github_ibm_sarama//:sarama",
"@com_github_jackc_pgx_v4//:pgx",
"@com_github_klauspost_compress//gzip",
"@com_github_lib_pq//:pq",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
Expand Down
198 changes: 181 additions & 17 deletions pkg/ccl/changefeedccl/sink_cloudstorage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ package changefeedccl

import (
"bytes"
"compress/gzip"
"context"
"fmt"
"io"
Expand All @@ -21,6 +20,7 @@ import (
"sort"
"strconv"
"strings"
"sync"
"testing"
"time"

Expand All @@ -29,6 +29,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
"github.com/cockroachdb/cockroach/pkg/cloud"
"github.com/cockroachdb/cockroach/pkg/cloud/cloudpb"
_ "github.com/cockroachdb/cockroach/pkg/cloud/impl" // register cloud storage providers
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand All @@ -39,13 +40,18 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/ioctx"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/cockroach/pkg/util/span"
"github.com/cockroachdb/errors"
"github.com/klauspost/compress/gzip"
"github.com/stretchr/testify/require"
)

const unlimitedFileSize int64 = math.MaxInt64

func makeTopic(name string) *tableDescriptorTopic {
id, _ := strconv.ParseUint(name, 36, 64)
desc := tabledesc.NewBuilder(&descpb.TableDescriptor{Name: name, ID: descpb.ID(id)}).BuildImmutableTable()
Expand Down Expand Up @@ -89,10 +95,6 @@ func TestCloudStorageSink(t *testing.T) {
return decompressed
}

testDir := func(t *testing.T) string {
return strings.ReplaceAll(t.Name(), "/", ";")
}

listLeafDirectories := func(t *testing.T) []string {
absRoot := filepath.Join(externalIODir, testDir(t))

Expand Down Expand Up @@ -156,7 +158,6 @@ func TestCloudStorageSink(t *testing.T) {
return files
}

const unlimitedFileSize int64 = math.MaxInt64
var noKey []byte
settings := cluster.MakeTestingClusterSettings()
settings.ExternalIODir = externalIODir
Expand Down Expand Up @@ -184,16 +185,6 @@ func TestCloudStorageSink(t *testing.T) {

user := username.RootUserName()

sinkURI := func(t *testing.T, maxFileSize int64) sinkURL {
u, err := url.Parse(fmt.Sprintf("nodelocal://1/%s", testDir(t)))
require.NoError(t, err)
sink := sinkURL{URL: u}
if maxFileSize != unlimitedFileSize {
sink.addParam(changefeedbase.SinkParamFileSize, strconv.FormatInt(maxFileSize, 10))
}
return sink
}

testWithAndWithoutAsyncFlushing := func(t *testing.T, name string, testFn func(*testing.T)) {
t.Helper()
testutils.RunTrueAndFalse(t, name+"/asyncFlush", func(t *testing.T, enable bool) {
Expand Down Expand Up @@ -276,7 +267,7 @@ func TestCloudStorageSink(t *testing.T) {
require.Equal(t, []string(nil), slurpDir(t))

// Emitting rows and flushing should write them out in one file per table. Note
// the ordering among these two files is non deterministic as either of them could
// the ordering among these two files is non-deterministic as either of them could
// be flushed first (and thus be assigned fileID 0).
var pool testAllocPool
require.NoError(t, s.EmitRow(ctx, t1, noKey, []byte(`v1`), ts(1), ts(1), pool.alloc()))
Expand Down Expand Up @@ -841,3 +832,176 @@ type explicitTimestampOracle hlc.Timestamp
func (o explicitTimestampOracle) inclusiveLowerBoundTS() hlc.Timestamp {
return hlc.Timestamp(o)
}

// TestCloudStorageSinkFastGzip is a regression test for #129947.
// The original issue was a memory leak from pgzip, the library used for fast
// gzip compression for cloud storage. The leak was caused by a race condition
// between Flush and the async flusher: if the Flush clears files before the
// async flusher closes the compression codec as part of flushing the files,
// and the flush returns an error, the compression codec will not be closed
// properly. This test uses some test-only synchronization points in the cloud
// storage sink to test for the regression.
func TestCloudStorageSinkFastGzip(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
settings := cluster.MakeTestingClusterSettings()

useFastGzip.Override(context.Background(), &settings.SV, true)
enableAsyncFlush.Override(context.Background(), &settings.SV, true)

opts := changefeedbase.EncodingOptions{
Format: changefeedbase.OptFormatJSON,
Envelope: changefeedbase.OptEnvelopeWrapped,
KeyInValue: true,
Compression: "gzip",
}

testSpan := roachpb.Span{Key: []byte("a"), EndKey: []byte("b")}
sf, err := span.MakeFrontier(testSpan)
require.NoError(t, err)
timestampOracle := &changeAggregatorLowerBoundOracle{sf: sf}

// Force the storage sink to always return an error.
getErrorWriter := func() io.WriteCloser {
return errorWriter{}
}
mockStorageSink := func(_ context.Context, _ string, _ username.SQLUsername, _ ...cloud.ExternalStorageOption) (cloud.ExternalStorage, error) {
return &mockSinkStorage{writer: getErrorWriter}, nil
}

// The cloud storage sink calls the AsyncFlushSync function in two different
// goroutines: once in Flush(), and once in the async flusher. By waiting for
// the two goroutines to both reach those points, we can trigger the original
// issue, which was caused by a race condition between the two goroutines
// leading to leaked compression library resources.
wg := sync.WaitGroup{}
waiter := func() {
wg.Done()
wg.Wait()
}
testingKnobs := &TestingKnobs{AsyncFlushSync: waiter}
const sizeInBytes = 100 * 1024 * 1024 // 100MB

// Test that there's no leak during an async Flush.
t.Run("flush", func(t *testing.T) {
wg.Add(2)
s, err := makeCloudStorageSink(
ctx, sinkURI(t, unlimitedFileSize), 1, settings, opts, timestampOracle,
mockStorageSink, username.RootUserName(), nil /* mb */, testingKnobs,
)
require.NoError(t, err)
s.(*cloudStorageSink).sinkID = 7 // Force a deterministic sinkID.

var noKey []byte
for i := 1; i < 10; i++ {
newTopic := makeTopic(fmt.Sprintf(`t%d`, i))
byteSlice := make([]byte, sizeInBytes)
ts := hlc.Timestamp{WallTime: int64(i)}
_ = s.EmitRow(ctx, newTopic, noKey, byteSlice, ts, ts, zeroAlloc)
}

// Flush the files and close the sink. Any leaks should be caught after the
// test by leaktest.
_ = s.Flush(ctx)
_ = s.Close()
})
// Test that there's no leak during an async flushTopicVersions.
t.Run("flushTopicVersions", func(t *testing.T) {
wg.Add(2)
s, err := makeCloudStorageSink(
ctx, sinkURI(t, 2*sizeInBytes), 1, settings, opts, timestampOracle,
mockStorageSink, username.RootUserName(), nil /* mb */, testingKnobs,
)
require.NoError(t, err)
s.(*cloudStorageSink).sinkID = 7 // Force a deterministic sinkID.

// Insert data to the same topic with different versions so that they are
// in different files.
var noKey []byte
newTopic := makeTopic("test")
for i := 1; i < 10; i++ {
byteSlice := make([]byte, sizeInBytes)
ts := hlc.Timestamp{WallTime: int64(i)}
newTopic.Version++
_ = s.EmitRow(ctx, newTopic, noKey, byteSlice, ts, ts, zeroAlloc)
}

// Flush the files and close the sink. Any leaks should be caught after the
// test by leaktest.
_ = s.(*cloudStorageSink).flushTopicVersions(ctx, newTopic.GetTableName(), int64(newTopic.GetVersion()))
_ = s.Close()
})
}

func testDir(t *testing.T) string {
return strings.ReplaceAll(t.Name(), "/", ";")
}

func sinkURI(t *testing.T, maxFileSize int64) sinkURL {
u, err := url.Parse(fmt.Sprintf("nodelocal://1/%s", testDir(t)))
require.NoError(t, err)
sink := sinkURL{URL: u}
if maxFileSize != unlimitedFileSize {
sink.addParam(changefeedbase.SinkParamFileSize, strconv.FormatInt(maxFileSize, 10))
}
return sink
}

// errorWriter always returns an error on writes.
type errorWriter struct{}

func (errorWriter) Write(_ []byte) (int, error) {
return 0, errors.New("write error")
}
func (errorWriter) Close() error { return nil }

// mockSinkStorage can be useful for testing to override the WriteCloser.
type mockSinkStorage struct {
writer func() io.WriteCloser
}

var _ cloud.ExternalStorage = &mockSinkStorage{}

func (n *mockSinkStorage) Close() error {
return nil
}

func (n *mockSinkStorage) Conf() cloudpb.ExternalStorage {
return cloudpb.ExternalStorage{Provider: cloudpb.ExternalStorageProvider_null}
}

func (n *mockSinkStorage) ExternalIOConf() base.ExternalIODirConfig {
return base.ExternalIODirConfig{}
}

func (n *mockSinkStorage) RequiresExternalIOAccounting() bool {
return false
}

func (n *mockSinkStorage) Settings() *cluster.Settings {
return nil
}

func (n *mockSinkStorage) ReadFile(
_ context.Context, _ string, _ cloud.ReadOptions,
) (ioctx.ReadCloserCtx, int64, error) {
return nil, 0, io.EOF
}

func (n *mockSinkStorage) Writer(_ context.Context, _ string) (io.WriteCloser, error) {
return n.writer(), nil
}

func (n *mockSinkStorage) List(_ context.Context, _, _ string, _ cloud.ListingFn) error {
return nil
}

func (n *mockSinkStorage) Delete(_ context.Context, _ string) error {
return nil
}

func (n *mockSinkStorage) Size(_ context.Context, _ string) (int64, error) {
return 0, nil
}

0 comments on commit 53b6fd7

Please sign in to comment.