diff --git a/pkg/ccl/changefeedccl/sink_cloudstorage.go b/pkg/ccl/changefeedccl/sink_cloudstorage.go index 26abbc72257f..676d243c6c7f 100644 --- a/pkg/ccl/changefeedccl/sink_cloudstorage.go +++ b/pkg/ccl/changefeedccl/sink_cloudstorage.go @@ -659,10 +659,8 @@ func (s *cloudStorageSink) flushTopicVersions( } return err == nil }) - - // Files need to be cleared after the flush completes, otherwise file resources - for _, v := range toRemove { - s.files.Delete(cloudStorageSinkKey{topic: topic, schemaID: v}) + if err != nil { + return err } // Allow synchronization with the async flusher to happen. @@ -670,6 +668,21 @@ func (s *cloudStorageSink) flushTopicVersions( s.testingKnobs.AsyncFlushSync() } + // Wait for the async flush to complete before clearing files. + // Note that if waitAsyncFlush returns an error some successfully + // flushed files may not be removed from s.files. This is ok, since + // the error will trigger the sink to be closed, and we will only use + // s.files to ensure that the codecs are closed before deallocating it. + err = s.waitAsyncFlush(ctx) + if err != nil { + return err + } + + // Files need to be cleared after the flush completes, otherwise file + // resources may be leaked. + for _, v := range toRemove { + s.files.Delete(cloudStorageSinkKey{topic: topic, schemaID: v}) + } return err } @@ -689,14 +702,24 @@ func (s *cloudStorageSink) Flush(ctx context.Context) error { if err != nil { return err } - s.files.Clear(true /* addNodesToFreeList */) // Allow synchronization with the async flusher to happen. if s.testingKnobs != nil && s.testingKnobs.AsyncFlushSync != nil { s.testingKnobs.AsyncFlushSync() } - s.setDataFileTimestamp() - return s.waitAsyncFlush(ctx) + + // Note that if waitAsyncFlush returns an error some successfully + // flushed files may not be removed from s.files. This is ok, since + // the error will trigger the sink to be closed, and we will only use + // s.files to ensure that the codecs are closed before deallocating it. + err = s.waitAsyncFlush(ctx) + if err != nil { + return err + } + // Files need to be cleared after the flush completes, otherwise file resources + // may not be released properly when closing the sink. + s.files.Clear(true /* addNodesToFreeList */) + return nil } func (s *cloudStorageSink) setDataFileTimestamp() { diff --git a/pkg/ccl/changefeedccl/sink_cloudstorage_test.go b/pkg/ccl/changefeedccl/sink_cloudstorage_test.go index 1cf52f3c106d..ab03cccbf2bc 100644 --- a/pkg/ccl/changefeedccl/sink_cloudstorage_test.go +++ b/pkg/ccl/changefeedccl/sink_cloudstorage_test.go @@ -39,6 +39,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc" "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/ioctx" "github.com/cockroachdb/cockroach/pkg/util/leaktest" @@ -844,6 +845,7 @@ func (o explicitTimestampOracle) inclusiveLowerBoundTS() hlc.Timestamp { func TestCloudStorageSinkFastGzip(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + skip.UnderRace(t, "#130651") ctx := context.Background() settings := cluster.MakeTestingClusterSettings()