From 78df341c26c9284acfa3b2d6f9c2fd23004b70e3 Mon Sep 17 00:00:00 2001 From: Eero Lihavainen Date: Wed, 27 Sep 2023 22:41:53 +0300 Subject: [PATCH 1/7] feat: add openasync for filesource --- cpp/src/arrow/dataset/file_base.cc | 12 ++++++++++++ cpp/src/arrow/dataset/file_base.h | 1 + cpp/src/arrow/dataset/file_parquet.cc | 16 +++++++++++++--- 3 files changed, 26 insertions(+), 3 deletions(-) diff --git a/cpp/src/arrow/dataset/file_base.cc b/cpp/src/arrow/dataset/file_base.cc index 2fcd57d2f3622..eac45e4906866 100644 --- a/cpp/src/arrow/dataset/file_base.cc +++ b/cpp/src/arrow/dataset/file_base.cc @@ -81,6 +81,18 @@ Result> FileSource::Open() const { return custom_open_(); } +Future> FileSource::OpenAsync() const { + if (filesystem_) { + return filesystem_->OpenInputFileAsync(file_info_); + } + + if (buffer_) { + return Future>::MakeFinished(std::make_shared(buffer_)); + } + + return Future>::MakeFinished(custom_open_()); +} + int64_t FileSource::Size() const { if (filesystem_) { return file_info_.size(); diff --git a/cpp/src/arrow/dataset/file_base.h b/cpp/src/arrow/dataset/file_base.h index d33d88e9966fe..46fc8ebc40db0 100644 --- a/cpp/src/arrow/dataset/file_base.h +++ b/cpp/src/arrow/dataset/file_base.h @@ -115,6 +115,7 @@ class ARROW_DS_EXPORT FileSource : public util::EqualityComparable { /// \brief Get a RandomAccessFile which views this file source Result> Open() const; + Future> OpenAsync() const; /// \brief Get the size (in bytes) of the file or buffer /// If the file is compressed this should be the compressed (on-disk) size. diff --git a/cpp/src/arrow/dataset/file_parquet.cc b/cpp/src/arrow/dataset/file_parquet.cc index 751937e93b937..46cdb50a56b5d 100644 --- a/cpp/src/arrow/dataset/file_parquet.cc +++ b/cpp/src/arrow/dataset/file_parquet.cc @@ -479,11 +479,21 @@ Future> ParquetFileFormat::GetReader default_fragment_scan_options)); auto properties = MakeReaderProperties(*this, parquet_scan_options.get(), options->pool); - ARROW_ASSIGN_OR_RAISE(auto input, source.Open()); + auto input_fut = source.OpenAsync(); // TODO(ARROW-12259): workaround since we have Future<(move-only type)> - auto reader_fut = parquet::ParquetFileReader::OpenAsync( - std::move(input), std::move(properties), metadata); + auto path = source.path(); + auto reader_fut = input_fut.Then( + [=](const std::shared_ptr&) mutable + -> Result> { + ARROW_ASSIGN_OR_RAISE(std::shared_ptr input, + input_fut.MoveResult()); + auto rfut = parquet::ParquetFileReader::OpenAsync(std::move(input), std::move(properties), metadata); + ARROW_ASSIGN_OR_RAISE(auto reader, + rfut.MoveResult()); + return reader; + }); + auto self = checked_pointer_cast(shared_from_this()); return reader_fut.Then( [=](const std::unique_ptr&) mutable From d14ffe888bc65b202ff406f96a66f0da263e1e3e Mon Sep 17 00:00:00 2001 From: Eero Lihavainen Date: Thu, 28 Sep 2023 11:26:54 +0300 Subject: [PATCH 2/7] add todo --- cpp/src/arrow/dataset/file_base.cc | 1 + 1 file changed, 1 insertion(+) diff --git a/cpp/src/arrow/dataset/file_base.cc b/cpp/src/arrow/dataset/file_base.cc index eac45e4906866..fd880c916d2cd 100644 --- a/cpp/src/arrow/dataset/file_base.cc +++ b/cpp/src/arrow/dataset/file_base.cc @@ -90,6 +90,7 @@ Future> FileSource::OpenAsync() const { return Future>::MakeFinished(std::make_shared(buffer_)); } + // TODO(GH-37917): custom_open_ should not block return Future>::MakeFinished(custom_open_()); } From b36912e5d999b33eafe67965b163246f4289ead1 Mon Sep 17 00:00:00 2001 From: Eero Lihavainen Date: Thu, 28 Sep 2023 11:27:54 +0300 Subject: [PATCH 3/7] formatting --- cpp/src/arrow/dataset/file_parquet.cc | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/cpp/src/arrow/dataset/file_parquet.cc b/cpp/src/arrow/dataset/file_parquet.cc index 46cdb50a56b5d..091bf2ed4631a 100644 --- a/cpp/src/arrow/dataset/file_parquet.cc +++ b/cpp/src/arrow/dataset/file_parquet.cc @@ -483,17 +483,17 @@ Future> ParquetFileFormat::GetReader // TODO(ARROW-12259): workaround since we have Future<(move-only type)> auto path = source.path(); - auto reader_fut = input_fut.Then( - [=](const std::shared_ptr&) mutable - -> Result> { + auto reader_fut = + input_fut.Then([=](const std::shared_ptr&) mutable + -> Result> { ARROW_ASSIGN_OR_RAISE(std::shared_ptr input, input_fut.MoveResult()); - auto rfut = parquet::ParquetFileReader::OpenAsync(std::move(input), std::move(properties), metadata); - ARROW_ASSIGN_OR_RAISE(auto reader, - rfut.MoveResult()); + auto rfut = parquet::ParquetFileReader::OpenAsync( + std::move(input), std::move(properties), metadata); + ARROW_ASSIGN_OR_RAISE(auto reader, rfut.MoveResult()); return reader; }); - + auto self = checked_pointer_cast(shared_from_this()); return reader_fut.Then( [=](const std::unique_ptr&) mutable From 6ee9a0e494ea9e8ce64dfde6c3ebb27e86ff703b Mon Sep 17 00:00:00 2001 From: Eero Lihavainen Date: Sat, 30 Sep 2023 18:23:31 +0300 Subject: [PATCH 4/7] new version --- cpp/src/arrow/dataset/file_parquet.cc | 58 ++++++++++++--------------- 1 file changed, 26 insertions(+), 32 deletions(-) diff --git a/cpp/src/arrow/dataset/file_parquet.cc b/cpp/src/arrow/dataset/file_parquet.cc index 091bf2ed4631a..5d62bc31304bb 100644 --- a/cpp/src/arrow/dataset/file_parquet.cc +++ b/cpp/src/arrow/dataset/file_parquet.cc @@ -479,40 +479,34 @@ Future> ParquetFileFormat::GetReader default_fragment_scan_options)); auto properties = MakeReaderProperties(*this, parquet_scan_options.get(), options->pool); - auto input_fut = source.OpenAsync(); - // TODO(ARROW-12259): workaround since we have Future<(move-only type)> - - auto path = source.path(); - auto reader_fut = - input_fut.Then([=](const std::shared_ptr&) mutable - -> Result> { - ARROW_ASSIGN_OR_RAISE(std::shared_ptr input, - input_fut.MoveResult()); - auto rfut = parquet::ParquetFileReader::OpenAsync( - std::move(input), std::move(properties), metadata); - ARROW_ASSIGN_OR_RAISE(auto reader, rfut.MoveResult()); - return reader; - }); auto self = checked_pointer_cast(shared_from_this()); - return reader_fut.Then( - [=](const std::unique_ptr&) mutable - -> Result> { - ARROW_ASSIGN_OR_RAISE(std::unique_ptr reader, - reader_fut.MoveResult()); - std::shared_ptr metadata = reader->metadata(); - auto arrow_properties = - MakeArrowReaderProperties(*this, *metadata, *options, *parquet_scan_options); - std::unique_ptr arrow_reader; - RETURN_NOT_OK(parquet::arrow::FileReader::Make(options->pool, std::move(reader), - std::move(arrow_properties), - &arrow_reader)); - return std::move(arrow_reader); - }, - [path]( - const Status& status) -> Result> { - return WrapSourceError(status, path); - }); + + return source.OpenAsync().Then([=](const std::shared_ptr& input) mutable { + return parquet::ParquetFileReader::OpenAsync(input, std::move(properties), metadata) + .Then( + [=](const std::unique_ptr& reader) mutable + -> Result> { + auto arrow_properties = MakeArrowReaderProperties( + *self, *reader->metadata(), *options, *parquet_scan_options); + + std::unique_ptr arrow_reader; + RETURN_NOT_OK(parquet::arrow::FileReader::Make( + options->pool, + // TODO(ARROW-12259): workaround since we have Future<(move-only type)> + // It *wouldn't* be safe to const_cast reader except that here we know + // there are no other waiters on the reader. + std::move( + const_cast&>(reader)), + std::move(arrow_properties), &arrow_reader)); + + return std::move(arrow_reader); + }, + [path = source.path()](const Status& status) + -> Result> { + return WrapSourceError(status, path); + }); + }); } struct SlicingGenerator { From 6d4d2178992ac1b0c645a0bc711429b705f7c34e Mon Sep 17 00:00:00 2001 From: Eero Lihavainen Date: Sat, 30 Sep 2023 18:33:19 +0300 Subject: [PATCH 5/7] update todo issue --- cpp/src/arrow/dataset/file_base.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/arrow/dataset/file_base.cc b/cpp/src/arrow/dataset/file_base.cc index fd880c916d2cd..824365c38755f 100644 --- a/cpp/src/arrow/dataset/file_base.cc +++ b/cpp/src/arrow/dataset/file_base.cc @@ -90,7 +90,7 @@ Future> FileSource::OpenAsync() const { return Future>::MakeFinished(std::make_shared(buffer_)); } - // TODO(GH-37917): custom_open_ should not block + // TODO(GH-37962): custom_open_ should not block return Future>::MakeFinished(custom_open_()); } From d90f24d711454e9eaa364b43c9c8175cfedf9ab1 Mon Sep 17 00:00:00 2001 From: Eero Lihavainen Date: Mon, 2 Oct 2023 18:43:44 +0300 Subject: [PATCH 6/7] formatting --- cpp/src/arrow/dataset/file_parquet.cc | 50 ++++++++++++++------------- 1 file changed, 26 insertions(+), 24 deletions(-) diff --git a/cpp/src/arrow/dataset/file_parquet.cc b/cpp/src/arrow/dataset/file_parquet.cc index 5d62bc31304bb..3cad1ddd8321f 100644 --- a/cpp/src/arrow/dataset/file_parquet.cc +++ b/cpp/src/arrow/dataset/file_parquet.cc @@ -482,31 +482,33 @@ Future> ParquetFileFormat::GetReader auto self = checked_pointer_cast(shared_from_this()); - return source.OpenAsync().Then([=](const std::shared_ptr& input) mutable { - return parquet::ParquetFileReader::OpenAsync(input, std::move(properties), metadata) - .Then( - [=](const std::unique_ptr& reader) mutable - -> Result> { - auto arrow_properties = MakeArrowReaderProperties( - *self, *reader->metadata(), *options, *parquet_scan_options); - - std::unique_ptr arrow_reader; - RETURN_NOT_OK(parquet::arrow::FileReader::Make( - options->pool, - // TODO(ARROW-12259): workaround since we have Future<(move-only type)> - // It *wouldn't* be safe to const_cast reader except that here we know - // there are no other waiters on the reader. - std::move( - const_cast&>(reader)), - std::move(arrow_properties), &arrow_reader)); - - return std::move(arrow_reader); - }, - [path = source.path()](const Status& status) + return source.OpenAsync().Then( + [=](const std::shared_ptr& input) mutable { + return parquet::ParquetFileReader::OpenAsync(input, std::move(properties), + metadata) + .Then( + [=](const std::unique_ptr& reader) mutable -> Result> { - return WrapSourceError(status, path); - }); - }); + auto arrow_properties = MakeArrowReaderProperties( + *self, *reader->metadata(), *options, *parquet_scan_options); + + std::unique_ptr arrow_reader; + RETURN_NOT_OK(parquet::arrow::FileReader::Make( + options->pool, + // TODO(ARROW-12259): workaround since we have Future<(move-only + // type)> It *wouldn't* be safe to const_cast reader except that + // here we know there are no other waiters on the reader. + std::move(const_cast&>( + reader)), + std::move(arrow_properties), &arrow_reader)); + + return std::move(arrow_reader); + }, + [path = source.path()](const Status& status) + -> Result> { + return WrapSourceError(status, path); + }); + }); } struct SlicingGenerator { From 51bad89ee1e7ecc80db637112911d00253ba6e28 Mon Sep 17 00:00:00 2001 From: Eero Lihavainen Date: Mon, 2 Oct 2023 20:38:44 +0300 Subject: [PATCH 7/7] more formatting --- cpp/src/arrow/dataset/file_base.cc | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/cpp/src/arrow/dataset/file_base.cc b/cpp/src/arrow/dataset/file_base.cc index 824365c38755f..6a97b51cf2815 100644 --- a/cpp/src/arrow/dataset/file_base.cc +++ b/cpp/src/arrow/dataset/file_base.cc @@ -85,9 +85,10 @@ Future> FileSource::OpenAsync() const { if (filesystem_) { return filesystem_->OpenInputFileAsync(file_info_); } - + if (buffer_) { - return Future>::MakeFinished(std::make_shared(buffer_)); + return Future>::MakeFinished( + std::make_shared(buffer_)); } // TODO(GH-37962): custom_open_ should not block