From 8564034963c3f6e68eff95b204a2a1bde5d9be46 Mon Sep 17 00:00:00 2001 From: mwish Date: Tue, 14 Nov 2023 23:51:01 +0800 Subject: [PATCH] add test for Async Scan --- cpp/src/arrow/dataset/file_parquet_test.cc | 67 ++++++++++++++++++++++ 1 file changed, 67 insertions(+) diff --git a/cpp/src/arrow/dataset/file_parquet_test.cc b/cpp/src/arrow/dataset/file_parquet_test.cc index c22cf33eb35f7..4cc7e4e529053 100644 --- a/cpp/src/arrow/dataset/file_parquet_test.cc +++ b/cpp/src/arrow/dataset/file_parquet_test.cc @@ -834,5 +834,72 @@ TEST(TestParquetStatistics, NullMax) { EXPECT_EQ(stat_expression->ToString(), "(x >= 1)"); } +class DelayedBufferReader : public ::arrow::io::BufferReader { + public: + explicit DelayedBufferReader(const std::shared_ptr<::arrow::Buffer>& buffer) + : ::arrow::io::BufferReader(buffer) {} + + ::arrow::Future> ReadAsync( + const ::arrow::io::IOContext& io_context, int64_t position, + int64_t nbytes) override { + read_async_count.fetch_add(1); + auto self = std::dynamic_pointer_cast(shared_from_this()); + return DeferNotOk(::arrow::io::internal::SubmitIO( + io_context, [self, position, nbytes]() -> Result> { + std::this_thread::sleep_for(std::chrono::seconds(1)); + return self->DoReadAt(position, nbytes); + })); + } + + std::atomic read_async_count{0}; +}; + +TEST_F(TestParquetFileFormat, MultithreadedScanUnsafe) { + auto reader = MakeGeneratedRecordBatch(schema({field("utf8", utf8())}), 10000, 100); + + ASSERT_OK_AND_ASSIGN(auto buffer, ParquetFormatHelper::Write(reader.get())); + + std::vector> completes; + std::vector> pools; + + for (int idx = 0; idx < 2; ++idx) { + auto buffer_reader = std::make_shared(buffer); + auto source = std::make_shared(buffer_reader, buffer->size()); + auto fragment = MakeFragment(*source); + std::shared_ptr scanner; + + { + auto options = std::make_shared(); + ASSERT_OK_AND_ASSIGN(auto thread_pool, arrow::internal::ThreadPool::Make(1)); + pools.emplace_back(thread_pool); + options->io_context = + ::arrow::io::IOContext(::arrow::default_memory_pool(), pools.back().get()); + auto fragment_scan_options = std::make_shared(); + fragment_scan_options->arrow_reader_properties->set_pre_buffer(true); + + options->fragment_scan_options = fragment_scan_options; + options->use_threads = true; + ScannerBuilder builder(ArithmeticDatasetFixture::schema(), fragment, options); + + ASSERT_OK(builder.UseThreads(true)); + ASSERT_OK(builder.BatchSize(10000)); + ASSERT_OK_AND_ASSIGN(scanner, builder.Finish()); + } + + ASSERT_OK_AND_ASSIGN(auto batch, scanner->Head(10000)); + [[maybe_unused]] auto fut = scanner->ScanBatchesUnorderedAsync(); + // Random ReadAsync calls, generate some futures to make the state machine + // more complex. + for (int yy = 0; yy < 16; yy++) { + completes.emplace_back(buffer_reader->ReadAsync(::arrow::io::IOContext(), 0, 1001)); + } + scanner = nullptr; + } + + for (auto& f : completes) { + f.Wait(); + } +} + } // namespace dataset } // namespace arrow