Skip to content

Commit

Permalink
GH-38438: [C++] Dataset: Trying to fix the async bug in Parquet datas…
Browse files Browse the repository at this point in the history
…et (#38466)

### Rationale for this change

Origin mentioned #38438

1. When PreBuffer is default enabled, the code in `RowGroupGenerator::FetchNext` would switch to async mode. This make the state handling more complex
2. In `RowGroupGenerator::FetchNext`, `[this]` is captured without `shared_from_this`. This is not bad, however, `this->executor_` may point to a invalid address if this dtor.

This patch also fixes a lifetime issue I founded in CSV handling.

### What changes are included in this PR?

1. Fix handling in `cpp/src/parquet/arrow/reader.cc` as I talked above
2. Fix a lifetime problem in CSV

### Are these changes tested?

I test it locality. But don't know how to write unittest here. Fell free to help.

### Are there any user-facing changes?

Bugfix

* Closes: #38438

Authored-by: mwish <[email protected]>
Signed-off-by: Benjamin Kietzman <[email protected]>
  • Loading branch information
mapleFU authored Nov 17, 2023
1 parent 26cf0e0 commit 951d92a
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 13 deletions.
21 changes: 11 additions & 10 deletions cpp/src/arrow/csv/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1113,16 +1113,17 @@ class AsyncThreadedTableReader
Future<std::shared_ptr<Buffer>> ProcessFirstBuffer() {
// First block
auto first_buffer_future = buffer_generator_();
return first_buffer_future.Then([this](const std::shared_ptr<Buffer>& first_buffer)
-> Result<std::shared_ptr<Buffer>> {
if (first_buffer == nullptr) {
return Status::Invalid("Empty CSV file");
}
std::shared_ptr<Buffer> first_buffer_processed;
RETURN_NOT_OK(ProcessHeader(first_buffer, &first_buffer_processed));
RETURN_NOT_OK(MakeColumnBuilders());
return first_buffer_processed;
});
return first_buffer_future.Then(
[self = shared_from_this()](const std::shared_ptr<Buffer>& first_buffer)
-> Result<std::shared_ptr<Buffer>> {
if (first_buffer == nullptr) {
return Status::Invalid("Empty CSV file");
}
std::shared_ptr<Buffer> first_buffer_processed;
RETURN_NOT_OK(self->ProcessHeader(first_buffer, &first_buffer_processed));
RETURN_NOT_OK(self->MakeColumnBuilders());
return first_buffer_processed;
});
}

Executor* cpu_executor_;
Expand Down
69 changes: 69 additions & 0 deletions cpp/src/arrow/dataset/file_parquet_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -834,5 +834,74 @@ TEST(TestParquetStatistics, NullMax) {
EXPECT_EQ(stat_expression->ToString(), "(x >= 1)");
}

class DelayedBufferReader : public ::arrow::io::BufferReader {
public:
explicit DelayedBufferReader(const std::shared_ptr<::arrow::Buffer>& buffer)
: ::arrow::io::BufferReader(buffer) {}

::arrow::Future<std::shared_ptr<Buffer>> ReadAsync(
const ::arrow::io::IOContext& io_context, int64_t position,
int64_t nbytes) override {
read_async_count.fetch_add(1);
auto self = std::dynamic_pointer_cast<DelayedBufferReader>(shared_from_this());
return DeferNotOk(::arrow::io::internal::SubmitIO(
io_context, [self, position, nbytes]() -> Result<std::shared_ptr<Buffer>> {
std::this_thread::sleep_for(std::chrono::seconds(1));
return self->DoReadAt(position, nbytes);
}));
}

std::atomic<int> read_async_count{0};
};

TEST_F(TestParquetFileFormat, MultithreadedScanRegression) {
// GH-38438: This test is similar to MultithreadedScan, but it try to use self
// designed Executor and DelayedBufferReader to mock async execution to make
// the state machine more complex.
auto reader = MakeGeneratedRecordBatch(schema({field("utf8", utf8())}), 10000, 100);

ASSERT_OK_AND_ASSIGN(auto buffer, ParquetFormatHelper::Write(reader.get()));

std::vector<Future<>> completes;
std::vector<std::shared_ptr<arrow::internal::ThreadPool>> pools;

for (int idx = 0; idx < 2; ++idx) {
auto buffer_reader = std::make_shared<DelayedBufferReader>(buffer);
auto source = std::make_shared<FileSource>(buffer_reader, buffer->size());
auto fragment = MakeFragment(*source);
std::shared_ptr<Scanner> scanner;

{
auto options = std::make_shared<ScanOptions>();
ASSERT_OK_AND_ASSIGN(auto thread_pool, arrow::internal::ThreadPool::Make(1));
pools.emplace_back(thread_pool);
options->io_context =
::arrow::io::IOContext(::arrow::default_memory_pool(), pools.back().get());
auto fragment_scan_options = std::make_shared<ParquetFragmentScanOptions>();
fragment_scan_options->arrow_reader_properties->set_pre_buffer(true);

options->fragment_scan_options = fragment_scan_options;
ScannerBuilder builder(ArithmeticDatasetFixture::schema(), fragment, options);

ASSERT_OK(builder.UseThreads(true));
ASSERT_OK(builder.BatchSize(10000));
ASSERT_OK_AND_ASSIGN(scanner, builder.Finish());
}

ASSERT_OK_AND_ASSIGN(auto batch, scanner->Head(10000));
[[maybe_unused]] auto fut = scanner->ScanBatchesUnorderedAsync();
// Random ReadAsync calls, generate some futures to make the state machine
// more complex.
for (int yy = 0; yy < 16; yy++) {
completes.emplace_back(buffer_reader->ReadAsync(::arrow::io::IOContext(), 0, 1001));
}
scanner = nullptr;
}

for (auto& f : completes) {
f.Wait();
}
}

} // namespace dataset
} // namespace arrow
6 changes: 3 additions & 3 deletions cpp/src/parquet/arrow/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1123,10 +1123,10 @@ class RowGroupGenerator {
auto ready = reader->parquet_reader()->WhenBuffered({row_group}, column_indices);
if (cpu_executor_) ready = cpu_executor_->TransferAlways(ready);
row_group_read =
ready.Then([this, reader, row_group,
ready.Then([cpu_executor = cpu_executor_, reader, row_group,
column_indices = std::move(
column_indices)]() -> ::arrow::Future<RecordBatchGenerator> {
return ReadOneRowGroup(cpu_executor_, reader, row_group, column_indices);
return ReadOneRowGroup(cpu_executor, reader, row_group, column_indices);
});
}
in_flight_reads_.push({std::move(row_group_read), num_rows});
Expand Down Expand Up @@ -1182,7 +1182,7 @@ FileReaderImpl::GetRecordBatchGenerator(std::shared_ptr<FileReader> reader,
int64_t rows_to_readahead) {
RETURN_NOT_OK(BoundsCheck(row_group_indices, column_indices));
if (rows_to_readahead < 0) {
return Status::Invalid("rows_to_readahead must be > 0");
return Status::Invalid("rows_to_readahead must be >= 0");
}
if (reader_properties_.pre_buffer()) {
BEGIN_PARQUET_CATCH_EXCEPTIONS
Expand Down

0 comments on commit 951d92a

Please sign in to comment.