Skip to content

Commit

Permalink
GH-37487: [C++][Parquet] Dataset: Implement sync `ParquetFileFormat::…
Browse files Browse the repository at this point in the history
…GetReader` (#37514)

### Rationale for this change

As #37487 says. When thread cnt == 1, the thread might blocking in `ParquetFileFormat::GetReaderAsync`, that's because:

1. `ParquetFileFormat::CountRows` would call `EnsureCompleteMetadata` in `io_executor`
2. `EnsureCompleteMetadata` call `ParquetFileFormat::GetReader`, which dispatch real request to async mode
3. `async` is executed in `io_executor`.

1/3 in same fix-sized executor, causing deadlock.

### What changes are included in this PR?

Implement sync `ParquetFileFormat::GetReader`.

### Are these changes tested?

Currently not

### Are there any user-facing changes?

Bugfix

* Closes: #37487

Authored-by: mwish <[email protected]>
Signed-off-by: Benjamin Kietzman <[email protected]>
  • Loading branch information
mapleFU authored Sep 19, 2023
1 parent 64ad8e5 commit 76c4a6e
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 12 deletions.
61 changes: 49 additions & 12 deletions cpp/src/arrow/dataset/file_parquet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,22 @@ parquet::ArrowReaderProperties MakeArrowReaderProperties(
return properties;
}

parquet::ArrowReaderProperties MakeArrowReaderProperties(
const ParquetFileFormat& format, const parquet::FileMetaData& metadata,
const ScanOptions& options, const ParquetFragmentScanOptions& parquet_scan_options) {
auto arrow_properties = MakeArrowReaderProperties(format, metadata);
arrow_properties.set_batch_size(options.batch_size);
// Must be set here since the sync ScanTask handles pre-buffering itself
arrow_properties.set_pre_buffer(
parquet_scan_options.arrow_reader_properties->pre_buffer());
arrow_properties.set_cache_options(
parquet_scan_options.arrow_reader_properties->cache_options());
arrow_properties.set_io_context(
parquet_scan_options.arrow_reader_properties->io_context());
arrow_properties.set_use_threads(options.use_threads);
return arrow_properties;
}

template <typename M>
Result<std::shared_ptr<SchemaManifest>> GetSchemaManifest(
const M& metadata, const parquet::ArrowReaderProperties& properties) {
Expand Down Expand Up @@ -410,13 +426,42 @@ Result<std::shared_ptr<Schema>> ParquetFileFormat::Inspect(

Result<std::shared_ptr<parquet::arrow::FileReader>> ParquetFileFormat::GetReader(
const FileSource& source, const std::shared_ptr<ScanOptions>& options) const {
return GetReaderAsync(source, options, nullptr).result();
return GetReader(source, options, /*metadata=*/nullptr);
}

Result<std::shared_ptr<parquet::arrow::FileReader>> ParquetFileFormat::GetReader(
const FileSource& source, const std::shared_ptr<ScanOptions>& options,
const std::shared_ptr<parquet::FileMetaData>& metadata) const {
return GetReaderAsync(source, options, metadata).result();
ARROW_ASSIGN_OR_RAISE(
auto parquet_scan_options,
GetFragmentScanOptions<ParquetFragmentScanOptions>(kParquetTypeName, options.get(),
default_fragment_scan_options));
auto properties =
MakeReaderProperties(*this, parquet_scan_options.get(), options->pool);
ARROW_ASSIGN_OR_RAISE(auto input, source.Open());
// `parquet::ParquetFileReader::Open` will not wrap the exception as status,
// so using `open_parquet_file` to wrap it.
auto open_parquet_file = [&]() -> Result<std::unique_ptr<parquet::ParquetFileReader>> {
BEGIN_PARQUET_CATCH_EXCEPTIONS
auto reader = parquet::ParquetFileReader::Open(std::move(input),
std::move(properties), metadata);
return reader;
END_PARQUET_CATCH_EXCEPTIONS
};

auto reader_opt = open_parquet_file();
if (!reader_opt.ok()) {
return WrapSourceError(reader_opt.status(), source.path());
}
auto reader = std::move(reader_opt).ValueOrDie();

std::shared_ptr<parquet::FileMetaData> reader_metadata = reader->metadata();
auto arrow_properties =
MakeArrowReaderProperties(*this, *reader_metadata, *options, *parquet_scan_options);
std::unique_ptr<parquet::arrow::FileReader> arrow_reader;
RETURN_NOT_OK(parquet::arrow::FileReader::Make(
options->pool, std::move(reader), std::move(arrow_properties), &arrow_reader));
return arrow_reader;
}

Future<std::shared_ptr<parquet::arrow::FileReader>> ParquetFileFormat::GetReaderAsync(
Expand Down Expand Up @@ -445,16 +490,8 @@ Future<std::shared_ptr<parquet::arrow::FileReader>> ParquetFileFormat::GetReader
ARROW_ASSIGN_OR_RAISE(std::unique_ptr<parquet::ParquetFileReader> reader,
reader_fut.MoveResult());
std::shared_ptr<parquet::FileMetaData> metadata = reader->metadata();
auto arrow_properties = MakeArrowReaderProperties(*self, *metadata);
arrow_properties.set_batch_size(options->batch_size);
// Must be set here since the sync ScanTask handles pre-buffering itself
arrow_properties.set_pre_buffer(
parquet_scan_options->arrow_reader_properties->pre_buffer());
arrow_properties.set_cache_options(
parquet_scan_options->arrow_reader_properties->cache_options());
arrow_properties.set_io_context(
parquet_scan_options->arrow_reader_properties->io_context());
arrow_properties.set_use_threads(options->use_threads);
auto arrow_properties =
MakeArrowReaderProperties(*this, *metadata, *options, *parquet_scan_options);
std::unique_ptr<parquet::arrow::FileReader> arrow_reader;
RETURN_NOT_OK(parquet::arrow::FileReader::Make(options->pool, std::move(reader),
std::move(arrow_properties),
Expand Down
25 changes: 25 additions & 0 deletions cpp/src/arrow/dataset/file_parquet_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@
#include "arrow/dataset/file_parquet.h"

#include <memory>
#include <thread>
#include <utility>
#include <vector>

#include "arrow/compute/api_scalar.h"
#include "arrow/dataset/dataset_internal.h"
#include "arrow/dataset/test_util_internal.h"
#include "arrow/io/interfaces.h"
#include "arrow/io/memory.h"
#include "arrow/io/test_common.h"
#include "arrow/io/util_internal.h"
Expand Down Expand Up @@ -367,6 +369,29 @@ TEST_F(TestParquetFileFormat, MultithreadedScan) {
ASSERT_EQ(batches.size(), kNumRowGroups);
}

TEST_F(TestParquetFileFormat, SingleThreadExecutor) {
// Reset capacity for io executor
struct PoolResetGuard {
int original_capacity = io::GetIOThreadPoolCapacity();
~PoolResetGuard() { DCHECK_OK(io::SetIOThreadPoolCapacity(original_capacity)); }
} guard;
ASSERT_OK(io::SetIOThreadPoolCapacity(1));

auto reader = GetRecordBatchReader(schema({field("utf8", utf8())}));

ASSERT_OK_AND_ASSIGN(auto buffer, ParquetFormatHelper::Write(reader.get()));
auto buffer_reader = std::make_shared<::arrow::io::BufferReader>(buffer);
auto source = std::make_shared<FileSource>(std::move(buffer_reader), buffer->size());
auto options = std::make_shared<ScanOptions>();

{
auto fragment = MakeFragment(*source);
auto count_rows = fragment->CountRows(literal(true), options);
ASSERT_OK_AND_ASSIGN(auto result, count_rows.MoveResult());
ASSERT_EQ(expected_rows(), result);
}
}

class TestParquetFileSystemDataset : public WriteFileSystemDatasetMixin,
public testing::Test {
public:
Expand Down

0 comments on commit 76c4a6e

Please sign in to comment.