-
Notifications
You must be signed in to change notification settings - Fork 1.6k
Feat: Revive to use upstream arrow coalesce #17105
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Feat: Revive to use upstream arrow coalesce #17105
Conversation
…eam_arrow_coalesce
FYI @alamb @Dandandan I try to revive the PR #16249, and we may run the benchmark for this PR to see if any changes since then, thanks! |
I was just thinking about this PR last night -- thank you @zhuqi-lucas -- I'll kick off the benchmarks just to make sure |
🤖 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @zhuqi-lucas -- this looks great
As long as the benchmarks look good (I expect no substantial change) I think we should merge this
Poll::Ready(Some(Ok(batch))) | ||
}; | ||
Some(Ok(batch)) => { | ||
if self.coalescer.push_batch(batch)? { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I found this API to be somewhat confusing (the fact that a true
return value means limit was reached)
Maybe returning an enum would be clearer here
I don't think this is a correctness issue, just a readability thing I noticed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @alamb for good suggestion, this is a better way i agree.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed it in latest PR, it returns an enum now.
Thank you @alamb for review, my follow-up plan, correct me if i am wrong:
|
🤖: Benchmark completed Details
|
The clickbench result is good, but tpch_mem seems some regression from the benchmark result. 🤔 |
Weird, I will rerun and see if we can see it again |
🤖 |
🤖: Benchmark completed Details
|
🤔 the new kernel seems to slow down. I wonder if the overhead of precisely sized output batches is causing the issue |
Good point @alamb , i agree this is the only difference. I can add a test PR to make upstream do not generate precisely sized output batches, but when we ensure capacity for the increment buffer size, it seems we need to make the size change since we do not keep the same target size for this change. The latest benchmark seems a little better.
|
Thanks @zhuqi-lucas -- what I was thinking about was something like the following let target_batch_size = 4;
let mut coalescer = BatchCoalescer::new(batch1.schema(), 4)
.with_exact_size(false) Before we spend a lot of time polishing / testing a PR for that it would probably be good to hack up a POC and verify it actually improves performance Thank you for your willingness to help along with this project. It is something I have thought was important (but not critical) for a long time and so having someone to help really makes a big difference |
Thank you @alamb for good suggestion! It looks pretty cool to me, and a config for this is very clever idea. let target_batch_size = 4;
let mut coalescer = BatchCoalescer::new(batch1.schema(), 4)
.with_exact_size(false) I will try to address this for upstream first, so we can easily testing it for datafusion. |
Which issue does this PR close?
Revive Draft: Use upstream arrow
coalesce
kernel in DataFusion #16249Related to Optimize take/filter/concat from multiple input arrays to a single large output array arrow-rs#6692
Related to Enable parquet filter pushdown (
filter_pushdown
) by default #3463Rationale for this change
coalesce
kernel in DataFusion #16249And fix conflicts
filter_pushdown
) by default #3463What changes are included in this PR?
This PR refactors the BatchCoalescer in DataFusion to use the proposed upstream API to show that it
Can be used (api is complete enough)
Is not any slower
Are these changes tested?
Yes
Are there any user-facing changes?
No