Skip to content

Commit

Permalink
[fix](nereids) fix cte filter pushdown if the filters can be aggregat…
Browse files Browse the repository at this point in the history
…ed (apache#24489)

Current cte common filter extraction doesn't work if the filters can be aggregated, which will lead the common filter can't be pushed down inside cte. Consider the following case:
with main as (select c1 from t1) select * from (select m1.* from main m1, main m2 where m1.c1 = m2.c1) abc where c1 = 1;
The common c1=1 filter can't be pushed down.

This pr fixed the original extraction logic from set to list to make the logic works, and this will also resolve the tpcds query4/11's pattern works well also.
  • Loading branch information
xzj7019 authored Sep 18, 2023
1 parent 932b639 commit 7a8e3a6
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -145,22 +145,23 @@ private LogicalPlan tryToConstructFilter(CascadesContext cascadesContext, CTEId
Set<RelationId> consumerIds = cascadesContext.getCteIdToConsumers().get(cteId).stream()
.map(LogicalCTEConsumer::getRelationId)
.collect(Collectors.toSet());
Set<Set<Expression>> filtersAboveEachConsumer = cascadesContext.getConsumerIdToFilters().entrySet().stream()
List<Set<Expression>> filtersAboveEachConsumer = cascadesContext.getConsumerIdToFilters().entrySet().stream()
.filter(kv -> consumerIds.contains(kv.getKey()))
.map(Entry::getValue)
.collect(Collectors.toSet());
.collect(Collectors.toList());
Set<Expression> someone = filtersAboveEachConsumer.stream().findFirst().orElse(null);
if (someone == null) {
return child;
}
int filterSize = cascadesContext.getCteIdToConsumers().get(cteId).size();
Set<Expression> conjuncts = new HashSet<>();
for (Expression f : someone) {
int matchCount = 1;
int matchCount = 0;
Set<SlotReference> slots = f.collect(e -> e instanceof SlotReference);
Set<Expression> mightBeJoined = new HashSet<>();
for (Set<Expression> another : filtersAboveEachConsumer) {
if (another.equals(someone)) {
matchCount++;
continue;
}
Set<Expression> matched = new HashSet<>();
Expand Down
43 changes: 43 additions & 0 deletions regression-test/data/nereids_p0/cte/test_cte_filter_pushdown.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !cte_filter_pushdown_1 --
PhysicalCteAnchor ( cteId=CTEId#0 )
--PhysicalCteProducer ( cteId=CTEId#0 )
----PhysicalWindow
------PhysicalQuickSort
--------PhysicalProject
----------filter((main.k1 = 1))
------------PhysicalOlapScan[test]
--PhysicalResultSink
----PhysicalDistribute
------PhysicalProject
--------hashJoin[INNER_JOIN](m1.k1 = m2.k1)
----------PhysicalDistribute
------------filter((temp.k1 = 1))
--------------PhysicalCteConsumer ( cteId=CTEId#0 )
----------PhysicalDistribute
------------PhysicalProject
--------------filter((m2.k1 = 1))
----------------PhysicalCteConsumer ( cteId=CTEId#0 )

-- !cte_filter_pushdown_2 --
PhysicalCteAnchor ( cteId=CTEId#0 )
--PhysicalCteProducer ( cteId=CTEId#0 )
----PhysicalProject
------filter((main.k1 = 1))
--------PhysicalWindow
----------PhysicalQuickSort
------------PhysicalDistribute
--------------PhysicalProject
----------------PhysicalOlapScan[test]
--PhysicalResultSink
----PhysicalDistribute
------PhysicalProject
--------hashJoin[INNER_JOIN](m1.k1 = m2.k1)
----------PhysicalDistribute
------------filter((temp.k1 = 1))
--------------PhysicalCteConsumer ( cteId=CTEId#0 )
----------PhysicalDistribute
------------PhysicalProject
--------------filter((m2.k1 = 1))
----------------PhysicalCteConsumer ( cteId=CTEId#0 )

Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
suite("test_cte_filter_pushdown)") {
sql "SET enable_nereids_planner=true"
sql "SET enable_pipeline_engine=true"
sql "SET enable_fallback_to_original_planner=false"

// CTE filter pushing down with the same filter
qt_cte_filter_pushdown_1 """
explain shape plan
with main AS (
select k1, row_number() over (partition by k1) rn
from nereids_test_query_db.test
)
select * from (
select m1.* from main m1, main m2
where m1.k1 = m2.k1
) temp
where k1 = 1;
"""
qt_cte_filter_pushdown_2 """
explain shape plan
with main AS (
select k1, row_number() over (partition by k2) rn
from nereids_test_query_db.test
)
select * from (
select m1.* from main m1, main m2
where m1.k1 = m2.k1
) temp
where k1 = 1;
"""
}

0 comments on commit 7a8e3a6

Please sign in to comment.