Skip to content

Commit

Permalink
[fix](nereids)(create-table) fix bug that replication num is not set …
Browse files Browse the repository at this point in the history
…when create table with no property (apache#25651)

When executing create partitioned table with Nereids, and replication_num property is not set,
the replication number will be 0, so the tablet will has no replica.
  • Loading branch information
morningman authored Oct 21, 2023
1 parent 387a9c7 commit 13780e4
Show file tree
Hide file tree
Showing 12 changed files with 240 additions and 171 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,11 @@
import org.apache.doris.analysis.IndexDef.IndexType;
import org.apache.doris.catalog.AggregateType;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.DistributionInfo;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.Index;
import org.apache.doris.catalog.KeysType;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.catalog.ReplicaAllocation;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
Expand All @@ -41,7 +39,6 @@
import org.apache.doris.common.util.PrintableMap;
import org.apache.doris.common.util.PropertyAnalyzer;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.external.elasticsearch.EsUtil;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.qe.ConnectContext;
Expand All @@ -51,7 +48,6 @@
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -559,7 +555,8 @@ public void analyze(Analyzer analyzer) throws UserException, AnalysisException {

if (engineName.equals("olap")) {
// before analyzing partition, handle the replication allocation info
properties = rewriteReplicaAllocationProperties(properties);
properties = PropertyAnalyzer.rewriteReplicaAllocationProperties(
tableName.getCtl(), tableName.getDb(), properties);
// analyze partition
if (partitionDesc != null) {
if (partitionDesc instanceof ListPartitionDesc || partitionDesc instanceof RangePartitionDesc
Expand Down Expand Up @@ -650,74 +647,6 @@ public void analyze(Analyzer analyzer) throws UserException, AnalysisException {
}
}

private Map<String, String> rewriteReplicaAllocationProperties(Map<String, String> properties)
throws AnalysisException {
if (Config.force_olap_table_replication_num <= 0) {
return rewriteReplicaAllocationPropertiesByDatabase(properties);
}
// if force_olap_table_replication_num is set, use this value to rewrite the replication_num or
// replication_allocation properties
Map<String, String> newProperties = properties;
if (newProperties == null) {
newProperties = Maps.newHashMap();
}
boolean rewrite = false;
if (newProperties.containsKey(PropertyAnalyzer.PROPERTIES_REPLICATION_NUM)) {
newProperties.put(PropertyAnalyzer.PROPERTIES_REPLICATION_NUM,
String.valueOf(Config.force_olap_table_replication_num));
rewrite = true;
}
if (newProperties.containsKey(PropertyAnalyzer.PROPERTIES_REPLICATION_ALLOCATION)) {
newProperties.put(PropertyAnalyzer.PROPERTIES_REPLICATION_ALLOCATION,
new ReplicaAllocation((short) Config.force_olap_table_replication_num).toCreateStmt());
rewrite = true;
}
if (!rewrite) {
newProperties.put(PropertyAnalyzer.PROPERTIES_REPLICATION_NUM,
String.valueOf(Config.force_olap_table_replication_num));
}
return newProperties;
}

private Map<String, String> rewriteReplicaAllocationPropertiesByDatabase(Map<String, String> properties)
throws AnalysisException {
// if table contain `replication_allocation` or `replication_allocation`,not need rewrite by db
if (properties != null && (properties.containsKey(PropertyAnalyzer.PROPERTIES_REPLICATION_ALLOCATION)
|| properties.containsKey(PropertyAnalyzer.PROPERTIES_REPLICATION_NUM))) {
return properties;
}
CatalogIf catalog = Env.getCurrentEnv().getCatalogMgr().getCatalogNullable(tableName.getCtl());
if (catalog == null) {
return properties;
}
DatabaseIf db = catalog.getDbNullable(tableName.getDb());
if (db == null) {
return properties;
}
// if db not have properties,not need rewrite
if (db.getDbProperties() == null) {
return properties;
}
Map<String, String> dbProperties = db.getDbProperties().getProperties();
if (dbProperties == null) {
return properties;
}
if (properties == null) {
properties = Maps.newHashMap();
}
if (dbProperties.containsKey(PropertyAnalyzer.PROPERTIES_REPLICATION_ALLOCATION) && StringUtils
.isNotEmpty(dbProperties.get(PropertyAnalyzer.PROPERTIES_REPLICATION_ALLOCATION))) {
properties.put(PropertyAnalyzer.PROPERTIES_REPLICATION_ALLOCATION,
dbProperties.get(PropertyAnalyzer.PROPERTIES_REPLICATION_ALLOCATION));
}
if (dbProperties.containsKey(PropertyAnalyzer.PROPERTIES_REPLICATION_NUM) && StringUtils
.isNotEmpty(dbProperties.get(PropertyAnalyzer.PROPERTIES_REPLICATION_NUM))) {
properties.put(PropertyAnalyzer.PROPERTIES_REPLICATION_NUM,
dbProperties.get(PropertyAnalyzer.PROPERTIES_REPLICATION_NUM));
}
return properties;
}

private void analyzeEngineName() throws AnalysisException {
if (Strings.isNullOrEmpty(engineName)) {
engineName = "olap";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.doris.analysis.DateLiteral;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.DataProperty;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.EsResource;
import org.apache.doris.catalog.KeysType;
Expand All @@ -32,6 +33,7 @@
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.datasource.CatalogMgr;
import org.apache.doris.policy.Policy;
import org.apache.doris.policy.StoragePolicy;
Expand All @@ -48,6 +50,7 @@
import com.google.common.base.Strings;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -1176,4 +1179,73 @@ public static void checkCatalogProperties(Map<String, String> properties, boolea
}
}
}

public static Map<String, String> rewriteReplicaAllocationProperties(
String ctl, String db, Map<String, String> properties) {
if (Config.force_olap_table_replication_num <= 0) {
return rewriteReplicaAllocationPropertiesByDatabase(ctl, db, properties);
}
// if force_olap_table_replication_num is set, use this value to rewrite the replication_num or
// replication_allocation properties
Map<String, String> newProperties = properties;
if (newProperties == null) {
newProperties = Maps.newHashMap();
}
boolean rewrite = false;
if (newProperties.containsKey(PropertyAnalyzer.PROPERTIES_REPLICATION_NUM)) {
newProperties.put(PropertyAnalyzer.PROPERTIES_REPLICATION_NUM,
String.valueOf(Config.force_olap_table_replication_num));
rewrite = true;
}
if (newProperties.containsKey(PropertyAnalyzer.PROPERTIES_REPLICATION_ALLOCATION)) {
newProperties.put(PropertyAnalyzer.PROPERTIES_REPLICATION_ALLOCATION,
new ReplicaAllocation((short) Config.force_olap_table_replication_num).toCreateStmt());
rewrite = true;
}
if (!rewrite) {
newProperties.put(PropertyAnalyzer.PROPERTIES_REPLICATION_NUM,
String.valueOf(Config.force_olap_table_replication_num));
}
return newProperties;
}

private static Map<String, String> rewriteReplicaAllocationPropertiesByDatabase(
String ctl, String database, Map<String, String> properties) {
// if table contain `replication_allocation` or `replication_allocation`,not need rewrite by db
if (properties != null && (properties.containsKey(PropertyAnalyzer.PROPERTIES_REPLICATION_ALLOCATION)
|| properties.containsKey(PropertyAnalyzer.PROPERTIES_REPLICATION_NUM))) {
return properties;
}
CatalogIf catalog = Env.getCurrentEnv().getCatalogMgr().getCatalogNullable(ctl);
if (catalog == null) {
return properties;
}
DatabaseIf db = catalog.getDbNullable(database);
if (db == null) {
return properties;
}
// if db not have properties,not need rewrite
if (db.getDbProperties() == null) {
return properties;
}
Map<String, String> dbProperties = db.getDbProperties().getProperties();
if (dbProperties == null) {
return properties;
}
if (properties == null) {
properties = Maps.newHashMap();
}
if (dbProperties.containsKey(PropertyAnalyzer.PROPERTIES_REPLICATION_ALLOCATION) && StringUtils
.isNotEmpty(dbProperties.get(PropertyAnalyzer.PROPERTIES_REPLICATION_ALLOCATION))) {
properties.put(PropertyAnalyzer.PROPERTIES_REPLICATION_ALLOCATION,
dbProperties.get(PropertyAnalyzer.PROPERTIES_REPLICATION_ALLOCATION));
}
if (dbProperties.containsKey(PropertyAnalyzer.PROPERTIES_REPLICATION_NUM) && StringUtils
.isNotEmpty(dbProperties.get(PropertyAnalyzer.PROPERTIES_REPLICATION_NUM))) {
properties.put(PropertyAnalyzer.PROPERTIES_REPLICATION_NUM,
dbProperties.get(PropertyAnalyzer.PROPERTIES_REPLICATION_NUM));
}
return properties;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -1865,17 +1865,22 @@ private LogicalPlan plan(ParserRuleContext tree) {

@Override
public LogicalPlan visitCreateTable(CreateTableContext ctx) {
String ctlName = null;
String dbName = null;
String tableName;
String tableName = null;
List<String> nameParts = visitMultipartIdentifier(ctx.name);
// TODO: support catalog
if (nameParts.size() == 1) {
tableName = nameParts.get(0);
} else if (nameParts.size() == 2) {
dbName = nameParts.get(0);
tableName = nameParts.get(1);
} else if (nameParts.size() == 3) {
ctlName = nameParts.get(0);
dbName = nameParts.get(1);
tableName = nameParts.get(2);
} else {
throw new AnalysisException("nameParts in create table should be 1 or 2");
throw new AnalysisException("nameParts in create table should be [ctl.][db.]tbl");
}
KeysType keysType = null;
if (ctx.DUPLICATE() != null) {
Expand Down Expand Up @@ -1906,6 +1911,7 @@ public LogicalPlan visitCreateTable(CreateTableContext ctx) {
}
return new CreateTableCommand(Optional.empty(), new CreateTableInfo(
ctx.EXISTS() != null,
ctlName,
dbName,
tableName,
visitColumnDefs(ctx.columnDefs()),
Expand All @@ -1923,6 +1929,7 @@ public LogicalPlan visitCreateTable(CreateTableContext ctx) {
} else if (ctx.AS() != null) {
return new CreateTableCommand(Optional.of(visitQuery(ctx.query())), new CreateTableInfo(
ctx.EXISTS() != null,
ctlName,
dbName,
tableName,
ctx.ctasCols != null ? visitIdentifierList(ctx.ctasCols) : null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,21 @@
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.Index;
import org.apache.doris.catalog.KeysType;
import org.apache.doris.catalog.PartitionType;
import org.apache.doris.catalog.Type;
import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.FeNameFormat;
import org.apache.doris.common.util.PropertyAnalyzer;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.types.DataType;
import org.apache.doris.nereids.util.Utils;
import org.apache.doris.qe.ConnectContext;

import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
Expand All @@ -57,6 +61,7 @@
*/
public class CreateTableInfo {
private final boolean ifNotExists;
private String ctlName;
private String dbName;
private final String tableName;
private List<ColumnDefinition> columns;
Expand All @@ -77,11 +82,13 @@ public class CreateTableInfo {
/**
* constructor for create table
*/
public CreateTableInfo(boolean ifNotExists, String dbName, String tableName, List<ColumnDefinition> columns,
List<IndexDefinition> indexes, String engineName, KeysType keysType, List<String> keys, String comment,
public CreateTableInfo(boolean ifNotExists, String ctlName, String dbName, String tableName,
List<ColumnDefinition> columns, List<IndexDefinition> indexes, String engineName,
KeysType keysType, List<String> keys, String comment,
String partitionType, List<String> partitionColumns, List<PartitionDefinition> partitions,
DistributionDescriptor distribution, List<RollupDefinition> rollups, Map<String, String> properties) {
this.ifNotExists = ifNotExists;
this.ctlName = ctlName;
this.dbName = dbName;
this.tableName = tableName;
this.ctasColumns = null;
Expand All @@ -102,11 +109,12 @@ public CreateTableInfo(boolean ifNotExists, String dbName, String tableName, Lis
/**
* constructor for create table as select
*/
public CreateTableInfo(boolean ifNotExists, String dbName, String tableName, List<String> cols,
public CreateTableInfo(boolean ifNotExists, String ctlName, String dbName, String tableName, List<String> cols,
String engineName, KeysType keysType, List<String> keys, String comment,
String partitionType, List<String> partitionColumns, List<PartitionDefinition> partitions,
DistributionDescriptor distribution, List<RollupDefinition> rollups, Map<String, String> properties) {
this.ifNotExists = ifNotExists;
this.ctlName = ctlName;
this.dbName = dbName;
this.tableName = tableName;
this.ctasColumns = cols;
Expand All @@ -128,6 +136,10 @@ public List<String> getCtasColumns() {
return ctasColumns;
}

public String getCtlName() {
return ctlName;
}

public String getDbName() {
return dbName;
}
Expand Down Expand Up @@ -163,20 +175,30 @@ public void validate(ConnectContext ctx) {

try {
FeNameFormat.checkTableName(tableName);
if (dbName != null) {
FeNameFormat.checkDbName(dbName);
}
} catch (Exception e) {
throw new AnalysisException(e.getMessage(), e);
}

// analyze catalog name
if (Strings.isNullOrEmpty(ctlName)) {
if (ctx.getCurrentCatalog() != null) {
ctlName = ctx.getCurrentCatalog().getName();
} else {
ctlName = InternalCatalog.INTERNAL_CATALOG_NAME;
}
}

// analyze table name
if (dbName == null) {
if (Strings.isNullOrEmpty(dbName)) {
dbName = ClusterNamespace.getFullName(ctx.getClusterName(), ctx.getDatabase());
} else {
dbName = ClusterNamespace.getFullName(ctx.getClusterName(), dbName);
}

Preconditions.checkState(!Strings.isNullOrEmpty(ctlName), "catalog name is null or empty");
Preconditions.checkState(!Strings.isNullOrEmpty(dbName), "database name is null or empty");
properties = PropertyAnalyzer.rewriteReplicaAllocationProperties(ctlName, dbName, properties);

boolean enableDuplicateWithoutKeysByDefault = false;
if (properties != null) {
try {
Expand Down Expand Up @@ -367,14 +389,15 @@ public void validateCreateTableAsSelect(List<ColumnDefinition> columns, ConnectC
* check partitions types.
*/
private boolean checkPartitionsTypes() {
if (partitionType.equalsIgnoreCase("RANGE")) {
if (partitionType.equalsIgnoreCase(PartitionType.RANGE.name())) {
if (partitions.stream().allMatch(p -> p instanceof StepPartition)) {
return true;
}
return partitions.stream().allMatch(p -> (p instanceof LessThanPartition)
|| (p instanceof FixedRangePartition));
}
return partitionType.equalsIgnoreCase("LIST") && partitions.stream().allMatch(p -> p instanceof InPartition);
return partitionType.equalsIgnoreCase(PartitionType.LIST.name())
&& partitions.stream().allMatch(p -> p instanceof InPartition);
}

private void validatePartitionColumn(ColumnDefinition column, ConnectContext ctx) {
Expand All @@ -395,7 +418,7 @@ private void validatePartitionColumn(ColumnDefinition column, ConnectContext ctx
if (!ctx.getSessionVariable().isAllowPartitionColumnNullable() && column.isNullable()) {
throw new AnalysisException("The partition column must be NOT NULL");
}
if (partitionType.equalsIgnoreCase("LIST") && column.isNullable()) {
if (partitionType.equalsIgnoreCase(PartitionType.LIST.name()) && column.isNullable()) {
throw new AnalysisException("The list partition column must be NOT NULL");
}
}
Expand All @@ -415,7 +438,7 @@ public CreateTableStmt translateToLegacyStmt() {
List<AllPartitionDesc> partitionDescs = partitions.stream()
.map(PartitionDefinition::translateToCatalogStyle).collect(Collectors.toList());
try {
if (partitionType.equals("RANGE")) {
if (partitionType.equals(PartitionType.RANGE.name())) {
partitionDesc = new RangePartitionDesc(partitionColumns, partitionDescs);
} else {
partitionDesc = new ListPartitionDesc(partitionColumns, partitionDescs);
Expand Down
Loading

0 comments on commit 13780e4

Please sign in to comment.