From e7ddd79bb756288371e719d6abe48866f7a1e2f8 Mon Sep 17 00:00:00 2001 From: mwish Date: Thu, 26 Oct 2023 03:24:29 +0800 Subject: [PATCH 1/4] Trying to fix the async bug --- cpp/src/arrow/csv/reader.cc | 7 ++++--- cpp/src/parquet/arrow/reader.cc | 5 +++-- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/cpp/src/arrow/csv/reader.cc b/cpp/src/arrow/csv/reader.cc index bf703b6c6ba28..ff2bb7608c835 100644 --- a/cpp/src/arrow/csv/reader.cc +++ b/cpp/src/arrow/csv/reader.cc @@ -1112,15 +1112,16 @@ class AsyncThreadedTableReader protected: Future> ProcessFirstBuffer() { // First block + auto self = shared_from_this(); auto first_buffer_future = buffer_generator_(); - return first_buffer_future.Then([this](const std::shared_ptr& first_buffer) + return first_buffer_future.Then([self](const std::shared_ptr& first_buffer) -> Result> { if (first_buffer == nullptr) { return Status::Invalid("Empty CSV file"); } std::shared_ptr first_buffer_processed; - RETURN_NOT_OK(ProcessHeader(first_buffer, &first_buffer_processed)); - RETURN_NOT_OK(MakeColumnBuilders()); + RETURN_NOT_OK(self->ProcessHeader(first_buffer, &first_buffer_processed)); + RETURN_NOT_OK(self->MakeColumnBuilders()); return first_buffer_processed; }); } diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc index 99b8a9ccef1fa..0739b828a0afe 100644 --- a/cpp/src/parquet/arrow/reader.cc +++ b/cpp/src/parquet/arrow/reader.cc @@ -1122,11 +1122,12 @@ class RowGroupGenerator { } else { auto ready = reader->parquet_reader()->WhenBuffered({row_group}, column_indices); if (cpu_executor_) ready = cpu_executor_->TransferAlways(ready); + auto cpu_executor = cpu_executor_; row_group_read = - ready.Then([this, reader, row_group, + ready.Then([cpu_executor, reader, row_group, column_indices = std::move( column_indices)]() -> ::arrow::Future { - return ReadOneRowGroup(cpu_executor_, reader, row_group, column_indices); + return ReadOneRowGroup(cpu_executor, reader, row_group, column_indices); }); } in_flight_reads_.push({std::move(row_group_read), num_rows}); From 9ba33bf085cedcf17e0a1146f03a77ae2ba40826 Mon Sep 17 00:00:00 2001 From: mwish Date: Thu, 26 Oct 2023 10:08:49 +0800 Subject: [PATCH 2/4] Resolve comment --- cpp/src/arrow/csv/reader.cc | 22 +++++++++++----------- cpp/src/parquet/arrow/reader.cc | 5 ++--- 2 files changed, 13 insertions(+), 14 deletions(-) diff --git a/cpp/src/arrow/csv/reader.cc b/cpp/src/arrow/csv/reader.cc index ff2bb7608c835..30fc0bc6aca44 100644 --- a/cpp/src/arrow/csv/reader.cc +++ b/cpp/src/arrow/csv/reader.cc @@ -1112,18 +1112,18 @@ class AsyncThreadedTableReader protected: Future> ProcessFirstBuffer() { // First block - auto self = shared_from_this(); auto first_buffer_future = buffer_generator_(); - return first_buffer_future.Then([self](const std::shared_ptr& first_buffer) - -> Result> { - if (first_buffer == nullptr) { - return Status::Invalid("Empty CSV file"); - } - std::shared_ptr first_buffer_processed; - RETURN_NOT_OK(self->ProcessHeader(first_buffer, &first_buffer_processed)); - RETURN_NOT_OK(self->MakeColumnBuilders()); - return first_buffer_processed; - }); + return first_buffer_future.Then( + [self = shared_from_this()](const std::shared_ptr& first_buffer) + -> Result> { + if (first_buffer == nullptr) { + return Status::Invalid("Empty CSV file"); + } + std::shared_ptr first_buffer_processed; + RETURN_NOT_OK(self->ProcessHeader(first_buffer, &first_buffer_processed)); + RETURN_NOT_OK(self->MakeColumnBuilders()); + return first_buffer_processed; + }); } Executor* cpu_executor_; diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc index 0739b828a0afe..d6ad7c25bc7c1 100644 --- a/cpp/src/parquet/arrow/reader.cc +++ b/cpp/src/parquet/arrow/reader.cc @@ -1122,9 +1122,8 @@ class RowGroupGenerator { } else { auto ready = reader->parquet_reader()->WhenBuffered({row_group}, column_indices); if (cpu_executor_) ready = cpu_executor_->TransferAlways(ready); - auto cpu_executor = cpu_executor_; row_group_read = - ready.Then([cpu_executor, reader, row_group, + ready.Then([cpu_executor = cpu_executor_, reader, row_group, column_indices = std::move( column_indices)]() -> ::arrow::Future { return ReadOneRowGroup(cpu_executor, reader, row_group, column_indices); @@ -1183,7 +1182,7 @@ FileReaderImpl::GetRecordBatchGenerator(std::shared_ptr reader, int64_t rows_to_readahead) { RETURN_NOT_OK(BoundsCheck(row_group_indices, column_indices)); if (rows_to_readahead < 0) { - return Status::Invalid("rows_to_readahead must be > 0"); + return Status::Invalid("rows_to_readahead must be >= 0"); } if (reader_properties_.pre_buffer()) { BEGIN_PARQUET_CATCH_EXCEPTIONS From 8564034963c3f6e68eff95b204a2a1bde5d9be46 Mon Sep 17 00:00:00 2001 From: mwish Date: Tue, 14 Nov 2023 23:51:01 +0800 Subject: [PATCH 3/4] add test for Async Scan --- cpp/src/arrow/dataset/file_parquet_test.cc | 67 ++++++++++++++++++++++ 1 file changed, 67 insertions(+) diff --git a/cpp/src/arrow/dataset/file_parquet_test.cc b/cpp/src/arrow/dataset/file_parquet_test.cc index c22cf33eb35f7..4cc7e4e529053 100644 --- a/cpp/src/arrow/dataset/file_parquet_test.cc +++ b/cpp/src/arrow/dataset/file_parquet_test.cc @@ -834,5 +834,72 @@ TEST(TestParquetStatistics, NullMax) { EXPECT_EQ(stat_expression->ToString(), "(x >= 1)"); } +class DelayedBufferReader : public ::arrow::io::BufferReader { + public: + explicit DelayedBufferReader(const std::shared_ptr<::arrow::Buffer>& buffer) + : ::arrow::io::BufferReader(buffer) {} + + ::arrow::Future> ReadAsync( + const ::arrow::io::IOContext& io_context, int64_t position, + int64_t nbytes) override { + read_async_count.fetch_add(1); + auto self = std::dynamic_pointer_cast(shared_from_this()); + return DeferNotOk(::arrow::io::internal::SubmitIO( + io_context, [self, position, nbytes]() -> Result> { + std::this_thread::sleep_for(std::chrono::seconds(1)); + return self->DoReadAt(position, nbytes); + })); + } + + std::atomic read_async_count{0}; +}; + +TEST_F(TestParquetFileFormat, MultithreadedScanUnsafe) { + auto reader = MakeGeneratedRecordBatch(schema({field("utf8", utf8())}), 10000, 100); + + ASSERT_OK_AND_ASSIGN(auto buffer, ParquetFormatHelper::Write(reader.get())); + + std::vector> completes; + std::vector> pools; + + for (int idx = 0; idx < 2; ++idx) { + auto buffer_reader = std::make_shared(buffer); + auto source = std::make_shared(buffer_reader, buffer->size()); + auto fragment = MakeFragment(*source); + std::shared_ptr scanner; + + { + auto options = std::make_shared(); + ASSERT_OK_AND_ASSIGN(auto thread_pool, arrow::internal::ThreadPool::Make(1)); + pools.emplace_back(thread_pool); + options->io_context = + ::arrow::io::IOContext(::arrow::default_memory_pool(), pools.back().get()); + auto fragment_scan_options = std::make_shared(); + fragment_scan_options->arrow_reader_properties->set_pre_buffer(true); + + options->fragment_scan_options = fragment_scan_options; + options->use_threads = true; + ScannerBuilder builder(ArithmeticDatasetFixture::schema(), fragment, options); + + ASSERT_OK(builder.UseThreads(true)); + ASSERT_OK(builder.BatchSize(10000)); + ASSERT_OK_AND_ASSIGN(scanner, builder.Finish()); + } + + ASSERT_OK_AND_ASSIGN(auto batch, scanner->Head(10000)); + [[maybe_unused]] auto fut = scanner->ScanBatchesUnorderedAsync(); + // Random ReadAsync calls, generate some futures to make the state machine + // more complex. + for (int yy = 0; yy < 16; yy++) { + completes.emplace_back(buffer_reader->ReadAsync(::arrow::io::IOContext(), 0, 1001)); + } + scanner = nullptr; + } + + for (auto& f : completes) { + f.Wait(); + } +} + } // namespace dataset } // namespace arrow From a93ea704d8c2de55f85476cd0a17f2cb77007fa9 Mon Sep 17 00:00:00 2001 From: mwish Date: Wed, 15 Nov 2023 10:46:34 +0800 Subject: [PATCH 4/4] update comment --- cpp/src/arrow/dataset/file_parquet_test.cc | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/cpp/src/arrow/dataset/file_parquet_test.cc b/cpp/src/arrow/dataset/file_parquet_test.cc index 4cc7e4e529053..84d4342a25e20 100644 --- a/cpp/src/arrow/dataset/file_parquet_test.cc +++ b/cpp/src/arrow/dataset/file_parquet_test.cc @@ -854,7 +854,10 @@ class DelayedBufferReader : public ::arrow::io::BufferReader { std::atomic read_async_count{0}; }; -TEST_F(TestParquetFileFormat, MultithreadedScanUnsafe) { +TEST_F(TestParquetFileFormat, MultithreadedScanRegression) { + // GH-38438: This test is similar to MultithreadedScan, but it try to use self + // designed Executor and DelayedBufferReader to mock async execution to make + // the state machine more complex. auto reader = MakeGeneratedRecordBatch(schema({field("utf8", utf8())}), 10000, 100); ASSERT_OK_AND_ASSIGN(auto buffer, ParquetFormatHelper::Write(reader.get())); @@ -878,7 +881,6 @@ TEST_F(TestParquetFileFormat, MultithreadedScanUnsafe) { fragment_scan_options->arrow_reader_properties->set_pre_buffer(true); options->fragment_scan_options = fragment_scan_options; - options->use_threads = true; ScannerBuilder builder(ArithmeticDatasetFixture::schema(), fragment, options); ASSERT_OK(builder.UseThreads(true));