Skip to content

Commit

Permalink
add test for Async Scan
Browse files Browse the repository at this point in the history
  • Loading branch information
mapleFU committed Nov 14, 2023
1 parent d38d982 commit 8564034
Showing 1 changed file with 67 additions and 0 deletions.
67 changes: 67 additions & 0 deletions cpp/src/arrow/dataset/file_parquet_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -834,5 +834,72 @@ TEST(TestParquetStatistics, NullMax) {
EXPECT_EQ(stat_expression->ToString(), "(x >= 1)");
}

class DelayedBufferReader : public ::arrow::io::BufferReader {
public:
explicit DelayedBufferReader(const std::shared_ptr<::arrow::Buffer>& buffer)
: ::arrow::io::BufferReader(buffer) {}

::arrow::Future<std::shared_ptr<Buffer>> ReadAsync(
const ::arrow::io::IOContext& io_context, int64_t position,
int64_t nbytes) override {
read_async_count.fetch_add(1);
auto self = std::dynamic_pointer_cast<DelayedBufferReader>(shared_from_this());
return DeferNotOk(::arrow::io::internal::SubmitIO(
io_context, [self, position, nbytes]() -> Result<std::shared_ptr<Buffer>> {
std::this_thread::sleep_for(std::chrono::seconds(1));
return self->DoReadAt(position, nbytes);
}));
}

std::atomic<int> read_async_count{0};
};

TEST_F(TestParquetFileFormat, MultithreadedScanUnsafe) {
auto reader = MakeGeneratedRecordBatch(schema({field("utf8", utf8())}), 10000, 100);

ASSERT_OK_AND_ASSIGN(auto buffer, ParquetFormatHelper::Write(reader.get()));

std::vector<Future<>> completes;
std::vector<std::shared_ptr<arrow::internal::ThreadPool>> pools;

for (int idx = 0; idx < 2; ++idx) {
auto buffer_reader = std::make_shared<DelayedBufferReader>(buffer);
auto source = std::make_shared<FileSource>(buffer_reader, buffer->size());
auto fragment = MakeFragment(*source);
std::shared_ptr<Scanner> scanner;

{
auto options = std::make_shared<ScanOptions>();
ASSERT_OK_AND_ASSIGN(auto thread_pool, arrow::internal::ThreadPool::Make(1));
pools.emplace_back(thread_pool);
options->io_context =
::arrow::io::IOContext(::arrow::default_memory_pool(), pools.back().get());
auto fragment_scan_options = std::make_shared<ParquetFragmentScanOptions>();
fragment_scan_options->arrow_reader_properties->set_pre_buffer(true);

options->fragment_scan_options = fragment_scan_options;
options->use_threads = true;
ScannerBuilder builder(ArithmeticDatasetFixture::schema(), fragment, options);

ASSERT_OK(builder.UseThreads(true));
ASSERT_OK(builder.BatchSize(10000));
ASSERT_OK_AND_ASSIGN(scanner, builder.Finish());
}

ASSERT_OK_AND_ASSIGN(auto batch, scanner->Head(10000));
[[maybe_unused]] auto fut = scanner->ScanBatchesUnorderedAsync();
// Random ReadAsync calls, generate some futures to make the state machine
// more complex.
for (int yy = 0; yy < 16; yy++) {
completes.emplace_back(buffer_reader->ReadAsync(::arrow::io::IOContext(), 0, 1001));
}
scanner = nullptr;
}

for (auto& f : completes) {
f.Wait();
}
}

} // namespace dataset
} // namespace arrow

0 comments on commit 8564034

Please sign in to comment.