-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
first pass at implementing predicate pushdown, seems to work #16
Conversation
Okay so the relatively small amount of new code here doesn't reflect how long I spent on this haha, it's admittedly more complicated than I initially thought it would be. But I think this works in its current state. It's a WIP because I want to revisit how I applied filter push downs in the reader. When I looked at arrow-rs, specifically at the parquet implementation, my understanding was that if you take in a predicate, you need to produce a record batch that completely satisfies it. However, looking into how it work in datafusion, it seems it's not the case, you can set a predicate as "inexact", in which case datafusion will take the record batches that were partially "filtered" and remove any row that's left that doesn't satisfy the predicate. For parquet files, specifically when compression is not supported, there's value in trying to skip rows when reading, but we can't do that for (compressed) zarr data, so my somewhat complicated implementation of "row filtering" for zarr doesn't add any value. I want to simplify it and focus on what does add value -- skipping whole chunks when no values in the predicate chunks satisfy the predicate. I will then leave the "exact" filtering to datafusion, that will make things much cleaner. |
|
||
// Below is all the logic necessary (I think) to convert a PhysicalExpr into a ZarrChunkFilter. | ||
// The logic is mostly copied from datafusion, and is simplified here for the zarr use case. | ||
pub struct ZarrFilterCandidate { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So in the end, this is still mostly index based. I'll make a few comments below to clarify the logic (I should actually write those comments in code, I'll do that before we merge), but column names are basically just used as an intermediate step.
fn pre_visit(&mut self, node: &Arc<dyn PhysicalExpr>) -> DataFusionResult<RewriteRecursion> { | ||
if let Some(column) = node.as_any().downcast_ref::<Column>() { | ||
if let Ok(idx) = self.file_schema.index_of(column.name()) { | ||
self.required_column_indices.insert(idx); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So first, we accumulate indices of columns required (by a given predicate). These indices represent the position of the column in the file schema (which will eventually be the table schema, for now we don't have that distinction), e.g. if the predicate requires lat, lon
and the file schema is float_data, lat, lon
, we will end up setting the projection to [1, 2]
. Since the set is ordered, I think even if in the predicate the order was lon, lat
, we'd end up with [1, 2]
as the projection.
|
||
impl ZarrDatafusionArrowPredicate { | ||
pub fn new(candidate: ZarrFilterCandidate, schema: &Schema) -> DataFusionResult<Self> { | ||
let cols: Vec<_> = candidate |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is where we convert the indices to the columns names, e.g. [1, 2] -> [lat, lon]. See below for how that's used.
.map(|idx| schema.field(*idx).name().to_string()) | ||
.collect(); | ||
|
||
let schema = Arc::new(schema.project(&candidate.projection)?); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here we go from the file schema to the predicate schema, e.g. float_data, lat, lon
-> lat, lon
.
.collect(); | ||
|
||
let schema = Arc::new(schema.project(&candidate.projection)?); | ||
let physical_expr = reassign_predicate_columns(candidate.expr, &schema, true)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I understand correctly, the physical expression has the name of each column as well as an index for each. Since it was first created off of a Expr
, using the file schema, the indices for each column don't necessarily match what they will be in the record batch we pass to the physical expression. Assuming we will pass the physical expression a record batch that only contains the columns it needs, we need to remap indices to columns, e.g. we go from (lat, 1), (lon, 2)
to (lat, 0), (lon, 1)
.
} | ||
|
||
fn evaluate(&mut self, batch: &RecordBatch) -> Result<BooleanArray, ArrowError> { | ||
let index_projection = self |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the bit that depends on the column names. Here, the incoming record batch can have any number of columns, it doesn't matter, as long as it contains at least the columns the predicate needs. In the parquet implementation, again if I understood correctly, it's expected to come in with only the required columns, but by using column names here, that's not required, we figure out the indices in the record batch based on the column names, and re-project it before passing it to the physical expression. The re-projection does still happen in the parquet implementation, I think to handle different column orderings, but here we use it to also drop unnecessary columns, that way, for example if the predicate only requires the lon
column, we can re-use a record batch that contains lat, lon
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just scanning through leaving some comments. I'll come back to this later after I've let it stew for a bit.
} | ||
} | ||
|
||
pub fn build(mut self) -> DataFusionResult<Option<ZarrFilterCandidate>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since there aren't any builder methods on this, maybe TryFrom
? Also not super clear to my why it's an Option
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good question. So I think we need a builder struct because the way the columns indices get "extracted" from the predicate is through the call to rewrite
, which takes a mut reference to self, on the below line. That function requires the TreeNodeRewriter
trait on its argument, and you can see that as pre_visit
is called, the indices get progressively filled. To be honest, I didn't dig all the way down to how this works, I just followed the steps they follow for parquet since I didn't want to risk breaking something.
Regarding the Option
, you're right that it's not clear from the code here, I believe it's like that because of the code here, https://github.com/datafusion-contrib/arrow-zarr/pull/16/files#diff-d61c0a121604c7680df3d272638903a3fc21fee9ac3381e34b5285c02b9deaf0R202-R213, specifically because the else
statement returns None
. Since the type of candidates
is Vec<ZarrFilterCandidate>
, I think the call to collect
coerces options into the inner type (or skips the value if it's None
)? And that means the type of candidate
must be Option<...>
, so that the if
and else
statements return types match. Again, I mostly followed the parquet implementation.
I know that just following someone else's code and replicating it somewhat naively is not the best excuse haha, but like I said I wanted to minimize the risk of messing things up here, since I'm not yet comfortable with the code base. Overall does this all make sense?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, when we start handling hive style partitions, and the logic gets more complicated, we might need to return a Ok(None) from build
in some situations, I'm following the parquet logic but also simplifying it a lot (for now), so that might lead to code that looks a bit weird, temporarily.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cool cool, yeah that all sounds good to me to get started with. If we notice some perf issues w/ the cloning + rewriting we can reassess later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I haven't been paying too much attention to everything that could impact performance so far, I'm thinking I'll revisit later when we have something fully functional.
Okay I removed the row filtering logic in the readers, and the predicate push downs are now marked as Inexact, should be good to go. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this seems good to me to get going on predicate pushdowns. Sometime this week I hope to play around with some of my data so that'll hopefully yield some insights into how this works for that use case.
} | ||
} | ||
|
||
pub fn build(mut self) -> DataFusionResult<Option<ZarrFilterCandidate>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cool cool, yeah that all sounds good to me to get started with. If we notice some perf issues w/ the cloning + rewriting we can reassess later.
Great, I'm looking forward to running all this against real data, I'm sure we'll find some issues or at least things to improve that would be hard to figure out with only the dummy test data I've been generating. I theory, I think most zarr features are supported with what's there, the main one I'm not supporting yet is missing chunks + fill values. |
No description provided.