Skip to content

Commit

Permalink
[feat](mtmv)external table support partition rewrite (apache#44998)
Browse files Browse the repository at this point in the history
### What problem does this PR solve?

Previously, transparent rewriting of the external table could only be
done as a whole or without rewriting.

Now supports partial partition rewriting and direct lookup of the base
table for some partitions.

### Release note

mtmv partition rewrite support external table
  • Loading branch information
zddr authored Dec 5, 2024
1 parent 61df4e6 commit 6b4b3cb
Show file tree
Hide file tree
Showing 8 changed files with 148 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ public SelectedPartitions initSelectedPartitions(Optional<MvccSnapshot> snapshot
* @param snapshot if not support mvcc, ignore this
* @return partitionName ==> PartitionItem
*/
protected Map<String, PartitionItem> getNameToPartitionItems(Optional<MvccSnapshot> snapshot) {
public Map<String, PartitionItem> getNameToPartitionItems(Optional<MvccSnapshot> snapshot) {
return Collections.emptyMap();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ public boolean supportInternalPartitionPruned() {
}

@Override
protected Map<String, PartitionItem> getNameToPartitionItems(Optional<MvccSnapshot> snapshot) {
public Map<String, PartitionItem> getNameToPartitionItems(Optional<MvccSnapshot> snapshot) {
return getNameToPartitionItems();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public List<Column> getPartitionColumns() {
}

@Override
protected Map<String, PartitionItem> getNameToPartitionItems(Optional<MvccSnapshot> snapshot) {
public Map<String, PartitionItem> getNameToPartitionItems(Optional<MvccSnapshot> snapshot) {
if (getPartitionColumns().isEmpty()) {
return Collections.emptyMap();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -461,17 +461,14 @@ protected Pair<Map<BaseTableInfo, Set<String>>, Map<BaseTableInfo, Set<String>>>
return Pair.of(ImmutableMap.of(), ImmutableMap.of());
}
// Collect the mv related base table partitions which query used
Map<BaseTableInfo, Set<Partition>> queryUsedBaseTablePartitions = new LinkedHashMap<>();
Map<BaseTableInfo, Set<String>> queryUsedBaseTablePartitions = new LinkedHashMap<>();
queryUsedBaseTablePartitions.put(relatedPartitionTable, new HashSet<>());
queryPlan.accept(new StructInfo.QueryScanPartitionsCollector(), queryUsedBaseTablePartitions);
// Bail out, not check invalid partition if not olap scan, support later
if (queryUsedBaseTablePartitions.isEmpty()) {
return Pair.of(ImmutableMap.of(), ImmutableMap.of());
}
Set<String> queryUsedBaseTablePartitionNameSet = queryUsedBaseTablePartitions.get(relatedPartitionTable)
.stream()
.map(Partition::getName)
.collect(Collectors.toSet());
Set<String> queryUsedBaseTablePartitionNameSet = queryUsedBaseTablePartitions.get(relatedPartitionTable);

Collection<Partition> mvValidPartitions = MTMVRewriteUtil.getMTMVCanRewritePartitions(mtmv,
cascadesContext.getConnectContext(), System.currentTimeMillis(), false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@

package org.apache.doris.nereids.rules.exploration.mv;

import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.Pair;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.mtmv.BaseTableInfo;
import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.jobs.executor.Rewriter;
Expand Down Expand Up @@ -51,6 +51,8 @@
import org.apache.doris.nereids.trees.plans.commands.UpdateMvByPartitionCommand.PredicateAdder;
import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate;
import org.apache.doris.nereids.trees.plans.logical.LogicalCatalogRelation;
import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan;
import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan.SelectedPartitions;
import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
import org.apache.doris.nereids.trees.plans.logical.LogicalJoin;
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
Expand Down Expand Up @@ -731,22 +733,28 @@ public Plan visitLogicalOlapScan(LogicalOlapScan olapScan,
* Collect partitions on base table
*/
public static class QueryScanPartitionsCollector extends DefaultPlanVisitor<Plan,
Map<BaseTableInfo, Set<Partition>>> {
Map<BaseTableInfo, Set<String>>> {
@Override
public Plan visitLogicalCatalogRelation(LogicalCatalogRelation catalogRelation,
Map<BaseTableInfo, Set<Partition>> targetTablePartitionMap) {
Map<BaseTableInfo, Set<String>> targetTablePartitionMap) {
TableIf table = catalogRelation.getTable();
BaseTableInfo relatedPartitionTable = new BaseTableInfo(table);
if (!targetTablePartitionMap.containsKey(relatedPartitionTable)) {
return catalogRelation;
}
Set<String> tablePartitions = targetTablePartitionMap.get(relatedPartitionTable);
if (catalogRelation instanceof LogicalOlapScan) {
// Handle olap table
LogicalOlapScan logicalOlapScan = (LogicalOlapScan) catalogRelation;
Set<Partition> tablePartitions = targetTablePartitionMap.get(relatedPartitionTable);
for (Long partitionId : logicalOlapScan.getSelectedPartitionIds()) {
tablePartitions.add(logicalOlapScan.getTable().getPartition(partitionId));
tablePartitions.add(logicalOlapScan.getTable().getPartition(partitionId).getName());
}
} else if (catalogRelation instanceof LogicalFileScan
&& catalogRelation.getTable() instanceof ExternalTable
&& ((ExternalTable) catalogRelation.getTable()).supportInternalPartitionPruned()) {
LogicalFileScan logicalFileScan = (LogicalFileScan) catalogRelation;
SelectedPartitions selectedPartitions = logicalFileScan.getSelectedPartitions();
tablePartitions.addAll(selectedPartitions.selectedPartitions.keySet());
} else {
// todo Support other type partition table
// Not support to partition check now when query external catalog table, support later.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.doris.catalog.Type;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.UserException;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.mtmv.BaseTableInfo;
import org.apache.doris.mtmv.MTMVRelatedTableIf;
import org.apache.doris.nereids.analyzer.UnboundRelation;
Expand Down Expand Up @@ -306,16 +307,19 @@ public Plan visitLogicalCatalogRelation(LogicalCatalogRelation catalogRelation,
MTMVRelatedTableIf targetTable = (MTMVRelatedTableIf) table;
for (String partitionName : filterTableEntry.getValue()) {
Partition partition = targetTable.getPartition(partitionName);
if (!(targetTable instanceof OlapTable)) {
// check partition is have data or not, only support olap table
break;
}
if (!((OlapTable) targetTable).selectNonEmptyPartitionIds(
if (targetTable instanceof OlapTable && !((OlapTable) targetTable).selectNonEmptyPartitionIds(
Lists.newArrayList(partition.getId())).isEmpty()) {
// Add filter only when partition has data
// Add filter only when partition has data when olap table
partitionHasDataItems.add(
((OlapTable) targetTable).getPartitionInfo().getItem(partition.getId()));
}
if (targetTable instanceof ExternalTable) {
// Add filter only when partition has data when external table
// TODO: 2024/12/4 real snapshot
partitionHasDataItems.add(
((ExternalTable) targetTable).getNameToPartitionItems(Optional.empty())
.get(partitionName));
}
}
if (partitionHasDataItems.isEmpty()) {
predicates.setNeedAddFilter(false);
Expand Down
31 changes: 31 additions & 0 deletions regression-test/data/mtmv_p0/test_hive_rewrite_mtmv.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !refresh_one_partition --
20230101 3

-- !refresh_one_partition_rewrite --
20230101 3
20230102 3

-- !refresh_complete --
20230101 3
20230102 3

-- !refresh_all_partition_rewrite --
20230101 3
20230102 3

-- !refresh_one_partition --
20230101 3

-- !refresh_one_partition_rewrite --
20230101 3
20230102 3

-- !refresh_complete --
20230101 3
20230102 3

-- !refresh_all_partition_rewrite --
20230101 3
20230102 3

89 changes: 89 additions & 0 deletions regression-test/suites/mtmv_p0/test_hive_rewrite_mtmv.groovy
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

suite("test_hive_rewrite_mtmv", "p0,external,hive,external_docker,external_docker_hive") {
String enabled = context.config.otherConfigs.get("enableHiveTest")
if (enabled == null || !enabled.equalsIgnoreCase("true")) {
logger.info("diable Hive test.")
return;
}
String suiteName = "test_hive_rewrite_mtmv"
String catalogName = "${suiteName}_catalog"
String mvName = "${suiteName}_mv"
String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
sql """set materialized_view_rewrite_enable_contain_external_table=true;"""
String mvSql = "SELECT part_col,count(*) as num FROM ${catalogName}.`default`.mtmv_base1 group by part_col;";
for (String hivePrefix : ["hive2", "hive3"]) {
String hms_port = context.config.otherConfigs.get(hivePrefix + "HmsPort")
sql """drop catalog if exists ${catalogName}"""
sql """create catalog if not exists ${catalogName} properties (
"type"="hms",
'hive.metastore.uris' = 'thrift://${externalEnvIp}:${hms_port}'
);"""
sql """analyze table ${catalogName}.`default`.mtmv_base1 with sync"""
sql """alter table ${catalogName}.`default`.mtmv_base1 modify column part_col set stats ('row_count'='6');"""

sql """drop materialized view if exists ${mvName};"""
sql """
CREATE MATERIALIZED VIEW ${mvName}
BUILD DEFERRED REFRESH AUTO ON MANUAL
partition by(`part_col`)
DISTRIBUTED BY RANDOM BUCKETS 2
PROPERTIES ('replication_num' = '1')
AS
${mvSql}
"""
def showPartitionsResult = sql """show partitions from ${mvName}"""
logger.info("showPartitionsResult: " + showPartitionsResult.toString())
assertTrue(showPartitionsResult.toString().contains("p_20230101"))
assertTrue(showPartitionsResult.toString().contains("p_20230102"))

// refresh one partitions
sql """
REFRESH MATERIALIZED VIEW ${mvName} partitions(p_20230101);
"""
waitingMTMVTaskFinishedByMvName(mvName)
order_qt_refresh_one_partition "SELECT * FROM ${mvName}"

def explainOnePartition = sql """ explain ${mvSql} """
logger.info("explainOnePartition: " + explainOnePartition.toString())
assertTrue(explainOnePartition.toString().contains("VUNION"))
assertTrue(explainOnePartition.toString().contains("part_col[#4] = 20230102"))
order_qt_refresh_one_partition_rewrite "${mvSql}"

mv_rewrite_success("${mvSql}", "${mvName}")

//refresh complete
sql """
REFRESH MATERIALIZED VIEW ${mvName} complete
"""
waitingMTMVTaskFinishedByMvName(mvName)
order_qt_refresh_complete "SELECT * FROM ${mvName}"

def explainAllPartition = sql """ explain ${mvSql}; """
logger.info("explainAllPartition: " + explainAllPartition.toString())
assertTrue(explainAllPartition.toString().contains("VOlapScanNode"))
assertTrue(explainAllPartition.toString().contains("partitions=2/2"))
order_qt_refresh_all_partition_rewrite "${mvSql}"

mv_rewrite_success("${mvSql}", "${mvName}")

sql """drop materialized view if exists ${mvName};"""
sql """drop catalog if exists ${catalogName}"""
}
}

0 comments on commit 6b4b3cb

Please sign in to comment.