Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-24.2.3-rc: changefeedccl: fix memory leak in cloud storage sink with fast gzip #130625

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,7 @@ go_test(
"//pkg/ccl/storageccl",
"//pkg/ccl/utilccl",
"//pkg/cloud",
"//pkg/cloud/cloudpb",
"//pkg/cloud/impl:cloudimpl",
"//pkg/internal/sqlsmith",
"//pkg/jobs",
Expand Down Expand Up @@ -324,6 +325,7 @@ go_test(
"//pkg/util/encoding",
"//pkg/util/hlc",
"//pkg/util/intsets",
"//pkg/util/ioctx",
"//pkg/util/json",
"//pkg/util/leaktest",
"//pkg/util/log",
Expand Down Expand Up @@ -352,6 +354,7 @@ go_test(
"@com_github_golang_mock//gomock",
"@com_github_ibm_sarama//:sarama",
"@com_github_jackc_pgx_v4//:pgx",
"@com_github_klauspost_compress//gzip",
"@com_github_lib_pq//:pq",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
Expand Down
46 changes: 44 additions & 2 deletions pkg/ccl/changefeedccl/sink_cloudstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -659,6 +659,27 @@ func (s *cloudStorageSink) flushTopicVersions(
}
return err == nil
})
if err != nil {
return err
}

// Allow synchronization with the async flusher to happen.
if s.testingKnobs != nil && s.testingKnobs.AsyncFlushSync != nil {
s.testingKnobs.AsyncFlushSync()
}

// Wait for the async flush to complete before clearing files.
// Note that if waitAsyncFlush returns an error some successfully
// flushed files may not be removed from s.files. This is ok, since
// the error will trigger the sink to be closed, and we will only use
// s.files to ensure that the codecs are closed before deallocating it.
err = s.waitAsyncFlush(ctx)
if err != nil {
return err
}

// Files need to be cleared after the flush completes, otherwise file
// resources may be leaked.
for _, v := range toRemove {
s.files.Delete(cloudStorageSinkKey{topic: topic, schemaID: v})
}
Expand All @@ -681,9 +702,24 @@ func (s *cloudStorageSink) Flush(ctx context.Context) error {
if err != nil {
return err
}
s.files.Clear(true /* addNodesToFreeList */)
// Allow synchronization with the async flusher to happen.
if s.testingKnobs != nil && s.testingKnobs.AsyncFlushSync != nil {
s.testingKnobs.AsyncFlushSync()
}
s.setDataFileTimestamp()
return s.waitAsyncFlush(ctx)

// Note that if waitAsyncFlush returns an error some successfully
// flushed files may not be removed from s.files. This is ok, since
// the error will trigger the sink to be closed, and we will only use
// s.files to ensure that the codecs are closed before deallocating it.
err = s.waitAsyncFlush(ctx)
if err != nil {
return err
}
// Files need to be cleared after the flush completes, otherwise file resources
// may not be released properly when closing the sink.
s.files.Clear(true /* addNodesToFreeList */)
return nil
}

func (s *cloudStorageSink) setDataFileTimestamp() {
Expand Down Expand Up @@ -816,6 +852,12 @@ func (s *cloudStorageSink) asyncFlusher(ctx context.Context) error {
continue
}

// Allow synchronization with the flushing routine to happen between getting
// the flush request from the channel and completing the flush.
if s.testingKnobs != nil && s.testingKnobs.AsyncFlushSync != nil {
s.testingKnobs.AsyncFlushSync()
}

// flush file to storage.
flushDone := s.metrics.recordFlushRequestCallback()
err := req.file.flushToStorage(ctx, s.es, req.dest, s.metrics)
Expand Down
200 changes: 183 additions & 17 deletions pkg/ccl/changefeedccl/sink_cloudstorage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ package changefeedccl

import (
"bytes"
"compress/gzip"
"context"
"fmt"
"io"
Expand All @@ -21,6 +20,7 @@ import (
"sort"
"strconv"
"strings"
"sync"
"testing"
"time"

Expand All @@ -29,6 +29,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
"github.com/cockroachdb/cockroach/pkg/cloud"
"github.com/cockroachdb/cockroach/pkg/cloud/cloudpb"
_ "github.com/cockroachdb/cockroach/pkg/cloud/impl" // register cloud storage providers
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand All @@ -38,14 +39,20 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/ioctx"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/cockroach/pkg/util/span"
"github.com/cockroachdb/errors"
"github.com/klauspost/compress/gzip"
"github.com/stretchr/testify/require"
)

const unlimitedFileSize int64 = math.MaxInt64

func makeTopic(name string) *tableDescriptorTopic {
id, _ := strconv.ParseUint(name, 36, 64)
desc := tabledesc.NewBuilder(&descpb.TableDescriptor{Name: name, ID: descpb.ID(id)}).BuildImmutableTable()
Expand Down Expand Up @@ -89,10 +96,6 @@ func TestCloudStorageSink(t *testing.T) {
return decompressed
}

testDir := func(t *testing.T) string {
return strings.ReplaceAll(t.Name(), "/", ";")
}

listLeafDirectories := func(t *testing.T) []string {
absRoot := filepath.Join(externalIODir, testDir(t))

Expand Down Expand Up @@ -156,7 +159,6 @@ func TestCloudStorageSink(t *testing.T) {
return files
}

const unlimitedFileSize int64 = math.MaxInt64
var noKey []byte
settings := cluster.MakeTestingClusterSettings()
settings.ExternalIODir = externalIODir
Expand Down Expand Up @@ -184,16 +186,6 @@ func TestCloudStorageSink(t *testing.T) {

user := username.RootUserName()

sinkURI := func(t *testing.T, maxFileSize int64) sinkURL {
u, err := url.Parse(fmt.Sprintf("nodelocal://1/%s", testDir(t)))
require.NoError(t, err)
sink := sinkURL{URL: u}
if maxFileSize != unlimitedFileSize {
sink.addParam(changefeedbase.SinkParamFileSize, strconv.FormatInt(maxFileSize, 10))
}
return sink
}

testWithAndWithoutAsyncFlushing := func(t *testing.T, name string, testFn func(*testing.T)) {
t.Helper()
testutils.RunTrueAndFalse(t, name+"/asyncFlush", func(t *testing.T, enable bool) {
Expand Down Expand Up @@ -276,7 +268,7 @@ func TestCloudStorageSink(t *testing.T) {
require.Equal(t, []string(nil), slurpDir(t))

// Emitting rows and flushing should write them out in one file per table. Note
// the ordering among these two files is non deterministic as either of them could
// the ordering among these two files is non-deterministic as either of them could
// be flushed first (and thus be assigned fileID 0).
var pool testAllocPool
require.NoError(t, s.EmitRow(ctx, t1, noKey, []byte(`v1`), ts(1), ts(1), pool.alloc()))
Expand Down Expand Up @@ -841,3 +833,177 @@ type explicitTimestampOracle hlc.Timestamp
func (o explicitTimestampOracle) inclusiveLowerBoundTS() hlc.Timestamp {
return hlc.Timestamp(o)
}

// TestCloudStorageSinkFastGzip is a regression test for #129947.
// The original issue was a memory leak from pgzip, the library used for fast
// gzip compression for cloud storage. The leak was caused by a race condition
// between Flush and the async flusher: if the Flush clears files before the
// async flusher closes the compression codec as part of flushing the files,
// and the flush returns an error, the compression codec will not be closed
// properly. This test uses some test-only synchronization points in the cloud
// storage sink to test for the regression.
func TestCloudStorageSinkFastGzip(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
skip.UnderRace(t, "#130651")

ctx := context.Background()
settings := cluster.MakeTestingClusterSettings()

useFastGzip.Override(context.Background(), &settings.SV, true)
enableAsyncFlush.Override(context.Background(), &settings.SV, true)

opts := changefeedbase.EncodingOptions{
Format: changefeedbase.OptFormatJSON,
Envelope: changefeedbase.OptEnvelopeWrapped,
KeyInValue: true,
Compression: "gzip",
}

testSpan := roachpb.Span{Key: []byte("a"), EndKey: []byte("b")}
sf, err := span.MakeFrontier(testSpan)
require.NoError(t, err)
timestampOracle := &changeAggregatorLowerBoundOracle{sf: sf}

// Force the storage sink to always return an error.
getErrorWriter := func() io.WriteCloser {
return errorWriter{}
}
mockStorageSink := func(_ context.Context, _ string, _ username.SQLUsername, _ ...cloud.ExternalStorageOption) (cloud.ExternalStorage, error) {
return &mockSinkStorage{writer: getErrorWriter}, nil
}

// The cloud storage sink calls the AsyncFlushSync function in two different
// goroutines: once in Flush(), and once in the async flusher. By waiting for
// the two goroutines to both reach those points, we can trigger the original
// issue, which was caused by a race condition between the two goroutines
// leading to leaked compression library resources.
wg := sync.WaitGroup{}
waiter := func() {
wg.Done()
wg.Wait()
}
testingKnobs := &TestingKnobs{AsyncFlushSync: waiter}
const sizeInBytes = 100 * 1024 * 1024 // 100MB

// Test that there's no leak during an async Flush.
t.Run("flush", func(t *testing.T) {
wg.Add(2)
s, err := makeCloudStorageSink(
ctx, sinkURI(t, unlimitedFileSize), 1, settings, opts, timestampOracle,
mockStorageSink, username.RootUserName(), nil /* mb */, testingKnobs,
)
require.NoError(t, err)
s.(*cloudStorageSink).sinkID = 7 // Force a deterministic sinkID.

var noKey []byte
for i := 1; i < 10; i++ {
newTopic := makeTopic(fmt.Sprintf(`t%d`, i))
byteSlice := make([]byte, sizeInBytes)
ts := hlc.Timestamp{WallTime: int64(i)}
_ = s.EmitRow(ctx, newTopic, noKey, byteSlice, ts, ts, zeroAlloc)
}

// Flush the files and close the sink. Any leaks should be caught after the
// test by leaktest.
_ = s.Flush(ctx)
_ = s.Close()
})
// Test that there's no leak during an async flushTopicVersions.
t.Run("flushTopicVersions", func(t *testing.T) {
wg.Add(2)
s, err := makeCloudStorageSink(
ctx, sinkURI(t, 2*sizeInBytes), 1, settings, opts, timestampOracle,
mockStorageSink, username.RootUserName(), nil /* mb */, testingKnobs,
)
require.NoError(t, err)
s.(*cloudStorageSink).sinkID = 7 // Force a deterministic sinkID.

// Insert data to the same topic with different versions so that they are
// in different files.
var noKey []byte
newTopic := makeTopic("test")
for i := 1; i < 10; i++ {
byteSlice := make([]byte, sizeInBytes)
ts := hlc.Timestamp{WallTime: int64(i)}
newTopic.Version++
_ = s.EmitRow(ctx, newTopic, noKey, byteSlice, ts, ts, zeroAlloc)
}

// Flush the files and close the sink. Any leaks should be caught after the
// test by leaktest.
_ = s.(*cloudStorageSink).flushTopicVersions(ctx, newTopic.GetTableName(), int64(newTopic.GetVersion()))
_ = s.Close()
})
}

func testDir(t *testing.T) string {
return strings.ReplaceAll(t.Name(), "/", ";")
}

func sinkURI(t *testing.T, maxFileSize int64) sinkURL {
u, err := url.Parse(fmt.Sprintf("nodelocal://1/%s", testDir(t)))
require.NoError(t, err)
sink := sinkURL{URL: u}
if maxFileSize != unlimitedFileSize {
sink.addParam(changefeedbase.SinkParamFileSize, strconv.FormatInt(maxFileSize, 10))
}
return sink
}

// errorWriter always returns an error on writes.
type errorWriter struct{}

func (errorWriter) Write(_ []byte) (int, error) {
return 0, errors.New("write error")
}
func (errorWriter) Close() error { return nil }

// mockSinkStorage can be useful for testing to override the WriteCloser.
type mockSinkStorage struct {
writer func() io.WriteCloser
}

var _ cloud.ExternalStorage = &mockSinkStorage{}

func (n *mockSinkStorage) Close() error {
return nil
}

func (n *mockSinkStorage) Conf() cloudpb.ExternalStorage {
return cloudpb.ExternalStorage{Provider: cloudpb.ExternalStorageProvider_null}
}

func (n *mockSinkStorage) ExternalIOConf() base.ExternalIODirConfig {
return base.ExternalIODirConfig{}
}

func (n *mockSinkStorage) RequiresExternalIOAccounting() bool {
return false
}

func (n *mockSinkStorage) Settings() *cluster.Settings {
return nil
}

func (n *mockSinkStorage) ReadFile(
_ context.Context, _ string, _ cloud.ReadOptions,
) (ioctx.ReadCloserCtx, int64, error) {
return nil, 0, io.EOF
}

func (n *mockSinkStorage) Writer(_ context.Context, _ string) (io.WriteCloser, error) {
return n.writer(), nil
}

func (n *mockSinkStorage) List(_ context.Context, _, _ string, _ cloud.ListingFn) error {
return nil
}

func (n *mockSinkStorage) Delete(_ context.Context, _ string) error {
return nil
}

func (n *mockSinkStorage) Size(_ context.Context, _ string) (int64, error) {
return 0, nil
}
3 changes: 3 additions & 0 deletions pkg/ccl/changefeedccl/testing_knobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ type TestingKnobs struct {

// OverrideExecCfg returns a modified ExecutorConfig to use under tests.
OverrideExecCfg func(actual *sql.ExecutorConfig) *sql.ExecutorConfig

// AsyncFlushSync is called in async flush goroutines as a way to provide synchronization between them.
AsyncFlushSync func()
}

// ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface.
Expand Down