Skip to content

Commit

Permalink
[enhancement](nereids) Sync stats across FE cluster after analyze apa…
Browse files Browse the repository at this point in the history
…che#21482

Before this PR, if user connect to follower and analyze table, stats would not get cached in follower FE, since Analyze stmt would be forwarded to master, and in follower it's still lazy load to cache.After this PR, once analyze finished on master, master would sync stats to all followers and update follower's stats cache
Load partition stats to col stats
  • Loading branch information
Kikyou1997 authored and morningman committed Jul 20, 2023
1 parent 6cb489f commit eeb09f8
Show file tree
Hide file tree
Showing 12 changed files with 198 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,16 @@
import org.apache.doris.master.MasterImpl;
import org.apache.doris.mysql.privilege.AccessControllerManager;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.planner.StreamLoadPlanner;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.ConnectProcessor;
import org.apache.doris.qe.DdlExecutor;
import org.apache.doris.qe.QeProcessorImpl;
import org.apache.doris.qe.QueryState;
import org.apache.doris.qe.VariableMgr;
import org.apache.doris.statistics.ColumnStatistic;
import org.apache.doris.statistics.StatisticsCacheKey;
import org.apache.doris.statistics.query.QueryStats;
import org.apache.doris.system.Backend;
import org.apache.doris.system.Frontend;
Expand Down Expand Up @@ -164,6 +167,7 @@
import org.apache.doris.thrift.TTableQueryStats;
import org.apache.doris.thrift.TTableStatus;
import org.apache.doris.thrift.TUpdateExportTaskStatusRequest;
import org.apache.doris.thrift.TUpdateFollowerStatsCacheRequest;
import org.apache.doris.thrift.TWaitingTxnStatusRequest;
import org.apache.doris.thrift.TWaitingTxnStatusResult;
import org.apache.doris.transaction.DatabaseTransactionMgr;
Expand Down Expand Up @@ -2670,4 +2674,12 @@ private TGetBinlogLagResult getBinlogLagImpl(TGetBinlogRequest request, String c

return result;
}

@Override
public TStatus updateStatsCache(TUpdateFollowerStatsCacheRequest request) throws TException {
StatisticsCacheKey key = GsonUtils.GSON.fromJson(request.key, StatisticsCacheKey.class);
ColumnStatistic columnStatistic = GsonUtils.GSON.fromJson(request.colStats, ColumnStatistic.class);
Env.getCurrentEnv().getStatisticsCache().putCache(key, columnStatistic);
return new TStatus(TStatusCode.OK);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,20 @@
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.DdlException;
import org.apache.doris.statistics.util.InternalQueryResult.ResultRow;
import org.apache.doris.statistics.util.StatisticsUtil;

import com.google.common.base.Preconditions;
import com.google.gson.annotations.SerializedName;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.json.JSONObject;

import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

public class ColumnStatistic {
Expand Down Expand Up @@ -69,12 +75,19 @@ public class ColumnStatistic {
UNSUPPORTED_TYPE.add(Type.LAMBDA_FUNCTION);
}

@SerializedName("count")
public final double count;
@SerializedName("ndv")
public final double ndv;
@SerializedName("numNulls")
public final double numNulls;
@SerializedName("dataSize")
public final double dataSize;
@SerializedName("avgSizeByte")
public final double avgSizeByte;
@SerializedName("minValue")
public final double minValue;
@SerializedName("maxValue")
public final double maxValue;
public final boolean isUnKnown;
/*
Expand Down Expand Up @@ -102,9 +115,12 @@ and after join(l_orderkey = o_orderkey), lineitem is reduced by 1/3.
public final LiteralExpr minExpr;
public final LiteralExpr maxExpr;

@SerializedName("histogram")
// assign value when do stats estimation.
public final Histogram histogram;

public final Map<Long, ColumnStatistic> partitionIdToColStats = new HashMap<>();

public ColumnStatistic(double count, double ndv, ColumnStatistic original, double avgSizeByte,
double numNulls, double dataSize, double minValue, double maxValue,
double selectivity, LiteralExpr minExpr, LiteralExpr maxExpr, boolean isUnKnown, Histogram histogram) {
Expand All @@ -123,6 +139,27 @@ public ColumnStatistic(double count, double ndv, ColumnStatistic original, doubl
this.histogram = histogram;
}

public static ColumnStatistic fromResultRow(List<ResultRow> resultRows) {
Map<Long, ColumnStatistic> partitionIdToColStats = new HashMap<>();
ColumnStatistic columnStatistic = null;
try {
for (ResultRow resultRow : resultRows) {
String partId = resultRow.getColumnValue("part_id");
if (partId == null) {
columnStatistic = fromResultRow(resultRow);
} else {
partitionIdToColStats.put(Long.parseLong(partId), fromResultRow(resultRow));
}
}
} catch (Throwable t) {
LOG.warn("Failed to deserialize column stats", t);
return ColumnStatistic.UNKNOWN;
}
Preconditions.checkState(columnStatistic != null, "Column stats is null");
columnStatistic.partitionIdToColStats.putAll(partitionIdToColStats);
return columnStatistic;
}

// TODO: use thrift
public static ColumnStatistic fromResultRow(ResultRow resultRow) {
try {
Expand All @@ -138,7 +175,8 @@ public static ColumnStatistic fromResultRow(ResultRow resultRow) {
columnStatisticBuilder.setNumNulls(Double.parseDouble(nullCount));
columnStatisticBuilder.setDataSize(Double
.parseDouble(resultRow.getColumnValueWithDefault("data_size_in_bytes", "0")));
columnStatisticBuilder.setAvgSizeByte(columnStatisticBuilder.getDataSize()
columnStatisticBuilder.setAvgSizeByte(columnStatisticBuilder.getCount() == 0
? 0 : columnStatisticBuilder.getDataSize()
/ columnStatisticBuilder.getCount());
long catalogId = Long.parseLong(resultRow.getColumnValue("catalog_id"));
long idxId = Long.parseLong(resultRow.getColumnValue("idx_id"));
Expand Down Expand Up @@ -385,4 +423,13 @@ public boolean rangeChanged() {
public boolean isUnKnown() {
return isUnKnown;
}

public void loadPartitionStats(long tableId, long idxId, String colName) throws DdlException {
List<ResultRow> resultRows = StatisticsRepository.loadPartStats(tableId, idxId, colName);
for (ResultRow resultRow : resultRows) {
String partId = resultRow.getColumnValue("part_id");
ColumnStatistic columnStatistic = ColumnStatistic.fromResultRow(resultRow);
partitionIdToColStats.put(Long.parseLong(partId), columnStatistic);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,33 +19,24 @@

import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.FeConstants;
import org.apache.doris.statistics.util.InternalQueryResult.ResultRow;
import org.apache.doris.statistics.util.StatisticsUtil;

import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.text.StringSubstitutor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

public class ColumnStatisticsCacheLoader extends StatisticsCacheLoader<Optional<ColumnStatistic>> {

private static final Logger LOG = LogManager.getLogger(ColumnStatisticsCacheLoader.class);

private static final String QUERY_COLUMN_STATISTICS = "SELECT * FROM " + FeConstants.INTERNAL_DB_NAME
+ "." + StatisticConstants.STATISTIC_TBL_NAME + " WHERE "
+ "id = CONCAT('${tblId}', '-', ${idxId}, '-', '${colId}')";

@Override
protected Optional<ColumnStatistic> doLoad(StatisticsCacheKey key) {
// Load from statistics table.
Optional<ColumnStatistic> columnStatistic = loadFromStatsTable(String.valueOf(key.tableId),
String.valueOf(key.idxId), key.colName);
Optional<ColumnStatistic> columnStatistic = loadFromStatsTable(key.tableId,
key.idxId, key.colName);
if (columnStatistic.isPresent()) {
return columnStatistic;
}
Expand All @@ -61,26 +52,19 @@ protected Optional<ColumnStatistic> doLoad(StatisticsCacheKey key) {
return columnStatistic;
}

private Optional<ColumnStatistic> loadFromStatsTable(String tableId, String idxId, String colName) {
Map<String, String> params = new HashMap<>();
params.put("tblId", tableId);
params.put("idxId", idxId);
params.put("colId", colName);

List<ColumnStatistic> columnStatistics;
List<ResultRow> columnResult =
StatisticsUtil.execStatisticQuery(new StringSubstitutor(params)
.replace(QUERY_COLUMN_STATISTICS));
private Optional<ColumnStatistic> loadFromStatsTable(long tableId, long idxId, String colName) {
List<ResultRow> columnResults = StatisticsRepository.loadColStats(tableId, idxId, colName);
ColumnStatistic columnStatistics;
try {
columnStatistics = StatisticsUtil.deserializeToColumnStatistics(columnResult);
columnStatistics = StatisticsUtil.deserializeToColumnStatistics(columnResults);
} catch (Exception e) {
LOG.warn("Exception to deserialize column statistics", e);
return Optional.empty();
}
if (CollectionUtils.isEmpty(columnStatistics)) {
if (columnStatistics == null) {
return Optional.empty();
} else {
return Optional.of(columnStatistics.get(0));
return Optional.of(columnStatistics);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -179,4 +179,9 @@ public double size() {
Bucket lastBucket = buckets.get(buckets.size() - 1);
return lastBucket.preSum + lastBucket.count;
}

@Override
public String toString() {
return serializeToJson(this);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public void execute() throws Exception {
StringSubstitutor stringSubstitutor = new StringSubstitutor(params);
String sql = stringSubstitutor.replace(ANALYZE_COLUMN_SQL_TEMPLATE);
execSQL(sql);
Env.getCurrentEnv().getStatisticsCache().refreshColStatsSync(tbl.getId(), -1, col.getName());
Env.getCurrentEnv().getStatisticsCache().syncLoadColStats(tbl.getId(), -1, col.getName());
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,19 @@

package org.apache.doris.statistics;

import org.apache.doris.catalog.Env;
import org.apache.doris.common.ClientPool;
import org.apache.doris.common.Config;
import org.apache.doris.common.ThreadPoolManager;
import org.apache.doris.ha.FrontendNodeType;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.statistics.util.InternalQueryResult.ResultRow;
import org.apache.doris.statistics.util.StatisticsUtil;
import org.apache.doris.system.Frontend;
import org.apache.doris.thrift.FrontendService;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TUpdateFollowerStatsCacheRequest;

import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
import com.github.benmanes.caffeine.cache.Caffeine;
Expand Down Expand Up @@ -97,7 +105,7 @@ public ColumnStatistic getColumnStatistics(long catalogId, long dbId, long tblId
}

public Optional<ColumnStatistic> getColumnStatistics(long catalogId, long dbId,
long tblId, long idxId, String colName) {
long tblId, long idxId, String colName) {
ConnectContext ctx = ConnectContext.get();
if (ctx != null && ctx.getSessionVariable().internalSession) {
return Optional.empty();
Expand Down Expand Up @@ -203,42 +211,78 @@ private void doPreHeat() {
}
for (ResultRow r : recentStatsUpdatedCols) {
try {
String tblId = r.getColumnValue("tbl_id");
String idxId = r.getColumnValue("idx_id");
long tblId = Long.parseLong(r.getColumnValue("tbl_id"));
long idxId = Long.parseLong(r.getColumnValue("idx_id"));
String colId = r.getColumnValue("col_id");
final StatisticsCacheKey k =
new StatisticsCacheKey(Long.parseLong(tblId), Long.parseLong(idxId), colId);
new StatisticsCacheKey(tblId, idxId, colId);
final ColumnStatistic c = ColumnStatistic.fromResultRow(r);
CompletableFuture<Optional<ColumnStatistic>> f = new CompletableFuture<Optional<ColumnStatistic>>() {
c.loadPartitionStats(tblId, idxId, colId);
putCache(k, c);
} catch (Throwable t) {
LOG.warn("Error when preheating stats cache", t);
}
}
}

@Override
public Optional<ColumnStatistic> get() throws InterruptedException, ExecutionException {
return Optional.of(c);
public void syncLoadColStats(long tableId, long idxId, String colName) {
List<ResultRow> columnResults = StatisticsRepository.loadColStats(tableId, idxId, colName);
for (ResultRow r : columnResults) {
final StatisticsCacheKey k =
new StatisticsCacheKey(tableId, idxId, colName);
final ColumnStatistic c = ColumnStatistic.fromResultRow(r);
if (c == ColumnStatistic.UNKNOWN) {
continue;
}
putCache(k, c);
TUpdateFollowerStatsCacheRequest updateFollowerStatsCacheRequest = new TUpdateFollowerStatsCacheRequest();
updateFollowerStatsCacheRequest.key = GsonUtils.GSON.toJson(k);
updateFollowerStatsCacheRequest.colStats = GsonUtils.GSON.toJson(c);
for (Frontend frontend : Env.getCurrentEnv().getFrontends(FrontendNodeType.FOLLOWER)) {
TNetworkAddress address = new TNetworkAddress(frontend.getHost(),
frontend.getRpcPort());
FrontendService.Client client = null;
try {
client = ClientPool.frontendPool.borrowObject(address);
client.updateStatsCache(updateFollowerStatsCacheRequest);
} catch (Throwable t) {
LOG.warn("Failed to sync stats to follower: {}", address, t);
} finally {
if (client != null) {
ClientPool.frontendPool.returnObject(address, client);
}
}
}
}
}

@Override
public boolean isDone() {
return true;
}
public void putCache(StatisticsCacheKey k, ColumnStatistic c) {
CompletableFuture<Optional<ColumnStatistic>> f = new CompletableFuture<Optional<ColumnStatistic>>() {

@Override
public boolean complete(Optional<ColumnStatistic> value) {
return true;
}
@Override
public Optional<ColumnStatistic> get() throws InterruptedException, ExecutionException {
return Optional.of(c);
}

@Override
public Optional<ColumnStatistic> join() {
return Optional.of(c);
}
};
if (c.isUnKnown) {
continue;
}
columnStatisticsCache.put(k, f);
} catch (Throwable t) {
LOG.warn("Error when preheating stats cache", t);
@Override
public boolean isDone() {
return true;
}

@Override
public boolean complete(Optional<ColumnStatistic> value) {
return true;
}

@Override
public Optional<ColumnStatistic> join() {
return Optional.of(c);
}
};
if (c.isUnKnown) {
return;
}
columnStatisticsCache.put(k, f);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.doris.statistics;

import com.google.gson.annotations.SerializedName;

import java.util.Objects;
import java.util.StringJoiner;

Expand All @@ -26,10 +28,15 @@ public class StatisticsCacheKey {
* May be index id either, since they are natively same in the code.
* catalogId and dbId are not included in the hashCode. Because tableId is globally unique.
*/
@SerializedName("catalogId")
public final long catalogId;
@SerializedName("dbId")
public final long dbId;
@SerializedName("tableId")
public final long tableId;
@SerializedName("idxId")
public final long idxId;
@SerializedName("colName")
public final String colName;

private static final String DELIMITER = "-";
Expand Down
Loading

0 comments on commit eeb09f8

Please sign in to comment.