From da819bfef20c284f95d439e577457f3075287b3c Mon Sep 17 00:00:00 2001 From: Murphy <96611012+murphyatwork@users.noreply.github.com> Date: Fri, 24 Jan 2025 15:19:08 +0800 Subject: [PATCH] [Enhancement] Use partition changed rows as stats healthy indicator (#55373) Signed-off-by: Murphy --- .../java/com/starrocks/common/Config.java | 6 ++++ .../starrocks/statistic/BasicStatsMeta.java | 6 ++-- .../starrocks/statistic/StatisticUtils.java | 35 +++++++++++++++++++ .../StatisticsCollectJobFactory.java | 28 ++++----------- .../statistic/BasicStatsMetaTest.java | 2 +- 5 files changed, 53 insertions(+), 24 deletions(-) diff --git a/fe/fe-core/src/main/java/com/starrocks/common/Config.java b/fe/fe-core/src/main/java/com/starrocks/common/Config.java index 0a25be856fa5a..e6c17a5b53514 100644 --- a/fe/fe-core/src/main/java/com/starrocks/common/Config.java +++ b/fe/fe-core/src/main/java/com/starrocks/common/Config.java @@ -2088,6 +2088,12 @@ public class Config extends ConfigBase { "the job status but improve the robustness") public static double statistic_full_statistics_failure_tolerance_ratio = 0.05; + @ConfField(mutable = true, comment = "Enable V2 health calculation based on changed rows") + public static boolean statistic_partition_healthy_v2 = true; + + @ConfField(mutable = true, comment = "Health threshold for partitions") + public static double statistic_partition_health__v2_threshold = 0.95; + @ConfField(mutable = true) public static long statistic_auto_collect_small_table_size = 5L * 1024 * 1024 * 1024; // 5G diff --git a/fe/fe-core/src/main/java/com/starrocks/statistic/BasicStatsMeta.java b/fe/fe-core/src/main/java/com/starrocks/statistic/BasicStatsMeta.java index 330cc59c71aae..e6215c7a12da0 100644 --- a/fe/fe-core/src/main/java/com/starrocks/statistic/BasicStatsMeta.java +++ b/fe/fe-core/src/main/java/com/starrocks/statistic/BasicStatsMeta.java @@ -137,6 +137,9 @@ public Map getProperties() { return properties; } + /** + * Return a number within [0,1] to indicate the health of table stats, 1 means all good. + */ public double getHealthy() { Database database = GlobalStateMgr.getCurrentState().getLocalMetastore().getDb(dbId); OlapTable table = (OlapTable) GlobalStateMgr.getCurrentState().getLocalMetastore().getTable(database.getId(), tableId); @@ -154,9 +157,8 @@ public double getHealthy() { tableRowCount += partition.getRowCount(); Optional statistic = tableStatistics.getOrDefault(partition.getId(), Optional.empty()); cachedTableRowCount += statistic.orElse(0L); - LocalDateTime loadTime = StatisticUtils.getPartitionLastUpdateTime(partition); - if (partition.hasData() && !isUpdatedAfterLoad(loadTime)) { + if (!StatisticUtils.isPartitionStatsHealthy(table, partition, this, statistic.orElse(0L))) { updatePartitionCount++; } } diff --git a/fe/fe-core/src/main/java/com/starrocks/statistic/StatisticUtils.java b/fe/fe-core/src/main/java/com/starrocks/statistic/StatisticUtils.java index c939bafdf8255..deb2f563b3e03 100644 --- a/fe/fe-core/src/main/java/com/starrocks/statistic/StatisticUtils.java +++ b/fe/fe-core/src/main/java/com/starrocks/statistic/StatisticUtils.java @@ -222,11 +222,46 @@ public static Set getUpdatedPartitionNames(Table table, LocalDateTime ch return updatedPartitions; } + // Don't use PartitionVisibleTime for data update checks as it's ineffective for ShareData architecture + @Deprecated public static LocalDateTime getPartitionLastUpdateTime(Partition partition) { long time = partition.getDefaultPhysicalPartition().getVisibleVersionTime(); return LocalDateTime.ofInstant(Instant.ofEpochMilli(time), Clock.systemDefaultZone().getZone()); } + /** + * In V2: use relative changed row count to decide if a partition is healthy + * In V1: use VISIBLE_VERSION, which doesn't work for shared-data + */ + public static boolean isPartitionStatsHealthy(Table table, Partition partition, BasicStatsMeta stats) { + long statsRowCount = 0; + if (Config.statistic_partition_healthy_v2) { + Map> tableStatistics = GlobalStateMgr.getCurrentState().getStatisticStorage() + .getTableStatistics(table.getId(), Lists.newArrayList(partition)); + statsRowCount = tableStatistics.getOrDefault(partition.getId(), Optional.empty()).orElse(0L); + } + + return isPartitionStatsHealthy(table, partition, stats, statsRowCount); + } + + public static boolean isPartitionStatsHealthy(Table table, Partition partition, BasicStatsMeta stats, + long statsRowCount) { + if (stats == null || stats.isInitJobMeta()) { + return false; + } + if (!partition.hasData()) { + return true; + } + if (Config.statistic_partition_healthy_v2) { + long currentRowCount = partition.getRowCount(); + double relativeError = 1.0 * Math.abs(statsRowCount - currentRowCount) / + (double) (currentRowCount > 0 ? currentRowCount : 1); + return relativeError <= 1 - Config.statistic_partition_health__v2_threshold; + } else { + return stats.isUpdatedAfterLoad(getPartitionLastUpdateTime(partition)); + } + } + public static boolean isEmptyTable(Table table) { if (!table.isNativeTableOrMaterializedView()) { // for external table, return false directly diff --git a/fe/fe-core/src/main/java/com/starrocks/statistic/StatisticsCollectJobFactory.java b/fe/fe-core/src/main/java/com/starrocks/statistic/StatisticsCollectJobFactory.java index 4be5babd47c85..e045e753e4034 100644 --- a/fe/fe-core/src/main/java/com/starrocks/statistic/StatisticsCollectJobFactory.java +++ b/fe/fe-core/src/main/java/com/starrocks/statistic/StatisticsCollectJobFactory.java @@ -34,7 +34,6 @@ import org.apache.logging.log4j.Logger; import java.time.LocalDateTime; -import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.Comparator; @@ -462,8 +461,7 @@ private static void createJob(List allTableJobMap, NativeA // 2. if the stats collection is too frequent long sumDataSize = 0; for (Partition partition : table.getPartitions()) { - LocalDateTime partitionUpdateTime = StatisticUtils.getPartitionLastUpdateTime(partition); - if (!basicStatsMeta.isUpdatedAfterLoad(partitionUpdateTime)) { + if (!StatisticUtils.isPartitionStatsHealthy(table, partition, basicStatsMeta)) { sumDataSize += partition.getDataSize(); } } @@ -523,12 +521,7 @@ private static void createJob(List allTableJobMap, NativeA } else if (job.getAnalyzeType().equals(StatsConstants.AnalyzeType.HISTOGRAM)) { createHistogramJob(allTableJobMap, job, db, table, columnNames, columnTypes); } else if (job.getAnalyzeType().equals(StatsConstants.AnalyzeType.FULL)) { - if (basicStatsMeta == null || basicStatsMeta.isInitJobMeta()) { - createFullStatsJob(allTableJobMap, job, LocalDateTime.MIN, db, table, columnNames, columnTypes); - } else { - createFullStatsJob(allTableJobMap, job, basicStatsMeta.getUpdateTime(), db, table, columnNames, - columnTypes); - } + createFullStatsJob(allTableJobMap, job, basicStatsMeta, db, table, columnNames, columnTypes); } else { throw new StarRocksPlannerException("Unknown analyze type " + job.getAnalyzeType(), ErrorType.INTERNAL_ERROR); @@ -544,10 +537,7 @@ private static void createSampleStatsJob(List allTableJobM List partitionIdList; if (basicStatsMeta != null) { partitionIdList = partitions.stream() - .filter(partition -> { - LocalDateTime partitionUpdateTime = StatisticUtils.getPartitionLastUpdateTime(partition); - return basicStatsMeta.getUpdateTime().isBefore(partitionUpdateTime) && partition.hasData(); - }) + .filter(partition -> !StatisticUtils.isPartitionStatsHealthy(table, partition, basicStatsMeta)) .map(Partition::getId) .collect(Collectors.toList()); } else { @@ -571,16 +561,12 @@ private static void createHistogramJob(List allTableJobMap } private static void createFullStatsJob(List allTableJobMap, - NativeAnalyzeJob job, LocalDateTime statsLastUpdateTime, + NativeAnalyzeJob job, BasicStatsMeta stats, Database db, Table table, List columnNames, List columnTypes) { StatsConstants.AnalyzeType analyzeType; - List partitionList = new ArrayList<>(); - for (Partition partition : table.getPartitions()) { - LocalDateTime partitionUpdateTime = StatisticUtils.getPartitionLastUpdateTime(partition); - if (statsLastUpdateTime.isBefore(partitionUpdateTime) && partition.hasData()) { - partitionList.add(partition); - } - } + List partitionList = table.getPartitions().stream() + .filter(partition -> !StatisticUtils.isPartitionStatsHealthy(table, partition, stats)) + .collect(Collectors.toList()); long totalDataSize = partitionList.stream().mapToLong(Partition::getDataSize).sum(); if (totalDataSize > Config.statistic_max_full_collect_data_size) { diff --git a/fe/fe-core/src/test/java/com/starrocks/statistic/BasicStatsMetaTest.java b/fe/fe-core/src/test/java/com/starrocks/statistic/BasicStatsMetaTest.java index 96c6339da2570..21cafb6cfc6b6 100644 --- a/fe/fe-core/src/test/java/com/starrocks/statistic/BasicStatsMetaTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/statistic/BasicStatsMetaTest.java @@ -83,7 +83,7 @@ public void testHealthy() { LocalDateTime.of(2024, 07, 22, 12, 20), Map.of(), 10000); basicStatsMeta.increaseDeltaRows(5000L); basicStatsMeta.setUpdateRows(10000L); - Assert.assertEquals(0.5, basicStatsMeta.getHealthy(), 0.01); + Assert.assertEquals(1.0, basicStatsMeta.getHealthy(), 0.01); basicStatsMeta.resetDeltaRows(); Assert.assertEquals(1.0, basicStatsMeta.getHealthy(), 0.01); }