From 7a8e3a65876c6dbadf33b916f4337e38dc13add3 Mon Sep 17 00:00:00 2001 From: xzj7019 <131111794+xzj7019@users.noreply.github.com> Date: Mon, 18 Sep 2023 11:26:55 +0800 Subject: [PATCH] [fix](nereids) fix cte filter pushdown if the filters can be aggregated (#24489) Current cte common filter extraction doesn't work if the filters can be aggregated, which will lead the common filter can't be pushed down inside cte. Consider the following case: with main as (select c1 from t1) select * from (select m1.* from main m1, main m2 where m1.c1 = m2.c1) abc where c1 = 1; The common c1=1 filter can't be pushed down. This pr fixed the original extraction logic from set to list to make the logic works, and this will also resolve the tpcds query4/11's pattern works well also. --- .../rules/rewrite/RewriteCteChildren.java | 7 +-- .../cte/test_cte_filter_pushdown.out | 43 +++++++++++++++++ .../cte/test_cte_filter_pushdown.groovy | 47 +++++++++++++++++++ 3 files changed, 94 insertions(+), 3 deletions(-) create mode 100644 regression-test/data/nereids_p0/cte/test_cte_filter_pushdown.out create mode 100644 regression-test/suites/nereids_p0/cte/test_cte_filter_pushdown.groovy diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/RewriteCteChildren.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/RewriteCteChildren.java index d88ef62e314eaf..5aa286e67f9c27 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/RewriteCteChildren.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/RewriteCteChildren.java @@ -145,10 +145,10 @@ private LogicalPlan tryToConstructFilter(CascadesContext cascadesContext, CTEId Set consumerIds = cascadesContext.getCteIdToConsumers().get(cteId).stream() .map(LogicalCTEConsumer::getRelationId) .collect(Collectors.toSet()); - Set> filtersAboveEachConsumer = cascadesContext.getConsumerIdToFilters().entrySet().stream() + List> filtersAboveEachConsumer = cascadesContext.getConsumerIdToFilters().entrySet().stream() .filter(kv -> consumerIds.contains(kv.getKey())) .map(Entry::getValue) - .collect(Collectors.toSet()); + .collect(Collectors.toList()); Set someone = filtersAboveEachConsumer.stream().findFirst().orElse(null); if (someone == null) { return child; @@ -156,11 +156,12 @@ private LogicalPlan tryToConstructFilter(CascadesContext cascadesContext, CTEId int filterSize = cascadesContext.getCteIdToConsumers().get(cteId).size(); Set conjuncts = new HashSet<>(); for (Expression f : someone) { - int matchCount = 1; + int matchCount = 0; Set slots = f.collect(e -> e instanceof SlotReference); Set mightBeJoined = new HashSet<>(); for (Set another : filtersAboveEachConsumer) { if (another.equals(someone)) { + matchCount++; continue; } Set matched = new HashSet<>(); diff --git a/regression-test/data/nereids_p0/cte/test_cte_filter_pushdown.out b/regression-test/data/nereids_p0/cte/test_cte_filter_pushdown.out new file mode 100644 index 00000000000000..0c632f4fc29d01 --- /dev/null +++ b/regression-test/data/nereids_p0/cte/test_cte_filter_pushdown.out @@ -0,0 +1,43 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !cte_filter_pushdown_1 -- +PhysicalCteAnchor ( cteId=CTEId#0 ) +--PhysicalCteProducer ( cteId=CTEId#0 ) +----PhysicalWindow +------PhysicalQuickSort +--------PhysicalProject +----------filter((main.k1 = 1)) +------------PhysicalOlapScan[test] +--PhysicalResultSink +----PhysicalDistribute +------PhysicalProject +--------hashJoin[INNER_JOIN](m1.k1 = m2.k1) +----------PhysicalDistribute +------------filter((temp.k1 = 1)) +--------------PhysicalCteConsumer ( cteId=CTEId#0 ) +----------PhysicalDistribute +------------PhysicalProject +--------------filter((m2.k1 = 1)) +----------------PhysicalCteConsumer ( cteId=CTEId#0 ) + +-- !cte_filter_pushdown_2 -- +PhysicalCteAnchor ( cteId=CTEId#0 ) +--PhysicalCteProducer ( cteId=CTEId#0 ) +----PhysicalProject +------filter((main.k1 = 1)) +--------PhysicalWindow +----------PhysicalQuickSort +------------PhysicalDistribute +--------------PhysicalProject +----------------PhysicalOlapScan[test] +--PhysicalResultSink +----PhysicalDistribute +------PhysicalProject +--------hashJoin[INNER_JOIN](m1.k1 = m2.k1) +----------PhysicalDistribute +------------filter((temp.k1 = 1)) +--------------PhysicalCteConsumer ( cteId=CTEId#0 ) +----------PhysicalDistribute +------------PhysicalProject +--------------filter((m2.k1 = 1)) +----------------PhysicalCteConsumer ( cteId=CTEId#0 ) + diff --git a/regression-test/suites/nereids_p0/cte/test_cte_filter_pushdown.groovy b/regression-test/suites/nereids_p0/cte/test_cte_filter_pushdown.groovy new file mode 100644 index 00000000000000..8f08721f6cd3a5 --- /dev/null +++ b/regression-test/suites/nereids_p0/cte/test_cte_filter_pushdown.groovy @@ -0,0 +1,47 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +suite("test_cte_filter_pushdown)") { + sql "SET enable_nereids_planner=true" + sql "SET enable_pipeline_engine=true" + sql "SET enable_fallback_to_original_planner=false" + + // CTE filter pushing down with the same filter + qt_cte_filter_pushdown_1 """ + explain shape plan + with main AS ( + select k1, row_number() over (partition by k1) rn + from nereids_test_query_db.test + ) + select * from ( + select m1.* from main m1, main m2 + where m1.k1 = m2.k1 + ) temp + where k1 = 1; + """ + qt_cte_filter_pushdown_2 """ + explain shape plan + with main AS ( + select k1, row_number() over (partition by k2) rn + from nereids_test_query_db.test + ) + select * from ( + select m1.* from main m1, main m2 + where m1.k1 = m2.k1 + ) temp + where k1 = 1; + """ +}