diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 018602fafd3948..0cbe3eebbab9f1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -1788,7 +1788,6 @@ protected void computeFragmentHosts() throws Exception { } Pair pairNodes = findLeftmostNode(fragment.getPlanRoot()); - PlanNode fatherNode = pairNodes.first; PlanNode leftMostNode = pairNodes.second; /* @@ -1803,25 +1802,8 @@ protected void computeFragmentHosts() throws Exception { // (Case B) // there is no leftmost scan; we assign the same hosts as those of our // input fragment which has a higher instance_number - - int inputFragmentIndex = 0; - int maxParallelism = 0; - // If the fragment has three children, then the first child and the second child are - // the children(both exchange node) of shuffle HashJoinNode, - // and the third child is the right child(ExchangeNode) of broadcast HashJoinNode. - // We only need to pay attention to the maximum parallelism among - // the two ExchangeNodes of shuffle HashJoinNode. - int childrenCount = (fatherNode != null) ? fatherNode.getChildren().size() : 1; - for (int j = 0; j < childrenCount; j++) { - int currentChildFragmentParallelism - = fragmentExecParamsMap.get(fragment.getChild(j).getFragmentId()).instanceExecParams.size(); - if (currentChildFragmentParallelism > maxParallelism) { - maxParallelism = currentChildFragmentParallelism; - inputFragmentIndex = j; - } - } - - PlanFragmentId inputFragmentId = fragment.getChild(inputFragmentIndex).getFragmentId(); + int maxParallelFragmentIndex = findMaxParallelFragmentIndex(fragment); + PlanFragmentId inputFragmentId = fragment.getChild(maxParallelFragmentIndex).getFragmentId(); // AddAll() soft copy() int exchangeInstances = -1; if (ConnectContext.get() != null && ConnectContext.get().getSessionVariable() != null) { @@ -1978,6 +1960,27 @@ protected void computeFragmentHosts() throws Exception { } } + private int findMaxParallelFragmentIndex(PlanFragment fragment) { + Preconditions.checkState(!fragment.getChildren().isEmpty(), "fragment has no children"); + + // exclude broadcast join right side's child fragments + List childFragmentCandidates = fragment.getChildren().stream() + .filter(e -> e.getOutputPartition() != DataPartition.UNPARTITIONED) + .collect(Collectors.toList()); + + int maxParallelism = 0; + int maxParaIndex = 0; + for (int i = 0; i < childFragmentCandidates.size(); i++) { + PlanFragmentId childFragmentId = childFragmentCandidates.get(i).getFragmentId(); + int currentChildFragmentParallelism = fragmentExecParamsMap.get(childFragmentId).instanceExecParams.size(); + if (currentChildFragmentParallelism > maxParallelism) { + maxParallelism = currentChildFragmentParallelism; + maxParaIndex = i; + } + } + return maxParaIndex; + } + private TNetworkAddress getGroupCommitBackend(Map addressToBackendID) { // Used for Nereids planner Group commit insert BE select. TNetworkAddress execHostport = new TNetworkAddress(groupCommitBackend.getHost(), diff --git a/regression-test/suites/query_p0/union/test_union_instance.groovy b/regression-test/suites/query_p0/union/test_union_instance.groovy new file mode 100644 index 00000000000000..17a0d3ef1dd614 --- /dev/null +++ b/regression-test/suites/query_p0/union/test_union_instance.groovy @@ -0,0 +1,88 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_union_instance") { + multi_sql """ + drop table if exists t1; + drop table if exists t2; + drop table if exists t3; + drop table if exists t4; + CREATE TABLE `t1` ( + `c1` int NULL, + `c2` int NULL, + `c3` int NULL + ) ENGINE=OLAP + DUPLICATE KEY(`c1`, `c2`, `c3`) + DISTRIBUTED BY HASH(`c1`) BUCKETS 3 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1"); + + insert into t1 values (1,1,1); + insert into t1 values (2,2,2); + insert into t1 values (3,3,3); + + CREATE TABLE `t2` ( + `year_week` varchar(45) NULL + ) ENGINE=OLAP + DUPLICATE KEY(`year_week`) + DISTRIBUTED BY HASH(`year_week`) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1"); + + CREATE TABLE `t3` ( + `unique_key` varchar(2999) NULL, + `brand_name` varchar(96) NULL, + `skc` varchar(150) NULL + ) ENGINE=OLAP + UNIQUE KEY(`unique_key`) + DISTRIBUTED BY HASH(`unique_key`) BUCKETS 20 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1"); + + CREATE TABLE `t4` ( + `skc_code` varchar(150) NULL + ) ENGINE=OLAP + UNIQUE KEY(`skc_code`) + DISTRIBUTED BY HASH(`skc_code`) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1"); + + set parallel_pipeline_task_num=1; + set disable_nereids_rules='PRUNE_EMPTY_PARTITION'; + """ + explain { + sql """ + SELECT `t`.`year_week` AS `year_week` + ,'' AS `brand_name` + , '' AS `skc_code` + FROM `t1` a + CROSS JOIN `t2` t + union all + SELECT '1' AS `year_week` + ,`a`.`brand_name` AS `brand_name` + ,`a`.`skc` AS `skc_code` + FROM `t3` a + INNER JOIN[shuffle] `t4` b ON `a`.`skc` = `b`.`skc_code` + GROUP BY 1, 2, 3; + """ + contains "3:VNESTED LOOP JOIN" + contains "4:VEXCHANGE" + contains "8:VEXCHANGE" + contains "6:VEXCHANGE" + contains "11:VUNION" + } +}