Skip to content

Commit

Permalink
[Enhancement] Use partition changed rows as stats healthy indicator (S…
Browse files Browse the repository at this point in the history
…tarRocks#55373)

Signed-off-by: Murphy <[email protected]>
  • Loading branch information
murphyatwork authored Jan 24, 2025
1 parent da2ef02 commit da819bf
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 24 deletions.
6 changes: 6 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/common/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -2088,6 +2088,12 @@ public class Config extends ConfigBase {
"the job status but improve the robustness")
public static double statistic_full_statistics_failure_tolerance_ratio = 0.05;

@ConfField(mutable = true, comment = "Enable V2 health calculation based on changed rows")
public static boolean statistic_partition_healthy_v2 = true;

@ConfField(mutable = true, comment = "Health threshold for partitions")
public static double statistic_partition_health__v2_threshold = 0.95;

@ConfField(mutable = true)
public static long statistic_auto_collect_small_table_size = 5L * 1024 * 1024 * 1024; // 5G

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,9 @@ public Map<String, String> getProperties() {
return properties;
}

/**
* Return a number within [0,1] to indicate the health of table stats, 1 means all good.
*/
public double getHealthy() {
Database database = GlobalStateMgr.getCurrentState().getLocalMetastore().getDb(dbId);
OlapTable table = (OlapTable) GlobalStateMgr.getCurrentState().getLocalMetastore().getTable(database.getId(), tableId);
Expand All @@ -154,9 +157,8 @@ public double getHealthy() {
tableRowCount += partition.getRowCount();
Optional<Long> statistic = tableStatistics.getOrDefault(partition.getId(), Optional.empty());
cachedTableRowCount += statistic.orElse(0L);
LocalDateTime loadTime = StatisticUtils.getPartitionLastUpdateTime(partition);

if (partition.hasData() && !isUpdatedAfterLoad(loadTime)) {
if (!StatisticUtils.isPartitionStatsHealthy(table, partition, this, statistic.orElse(0L))) {
updatePartitionCount++;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,11 +222,46 @@ public static Set<String> getUpdatedPartitionNames(Table table, LocalDateTime ch
return updatedPartitions;
}

// Don't use PartitionVisibleTime for data update checks as it's ineffective for ShareData architecture
@Deprecated
public static LocalDateTime getPartitionLastUpdateTime(Partition partition) {
long time = partition.getDefaultPhysicalPartition().getVisibleVersionTime();
return LocalDateTime.ofInstant(Instant.ofEpochMilli(time), Clock.systemDefaultZone().getZone());
}

/**
* In V2: use relative changed row count to decide if a partition is healthy
* In V1: use VISIBLE_VERSION, which doesn't work for shared-data
*/
public static boolean isPartitionStatsHealthy(Table table, Partition partition, BasicStatsMeta stats) {
long statsRowCount = 0;
if (Config.statistic_partition_healthy_v2) {
Map<Long, Optional<Long>> tableStatistics = GlobalStateMgr.getCurrentState().getStatisticStorage()
.getTableStatistics(table.getId(), Lists.newArrayList(partition));
statsRowCount = tableStatistics.getOrDefault(partition.getId(), Optional.empty()).orElse(0L);
}

return isPartitionStatsHealthy(table, partition, stats, statsRowCount);
}

public static boolean isPartitionStatsHealthy(Table table, Partition partition, BasicStatsMeta stats,
long statsRowCount) {
if (stats == null || stats.isInitJobMeta()) {
return false;
}
if (!partition.hasData()) {
return true;
}
if (Config.statistic_partition_healthy_v2) {
long currentRowCount = partition.getRowCount();
double relativeError = 1.0 * Math.abs(statsRowCount - currentRowCount) /
(double) (currentRowCount > 0 ? currentRowCount : 1);
return relativeError <= 1 - Config.statistic_partition_health__v2_threshold;
} else {
return stats.isUpdatedAfterLoad(getPartitionLastUpdateTime(partition));
}
}

public static boolean isEmptyTable(Table table) {
if (!table.isNativeTableOrMaterializedView()) {
// for external table, return false directly
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.apache.logging.log4j.Logger;

import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
Expand Down Expand Up @@ -462,8 +461,7 @@ private static void createJob(List<StatisticsCollectJob> allTableJobMap, NativeA
// 2. if the stats collection is too frequent
long sumDataSize = 0;
for (Partition partition : table.getPartitions()) {
LocalDateTime partitionUpdateTime = StatisticUtils.getPartitionLastUpdateTime(partition);
if (!basicStatsMeta.isUpdatedAfterLoad(partitionUpdateTime)) {
if (!StatisticUtils.isPartitionStatsHealthy(table, partition, basicStatsMeta)) {
sumDataSize += partition.getDataSize();
}
}
Expand Down Expand Up @@ -523,12 +521,7 @@ private static void createJob(List<StatisticsCollectJob> allTableJobMap, NativeA
} else if (job.getAnalyzeType().equals(StatsConstants.AnalyzeType.HISTOGRAM)) {
createHistogramJob(allTableJobMap, job, db, table, columnNames, columnTypes);
} else if (job.getAnalyzeType().equals(StatsConstants.AnalyzeType.FULL)) {
if (basicStatsMeta == null || basicStatsMeta.isInitJobMeta()) {
createFullStatsJob(allTableJobMap, job, LocalDateTime.MIN, db, table, columnNames, columnTypes);
} else {
createFullStatsJob(allTableJobMap, job, basicStatsMeta.getUpdateTime(), db, table, columnNames,
columnTypes);
}
createFullStatsJob(allTableJobMap, job, basicStatsMeta, db, table, columnNames, columnTypes);
} else {
throw new StarRocksPlannerException("Unknown analyze type " + job.getAnalyzeType(),
ErrorType.INTERNAL_ERROR);
Expand All @@ -544,10 +537,7 @@ private static void createSampleStatsJob(List<StatisticsCollectJob> allTableJobM
List<Long> partitionIdList;
if (basicStatsMeta != null) {
partitionIdList = partitions.stream()
.filter(partition -> {
LocalDateTime partitionUpdateTime = StatisticUtils.getPartitionLastUpdateTime(partition);
return basicStatsMeta.getUpdateTime().isBefore(partitionUpdateTime) && partition.hasData();
})
.filter(partition -> !StatisticUtils.isPartitionStatsHealthy(table, partition, basicStatsMeta))
.map(Partition::getId)
.collect(Collectors.toList());
} else {
Expand All @@ -571,16 +561,12 @@ private static void createHistogramJob(List<StatisticsCollectJob> allTableJobMap
}

private static void createFullStatsJob(List<StatisticsCollectJob> allTableJobMap,
NativeAnalyzeJob job, LocalDateTime statsLastUpdateTime,
NativeAnalyzeJob job, BasicStatsMeta stats,
Database db, Table table, List<String> columnNames, List<Type> columnTypes) {
StatsConstants.AnalyzeType analyzeType;
List<Partition> partitionList = new ArrayList<>();
for (Partition partition : table.getPartitions()) {
LocalDateTime partitionUpdateTime = StatisticUtils.getPartitionLastUpdateTime(partition);
if (statsLastUpdateTime.isBefore(partitionUpdateTime) && partition.hasData()) {
partitionList.add(partition);
}
}
List<Partition> partitionList = table.getPartitions().stream()
.filter(partition -> !StatisticUtils.isPartitionStatsHealthy(table, partition, stats))
.collect(Collectors.toList());

long totalDataSize = partitionList.stream().mapToLong(Partition::getDataSize).sum();
if (totalDataSize > Config.statistic_max_full_collect_data_size) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public void testHealthy() {
LocalDateTime.of(2024, 07, 22, 12, 20), Map.of(), 10000);
basicStatsMeta.increaseDeltaRows(5000L);
basicStatsMeta.setUpdateRows(10000L);
Assert.assertEquals(0.5, basicStatsMeta.getHealthy(), 0.01);
Assert.assertEquals(1.0, basicStatsMeta.getHealthy(), 0.01);
basicStatsMeta.resetDeltaRows();
Assert.assertEquals(1.0, basicStatsMeta.getHealthy(), 0.01);
}
Expand Down

0 comments on commit da819bf

Please sign in to comment.