Skip to content

Commit

Permalink
[fix](fs-cache) add 'scheme://authority' to fs cache key
Browse files Browse the repository at this point in the history
  • Loading branch information
morningman committed Jul 26, 2023
1 parent 2bb882b commit 32186d0
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ private FileCacheValue getFileCache(String location, InputFormat<?, ?> inputForm
FileCacheValue result = new FileCacheValue();
result.setSplittable(HiveUtil.isSplittable(inputFormat, new Path(location), jobConf));
RemoteFileSystem fs = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem(
new FileSystemCache.FileSystemCacheKey(FileSystemFactory.getLocationType(location), jobConf));
new FileSystemCache.FileSystemCacheKey(FileSystemFactory.getFSIdentity(location), jobConf));
try {
// For Tez engine, it may generate subdirectoies for "union" query.
// So there may be files and directories in the table directory at the same time. eg:
Expand Down Expand Up @@ -762,8 +762,8 @@ public List<FileCacheValue> getFilesByTransaction(List<HivePartition> partitions
.getPath() : null;
String acidVersionPath = new Path(baseOrDeltaPath, "_orc_acid_version").toUri().toString();
RemoteFileSystem fs = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem(
new FileSystemCache.FileSystemCacheKey(
FileSystemFactory.getLocationType(baseOrDeltaPath.toUri().toString()), jobConf));
new FileSystemCache.FileSystemCacheKey(
FileSystemFactory.getFSIdentity(baseOrDeltaPath.toUri().toString()), jobConf));
Status status = fs.exists(acidVersionPath);
if (status != Status.OK) {
if (status.getErrCode() == ErrCode.NOT_FOUND) {
Expand All @@ -784,7 +784,7 @@ public List<FileCacheValue> getFilesByTransaction(List<HivePartition> partitions
for (AcidUtils.ParsedDelta delta : directory.getCurrentDirectories()) {
String location = delta.getPath().toString();
RemoteFileSystem fs = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem(
new FileSystemCache.FileSystemCacheKey(FileSystemFactory.getLocationType(location), jobConf));
new FileSystemCache.FileSystemCacheKey(FileSystemFactory.getFSIdentity(location), jobConf));
RemoteFiles locatedFiles = fs.listLocatedFiles(location, true, false);
if (delta.isDeleteDelta()) {
List<String> deleteDeltaFileNames = locatedFiles.files().stream().map(f -> f.getName()).filter(
Expand All @@ -802,7 +802,7 @@ public List<FileCacheValue> getFilesByTransaction(List<HivePartition> partitions
if (directory.getBaseDirectory() != null) {
String location = directory.getBaseDirectory().toString();
RemoteFileSystem fs = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem(
new FileSystemCache.FileSystemCacheKey(FileSystemFactory.getLocationType(location), jobConf));
new FileSystemCache.FileSystemCacheKey(FileSystemFactory.getFSIdentity(location), jobConf));
RemoteFiles locatedFiles = fs.listLocatedFiles(location, true, false);
locatedFiles.files().stream().filter(
f -> f.getName().startsWith(HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX))
Expand Down
14 changes: 10 additions & 4 deletions fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.doris.fs;

import org.apache.doris.common.Config;
import org.apache.doris.common.Pair;
import org.apache.doris.common.util.CacheBulkLoader;
import org.apache.doris.datasource.CacheException;
import org.apache.doris.fs.remote.RemoteFileSystem;
Expand Down Expand Up @@ -65,10 +66,13 @@ public RemoteFileSystem getRemoteFileSystem(FileSystemCacheKey key) {

public static class FileSystemCacheKey {
private final FileSystemType type;
// eg: hdfs://nameservices1
private final String fsIdent;
private final JobConf conf;

public FileSystemCacheKey(FileSystemType type, JobConf conf) {
this.type = type;
public FileSystemCacheKey(Pair<FileSystemType, String> fs, JobConf conf) {
this.type = fs.first;
this.fsIdent = fs.second;
this.conf = conf;
}

Expand All @@ -80,12 +84,14 @@ public boolean equals(Object obj) {
if (!(obj instanceof FileSystemCacheKey)) {
return false;
}
return type.equals(((FileSystemCacheKey) obj).type) && conf == ((FileSystemCacheKey) obj).conf;
return type.equals(((FileSystemCacheKey) obj).type)
&& fsIdent.equals(((FileSystemCacheKey) obj).fsIdent)
&& conf == ((FileSystemCacheKey) obj).conf;
}

@Override
public int hashCode() {
return Objects.hash(conf, type);
return Objects.hash(conf, fsIdent, type);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.doris.analysis.StorageBackend;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.Pair;
import org.apache.doris.common.util.S3Util;
import org.apache.doris.fs.remote.BrokerFileSystem;
import org.apache.doris.fs.remote.RemoteFileSystem;
Expand All @@ -27,10 +28,12 @@
import org.apache.doris.fs.remote.dfs.JFSFileSystem;
import org.apache.doris.fs.remote.dfs.OFSFileSystem;

import com.google.common.base.Strings;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;

import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;

Expand All @@ -53,22 +56,28 @@ public static RemoteFileSystem get(String name, StorageBackend.StorageType type,
}
}

public static FileSystemType getLocationType(String location) {
public static Pair<FileSystemType, String> getFSIdentity(String location) {
FileSystemType fsType;
if (S3Util.isObjStorage(location)) {
if (S3Util.isHdfsOnOssEndpoint(location)) {
// if hdfs service is enabled on oss, use hdfs lib to access oss.
return FileSystemType.DFS;
fsType = FileSystemType.DFS;
}
return FileSystemType.S3;
fsType = FileSystemType.S3;
} else if (location.startsWith(FeConstants.FS_PREFIX_HDFS) || location.startsWith(FeConstants.FS_PREFIX_GFS)) {
return FileSystemType.DFS;
fsType = FileSystemType.DFS;
} else if (location.startsWith(FeConstants.FS_PREFIX_OFS)) {
return FileSystemType.OFS;
fsType = FileSystemType.OFS;
} else if (location.startsWith(FeConstants.FS_PREFIX_JFS)) {
return FileSystemType.JFS;
fsType = FileSystemType.JFS;
} else {
throw new UnsupportedOperationException("Unknown file system for location: " + location);
}

Path path = new Path(location);
URI uri = path.toUri();
String fsIdent = Strings.nullToEmpty(uri.getScheme()) + "://" + Strings.nullToEmpty(uri.getAuthority());
return Pair.of(fsType, fsIdent);
}

public static RemoteFileSystem getByType(FileSystemType type, Configuration conf) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,6 @@ public enum FileSystemType {
S3,
DFS,
OFS,
JFS
JFS,
FILE
}

0 comments on commit 32186d0

Please sign in to comment.