Skip to content

Commit

Permalink
feat: add openasync for filesource
Browse files Browse the repository at this point in the history
  • Loading branch information
eeroel committed Sep 28, 2023
1 parent e038498 commit 7e2d91c
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 3 deletions.
12 changes: 12 additions & 0 deletions cpp/src/arrow/dataset/file_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,18 @@ Result<std::shared_ptr<io::RandomAccessFile>> FileSource::Open() const {
return custom_open_();
}

Future<std::shared_ptr<io::RandomAccessFile>> FileSource::OpenAsync() const {
if (filesystem_) {
return filesystem_->OpenInputFileAsync(file_info_);
}

if (buffer_) {
return Future<std::shared_ptr<io::RandomAccessFile>>::MakeFinished(std::make_shared<io::BufferReader>(buffer_));
}

return Future<std::shared_ptr<io::RandomAccessFile>>::MakeFinished(custom_open_());
}

int64_t FileSource::Size() const {
if (filesystem_) {
return file_info_.size();
Expand Down
1 change: 1 addition & 0 deletions cpp/src/arrow/dataset/file_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ class ARROW_DS_EXPORT FileSource : public util::EqualityComparable<FileSource> {

/// \brief Get a RandomAccessFile which views this file source
Result<std::shared_ptr<io::RandomAccessFile>> Open() const;
Future<std::shared_ptr<io::RandomAccessFile>> OpenAsync() const;

/// \brief Get the size (in bytes) of the file or buffer
/// If the file is compressed this should be the compressed (on-disk) size.
Expand Down
16 changes: 13 additions & 3 deletions cpp/src/arrow/dataset/file_parquet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -479,11 +479,21 @@ Future<std::shared_ptr<parquet::arrow::FileReader>> ParquetFileFormat::GetReader
default_fragment_scan_options));
auto properties =
MakeReaderProperties(*this, parquet_scan_options.get(), options->pool);
ARROW_ASSIGN_OR_RAISE(auto input, source.Open());
auto input_fut = source.OpenAsync();
// TODO(ARROW-12259): workaround since we have Future<(move-only type)>
auto reader_fut = parquet::ParquetFileReader::OpenAsync(
std::move(input), std::move(properties), metadata);

auto path = source.path();
auto reader_fut = input_fut.Then(
[=](const std::shared_ptr<arrow::io::RandomAccessFile>&) mutable
-> Result<std::unique_ptr<parquet::ParquetFileReader>> {
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::io::RandomAccessFile> input,
input_fut.MoveResult());
auto rfut = parquet::ParquetFileReader::OpenAsync(std::move(input), std::move(properties), metadata);
ARROW_ASSIGN_OR_RAISE(auto reader,
rfut.MoveResult());
return reader;
});

auto self = checked_pointer_cast<const ParquetFileFormat>(shared_from_this());
return reader_fut.Then(
[=](const std::unique_ptr<parquet::ParquetFileReader>&) mutable
Expand Down

0 comments on commit 7e2d91c

Please sign in to comment.