-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: parallel parquet can underflow when max_record_batch_rows < execution.batch_size #9737
fix: parallel parquet can underflow when max_record_batch_rows < execution.batch_size #9737
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @devinjdangelo -- this code looks good to me. I think there are some small issues with the test to fix, but otherwise I think this is good to go.
🙏
I ran the test without the changes in this PR and it fails like
b.rs (target/debug/deps/datafusion-4cbfc61ad6017be4)
attempt to subtract with overflow
thread 'dataframe::parquet::tests::write_parquet_with_small_rg_size' panicked at datafusion/core/src/datasource/file_format/parquet.rs:885:33:
attempt to subtract with overflow
stack backtrace:
} | ||
let output_path = "file://local/test.parquet"; | ||
|
||
for rg_size in (1..7).step_by(5) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My reading of the docs and my playground experiments suggests this is the same as [1, 6]
-- is that the intent? Or did you mean 1, 5, 10, 15, 20, 25, 30, 35
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, [1, 6] is all I meant and would be clearer... I originally wanted to loop over more rg sizes but the test was slow. If we streamline the test, we could actually range over more values here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This now loops over 0..10 with datafusion.execution.batch_size set to 10.
.await?; | ||
|
||
// Check that file actually used the correct rg size | ||
let file = std::fs::File::open(tmp_dir.into_path().join("test.parquet"))?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Calling into_path
here I think means the file won't be cleaned up
I think calling path()
would ensure the file is cleaned up
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I fixed this in the new test and a preexisting one.
let mut test_df = test_util::test_table().await?; | ||
// make the test data larger so there are multiple batches | ||
for _ in 0..7 { | ||
test_df = test_df.clone().union(test_df)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When I ran this test it takes more than 22 seconds on my laptop. I wonder if we really need to generate so much data -- maybe we can try slicing up the batch (or else maybe use larger rg_sizes)
$ cargo test --lib -p datafusion -- write_parquet_with_small_rg_size
...
Finished test [unoptimized + debuginfo] target(s) in 0.16s
Running unittests src/lib.rs (target/debug/deps/datafusion-4cbfc61ad6017be4)
running 1 test
test dataframe::parquet::tests::write_parquet_with_small_rg_size ... ok
test result: ok. 1 passed; 0 failed; 0 ignored; 0 measured; 651 filtered out; finished in 22.31s
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should be able to trigger the issue with less data by lowering execution.batch_size
to something small like 10 rows.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The test with a batch size of 10 still panics on main and passes in this PR, but runs in 0.41 seconds.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @devinjdangelo -- I also verified the test now runs quickly and still panics
Thank you so much
@@ -150,7 +152,7 @@ mod tests { | |||
.await?; | |||
|
|||
// Check that file actually used the specified compression | |||
let file = std::fs::File::open(tmp_dir.into_path().join("test.parquet"))?; | |||
let file = std::fs::File::open(tmp_dir.path().join("test.parquet"))?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thank you for the driveby cleanup
Which issue does this PR close?
Closes #9736
Rationale for this change
See issue
What changes are included in this PR?
Parallel parquet writer can now handle the case when max_record_batch_rows < execution.batch_size by iteratively splitting the record batch rather than assuming it only needs to be split once.
Are these changes tested?
Yes, added new test that would panic prior to this PR
Are there any user-facing changes?
Just bugfix