-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
GH-37917: [Parquet] Add OpenAsync for FileSource #37918
Conversation
|
Not sure about the details of the implementation but got it to compile and it seems to work as expected. |
Windows failed is not related, but I think here we need a
you can just |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
General LGTM
return Future<std::shared_ptr<io::RandomAccessFile>>::MakeFinished(std::make_shared<io::BufferReader>(buffer_)); | ||
} | ||
|
||
return Future<std::shared_ptr<io::RandomAccessFile>>::MakeFinished(custom_open_()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, since custom_open_
might still blocking here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, wasn't sure how to deal with it. FileSource would need its own IOContext
instance to run it async?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can leave a comment here, and keep it here? I think at lease, open via fileSystem could be help, and custom_open_
won't be worse
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 left a todo
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this TODO is valuable but should be linked to a new issue where we extend custom_open_
to return Future<std::share_ptr<io::RandomAccessFile>>
. That would be necessary to enable a custom but non blocking open
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Created a new issue and updated the comment #37962
[path]( | ||
const Status& status) -> Result<std::unique_ptr<parquet::ParquetFileReader>> { | ||
return WrapSourceError(status, path); | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would WrapSourceError
called twice since L516-L519 already called that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, I removed the callback
c9b99f2
to
7e2d91c
Compare
auto rfut = parquet::ParquetFileReader::OpenAsync( | ||
std::move(input), std::move(properties), metadata); | ||
ARROW_ASSIGN_OR_RAISE(auto reader, rfut.MoveResult()); | ||
return reader; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we just return rfut
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The return value needs to be of type Result, so the future needs to be unwrapped. Not sure if this was the right way to do it, but seems to work
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For a continuation (callback passed to Future::Then) the return type can also be Future. Please change it to Future, then you can return rfut
directly instead of blocking in a continuation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I see, thanks. For some reason I get a note: copy constructor is implicitly deleted because 'unique_ptr<parquet::ParquetFileReader>' has a user-declared move constructor
if I try to return the future, any idea what I'm missing?
Hmm, noticed that the execution freezes when I set io_thread_count <= fragment_readahead (testing with Pyarrow). A deadlock probably? I wonder if it's introduced by this change, or if it's a bug that just manifests now because of higher concurrency. I think I have observed such a freeze recently with a recent Pyarrow release, but couldn't reproduce. |
return Future<std::shared_ptr<io::RandomAccessFile>>::MakeFinished(std::make_shared<io::BufferReader>(buffer_)); | ||
} | ||
|
||
return Future<std::shared_ptr<io::RandomAccessFile>>::MakeFinished(custom_open_()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this TODO is valuable but should be linked to a new issue where we extend custom_open_
to return Future<std::share_ptr<io::RandomAccessFile>>
. That would be necessary to enable a custom but non blocking open
input_fut.Then([=](const std::shared_ptr<arrow::io::RandomAccessFile>&) mutable | ||
-> Result<std::unique_ptr<parquet::ParquetFileReader>> { | ||
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::io::RandomAccessFile> input, | ||
input_fut.MoveResult()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of copying input_fut
implicitly by keeping it in closure, I think it'd be more clear to use the value passed to the callback:
input_fut.Then([=](const std::shared_ptr<arrow::io::RandomAccessFile>&) mutable | |
-> Result<std::unique_ptr<parquet::ParquetFileReader>> { | |
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::io::RandomAccessFile> input, | |
input_fut.MoveResult()); | |
source.OpenAsync().Then([=](std::shared_ptr<arrow::io::RandomAccessFile> input) | |
-> Result<std::unique_ptr<parquet::ParquetFileReader>> { |
(The TODO(ARROW-12259)
only applies to futures of move-only types as returned by ParquetFileReader::OpenAsync()
, and doesn't apply to the return value of source.OpenAsync()
.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, let me give a more complete suggestion
Future<std::shared_ptr<parquet::arrow::FileReader>> ParquetFileFormat::GetReaderAsync(
const FileSource& source, const std::shared_ptr<ScanOptions>& options,
const std::shared_ptr<parquet::FileMetaData>& metadata) const {
ARROW_ASSIGN_OR_RAISE(
auto parquet_scan_options,
GetFragmentScanOptions<ParquetFragmentScanOptions>(kParquetTypeName, options.get(),
default_fragment_scan_options));
auto properties =
MakeReaderProperties(*this, parquet_scan_options.get(), options->pool);
auto self = checked_pointer_cast<const ParquetFileFormat>(shared_from_this());
return source.OpenAsync().Then([=](const std::shared_ptr<io::RandomAccessFile>& input) mutable {
return parquet::ParquetFileReader::OpenAsync(input, std::move(properties), metadata)
.Then(
[=](const std::unique_ptr<parquet::ParquetFileReader>& reader) mutable
-> Result<std::shared_ptr<parquet::arrow::FileReader>> {
auto arrow_properties = MakeArrowReaderProperties(
*self, *reader->metadata(), *options, *parquet_scan_options);
std::unique_ptr<parquet::arrow::FileReader> arrow_reader;
RETURN_NOT_OK(parquet::arrow::FileReader::Make(
options->pool,
// TODO(ARROW-12259): workaround since we have Future<(move-only type)>
// It *wouldn't* be safe to const_cast reader except that here we know
// there are no other waiters on the reader.
std::move(
const_cast<std::unique_ptr<parquet::ParquetFileReader>&>(reader)),
std::move(arrow_properties), &arrow_reader));
return std::move(arrow_reader);
},
[path = source.path()](const Status& status)
-> Result<std::shared_ptr<parquet::arrow::FileReader>> {
return WrapSourceError(status, path);
});
});
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@lidavidm re my alternative ARROW-12259 workaround
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a bit yikes, but if it works
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you very much, updated code to this and it seems to work. The deadlock issue I mentioned earlier is also gone.
auto rfut = parquet::ParquetFileReader::OpenAsync( | ||
std::move(input), std::move(properties), metadata); | ||
ARROW_ASSIGN_OR_RAISE(auto reader, rfut.MoveResult()); | ||
return reader; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For a continuation (callback passed to Future::Then) the return type can also be Future. Please change it to Future, then you can return rfut
directly instead of blocking in a continuation
@eeroel https://github.com/apache/arrow/pull/37918/files#r1340188707 I think this is the problem that we call blocking in async execution. It's a bit like this problem I fixed: #37514 . The problem is that blocking in async function under same Executor is a bad behavior, which might cause deadlock. |
OK, thanks for the explanation. Looks like this is fixed in @bkietz suggestion |
LGTM now. Lets wait for committers review. |
I think we need to clang-format the file |
✅ |
Sorry, still had an issue with formatting in file_base.cc, should be fixed now |
CI passed, should we merge or trigger more CI @bkietz |
@eeroel could you please rebase to pick up the fix ( https://github.com/apache/arrow/pull/37867/files#diff-1bba462ab050e89360fd88110a689e85ee037749cea091a1848ab574381d3795R155 ) for several of the CI failures ( https://github.com/apache/arrow/actions/runs/6383550289/job/17363301059?pr=37918#step:6:1217 ) |
290ba22
to
51bad89
Compare
👍 done |
CI failure seems unrelated. +1, thanks! |
After merging your PR, Conbench analyzed the 6 benchmarking runs that have been run so far on merge-commit 02de3c1. There were no benchmark performance regressions. 🎉 The full Conbench report has more details. It also includes information about 2 possible false positives for unstable benchmarks that are known to sometimes produce them. |
### Rationale for this change Improves performance of file reads with an expensive Open operation. ### What changes are included in this PR? ### Are these changes tested? ### Are there any user-facing changes? No * Closes: apache#37917 Authored-by: Eero Lihavainen <[email protected]> Signed-off-by: Benjamin Kietzman <[email protected]>
### Rationale for this change Improves performance of file reads with an expensive Open operation. ### What changes are included in this PR? ### Are these changes tested? ### Are there any user-facing changes? No * Closes: apache#37917 Authored-by: Eero Lihavainen <[email protected]> Signed-off-by: Benjamin Kietzman <[email protected]>
### Rationale for this change Improves performance of file reads with an expensive Open operation. ### What changes are included in this PR? ### Are these changes tested? ### Are there any user-facing changes? No * Closes: apache#37917 Authored-by: Eero Lihavainen <[email protected]> Signed-off-by: Benjamin Kietzman <[email protected]>
Rationale for this change
Improves performance of file reads with an expensive Open operation.
What changes are included in this PR?
Are these changes tested?
Are there any user-facing changes?
No