From 518cfaddc0d4affa8a0b6a07472e6eb81e3ee128 Mon Sep 17 00:00:00 2001 From: Eero Lihavainen Date: Wed, 4 Oct 2023 17:52:04 +0300 Subject: [PATCH] GH-37917: [Parquet] Add OpenAsync for FileSource (#37918) ### Rationale for this change Improves performance of file reads with an expensive Open operation. ### What changes are included in this PR? ### Are these changes tested? ### Are there any user-facing changes? No * Closes: #37917 Authored-by: Eero Lihavainen Signed-off-by: Benjamin Kietzman --- cpp/src/arrow/dataset/file_base.cc | 14 ++++++++ cpp/src/arrow/dataset/file_base.h | 1 + cpp/src/arrow/dataset/file_parquet.cc | 50 +++++++++++++++------------ 3 files changed, 43 insertions(+), 22 deletions(-) diff --git a/cpp/src/arrow/dataset/file_base.cc b/cpp/src/arrow/dataset/file_base.cc index 2fcd57d2f3622..6a97b51cf2815 100644 --- a/cpp/src/arrow/dataset/file_base.cc +++ b/cpp/src/arrow/dataset/file_base.cc @@ -81,6 +81,20 @@ Result> FileSource::Open() const { return custom_open_(); } +Future> FileSource::OpenAsync() const { + if (filesystem_) { + return filesystem_->OpenInputFileAsync(file_info_); + } + + if (buffer_) { + return Future>::MakeFinished( + std::make_shared(buffer_)); + } + + // TODO(GH-37962): custom_open_ should not block + return Future>::MakeFinished(custom_open_()); +} + int64_t FileSource::Size() const { if (filesystem_) { return file_info_.size(); diff --git a/cpp/src/arrow/dataset/file_base.h b/cpp/src/arrow/dataset/file_base.h index d33d88e9966fe..46fc8ebc40db0 100644 --- a/cpp/src/arrow/dataset/file_base.h +++ b/cpp/src/arrow/dataset/file_base.h @@ -115,6 +115,7 @@ class ARROW_DS_EXPORT FileSource : public util::EqualityComparable { /// \brief Get a RandomAccessFile which views this file source Result> Open() const; + Future> OpenAsync() const; /// \brief Get the size (in bytes) of the file or buffer /// If the file is compressed this should be the compressed (on-disk) size. diff --git a/cpp/src/arrow/dataset/file_parquet.cc b/cpp/src/arrow/dataset/file_parquet.cc index 751937e93b937..3cad1ddd8321f 100644 --- a/cpp/src/arrow/dataset/file_parquet.cc +++ b/cpp/src/arrow/dataset/file_parquet.cc @@ -479,29 +479,35 @@ Future> ParquetFileFormat::GetReader default_fragment_scan_options)); auto properties = MakeReaderProperties(*this, parquet_scan_options.get(), options->pool); - ARROW_ASSIGN_OR_RAISE(auto input, source.Open()); - // TODO(ARROW-12259): workaround since we have Future<(move-only type)> - auto reader_fut = parquet::ParquetFileReader::OpenAsync( - std::move(input), std::move(properties), metadata); - auto path = source.path(); + auto self = checked_pointer_cast(shared_from_this()); - return reader_fut.Then( - [=](const std::unique_ptr&) mutable - -> Result> { - ARROW_ASSIGN_OR_RAISE(std::unique_ptr reader, - reader_fut.MoveResult()); - std::shared_ptr metadata = reader->metadata(); - auto arrow_properties = - MakeArrowReaderProperties(*this, *metadata, *options, *parquet_scan_options); - std::unique_ptr arrow_reader; - RETURN_NOT_OK(parquet::arrow::FileReader::Make(options->pool, std::move(reader), - std::move(arrow_properties), - &arrow_reader)); - return std::move(arrow_reader); - }, - [path]( - const Status& status) -> Result> { - return WrapSourceError(status, path); + + return source.OpenAsync().Then( + [=](const std::shared_ptr& input) mutable { + return parquet::ParquetFileReader::OpenAsync(input, std::move(properties), + metadata) + .Then( + [=](const std::unique_ptr& reader) mutable + -> Result> { + auto arrow_properties = MakeArrowReaderProperties( + *self, *reader->metadata(), *options, *parquet_scan_options); + + std::unique_ptr arrow_reader; + RETURN_NOT_OK(parquet::arrow::FileReader::Make( + options->pool, + // TODO(ARROW-12259): workaround since we have Future<(move-only + // type)> It *wouldn't* be safe to const_cast reader except that + // here we know there are no other waiters on the reader. + std::move(const_cast&>( + reader)), + std::move(arrow_properties), &arrow_reader)); + + return std::move(arrow_reader); + }, + [path = source.path()](const Status& status) + -> Result> { + return WrapSourceError(status, path); + }); }); }