Skip to content

Commit

Permalink
[feature](create-table) support setting replication num for creating …
Browse files Browse the repository at this point in the history
…table opertaion globally
  • Loading branch information
morningman committed Jul 16, 2023
1 parent e397112 commit 017aca7
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2028,11 +2028,13 @@ public class Config extends ConfigBase {

@ConfField(mutable = true, masterOnly = true, description = {
"用于强制设定内表的副本数,如果改参数大于零,则用户在建表时指定的副本数将被忽略,而使用本参数设置的值。"
+ "同时,建表语句中指定的副本标签等参数会被忽略。该参数建议仅用于测试环境",
"Used to force the number of replicas of the internal table. If the parameter is greater than zero, "
+ "同时,建表语句中指定的副本标签等参数会被忽略。该参数不影响包括创建分区、修改表属性的操作。该参数建议仅用于测试环境",
"Used to force the number of replicas of the internal table. If the config is greater than zero, "
+ "the number of replicas specified by the user when creating the table will be ignored, "
+ "and the value set by this parameter will be used. At the same time, the replica tags "
+ "and other parameters specified in the create table statement will be ignored. "
+ "This parameter is recommended to be used only in the test environment"})
+ "This config does not effect the operations including creating partitions "
+ "and modifying table properties. "
+ "This config is recommended to be used only in the test environment"})
public static int force_olap_table_replication_num = 0;
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.doris.catalog.Index;
import org.apache.doris.catalog.KeysType;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.catalog.ReplicaAllocation;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
Expand Down Expand Up @@ -535,6 +536,8 @@ public void analyze(Analyzer analyzer) throws UserException, AnalysisException {
}

if (engineName.equals("olap")) {
// before analyzing partition, handle the replication allocation info
properties = rewriteReplicaAllocationProperties(properties);
// analyze partition
if (partitionDesc != null) {
if (partitionDesc instanceof ListPartitionDesc || partitionDesc instanceof RangePartitionDesc
Expand Down Expand Up @@ -624,6 +627,34 @@ public void analyze(Analyzer analyzer) throws UserException, AnalysisException {
}
}

private Map<String, String> rewriteReplicaAllocationProperties(Map<String, String> properties) {
if (Config.force_olap_table_replication_num <= 0) {
return properties;
}
// if force_olap_table_replication_num is set, use this value to rewrite the replication_num or
// replication_allocation properties
Map<String, String> newProperties = properties;
if (newProperties == null) {
newProperties = Maps.newHashMap();
}
boolean rewrite = false;
if (newProperties.containsKey(PropertyAnalyzer.PROPERTIES_REPLICATION_NUM)) {
newProperties.put(PropertyAnalyzer.PROPERTIES_REPLICATION_NUM,
String.valueOf(Config.force_olap_table_replication_num));
rewrite = true;
}
if (newProperties.containsKey(PropertyAnalyzer.PROPERTIES_REPLICATION_ALLOCATION)) {
newProperties.put(PropertyAnalyzer.PROPERTIES_REPLICATION_ALLOCATION,
new ReplicaAllocation((short) Config.force_olap_table_replication_num).toCreateStmt());
rewrite = true;
}
if (!rewrite) {
newProperties.put(PropertyAnalyzer.PROPERTIES_REPLICATION_NUM,
String.valueOf(Config.force_olap_table_replication_num));
}
return newProperties;
}

private void analyzeEngineName() throws AnalysisException {
if (Strings.isNullOrEmpty(engineName)) {
engineName = "olap";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1867,17 +1867,6 @@ private Partition createPartitionWithIndices(String clusterName, long dbId, long
return partition;
}

private ReplicaAllocation getReplicaAllocForCreatingOlapTable(Map<String, String> properties) throws UserException {
if (Config.force_olap_table_replication_num > 0) {
return new ReplicaAllocation((short) Config.force_olap_table_replication_num);
}
ReplicaAllocation replicaAlloc = PropertyAnalyzer.analyzeReplicaAllocation(properties, "");
if (replicaAlloc.isNotSet()) {
replicaAlloc = ReplicaAllocation.DEFAULT_ALLOCATION;
}
return replicaAlloc;
}

// Create olap table and related base index synchronously.
private void createOlapTable(Database db, CreateTableStmt stmt) throws UserException {
String tableName = stmt.getTableName();
Expand All @@ -1896,7 +1885,10 @@ private void createOlapTable(Database db, CreateTableStmt stmt) throws UserExcep
checkAutoIncColumns(baseSchema, keysType);

// analyze replica allocation
ReplicaAllocation replicaAlloc = getReplicaAllocForCreatingOlapTable(stmt.getProperties());
ReplicaAllocation replicaAlloc = PropertyAnalyzer.analyzeReplicaAllocation(stmt.getProperties(), "");
if (replicaAlloc.isNotSet()) {
replicaAlloc = ReplicaAllocation.DEFAULT_ALLOCATION;
}

long bufferSize = IdGeneratorUtil.getBufferSizeForCreateTable(stmt, replicaAlloc);
IdGeneratorBuffer idGeneratorBuffer = Env.getCurrentEnv().getIdGeneratorBuffer(bufferSize);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.doris.catalog;

import org.apache.doris.analysis.AlterTableStmt;
import org.apache.doris.analysis.CreateDbStmt;
import org.apache.doris.analysis.CreateTableStmt;
import org.apache.doris.common.AnalysisException;
Expand All @@ -25,6 +26,7 @@
import org.apache.doris.common.ConfigException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.ExceptionChecker;
import org.apache.doris.common.UserException;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.utframe.UtFrameUtils;

Expand Down Expand Up @@ -67,6 +69,11 @@ private static void createTable(String sql) throws Exception {
Env.getCurrentEnv().createTable(createTableStmt);
}

private static void alterTable(String sql) throws Exception {
AlterTableStmt alterTableStmt = (AlterTableStmt) UtFrameUtils.parseAndAnalyzeStmt(sql, connectContext);
Env.getCurrentEnv().alterTable(alterTableStmt);
}

@Test
public void testDuplicateCreateTable() throws Exception {
// test
Expand Down Expand Up @@ -739,4 +746,27 @@ public void testCreateTableWithStringLen() throws DdlException {
Assert.assertEquals(ScalarType.MAX_VARCHAR_LENGTH, tb.getColumn("k3").getStrLen());
Assert.assertEquals(10, tb.getColumn("k4").getStrLen());
}

@Test
public void testCreateTableWithForceReplica() throws DdlException {
Config.force_olap_table_replication_num = 1;
// no need to specify replication_num, the table can still be created.
ExceptionChecker.expectThrowsNoException(() -> {
createTable("create table test.test_replica\n" + "(k1 int, k2 int) partition by range(k1)\n" + "(\n"
+ "partition p1 values less than(\"10\"),\n" + "partition p2 values less than(\"20\")\n" + ")\n"
+ "distributed by hash(k2) buckets 1;");
});

// can still set replication_num manually.
ExceptionChecker.expectThrowsWithMsg(UserException.class, "Failed to find enough host with tag",
() ->
{ alterTable("alter table test.test_replica modify partition p1 set ('replication_num' = '3')");
});

Database db = Env.getCurrentInternalCatalog().getDbOrDdlException("default_cluster:test");
OlapTable tb = (OlapTable) db.getTableOrDdlException("test_replica");
Partition p1 = tb.getPartition("p1");
Assert.assertEquals(1, tb.getPartitionInfo().getReplicaAllocation(p1.getId()).getTotalReplicaNum());
Assert.assertEquals(1, tb.getTableProperty().getReplicaAllocation().getTotalReplicaNum());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.doris.resource.Tag;

import com.google.common.collect.Maps;
import org.apache.spark.sql.AnalysisException;
import org.junit.Assert;
import org.junit.Test;

Expand Down

0 comments on commit 017aca7

Please sign in to comment.