Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-38438: [C++] Dataset: Trying to fix the async bug in Parquet dataset #38466

Merged
merged 5 commits into from
Nov 17, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 11 additions & 10 deletions cpp/src/arrow/csv/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1113,16 +1113,17 @@ class AsyncThreadedTableReader
Future<std::shared_ptr<Buffer>> ProcessFirstBuffer() {
// First block
auto first_buffer_future = buffer_generator_();
return first_buffer_future.Then([this](const std::shared_ptr<Buffer>& first_buffer)
-> Result<std::shared_ptr<Buffer>> {
if (first_buffer == nullptr) {
return Status::Invalid("Empty CSV file");
}
std::shared_ptr<Buffer> first_buffer_processed;
RETURN_NOT_OK(ProcessHeader(first_buffer, &first_buffer_processed));
RETURN_NOT_OK(MakeColumnBuilders());
return first_buffer_processed;
});
return first_buffer_future.Then(
[self = shared_from_this()](const std::shared_ptr<Buffer>& first_buffer)
-> Result<std::shared_ptr<Buffer>> {
if (first_buffer == nullptr) {
return Status::Invalid("Empty CSV file");
}
std::shared_ptr<Buffer> first_buffer_processed;
RETURN_NOT_OK(self->ProcessHeader(first_buffer, &first_buffer_processed));
RETURN_NOT_OK(self->MakeColumnBuilders());
return first_buffer_processed;
});
}

Executor* cpu_executor_;
Expand Down
67 changes: 67 additions & 0 deletions cpp/src/arrow/dataset/file_parquet_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -834,5 +834,72 @@ TEST(TestParquetStatistics, NullMax) {
EXPECT_EQ(stat_expression->ToString(), "(x >= 1)");
}

class DelayedBufferReader : public ::arrow::io::BufferReader {
public:
explicit DelayedBufferReader(const std::shared_ptr<::arrow::Buffer>& buffer)
: ::arrow::io::BufferReader(buffer) {}

::arrow::Future<std::shared_ptr<Buffer>> ReadAsync(
const ::arrow::io::IOContext& io_context, int64_t position,
int64_t nbytes) override {
read_async_count.fetch_add(1);
auto self = std::dynamic_pointer_cast<DelayedBufferReader>(shared_from_this());
return DeferNotOk(::arrow::io::internal::SubmitIO(
io_context, [self, position, nbytes]() -> Result<std::shared_ptr<Buffer>> {
std::this_thread::sleep_for(std::chrono::seconds(1));
return self->DoReadAt(position, nbytes);
}));
}

std::atomic<int> read_async_count{0};
};

TEST_F(TestParquetFileFormat, MultithreadedScanUnsafe) {
auto reader = MakeGeneratedRecordBatch(schema({field("utf8", utf8())}), 10000, 100);

ASSERT_OK_AND_ASSIGN(auto buffer, ParquetFormatHelper::Write(reader.get()));

std::vector<Future<>> completes;
std::vector<std::shared_ptr<arrow::internal::ThreadPool>> pools;

for (int idx = 0; idx < 2; ++idx) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

using 2 to make the state machine a bit complex, also make we use another Executor

auto buffer_reader = std::make_shared<DelayedBufferReader>(buffer);
auto source = std::make_shared<FileSource>(buffer_reader, buffer->size());
auto fragment = MakeFragment(*source);
std::shared_ptr<Scanner> scanner;

{
auto options = std::make_shared<ScanOptions>();
ASSERT_OK_AND_ASSIGN(auto thread_pool, arrow::internal::ThreadPool::Make(1));
pools.emplace_back(thread_pool);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pool is for case mentioned in the description

options->io_context =
::arrow::io::IOContext(::arrow::default_memory_pool(), pools.back().get());
auto fragment_scan_options = std::make_shared<ParquetFragmentScanOptions>();
fragment_scan_options->arrow_reader_properties->set_pre_buffer(true);

options->fragment_scan_options = fragment_scan_options;
options->use_threads = true;
ScannerBuilder builder(ArithmeticDatasetFixture::schema(), fragment, options);

ASSERT_OK(builder.UseThreads(true));
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thread is neccessary for testing

ASSERT_OK(builder.BatchSize(10000));
ASSERT_OK_AND_ASSIGN(scanner, builder.Finish());
}

ASSERT_OK_AND_ASSIGN(auto batch, scanner->Head(10000));
[[maybe_unused]] auto fut = scanner->ScanBatchesUnorderedAsync();
// Random ReadAsync calls, generate some futures to make the state machine
// more complex.
for (int yy = 0; yy < 16; yy++) {
completes.emplace_back(buffer_reader->ReadAsync(::arrow::io::IOContext(), 0, 1001));
}
scanner = nullptr;
}

for (auto& f : completes) {
f.Wait();
}
}

} // namespace dataset
} // namespace arrow
6 changes: 3 additions & 3 deletions cpp/src/parquet/arrow/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1123,10 +1123,10 @@ class RowGroupGenerator {
auto ready = reader->parquet_reader()->WhenBuffered({row_group}, column_indices);
if (cpu_executor_) ready = cpu_executor_->TransferAlways(ready);
row_group_read =
ready.Then([this, reader, row_group,
ready.Then([cpu_executor = cpu_executor_, reader, row_group,
column_indices = std::move(
column_indices)]() -> ::arrow::Future<RecordBatchGenerator> {
return ReadOneRowGroup(cpu_executor_, reader, row_group, column_indices);
return ReadOneRowGroup(cpu_executor, reader, row_group, column_indices);
});
}
in_flight_reads_.push({std::move(row_group_read), num_rows});
Expand Down Expand Up @@ -1182,7 +1182,7 @@ FileReaderImpl::GetRecordBatchGenerator(std::shared_ptr<FileReader> reader,
int64_t rows_to_readahead) {
RETURN_NOT_OK(BoundsCheck(row_group_indices, column_indices));
if (rows_to_readahead < 0) {
return Status::Invalid("rows_to_readahead must be > 0");
return Status::Invalid("rows_to_readahead must be >= 0");
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be 0 here.

}
if (reader_properties_.pre_buffer()) {
BEGIN_PARQUET_CATCH_EXCEPTIONS
Expand Down
Loading