-
Notifications
You must be signed in to change notification settings - Fork 5.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Iceberg Filter Pushdown Optimizer Rule for execution with Velox #20501
Conversation
@yingsu00 As we discussed, this PR includes the changes required for implementing the new Filter Pushdown optimizer rule compatible with Prestissimo/Velox. |
065ce0f
to
e65a80e
Compare
I realize this is a draft, but one early point of feedback is I wouldn't really find it advisable to encode the worker type directly in the connector. If anything this is an engine concept, not an Iceberg connector concept. |
One design point of Aria scan that I wished we addressed was that it's not really the proper place for the connector to extract expressions for filter pushdown. This is something that ought to be handled in the engine. In lieu of that major change, I wonder if there is any way to extract common logic from the Hive connector so it doesn't need to be completely re-done from scratch. |
Yes sure, this was one of the questions I was going to ask. I was also planning to include this in a common config class and system session properties. |
return workerType; | ||
} | ||
|
||
@Config("iceberg.execution.worker.type") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be better to add "pushdown-filter-enabled" instead of testing worker type. If it's enabled, then the rule will be executed and the TableLayout will contain relavant information
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought it might be better to add worker type since there might be other changes that we might add that are specific to worker type as well. With the current changes, filter pushdown will only work with native workers. To support it for Java workers, we would need to make additional changes. So, if we add the config pushdown-filter-enabled
it might be a little confusing, I think until we also support filter pushdown with Java workers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To add some details, in this PR changes for the filter pushdown feature may not work as it is if we are using Java workers since there are no changes made in the Iceberg connector Worker code on the Java side as per the filter pushdown feature if required.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, and we need to make sure this pushdown-filter-enabled is set to false by default
private final RowExpression remainingPredicate; | ||
private final Map<String, IcebergColumnHandle> predicateColumns; | ||
private final TupleDomain<ColumnHandle> partitionColumnPredicate; | ||
private final Optional<Set<IcebergColumnHandle>> requestedColumns; | ||
private final IcebergTableHandle table; | ||
private final TupleDomain<ColumnHandle> tupleDomain; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to either remove the original tupleDomain or subclass. This is used for Iceberg to filter files. Can you check how Hive does it when filter pushdown was set to false(In HiveMetadata.java ln 2742)? One possible way is to reconstruct it using the new fields when filtering the files. We just need to make sure the transformation is equivalent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@yingsu00 Yes we can remove the original tupleDomain. It can be constructed by transforming domainPredicate field.
Yes I agree. This should be "pushdown-filter-enabled" property, not worker type. |
@tdcmeehan I think there was some debate over this. On one hand, it makes sense to handle this in the engine, but on the other hand, the engine does not know what the connectors can do. If we want the engine to handle this, the engine needs to ask each connector: can you do this? can you do that? There will have to be a pre-defined operations superset for what all connectors can do, and the engine logic becomes complex. I think this is what Trino chose to do. But Presto didn't do it this way, instead, it allows the connector to modify the plan shape because it knows what itself can do or cannot do. This way is more flexible and the has better separation of concerns. Both ways have their pros can cons and so far the Presto implementation seems working well.
We actually talked about this a while ago, but @imjalpreet found some hurdles to just call directly into the Hive rule. Jalpreet, can you remind us what the issue was? |
.transform(subfield -> !isEntireColumn(subfield) ? subfield : null) | ||
.getDomains() | ||
.orElse(ImmutableMap.of())) | ||
.build()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this part not present?
if (currentLayoutHandle.isPresent()) {
entireColumnDomain = entireColumnDomain.intersect(((HiveTableLayoutHandle) (currentLayoutHandle.get())).getPartitionColumnPredicate());
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will be added as part of the changes that I am working on for partitioned tables since this is only needed for partitioned tables.
} | ||
} | ||
|
||
org.apache.iceberg.Table icebergTable; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
icebergTable is not used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I might have included this change by mistake, it is part of the changes for partitioned tables. icebergTable will be used to get partition columns.
I will remove this variable from the current commit and it will be added back as part of the partition table changes.
ConstraintEvaluator evaluator = new ConstraintEvaluator(rowExpressionService, session, columnHandles, deterministicPredicate); | ||
constraint = new Constraint<>(entireColumnDomain, evaluator::isCandidate); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Most of the lines are the same between Hive and Iceberg. Consider extract them out as one or several utility functions in Hive.
@@ -78,6 +78,7 @@ public final class IcebergSessionProperties | |||
private static final String NESSIE_REFERENCE_HASH = "nessie_reference_hash"; | |||
public static final String READ_MASKED_VALUE_ENABLED = "read_null_masked_parquet_encrypted_value_enabled"; | |||
public static final String PARQUET_DEREFERENCE_PUSHDOWN_ENABLED = "parquet_dereference_pushdown_enabled"; | |||
public static final String WORKER_TYPE = "worker_type"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use PUSHDOWN_FILTER_ENABLED, similar to Hive.
public static final String PUSHDOWN_FILTER_ENABLED = "pushdown_filter_enabled";
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, as requested by you.
6e56595
to
975b59b
Compare
Yes, we saw some issues in our initial implementation. But the current implementation can be refactored by extracting common logic between Hive and Iceberg implementations. It's currently in progress in one of the draft commits in this PR. |
1147612
to
8934ae5
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@imjalpreet Is this PR read for review?
I see you still have this commit "Add iceberg.execution.worker.type config property to Iceberg Connector
@imjalpreet". Could you please sort out the commits so that each one is handling a sub problem? For example, the above commits shall be removed, or squashed with the updates later.
@yingsu00 yes it is ready for review. I had planned to refine and re-order the commits after this review. For now I had kept separate commits, so that it would be easier to review the changes done post your last review. Yes, the commit you have mentioned will get squashed and removed. If you want me to re-order and squash unnecessary commits before the review, I can do that, please let me know. |
@imjalpreet Yes, please re-order and squash unnecessary commits before the review. The commits and their messages are part of the review targets. |
@imjalpreet Can you please also resolve the conflicts? |
8934ae5
to
d27662e
Compare
presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java
Outdated
Show resolved
Hide resolved
presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java
Outdated
Show resolved
Hide resolved
presto-iceberg/src/main/java/com/facebook/presto/iceberg/optimizer/IcebergFilterPushdown.java
Outdated
Show resolved
Hide resolved
RowExpression filter, | ||
Optional<ConnectorTableLayoutHandle> currentLayoutHandle) | ||
{ | ||
Result result = checkConstantBooleanExpression(rowExpressionService, functionResolution, filter, currentLayoutHandle, metadata, session); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
line 127 to 146 are still duplicate with HiveFilterPushdown. Is it possible to move it to the util class after the partitioned table support is added?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I will try to see if it can be extracted out to the util class in the partitioned table PR.
@@ -0,0 +1,532 @@ | |||
/* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@imjalpreet I actually think it's better to subclass HiveFilterPushdown than creating a util class. The functions in this util class are all static, therefore you had to pass a lot of parameters to them, which were class member fields of HiveFilterPushdown. If we subclass HiveFilterPushdown, these common class fields can be used by the derived IcebergFilterPushdown, and the code can be a lot less. For that, we can change the metadata parameter to be ConnectorMetadata type. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@yingsu00 I had thought about it as well, but I saw a few drawbacks. HiveFilterPushdown has some class fields like HiveTransactionManager, HivePartitionManager, etc. which are not required in IcebergFilterPushdown since it has its own versions like IcebergTransactionManager. Also, in HiveFilterPushdown's pushdownFilter method, we have HiveMetadata whereas IcebergFilterPushdown we have implementations of IcebergAbstractMetadata.
IMO directly extending HiveFilterPushdown class might not be ideal, if we want I think we can try creating a new base class extracting the common fields between Hive and Iceberg and then derive both HiveFilterPushdown and IcebergFilterPushdown from that base class.
Let me know what you think
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@imjalpreet Making a base class is ok. I think most of the content can be in the base class and only very minimal difference between the two derived classes.
In filterPushdown(), metadata is only used at two places: metadata.getColumnHandles(session, tableHandle) and metadata.getTableLayout(...) and both are on ConnectorMetadata. So it's ok to change the metadata parameter from HiveMetadata and IcebergAbstractMetadata to ConnectorMetadata.
Now let's look at transactionManager and icebergTransactionManager. They are only used to get the Metadata in getMetadata(TableHandle tableHandle) method:
protected IcebergAbstractMetadata getMetadata(TableHandle tableHandle)
{
ConnectorMetadata metadata = icebergTransactionManager.get(tableHandle.getTransaction());
checkState(metadata instanceof IcebergAbstractMetadata, "metadata must be IcebergAbstractMetadata");
return (IcebergAbstractMetadata) metadata;
}
The downcast was only to make sure the metadata is IcebergAbstractMetadata, but icebergTransactionManager would always return IcebergXXXMetadata so returning IcebergAbstractMetadata instead of ConnectorMetadata is not very necessary. If we make it just return a ConnectorMetadata, then the getMetadata() method can have the same signature and be put to the base class.
partitionManager was also only used once.
HivePartitionResult hivePartitionResult = partitionManager.getPartitions(metastore, tableHandle, constraint, session);
Similar to getMetadata(), we can have an abstract method in the base class, depending on how you express the Iceberg partition result. If that way doesn't work we can also break filterPushdown into multiple sections, with most sections having the same content.
So please go ahead and extract common base class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@yingsu00 I have refactored the filter pushdown and introduced a new abstract class.
presto-iceberg/src/main/java/com/facebook/presto/iceberg/optimizer/IcebergFilterPushdown.java
Outdated
Show resolved
Hide resolved
presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergConfig.java
Outdated
Show resolved
Hide resolved
presto-hive/src/main/java/com/facebook/presto/hive/util/FilterPushdownUtils.java
Outdated
Show resolved
Hide resolved
@yingsu00 I have pushed all the changes Update: Additional test cases have also been added. |
e177934
to
2e689fe
Compare
2e689fe
to
4d98884
Compare
protected final IcebergResourceFactory resourceFactory; | ||
protected final HdfsEnvironment hdfsEnvironment; | ||
protected final TypeManager typeManager; | ||
protected IcebergTransactionManager icebergTransactionManager; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are these fields protected?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
protected final IcebergResourceFactory resourceFactory; | |
protected final HdfsEnvironment hdfsEnvironment; | |
protected final TypeManager typeManager; | |
protected IcebergTransactionManager icebergTransactionManager; | |
protected final IcebergResourceFactory resourceFactory; | |
protected final HdfsEnvironment hdfsEnvironment; | |
protected final TypeManager typeManager; | |
protected final IcebergTransactionManager icebergTransactionManager; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My bad, just realized we can keep them private as well. I have made the change for all the protected fields.
protected final IcebergResourceFactory resourceFactory; | ||
protected final HdfsEnvironment hdfsEnvironment; | ||
protected final TypeManager typeManager; | ||
protected IcebergTransactionManager icebergTransactionManager; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why protected?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This has been removed
protected final IcebergResourceFactory resourceFactory; | ||
protected final HdfsEnvironment hdfsEnvironment; | ||
protected final TypeManager typeManager; | ||
protected IcebergTransactionManager icebergTransactionManager; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems icebergTransactionManager doesn't need to be persisted as a class member.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We would need to persist it here since we need to pass it to SubfieldExtractionRewriter
protected final IcebergResourceFactory resourceFactory; | ||
protected final HdfsEnvironment hdfsEnvironment; | ||
protected final TypeManager typeManager; | ||
protected IcebergTransactionManager icebergTransactionManager; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto. icebergTransactionManager doesn't need to be persisted as a class member.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, I have removed the class member here.
…riter Co-authored-by: Tim Meehan <[email protected]>
Filter Pushdown is only supported with Native Worker
…Filter Pushdown 1. Add new fields in IcebergTableLayoutHandle and IcebergColumnHandle required for Filter Pushdown 2. Remove tupleDomain from IcebergTableLayoutHandle and instead use domainPredicate 3. Refactor IcebergTableLayoutHandle and IcebergColumnHandle to extend the Base classes 4. Add new utility methods for computing partition columns
4d98884
to
89edaf4
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Couple other nits otherwise LGTM
{ | ||
Table icebergTable; | ||
if (metadata instanceof IcebergHiveMetadata) { | ||
ExtendedHiveMetastore metastore = ((IcebergHiveMetadata) metadata).getMetastore(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add upfront validation that these casts are correct via checkArgument
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a validation for both metadata and tableHandle
presto-iceberg/src/main/java/com/facebook/presto/iceberg/optimizer/IcebergFilterPushdown.java
Show resolved
Hide resolved
presto-iceberg/src/main/java/com/facebook/presto/iceberg/optimizer/IcebergFilterPushdown.java
Show resolved
Hide resolved
private final IcebergResourceFactory resourceFactory; | ||
private final HdfsEnvironment hdfsEnvironment; | ||
private final TypeManager typeManager; | ||
private final IcebergTransactionManager icebergTransactionManager; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
icebergTransactionManager is still here. Is it not removable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@yingsu00 yes, it is being used in the method com.facebook.presto.iceberg.optimizer.IcebergFilterPushdown#optimize
Co-authored-by: Reetika Agrawal <[email protected]> Co-authored-by: Tim Meehan <[email protected]>
89edaf4
to
c1ec0ff
Compare
@yingsu00 @tdcmeehan Folks, we are seeing build failures in Meta and after fixing these we see crashes. Wondering, if you could summarize the changes to help us figure out what's going on.
|
@mbasmanova the idea behind this change is we need similar filter pushdown logic for the Iceberg catalog as what we currently have in Hive. Because the code is rather large in the Hive connector, the idea is to extract it so it may be used for other Hive-adjacent connectors in the future, such as Delta and Hudi (the alternative would be to do such filter extraction and pushdown in the engine). Can you share a more detailed error message? |
@tdcmeehan Tim, thank you for these additional details. Here is all I have in terms of an error message:
|
The last lines repeat a million times it seems. |
Would it be possible to share the implementation of |
@tdcmeehan Sure. Let me know if you want to see the whole file.
|
Seems like |
@tdcmeehan This is a good point. CC: @shrinidhijoshi |
Co-authors: @imjalpreet, @agrawalreetika and @tdcmeehan
Description
This PR introduces the below changes in Iceberg Connector in an ongoing effort to make it compatible with execution on Velox:
Motivation and Context
facebookincubator/velox#5977
Impact
Test Plan
These changes have been tested with Iceberg Catalog type Hive/Glue and Hadoop.
Contributor checklist
Release Notes
Please follow release notes guidelines and fill in the release notes below.