Skip to content

Commit

Permalink
[enhancement](schema-change) Support light schema change on hash colu…
Browse files Browse the repository at this point in the history
…mns and agg key columns with varchar type to change length (apache#39319) (apache#40236)

## Proposed changes

1. Schema change should rebuild distribution info after modifying
columns, especially distribution columns. Or it may cause dynamic
partition failed when checking distribution columns' equality.
2. Support hash key columns to do light schema change. For unique key or
dup key columns, could not be enabled temporarily due to some historical
reasons. See apache#39798 .
  • Loading branch information
TangSiyang2001 authored Sep 9, 2024
1 parent e0b22b5 commit d373ca7
Show file tree
Hide file tree
Showing 7 changed files with 107 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -591,13 +591,7 @@ private boolean processModifyColumn(ModifyColumnClause alterClause, OlapTable ol
if (columnPos == null && col.getDataType() == PrimitiveType.VARCHAR
&& modColumn.getDataType() == PrimitiveType.VARCHAR) {
col.checkSchemaChangeAllowed(modColumn);
// If col and modColumn is not key, it allow light schema change,
// of course, olapTable has been enable light schema change
if (modColumn.isKey() || col.isKey()) {
lightSchemaChange = false;
} else {
lightSchemaChange = olapTable.getEnableLightSchemaChange();
}
lightSchemaChange = olapTable.getEnableLightSchemaChange();
}
if (col.isClusterKey()) {
throw new DdlException("Can not modify cluster key column: " + col.getName());
Expand Down Expand Up @@ -2938,6 +2932,7 @@ public void updateBaseIndexSchema(OlapTable olapTable, Map<Long, LinkedList<Colu
}
olapTable.setIndexes(indexes);
olapTable.rebuildFullSchema();
olapTable.rebuildDistributionInfo();
}

public void replayModifyTableAddOrDropInvertedIndices(TableAddOrDropInvertedIndicesInfo info)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -679,6 +679,7 @@ private void onFinished(OlapTable tbl) {
}
// rebuild table's full schema
tbl.rebuildFullSchema();
tbl.rebuildDistributionInfo();

// update bloom filter
if (hasBfChange) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -674,7 +674,7 @@ public void checkSchemaChangeAllowed(Column other) throws DdlException {
}
}

if (this.aggregationType != other.aggregationType) {
if (!Objects.equals(this.aggregationType, other.aggregationType)) {
throw new DdlException("Can not change aggregation type");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,4 +180,8 @@ public String toString() {
public RandomDistributionInfo toRandomDistributionInfo() {
return new RandomDistributionInfo(bucketNum);
}

public void setDistributionColumns(List<Column> column) {
this.distributionColumns = column;
}
}
24 changes: 24 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,30 @@ public void rebuildFullSchema() {
}
}

public void rebuildDistributionInfo() {
if (!Objects.equals(defaultDistributionInfo.getType(), DistributionInfoType.HASH)) {
return;
}
HashDistributionInfo distributionInfo = (HashDistributionInfo) defaultDistributionInfo;
Set<String> originalColumnsNames =
distributionInfo.getDistributionColumns()
.stream()
.map(Column::getName)
.collect(Collectors.toSet());

List<Column> newDistributionColumns = getBaseSchema()
.stream()
.filter(column -> originalColumnsNames.contains(column.getName()))
.map(Column::new)
.collect(Collectors.toList());
distributionInfo.setDistributionColumns(newDistributionColumns);

getPartitions()
.stream()
.map(Partition::getDistributionInfo)
.forEach(info -> ((HashDistributionInfo) info).setDistributionColumns(newDistributionColumns));
}

public boolean deleteIndexInfo(String indexName) {
if (!indexNameToId.containsKey(indexName)) {
return false;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.


import org.apache.doris.regression.suite.ClusterOptions

suite("test_dynamic_partition_mod_distribution_key") {
def options = new ClusterOptions()
options.setFeNum(2)
docker(options) {
// FIXME: for historical bugs, this case will fail if adding k2 as dup key or unique key
// see in https://github.com/apache/doris/issues/39798
def keys = ["DUPLICATE KEY (k1)", "UNIQUE KEY (k1)", "AGGREGATE KEY (k1, k2)"]
def aggTypes = ["", "", "REPLACE"]
for (i in 0..<3) {
def key = keys.get(i)
def aggType = aggTypes.get(i)
def tableName = "test_dynamic_partition_mod_distribution_key"
sql """ DROP TABLE IF EXISTS ${tableName} """

sql """
CREATE TABLE IF NOT EXISTS ${tableName} (
k1 DATE NOT NULL,
k2 VARCHAR(20) NOT NULL,
v INT ${aggType}
) ${key}
PARTITION BY RANGE(k1) ()
DISTRIBUTED BY HASH(k1) BUCKETS 1
PROPERTIES (
"dynamic_partition.enable"="true",
"dynamic_partition.end"="3",
"dynamic_partition.buckets"="1",
"dynamic_partition.start"="-3",
"dynamic_partition.prefix"="p",
"dynamic_partition.time_unit"="DAY",
"dynamic_partition.create_history_partition"="true",
"dynamic_partition.replication_allocation" = "tag.location.default: 1")
"""

sql """ alter table ${tableName} modify column k1 comment 'new_comment_for_k1' """
sql """ alter table ${tableName} modify column k2 varchar(255) """

cluster.restartFrontends()
sleep(30000)
context.reconnectFe()

sql """ ADMIN SET FRONTEND CONFIG ('dynamic_partition_check_interval_seconds' = '1') """
sql """ alter table ${tableName} set('dynamic_partition.end'='5') """
result = sql "show partitions from ${tableName}"
for (def retry = 0; retry < 10; retry++) { // at most wait 120s
if (result.size() == 9) {
break;
}
logger.info("wait dynamic partition scheduler, sleep 1s")
sleep(1000) // sleep 1s
result = sql "show partitions from ${tableName}"
}
assertEquals(9, result.size())
}
}
}

This file was deleted.

0 comments on commit d373ca7

Please sign in to comment.