-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Add dynamic filter (bounds) pushdown to HashJoinExec #16445
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
} | ||
|
||
// Compute min/max using ScalarValue's utilities | ||
let mut min_val = ScalarValue::try_from_array(array, 0)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should arrow kernel for this (this is slow).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we re-use datafusion/functions-aggregate/src/min_max.rs? Seems like there's a lot of complexity there related the types that we wouldn't want t to re-implement
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sadly min_batch is not public and either way functions-aggregate
is not a dependency of physical-plan
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we move these functions into functions-aggreage-common
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
makes sense
I tink we should also consider a heuristic for not evaluating the filter if it's not useful. Also I think doing only the lookup is preferable above also computing / checking the bounds, I think the latter might create more overhead. |
Sorry, misclicked a button. |
My thought was that for some cases the bounds checks are going to be quite effective at pruning and they should always be cheap to compute and cheap to apply. I'm surprised you say that they might create a lot of overhead? |
Maybe I should articulate it a bit more.
I think it also makes sense to also thing about a heuristic we want to use to use this pushdown only when we think it might be useful - e.g. the left side is much smaller than the right side, or we know (based on column statistics) it will filter out rows. |
I'm surprised that the hash table lookup, even if O(1), has such a small constant factor that its ~ a couple of binary comparisons. That said a reason to still do both is stats and filter caching: simple filters like |
Datafusion is generally not great at these things: we often don't have enough stats / info to make decisions like this. |
It's hard to say generally, but a hashtable lookup which fits into cache on a |
I guess only benchmarks can tell. But I still think the scalar bounds are worth keeping for stats pruning reasons. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we have any metrics to record how much data is filtered by dynamic join filter?
@@ -433,6 +433,117 @@ async fn test_topk_dynamic_filter_pushdown() { | |||
); | |||
} | |||
|
|||
#[tokio::test] | |||
async fn test_hashjoin_dynamic_filter_pushdown() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add some tests for multiple joins? Such as
Join (t1.a = t2.b)
/ \
t1 Join(t2.c = t3.d)
/ \
t3 t2
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Such test can check
- dynamic filters are pushed down to right scan node
- dynamic filters aren't missed during pushdown
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've added a test that I think matches your suggestion
49d1636
to
04efcc1
Compare
@@ -353,6 +353,18 @@ impl FilterDescription { | |||
} | |||
} | |||
|
|||
pub fn with_child_pushdown( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
More APIs 🤮. I really need to circle back to doing some whiteboard design for these. It's complex and won't be pretty but I'm sure it can be better than it is right now.
To share some experience, we recently added some similar pushdown for HashJoinExec (at Coralogix) using sharing of |
I was originally planning on keeping this PR smaller but it's been growing so I might as well add the |
Feel free to PR it however you like ;) |
@Dandandan any chance you'd be willing to contribute your implementation of sharing |
@alamb I'd be interested to see what benchmarks say if you don't mind kicking them off? |
IIRC, the optimization will speed up tpch benchmark, we may run it directly. Or directly construct a small table and probe big table to see the effect. |
Part of #7955.
My goal here is to lay the groundwork for pushing down joins.
I am only implementing bounds pushdown because I am sure that is cheap and it will probably be quite effective in many cases. And it will be ~ easy to push down a reference to the whole hash table in a followup PR.
Another followup that can be done is to enable parent filter pushdown through HashJoinExec. Similar to FilterExec this requires adjusting parent filters for the join's projection, but we also need to check what columns each filter refers to to push it into the correct child (or not push it if it refers to columns from both children and can't be disjoint).