Skip to content

Commit

Permalink
[Feature](statistics)Support external table analyze partition (apache…
Browse files Browse the repository at this point in the history
…#24154)

Enable collect partition level stats for hive external table.
  • Loading branch information
Jibing-Li authored Sep 18, 2023
1 parent 1153907 commit b4432ce
Show file tree
Hide file tree
Showing 20 changed files with 420 additions and 99 deletions.
6 changes: 3 additions & 3 deletions docs/en/docs/query-acceleration/statistics.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ Column statistics collection syntax:

```SQL
ANALYZE < TABLE | DATABASE table_name | db_name >
[ PARTITIONS (partition_name [, ...]) ]
[ PARTITIONS [(*) | (partition_name [, ...]) | WITH RECENT COUNT] ]
[ (column_name [, ...]) ]
[ [ WITH SYNC ] [ WITH INCREMENTAL ] [ WITH SAMPLE PERCENT | ROWS ] [ WITH PERIOD ]]
[ PROPERTIES ("key" = "value", ...) ];
Expand All @@ -78,7 +78,7 @@ ANALYZE < TABLE | DATABASE table_name | db_name >
Explanation:

- Table_name: The target table for the specified. It can be a `db_name.table_name` form.
- partition_name: The specified target partitions(for hive external table only)。Must be partitions exist in `table_name`. Multiple partition names are separated by commas. e.g. (nation=US/city=Washington)
- partition_name: The specified target partitions(for hive external table only)。Must be partitions exist in `table_name`. Multiple partition names are separated by commas. e.g. for single level partition: PARTITIONS(`event_date=20230706`), for multi level partition: PARTITIONS(`nation=US/city=Washington`). PARTITIONS(*) specifies all partitions, PARTITIONS WITH RECENT 30 specifies the latest 30 partitions.
- Column_name: The specified target column. Must be `table_name` a column that exists in. Multiple column names are separated by commas.
- Sync: Synchronizes the collection of statistics. Return after collection. If not specified, it will be executed asynchronously and the job ID will be returned.
- Incremental: Incrementally gather statistics.
Expand Down Expand Up @@ -673,4 +673,4 @@ When executing ANALYZE, statistical data is written to the internal table __inte

### ANALYZE Failure for Large Tables

Due to the strict resource limitations for ANALYZE, the ANALYZE operation for large tables might experience timeouts or exceed the memory limit of BE. In such cases, it's recommended to use ANALYZE ... WITH SAMPLE.... Additionally, for scenarios involving dynamic partitioned tables, it's highly recommended to use ANALYZE ... WITH INCREMENTAL.... This statement processes only the partitions with updated data incrementally, avoiding redundant computations and improving efficiency.
Due to the strict resource limitations for ANALYZE, the ANALYZE operation for large tables might experience timeouts or exceed the memory limit of BE. In such cases, it's recommended to use ANALYZE ... WITH SAMPLE.... Additionally, for scenarios involving dynamic partitioned tables, it's highly recommended to use ANALYZE ... WITH INCREMENTAL.... This statement processes only the partitions with updated data incrementally, avoiding redundant computations and improving efficiency.
4 changes: 2 additions & 2 deletions docs/zh-CN/docs/query-acceleration/statistics.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ Doris 查询优化器使用统计信息来确定查询最有效的执行计划

```SQL
ANALYZE < TABLE | DATABASE table_name | db_name >
[ PARTITIONS (partition_name [, ...]) ]
[ PARTITIONS [(*) | (partition_name [, ...]) | WITH RECENT COUNT] ]
[ (column_name [, ...]) ]
[ [ WITH SYNC ] [WITH INCREMENTAL] [ WITH SAMPLE PERCENT | ROWS ] [ WITH PERIOD ] ]
[ PROPERTIES ("key" = "value", ...) ];
Expand All @@ -75,7 +75,7 @@ ANALYZE < TABLE | DATABASE table_name | db_name >
其中:

- table_name: 指定的的目标表。可以是  `db_name.table_name`  形式。
- partition_name: 指定的目标分区(目前只针对Hive外表)。必须是  `table_name`  中存在的分区,多个列名称用逗号分隔。分区名样例:event_date=20230706, nation=CN/city=Beijing
- partition_name: 指定的目标分区(目前只针对Hive外表)。必须是  `table_name`  中存在的分区,多个列名称用逗号分隔。分区名样例: 单层分区PARTITIONS(`event_date=20230706`),多层分区PARTITIONS(`nation=CN/city=Beijing`)。PARTITIONS (*)指定所有分区,PARTITIONS WITH RECENT 100指定最新的100个分区。
- column_name: 指定的目标列。必须是  `table_name`  中存在的列,多个列名称用逗号分隔。
- sync:同步收集统计信息。收集完后返回。若不指定则异步执行并返回任务 ID。
- period:周期性收集统计信息。单位为秒,指定后会定期收集相应的统计信息。
Expand Down
9 changes: 9 additions & 0 deletions fe/fe-core/src/main/cup/sql_parser.cup
Original file line number Diff line number Diff line change
Expand Up @@ -528,6 +528,7 @@ terminal String
KW_QUOTA,
KW_RANDOM,
KW_RANGE,
KW_RECENT,
KW_READ,
KW_REBALANCE,
KW_RECOVER,
Expand Down Expand Up @@ -5900,6 +5901,14 @@ partition_names ::=
{:
RESULT = new PartitionNames(true, Lists.newArrayList(partName));
:}
| KW_PARTITIONS LPAREN STAR RPAREN
{:
RESULT = new PartitionNames(true);
:}
| KW_PARTITIONS KW_WITH KW_RECENT INTEGER_LITERAL:count
{:
RESULT = new PartitionNames(count);
:}
;

opt_table_sample ::=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public class AnalyzeTblStmt extends AnalyzeStmt {

private final TableName tableName;
private List<String> columnNames;
private List<String> partitionNames;
private PartitionNames partitionNames;
private boolean isAllColumns;

// after analyzed
Expand All @@ -97,7 +97,7 @@ public AnalyzeTblStmt(TableName tableName,
AnalyzeProperties properties) {
super(properties);
this.tableName = tableName;
this.partitionNames = partitionNames == null ? null : partitionNames.getPartitionNames();
this.partitionNames = partitionNames;
this.columnNames = columnNames;
this.analyzeProperties = properties;
this.isAllColumns = columnNames == null;
Expand Down Expand Up @@ -240,14 +240,34 @@ public Set<String> getColumnNames() {
}

public Set<String> getPartitionNames() {
Set<String> partitions = partitionNames == null ? table.getPartitionNames() : Sets.newHashSet(partitionNames);
if (isSamplingPartition()) {
int partNum = ConnectContext.get().getSessionVariable().getExternalTableAnalyzePartNum();
partitions = partitions.stream().limit(partNum).collect(Collectors.toSet());
if (partitionNames == null || partitionNames.getPartitionNames() == null) {
return null;
}
Set<String> partitions = Sets.newHashSet();
partitions.addAll(partitionNames.getPartitionNames());
/*
if (isSamplingPartition()) {
int partNum = ConnectContext.get().getSessionVariable().getExternalTableAnalyzePartNum();
partitions = partitions.stream().limit(partNum).collect(Collectors.toSet());
}
*/
return partitions;
}

public boolean isAllPartitions() {
if (partitionNames == null) {
return false;
}
return partitionNames.isAllPartitions();
}

public long getPartitionCount() {
if (partitionNames == null) {
return 0;
}
return partitionNames.getCount();
}

public boolean isPartitionOnly() {
return partitionNames != null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,37 @@ public class PartitionNames implements ParseNode, Writable {
// true if these partitions are temp partitions
@SerializedName(value = "isTemp")
private final boolean isTemp;
private final boolean allPartitions;
private final long count;
// Default partition count to collect statistic for external table.
private static final long DEFAULT_PARTITION_COUNT = 100;

public PartitionNames(boolean isTemp, List<String> partitionNames) {
this.partitionNames = partitionNames;
this.isTemp = isTemp;
this.allPartitions = false;
this.count = 0;
}

public PartitionNames(PartitionNames other) {
this.partitionNames = Lists.newArrayList(other.partitionNames);
this.isTemp = other.isTemp;
this.allPartitions = other.allPartitions;
this.count = 0;
}

public PartitionNames(boolean allPartitions) {
this.partitionNames = null;
this.isTemp = false;
this.allPartitions = allPartitions;
this.count = 0;
}

public PartitionNames(long partitionCount) {
this.partitionNames = null;
this.isTemp = false;
this.allPartitions = false;
this.count = partitionCount;
}

public List<String> getPartitionNames() {
Expand All @@ -67,9 +89,23 @@ public boolean isTemp() {
return isTemp;
}

public boolean isAllPartitions() {
return allPartitions;
}

public long getCount() {
return count;
}

@Override
public void analyze(Analyzer analyzer) throws AnalysisException {
if (partitionNames.isEmpty()) {
if (allPartitions && count > 0) {
throw new AnalysisException("All partition and partition count couldn't be set at the same time.");
}
if (allPartitions || count > 0) {
return;
}
if (partitionNames == null || partitionNames.isEmpty()) {
throw new AnalysisException("No partition specified in partition lists");
}
// check if partition name is not empty string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.IOException;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.time.LocalDate;
Expand Down Expand Up @@ -628,6 +629,12 @@ private void setStatData(Column col, ColumnStatisticsData data, ColumnStatisticB
builder.setMaxValue(Double.MAX_VALUE);
}
}

@Override
public void gsonPostProcess() throws IOException {
super.gsonPostProcess();
estimatedRowCount = -1;
}
}


Original file line number Diff line number Diff line change
Expand Up @@ -155,13 +155,19 @@ public enum ScheduleType {
// True means this task is a table level task for external table.
// This kind of task is mainly to collect the number of rows of a table.
@SerializedName("externalTableLevelTask")
public boolean externalTableLevelTask;
public final boolean externalTableLevelTask;

@SerializedName("partitionOnly")
public boolean partitionOnly;
public final boolean partitionOnly;

@SerializedName("samplingPartition")
public boolean samplingPartition;
public final boolean samplingPartition;

@SerializedName("isAllPartition")
public final boolean isAllPartition;

@SerializedName("partitionCount")
public final long partitionCount;

// For serialize
@SerializedName("cronExpr")
Expand All @@ -181,7 +187,7 @@ public AnalysisInfo(long jobId, long taskId, List<Long> taskIds, String catalogN
int samplePercent, int sampleRows, int maxBucketNum, long periodTimeInMs, String message,
long lastExecTimeInMs, long timeCostInMs, AnalysisState state, ScheduleType scheduleType,
boolean isExternalTableLevelTask, boolean partitionOnly, boolean samplingPartition,
CronExpression cronExpression, boolean forceFull) {
boolean isAllPartition, long partitionCount, CronExpression cronExpression, boolean forceFull) {
this.jobId = jobId;
this.taskId = taskId;
this.taskIds = taskIds;
Expand All @@ -208,6 +214,8 @@ public AnalysisInfo(long jobId, long taskId, List<Long> taskIds, String catalogN
this.externalTableLevelTask = isExternalTableLevelTask;
this.partitionOnly = partitionOnly;
this.samplingPartition = samplingPartition;
this.isAllPartition = isAllPartition;
this.partitionCount = partitionCount;
this.cronExpression = cronExpression;
if (cronExpression != null) {
this.cronExprStr = cronExpression.getCronExpression();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ public class AnalysisInfoBuilder {
private boolean externalTableLevelTask;
private boolean partitionOnly;
private boolean samplingPartition;
private boolean isAllPartition;
private long partitionCount;

private CronExpression cronExpression;

Expand Down Expand Up @@ -91,6 +93,8 @@ public AnalysisInfoBuilder(AnalysisInfo info) {
externalTableLevelTask = info.externalTableLevelTask;
partitionOnly = info.partitionOnly;
samplingPartition = info.samplingPartition;
isAllPartition = info.isAllPartition;
partitionCount = info.partitionCount;
cronExpression = info.cronExpression;
forceFull = info.forceFull;
}
Expand Down Expand Up @@ -225,6 +229,16 @@ public AnalysisInfoBuilder setSamplingPartition(boolean samplingPartition) {
return this;
}

public AnalysisInfoBuilder setAllPartition(boolean isAllPartition) {
this.isAllPartition = isAllPartition;
return this;
}

public AnalysisInfoBuilder setPartitionCount(long partitionCount) {
this.partitionCount = partitionCount;
return this;
}

public void setCronExpression(CronExpression cronExpression) {
this.cronExpression = cronExpression;
}
Expand All @@ -237,6 +251,38 @@ public AnalysisInfo build() {
return new AnalysisInfo(jobId, taskId, taskIds, catalogName, dbName, tblName, colToPartitions, partitionNames,
colName, indexId, jobType, analysisMode, analysisMethod, analysisType, samplePercent,
sampleRows, maxBucketNum, periodTimeInMs, message, lastExecTimeInMs, timeCostInMs, state, scheduleType,
externalTableLevelTask, partitionOnly, samplingPartition, cronExpression, forceFull);
externalTableLevelTask, partitionOnly, samplingPartition, isAllPartition, partitionCount,
cronExpression, forceFull);
}

public AnalysisInfoBuilder copy() {
return new AnalysisInfoBuilder()
.setJobId(jobId)
.setTaskId(taskId)
.setTaskIds(taskIds)
.setCatalogName(catalogName)
.setDbName(dbName)
.setTblName(tblName)
.setColToPartitions(colToPartitions)
.setColName(colName)
.setIndexId(indexId)
.setJobType(jobType)
.setAnalysisMode(analysisMode)
.setAnalysisMethod(analysisMethod)
.setAnalysisType(analysisType)
.setSamplePercent(samplePercent)
.setSampleRows(sampleRows)
.setPeriodTimeInMs(periodTimeInMs)
.setMaxBucketNum(maxBucketNum)
.setMessage(message)
.setLastExecTimeInMs(lastExecTimeInMs)
.setTimeCostInMs(timeCostInMs)
.setState(state)
.setScheduleType(scheduleType)
.setExternalTableLevelTask(externalTableLevelTask)
.setSamplingPartition(samplingPartition)
.setPartitionOnly(partitionOnly)
.setAllPartition(isAllPartition)
.setPartitionCount(partitionCount);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ protected AnalysisInfo buildAndAssignJob(AnalyzeTblStmt stmt) throws DdlExceptio
boolean isSync = stmt.isSync();
Map<Long, BaseAnalysisTask> analysisTaskInfos = new HashMap<>();
createTaskForEachColumns(jobInfo, analysisTaskInfos, isSync);
if (stmt.isAllColumns()
if (!jobInfo.partitionOnly && stmt.isAllColumns()
&& StatisticsUtil.isExternalTable(jobInfo.catalogName, jobInfo.dbName, jobInfo.tblName)) {
createTableLevelTaskForExternalTable(jobInfo, analysisTaskInfos, isSync);
}
Expand Down Expand Up @@ -453,20 +453,20 @@ private Map<String, Set<String>> validateAndGetPartitions(TableIf table, Set<Str
}

// Get the partition granularity statistics that have been collected
Map<String, Set<Long>> existColAndPartsForStats = StatisticsRepository
Map<String, Set<String>> existColAndPartsForStats = StatisticsRepository
.fetchColAndPartsForStats(tableId);

if (existColAndPartsForStats.isEmpty()) {
// There is no historical statistical information, no need to do validation
return columnToPartitions;
}

Set<Long> existPartIdsForStats = new HashSet<>();
Set<String> existPartIdsForStats = new HashSet<>();
existColAndPartsForStats.values().forEach(existPartIdsForStats::addAll);
Map<Long, String> idToPartition = StatisticsUtil.getPartitionIdToName(table);
Set<String> idToPartition = StatisticsUtil.getPartitionIds(table);
// Get an invalid set of partitions (those partitions were deleted)
Set<Long> invalidPartIds = existPartIdsForStats.stream()
.filter(id -> !idToPartition.containsKey(id)).collect(Collectors.toSet());
Set<String> invalidPartIds = existPartIdsForStats.stream()
.filter(id -> !idToPartition.contains(id)).collect(Collectors.toSet());

if (!invalidPartIds.isEmpty()) {
// Delete invalid partition statistics to avoid affecting table statistics
Expand Down Expand Up @@ -496,6 +496,8 @@ public AnalysisInfo buildAnalysisJobInfo(AnalyzeTblStmt stmt) throws DdlExceptio
Set<String> partitionNames = stmt.getPartitionNames();
boolean partitionOnly = stmt.isPartitionOnly();
boolean isSamplingPartition = stmt.isSamplingPartition();
boolean isAllPartition = stmt.isAllPartitions();
long partitionCount = stmt.getPartitionCount();
int samplePercent = stmt.getSamplePercent();
int sampleRows = stmt.getSampleRows();
AnalysisType analysisType = stmt.getAnalysisType();
Expand All @@ -516,6 +518,8 @@ public AnalysisInfo buildAnalysisJobInfo(AnalyzeTblStmt stmt) throws DdlExceptio
infoBuilder.setPartitionNames(partitionNames);
infoBuilder.setPartitionOnly(partitionOnly);
infoBuilder.setSamplingPartition(isSamplingPartition);
infoBuilder.setAllPartition(isAllPartition);
infoBuilder.setPartitionCount(partitionCount);
infoBuilder.setJobType(JobType.MANUAL);
infoBuilder.setState(AnalysisState.PENDING);
infoBuilder.setLastExecTimeInMs(System.currentTimeMillis());
Expand Down
Loading

0 comments on commit b4432ce

Please sign in to comment.