From 32186d0f67a758db5d20f22fe9b7af652e700520 Mon Sep 17 00:00:00 2001 From: morningman Date: Wed, 26 Jul 2023 18:48:29 +0800 Subject: [PATCH] [fix](fs-cache) add 'scheme://authority' to fs cache key --- .../datasource/hive/HiveMetaStoreCache.java | 10 ++++----- .../org/apache/doris/fs/FileSystemCache.java | 14 +++++++++---- .../apache/doris/fs/FileSystemFactory.java | 21 +++++++++++++------ .../org/apache/doris/fs/FileSystemType.java | 3 ++- 4 files changed, 32 insertions(+), 16 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java index 1db50c93fbee10..0bd190d9459799 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java @@ -366,7 +366,7 @@ private FileCacheValue getFileCache(String location, InputFormat inputForm FileCacheValue result = new FileCacheValue(); result.setSplittable(HiveUtil.isSplittable(inputFormat, new Path(location), jobConf)); RemoteFileSystem fs = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem( - new FileSystemCache.FileSystemCacheKey(FileSystemFactory.getLocationType(location), jobConf)); + new FileSystemCache.FileSystemCacheKey(FileSystemFactory.getFSIdentity(location), jobConf)); try { // For Tez engine, it may generate subdirectoies for "union" query. // So there may be files and directories in the table directory at the same time. eg: @@ -762,8 +762,8 @@ public List getFilesByTransaction(List partitions .getPath() : null; String acidVersionPath = new Path(baseOrDeltaPath, "_orc_acid_version").toUri().toString(); RemoteFileSystem fs = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem( - new FileSystemCache.FileSystemCacheKey( - FileSystemFactory.getLocationType(baseOrDeltaPath.toUri().toString()), jobConf)); + new FileSystemCache.FileSystemCacheKey( + FileSystemFactory.getFSIdentity(baseOrDeltaPath.toUri().toString()), jobConf)); Status status = fs.exists(acidVersionPath); if (status != Status.OK) { if (status.getErrCode() == ErrCode.NOT_FOUND) { @@ -784,7 +784,7 @@ public List getFilesByTransaction(List partitions for (AcidUtils.ParsedDelta delta : directory.getCurrentDirectories()) { String location = delta.getPath().toString(); RemoteFileSystem fs = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem( - new FileSystemCache.FileSystemCacheKey(FileSystemFactory.getLocationType(location), jobConf)); + new FileSystemCache.FileSystemCacheKey(FileSystemFactory.getFSIdentity(location), jobConf)); RemoteFiles locatedFiles = fs.listLocatedFiles(location, true, false); if (delta.isDeleteDelta()) { List deleteDeltaFileNames = locatedFiles.files().stream().map(f -> f.getName()).filter( @@ -802,7 +802,7 @@ public List getFilesByTransaction(List partitions if (directory.getBaseDirectory() != null) { String location = directory.getBaseDirectory().toString(); RemoteFileSystem fs = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem( - new FileSystemCache.FileSystemCacheKey(FileSystemFactory.getLocationType(location), jobConf)); + new FileSystemCache.FileSystemCacheKey(FileSystemFactory.getFSIdentity(location), jobConf)); RemoteFiles locatedFiles = fs.listLocatedFiles(location, true, false); locatedFiles.files().stream().filter( f -> f.getName().startsWith(HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX)) diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemCache.java b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemCache.java index aa6123d8072178..edc746ebe24e53 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemCache.java @@ -18,6 +18,7 @@ package org.apache.doris.fs; import org.apache.doris.common.Config; +import org.apache.doris.common.Pair; import org.apache.doris.common.util.CacheBulkLoader; import org.apache.doris.datasource.CacheException; import org.apache.doris.fs.remote.RemoteFileSystem; @@ -65,10 +66,13 @@ public RemoteFileSystem getRemoteFileSystem(FileSystemCacheKey key) { public static class FileSystemCacheKey { private final FileSystemType type; + // eg: hdfs://nameservices1 + private final String fsIdent; private final JobConf conf; - public FileSystemCacheKey(FileSystemType type, JobConf conf) { - this.type = type; + public FileSystemCacheKey(Pair fs, JobConf conf) { + this.type = fs.first; + this.fsIdent = fs.second; this.conf = conf; } @@ -80,12 +84,14 @@ public boolean equals(Object obj) { if (!(obj instanceof FileSystemCacheKey)) { return false; } - return type.equals(((FileSystemCacheKey) obj).type) && conf == ((FileSystemCacheKey) obj).conf; + return type.equals(((FileSystemCacheKey) obj).type) + && fsIdent.equals(((FileSystemCacheKey) obj).fsIdent) + && conf == ((FileSystemCacheKey) obj).conf; } @Override public int hashCode() { - return Objects.hash(conf, type); + return Objects.hash(conf, fsIdent, type); } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemFactory.java b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemFactory.java index 73d0c19472e5fd..1c6217ff4a8577 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemFactory.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemFactory.java @@ -19,6 +19,7 @@ import org.apache.doris.analysis.StorageBackend; import org.apache.doris.common.FeConstants; +import org.apache.doris.common.Pair; import org.apache.doris.common.util.S3Util; import org.apache.doris.fs.remote.BrokerFileSystem; import org.apache.doris.fs.remote.RemoteFileSystem; @@ -27,10 +28,12 @@ import org.apache.doris.fs.remote.dfs.JFSFileSystem; import org.apache.doris.fs.remote.dfs.OFSFileSystem; +import com.google.common.base.Strings; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import java.io.IOException; +import java.net.URI; import java.util.HashMap; import java.util.Map; @@ -53,22 +56,28 @@ public static RemoteFileSystem get(String name, StorageBackend.StorageType type, } } - public static FileSystemType getLocationType(String location) { + public static Pair getFSIdentity(String location) { + FileSystemType fsType; if (S3Util.isObjStorage(location)) { if (S3Util.isHdfsOnOssEndpoint(location)) { // if hdfs service is enabled on oss, use hdfs lib to access oss. - return FileSystemType.DFS; + fsType = FileSystemType.DFS; } - return FileSystemType.S3; + fsType = FileSystemType.S3; } else if (location.startsWith(FeConstants.FS_PREFIX_HDFS) || location.startsWith(FeConstants.FS_PREFIX_GFS)) { - return FileSystemType.DFS; + fsType = FileSystemType.DFS; } else if (location.startsWith(FeConstants.FS_PREFIX_OFS)) { - return FileSystemType.OFS; + fsType = FileSystemType.OFS; } else if (location.startsWith(FeConstants.FS_PREFIX_JFS)) { - return FileSystemType.JFS; + fsType = FileSystemType.JFS; } else { throw new UnsupportedOperationException("Unknown file system for location: " + location); } + + Path path = new Path(location); + URI uri = path.toUri(); + String fsIdent = Strings.nullToEmpty(uri.getScheme()) + "://" + Strings.nullToEmpty(uri.getAuthority()); + return Pair.of(fsType, fsIdent); } public static RemoteFileSystem getByType(FileSystemType type, Configuration conf) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemType.java b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemType.java index e3147943c21287..5ddea01174441b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemType.java @@ -21,5 +21,6 @@ public enum FileSystemType { S3, DFS, OFS, - JFS + JFS, + FILE }