Skip to content

Commit

Permalink
[fix](nereids) fix runtime filter expr order (apache#21480)
Browse files Browse the repository at this point in the history
Current runtime filter pushing down to cte internal, we construct the runtime filter expr_order with incremental number, which is not correct. For cte internal rf pushing down, the join node will be always different, the expr_order should be fixed as 0 without incrementation, otherwise, it will lead the checking for expr_order and probe_expr_size illegal or wrong query result.

This pr will revert 2827bc1 temporarily, it will break the cte rf pushing down plan pattern.
  • Loading branch information
xzj7019 authored Jul 5, 2023
1 parent f02bec8 commit f9bc433
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -557,7 +557,6 @@ private void pushDownRuntimeFilterIntoCTE(RuntimeFilterContext ctx) {
continue;
}
Map<EqualTo, PhysicalHashJoin> equalCondToJoinMap = entry.getValue();
int exprOrder = 0;
for (Map.Entry<EqualTo, PhysicalHashJoin> innerEntry : equalCondToJoinMap.entrySet()) {
EqualTo equalTo = innerEntry.getKey();
PhysicalHashJoin join = innerEntry.getValue();
Expand All @@ -568,15 +567,14 @@ private void pushDownRuntimeFilterIntoCTE(RuntimeFilterContext ctx) {
}
EqualTo newEqualTo = ((EqualTo) JoinUtils.swapEqualToForChildrenOrder(
equalTo, join.child(0).getOutputSet()));
doPushDownIntoCTEProducerInternal(join, ctx, newEqualTo, type, exprOrder++, cteProducer);
doPushDownIntoCTEProducerInternal(join, ctx, newEqualTo, type, cteProducer);
}
ctx.getPushedDownCTE().add(cteProducer.getCteId());
}
}

private void doPushDownIntoCTEProducerInternal(PhysicalHashJoin<? extends Plan, ? extends Plan> join,
RuntimeFilterContext ctx, EqualTo equalTo, TRuntimeFilterType type, int exprOrder,
PhysicalCTEProducer cteProducer) {
RuntimeFilterContext ctx, EqualTo equalTo, TRuntimeFilterType type, PhysicalCTEProducer cteProducer) {
Map<NamedExpression, Pair<PhysicalRelation, Slot>> aliasTransferMap = ctx.getAliasTransferMap();
PhysicalPlan inputPlanNode = (PhysicalPlan) cteProducer.child(0);
Slot unwrappedSlot = checkTargetChild(equalTo.left());
Expand Down Expand Up @@ -617,8 +615,9 @@ private void doPushDownIntoCTEProducerInternal(PhysicalHashJoin<? extends Plan,
ctx.setTargetsOnScanNode(scan.getId(), targetSlot);
}
// build multi-target runtime filter
// since always on different join, set the expr_order as 0
RuntimeFilter filter = new RuntimeFilter(generator.getNextId(),
equalTo.right(), targetList, type, exprOrder, join, buildSideNdv);
equalTo.right(), targetList, type, 0, join, buildSideNdv);
for (Slot slot : targetList) {
ctx.setTargetExprIdToFilter(slot.getExprId(), filter);
}
Expand Down Expand Up @@ -662,12 +661,12 @@ private Map<Slot, PhysicalOlapScan> getPushDownBasicTablesInfos(PhysicalPlan roo
if (equalTo instanceof EqualTo) {
SlotReference leftSlot = (SlotReference) ((EqualTo) equalTo).left();
SlotReference rightSlot = (SlotReference) ((EqualTo) equalTo).right();
if (leftSlot.getExprId() == exprId) {
if (leftSlot.getExprId() == exprId && aliasTransferMap.get(rightSlot) != null) {
PhysicalOlapScan rightTable = (PhysicalOlapScan) aliasTransferMap.get(rightSlot).first;
if (rightTable != null) {
basicTableInfos.put(rightSlot, rightTable);
}
} else if (rightSlot.getExprId() == exprId) {
} else if (rightSlot.getExprId() == exprId && aliasTransferMap.get(leftSlot) != null) {
PhysicalOlapScan leftTable = (PhysicalOlapScan) aliasTransferMap.get(leftSlot).first;
if (leftTable != null) {
basicTableInfos.put(leftSlot, leftTable);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,8 @@ public Statistics updateRowCountOnly(double rowCount) {
ColumnStatistic columnStatistic = entry.getValue();
ColumnStatisticBuilder columnStatisticBuilder = new ColumnStatisticBuilder(columnStatistic);
columnStatisticBuilder.setNdv(Math.min(columnStatistic.ndv, rowCount));
double numNulls = Math.min(columnStatistic.numNulls, rowCount - columnStatistic.ndv);
columnStatisticBuilder.setNumNulls(numNulls);
double nullFactor = (rowCount - columnStatistic.numNulls) / rowCount;
columnStatisticBuilder.setNumNulls(nullFactor * rowCount);
columnStatisticBuilder.setCount(rowCount);
statistics.addColumnStats(entry.getKey(), columnStatisticBuilder.build());
}
Expand Down
41 changes: 21 additions & 20 deletions regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query95.out
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@ CteAnchor[cteId= ( CTEId#3=] )
----PhysicalTopN
------PhysicalProject
--------hashAgg[GLOBAL]
----------PhysicalDistribute
------------hashAgg[LOCAL]
----------hashAgg[LOCAL]
------------PhysicalDistribute
--------------PhysicalProject
----------------hashJoin[INNER_JOIN](ws1.ws_ship_date_sk = date_dim.d_date_sk)
------------------hashJoin[RIGHT_SEMI_JOIN](ws1.ws_order_number = ws_wh.ws_order_number)
--------------------PhysicalDistribute
----------------------PhysicalProject
------------------------CteConsumer[cteId= ( CTEId#3=] )
------------------PhysicalProject
--------------------filter((date_dim.d_date >= 1999-02-01)(cast(d_date as DATETIMEV2(0)) <= cast(days_add(cast('1999-2-01' as DATEV2), INTERVAL 60 DAY) as DATETIMEV2(0))))
----------------------PhysicalOlapScan[date_dim]
------------------PhysicalDistribute
--------------------hashJoin[RIGHT_SEMI_JOIN](ws1.ws_order_number = web_returns.wr_order_number)
----------------------PhysicalProject
------------------------hashJoin[INNER_JOIN](web_returns.wr_order_number = ws_wh.ws_order_number)
Expand All @@ -32,20 +32,21 @@ CteAnchor[cteId= ( CTEId#3=] )
----------------------------PhysicalProject
------------------------------PhysicalOlapScan[web_returns]
----------------------PhysicalDistribute
------------------------hashJoin[INNER_JOIN](ws1.ws_web_site_sk = web_site.web_site_sk)
--------------------------hashJoin[INNER_JOIN](ws1.ws_ship_addr_sk = customer_address.ca_address_sk)
----------------------------PhysicalProject
------------------------------PhysicalOlapScan[web_sales]
----------------------------PhysicalDistribute
------------------------------PhysicalProject
--------------------------------filter((cast(ca_state as VARCHAR(*)) = 'NC'))
----------------------------------PhysicalOlapScan[customer_address]
------------------------hashJoin[RIGHT_SEMI_JOIN](ws1.ws_order_number = ws_wh.ws_order_number)
--------------------------PhysicalDistribute
----------------------------PhysicalProject
------------------------------filter((cast(web_company_name as VARCHAR(*)) = 'pri'))
--------------------------------PhysicalOlapScan[web_site]
------------------PhysicalDistribute
--------------------PhysicalProject
----------------------filter((date_dim.d_date >= 1999-02-01)(cast(d_date as DATETIMEV2(0)) <= cast(days_add(cast('1999-2-01' as DATEV2), INTERVAL 60 DAY) as DATETIMEV2(0))))
------------------------PhysicalOlapScan[date_dim]
------------------------------CteConsumer[cteId= ( CTEId#3=] )
--------------------------PhysicalDistribute
----------------------------hashJoin[INNER_JOIN](ws1.ws_web_site_sk = web_site.web_site_sk)
------------------------------hashJoin[INNER_JOIN](ws1.ws_ship_addr_sk = customer_address.ca_address_sk)
--------------------------------PhysicalProject
----------------------------------PhysicalOlapScan[web_sales]
--------------------------------PhysicalDistribute
----------------------------------PhysicalProject
------------------------------------filter((cast(ca_state as VARCHAR(*)) = 'NC'))
--------------------------------------PhysicalOlapScan[customer_address]
------------------------------PhysicalDistribute
--------------------------------PhysicalProject
----------------------------------filter((cast(web_company_name as VARCHAR(*)) = 'pri'))
------------------------------------PhysicalOlapScan[web_site]

0 comments on commit f9bc433

Please sign in to comment.