-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(connector): Support reading Iceberg split with equality deletes #11088
base: main
Are you sure you want to change the base?
Conversation
✅ Deploy Preview for meta-velox canceled.
|
1e220be
to
e644cab
Compare
e644cab
to
2c4c91e
Compare
2c4c91e
to
da6c642
Compare
@yingsu00 can you please rebase again? There is a conflict with other changes. |
1ce2688
to
72b4cef
Compare
@Yuhta Just rebased, appreciate your review again. Thanks! |
b18f5c0
to
b15537f
Compare
cc: @liujiayi771 Would you like to take a review? Thanks. |
@liujiayi771 Can you take a look of the PR if possible? should we add something in Gluten side after this PR merged? It's requested by a Gluten customer. |
@FelixYBW Yes, Gluten needs to make some minor changes to accommodate this PR. However, Spark cannot produce equality delete files. We need to use Flink to generate Iceberg tables with equality delete files for testing. I will perform some test this week. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. Added some nits.
nullAllowed = true; | ||
} else { | ||
if constexpr (std::is_same_v<U, Timestamp>) { | ||
values.emplace_back(simpleValues->valueAt(i).toMillis()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need toMicros for Spark? cc: @liujiayi771
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rui-mo It will create a BigIntRange to filter data values. I'm not quite sure why we need to convert the Timestamp type to bigint here; shouldn't we be using TimestampRange instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rui-mo @liujiayi771 this toValues() function is copied from velox/functions/prestosql/InPredicate.cpp. We could extract it to velox/common but there isn't a good folder yet. Any good ideas? cc @Yuhta
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@yingsu00 I see. The InPredicate is also registered by Spark sql and might needs to be fixed through providing configurable behavior. For the one in the 'hive/iceberg/FilterUtil.cpp', I assume a configuration 'kReadTimestampUnit' in the hiveConfig might help adapt to different precision used by Presto and Spark. It's fine for me to leave a TODO here and focus on the primary implementation first. Thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rui-mo Is the issue here that the precision of a Timestamp in Spark is in microseconds, and converting it to milliseconds for comparison would result in a loss of precision? For example, Timestamp(1, 999000) and Timestamp(1, 998000) would be considered the same?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
e9662e6
to
e106b4c
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The overall looks good, raise one discussion about the bookkeeping of temporary scan spec node and filters, and the rest are just wiring cleanup.
@@ -672,6 +672,7 @@ bool applyPartitionFilter( | |||
VELOX_FAIL( | |||
"Bad type {} for partition value: {}", type->kind(), partitionValue); | |||
} | |||
return true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do not need this
@@ -87,6 +90,8 @@ class SplitReader { | |||
|
|||
void resetSplit(); | |||
|
|||
std::shared_ptr<const dwio::common::TypeWithId> baseFileSchema(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is only needed inside IcebergSplitReader
@@ -215,7 +215,10 @@ std::unique_ptr<SplitReader> HiveDataSource::createSplitReader() { | |||
ioStats_, | |||
fileHandleFactory_, | |||
executor_, | |||
scanSpec_); | |||
scanSpec_, | |||
remainingFilterExprSet_, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are not really using this inside IcebergSplitReader
(instead you have your own deleteExprSet_
), let's avoid sharing and passing it.
filters_.push_back(std::move(filter)); | ||
} | ||
|
||
void updateFilter(std::unique_ptr<Filter> newFilter) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's call it pushFilter
and popFilter
to make it more explicit that such filters are temporary
ScanSpec* getOrCreateChild(const Subfield& subfield); | ||
ScanSpec* getOrCreateChild(const Subfield& subfield, bool isTempNode = false); | ||
|
||
void deleteTempNodes(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since all the temp nodes are top level, can we do it less intrusively by keeping a list of temp node/filter names inside IcebergSplitReader
and remove them after we've done with it? Then we don't need the extra states of isTempNode_
and hasTempFilter_
here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Yuhta Thanks for the suggestion. Yes in our last discussion we thought we could just add/remove top level columns, but when I looked again I found it might be necessary to still mark the nodes with these temp tags, because the Iceberg delete file may contain subfields. For example, this query
-- create table t1 (c_row(c1 integer, c2 integer, c3 integer), c_char char);
-- insert some rows
select c_char
from t1
where c_row.c2 > 2;
The base ScanSpec would contain the c_row child, which in turn has 3 children for c1, c2 and c3. In this case, all the three subfields would be null constants, but only c2 node would have filter c_row.c2 > 2,
while c1 and c3 node don't have any filters. Now, if the Iceberg equality delete file contains the predicate c_row.c1 = 1 && c_row.c2 IN {2,3}
, then we need to remove these values by adding a filter c_row.c1<>1
for the c_row.c1 node, and merge the filter c_row.c2<>2 && c_row.c2<>3
with existing filter c_row.c2 > 2
for the c_row.c2 node, so it becomes 'c_row.c2 > 3. WHen this split finishes, we need to remove the filter
c_row.c1<>1` from c_row.c1, and restore the filter for c_row.c2 to its original state, but not delete the whole c_row column from the ScanSpec. Therefore we need to tag the nodes in the ScanSpec to check if it's temp node and has temp filters. Do you think this makes sense?
velox/expression/Expr.h
Outdated
@@ -722,6 +724,12 @@ class ExprSet { | |||
core::ExecCtx* execCtx, | |||
bool enableConstantFolding = true); | |||
|
|||
ExprSet( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems these 2 are no longer needed with delete expr separated from remaining filter expr
1b58a55
to
82b5f50
Compare
This commit introduces EqualityDeleteFileReader, which is used to read Iceberg splits with equality delete files. Co-authored-by: Naveen Kumar Mahadevuni <[email protected]>
This PR introduces EqualityDeleteFileReader, which is used to read
Iceberg splits with equality delete files.