Skip to content

Commit

Permalink
[fix](show-table-status) fix hive view NPE and external meta cache re…
Browse files Browse the repository at this point in the history
…fresh issue (apache#22377)
  • Loading branch information
morningman authored Aug 4, 2023
1 parent dc06c48 commit 672acb8
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2042,7 +2042,7 @@ public class Config extends ConfigBase {
public static long statistics_sql_mem_limit_in_bytes = 2L * 1024 * 1024 * 1024;

@ConfField(mutable = true, masterOnly = true, description = {
"用于强制设定内表的副本数,如果改参数大于零,则用户在建表时指定的副本数将被忽略,而使用本参数设置的值。"
"用于强制设定内表的副本数,如果该参数大于零,则用户在建表时指定的副本数将被忽略,而使用本参数设置的值。"
+ "同时,建表语句中指定的副本标签等参数会被忽略。该参数不影响包括创建分区、修改表属性的操作。该参数建议仅用于测试环境",
"Used to force the number of replicas of the internal table. If the config is greater than zero, "
+ "the number of replicas specified by the user when creating the table will be ignored, "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.CacheBulkLoader;
import org.apache.doris.common.util.S3Util;
Expand Down Expand Up @@ -98,6 +99,7 @@
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -499,8 +501,18 @@ public HivePartitionValues getPartitionValues(PartitionValueCacheKey key) {
}
}

public List<FileCacheValue> getFilesByPartitions(List<HivePartition> partitions,
boolean useSelfSplitter) {
public List<FileCacheValue> getFilesByPartitionsWithCache(List<HivePartition> partitions,
boolean useSelfSplitter) {
return getFilesByPartitions(partitions, useSelfSplitter, true);
}

public List<FileCacheValue> getFilesByPartitionsWithoutCache(List<HivePartition> partitions,
boolean useSelfSplitter) {
return getFilesByPartitions(partitions, useSelfSplitter, false);
}

private List<FileCacheValue> getFilesByPartitions(List<HivePartition> partitions,
boolean useSelfSplitter, boolean withCache) {
long start = System.currentTimeMillis();
List<FileCacheKey> keys = partitions.stream().map(p -> {
FileCacheKey fileCacheKey = p.isDummyPartition()
Expand All @@ -513,28 +525,58 @@ public List<FileCacheValue> getFilesByPartitions(List<HivePartition> partitions,

List<FileCacheValue> fileLists;
try {
fileLists = fileCacheRef.get().getAll(keys).values().asList();
if (withCache) {
fileLists = fileCacheRef.get().getAll(keys).values().asList();
} else {
List<Pair<FileCacheKey, Future<FileCacheValue>>> pList = keys.stream()
.map(key -> Pair.of(key, executor.submit(() -> loadFiles(key))))
.collect(Collectors.toList());

fileLists = Lists.newArrayListWithExpectedSize(keys.size());
for (Pair<FileCacheKey, Future<FileCacheValue>> p : pList) {
fileLists.add(p.second.get());
}
}
} catch (ExecutionException e) {
throw new CacheException("failed to get files from partitions in catalog %s",
e, catalog.getName());
e, catalog.getName());
} catch (InterruptedException e) {
throw new CacheException("failed to get files from partitions in catalog %s with interrupted exception",
e, catalog.getName());
}

LOG.debug("get #{} files from #{} partitions in catalog {} cost: {} ms",
fileLists.stream().mapToInt(l -> l.getFiles() == null
? (l.getSplits() == null ? 0 : l.getSplits().size()) : l.getFiles().size()).sum(),
? (l.getSplits() == null ? 0 : l.getSplits().size()) : l.getFiles().size()).sum(),
partitions.size(), catalog.getName(), (System.currentTimeMillis() - start));
return fileLists;
}

public List<HivePartition> getAllPartitions(String dbName, String name, List<List<String>> partitionValuesList) {
public List<HivePartition> getAllPartitionsWithCache(String dbName, String name,
List<List<String>> partitionValuesList) {
return getAllPartitions(dbName, name, partitionValuesList, true);
}

public List<HivePartition> getAllPartitionsWithoutCache(String dbName, String name,
List<List<String>> partitionValuesList) {
return getAllPartitions(dbName, name, partitionValuesList, false);
}

private List<HivePartition> getAllPartitions(String dbName, String name, List<List<String>> partitionValuesList,
boolean withCache) {
long start = System.currentTimeMillis();
List<PartitionCacheKey> keys = partitionValuesList.stream()
.map(p -> new PartitionCacheKey(dbName, name, p))
.collect(Collectors.toList());
.map(p -> new PartitionCacheKey(dbName, name, p))
.collect(Collectors.toList());

List<HivePartition> partitions;
try {
partitions = partitionCache.getAll(keys).values().asList();
if (withCache) {
partitions = partitionCache.getAll(keys).values().asList();
} else {
Map<PartitionCacheKey, HivePartition> map = loadPartitions(keys);
partitions = map.values().stream().collect(Collectors.toList());
}
} catch (ExecutionException e) {
throw new CacheException("failed to get partition in catalog %s", e, catalog.getName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ protected List<HivePartition> getPartitions() throws AnalysisException {
partitionValuesList.add(listPartitionItem.getItems().get(0).getPartitionValuesAsStringList());
}
List<HivePartition> allPartitions =
cache.getAllPartitions(hmsTable.getDbName(), hmsTable.getName(), partitionValuesList);
cache.getAllPartitionsWithCache(hmsTable.getDbName(), hmsTable.getName(), partitionValuesList);
if (ConnectContext.get().getExecutor() != null) {
ConnectContext.get().getExecutor().getSummaryProfile().setGetPartitionsFinishTime();
}
Expand Down Expand Up @@ -197,7 +197,7 @@ private void getFileSplitByPartitions(HiveMetaStoreCache cache, List<HivePartiti
if (hiveTransaction != null) {
fileCaches = getFileSplitByTransaction(cache, partitions);
} else {
fileCaches = cache.getFilesByPartitions(partitions, useSelfSplitter);
fileCaches = cache.getFilesByPartitionsWithCache(partitions, useSelfSplitter);
}
if (ConnectContext.get().getExecutor() != null) {
ConnectContext.get().getExecutor().getSummaryProfile().setGetPartitionFilesFinishTime();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -882,7 +882,11 @@ private void handleShowTableStatus() throws AnalysisException {
// Row_format
row.add(null);
// Rows
row.add(String.valueOf(table.getRowCount()));
// Use estimatedRowCount(), not getRowCount().
// because estimatedRowCount() is an async call, it will not block, and it will call getRowCount()
// finally. So that for some table(especially external table),
// we can get the row count without blocking.
row.add(String.valueOf(table.estimatedRowCount()));
// Avg_row_length
row.add(String.valueOf(table.getAvgRowLength()));
// Data_length
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1408,7 +1408,6 @@ private void sendResult(boolean isOutfileQuery, boolean isSendFields, Queriable
}
}


Span fetchResultSpan = context.getTracer().spanBuilder("fetch result").setParent(Context.current()).startSpan();
try (Scope scope = fetchResultSpan.makeCurrent()) {
while (true) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,9 @@ public static long getIcebergRowCount(HMSExternalTable table) {
* @return estimated row count
*/
public static long getRowCountFromFileList(HMSExternalTable table) {
if (table.isView()) {
return 0;
}
HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr()
.getMetaStoreCache((HMSExternalCatalog) table.getCatalog());
List<Type> partitionColumnTypes = table.getPartitionColumnTypes();
Expand All @@ -562,6 +565,9 @@ public static long getRowCountFromFileList(HMSExternalTable table) {
int totalPartitionSize = 1;
// Get table partitions from cache.
if (!partitionColumnTypes.isEmpty()) {
// It is ok to get partition values from cache,
// no need to worry that this call will invalid or refresh the cache.
// because it has enough space to keep partition info of all tables in cache.
partitionValues = cache.getPartitionValues(table.getDbName(), table.getName(), partitionColumnTypes);
}
if (partitionValues != null) {
Expand All @@ -582,14 +588,17 @@ public static long getRowCountFromFileList(HMSExternalTable table) {
for (PartitionItem item : partitionItems) {
partitionValuesList.add(((ListPartitionItem) item).getItems().get(0).getPartitionValuesAsStringList());
}
hivePartitions = cache.getAllPartitions(table.getDbName(), table.getName(), partitionValuesList);
// get partitions without cache, so that it will not invalid the cache when executing
// non query request such as `show table status`
hivePartitions = cache.getAllPartitionsWithoutCache(table.getDbName(), table.getName(),
partitionValuesList);
} else {
hivePartitions.add(new HivePartition(table.getDbName(), table.getName(), true,
table.getRemoteTable().getSd().getInputFormat(),
table.getRemoteTable().getSd().getLocation(), null));
}
// Get files for all partitions.
List<HiveMetaStoreCache.FileCacheValue> filesByPartitions = cache.getFilesByPartitions(
List<HiveMetaStoreCache.FileCacheValue> filesByPartitions = cache.getFilesByPartitionsWithoutCache(
hivePartitions, true);
long totalSize = 0;
// Calculate the total file size.
Expand Down

0 comments on commit 672acb8

Please sign in to comment.