diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemCache.java b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemCache.java index d1e846b662b0315..84e3ac19941c86f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemCache.java @@ -29,17 +29,26 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.lang.ref.ReferenceQueue; +import java.lang.ref.WeakReference; import java.util.HashMap; import java.util.Map; import java.util.Objects; import java.util.OptionalLong; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; public class FileSystemCache { private static final Logger LOG = LoggerFactory.getLogger(FileSystemCache.class); private final LoadingCache fileSystemCache; + private final ReferenceQueue referenceQueue = new ReferenceQueue<>(); + private final Set> weakReferences = ConcurrentHashMap.newKeySet(); + private ScheduledExecutorService cleanupExecutor; public FileSystemCache() { // no need to set refreshAfterWrite, because the FileSystem is created once and never changed @@ -51,22 +60,48 @@ public FileSystemCache() { null); CustomThreadFactory threadFactory = new CustomThreadFactory("fs-cache-thread"); ExecutorService executor = Executors.newSingleThreadExecutor(threadFactory); + CustomThreadFactory cleanupThreadFactory = new CustomThreadFactory("fs-cache-cleanup-thread"); + cleanupExecutor = Executors.newScheduledThreadPool(1, cleanupThreadFactory); fileSystemCache = fsCacheFactory.buildCache(this::loadFileSystem, (key, fs, removalCause) -> { if (key != null) { LOG.info("Close file system: {}", key.fsIdent); } - try { - if (fs != null) { - fs.close(); - } - } catch (IOException e) { - LOG.warn("Failed to close file system", e); - } + // Handle reference removal + removeWeakReference(fs); }, executor); + cleanupExecutor.scheduleAtFixedRate(this::processQueue, 0, 5, TimeUnit.MINUTES); } private RemoteFileSystem loadFileSystem(FileSystemCacheKey key) { - return FileSystemFactory.getRemoteFileSystem(key.type, key.getFsProperties(), key.bindBrokerName); + RemoteFileSystem fs = FileSystemFactory.getRemoteFileSystem(key.type, key.getFsProperties(), + key.bindBrokerName); + WeakReference weakRef = new WeakReference<>(fs, referenceQueue); + weakReferences.add(weakRef); + return fs; + } + + private void removeWeakReference(RemoteFileSystem fs) { + weakReferences.removeIf(ref -> { + RemoteFileSystem referenced = ref.get(); + return referenced == null || referenced.equals(fs); + }); + // Explicitly try to clean up if the object is no longer referenced + processQueue(); + } + + private void processQueue() { + WeakReference ref; + while ((ref = (WeakReference) referenceQueue.poll()) != null) { + RemoteFileSystem fs = ref.get(); + if (fs != null) { + try { + fs.close(); + LOG.info("Closed file system: {}", fs); + } catch (IOException e) { + LOG.warn("Failed to close file system: " + e.getMessage()); + } + } + } } public RemoteFileSystem getRemoteFileSystem(FileSystemCacheKey key) {