Skip to content

Commit

Permalink
Verify if the fs object has any remaining references before closing it.
Browse files Browse the repository at this point in the history
  • Loading branch information
CalvinKirs committed Sep 3, 2024
1 parent d9e3536 commit 4c9bc60
Showing 1 changed file with 43 additions and 8 deletions.
51 changes: 43 additions & 8 deletions fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,26 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.lang.ref.ReferenceQueue;
import java.lang.ref.WeakReference;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class FileSystemCache {

private static final Logger LOG = LoggerFactory.getLogger(FileSystemCache.class);
private final LoadingCache<FileSystemCacheKey, RemoteFileSystem> fileSystemCache;
private final ReferenceQueue<RemoteFileSystem> referenceQueue = new ReferenceQueue<>();
private final Set<WeakReference<RemoteFileSystem>> weakReferences = ConcurrentHashMap.newKeySet();
private ScheduledExecutorService cleanupExecutor;

public FileSystemCache() {
// no need to set refreshAfterWrite, because the FileSystem is created once and never changed
Expand All @@ -51,22 +60,48 @@ public FileSystemCache() {
null);
CustomThreadFactory threadFactory = new CustomThreadFactory("fs-cache-thread");
ExecutorService executor = Executors.newSingleThreadExecutor(threadFactory);
CustomThreadFactory cleanupThreadFactory = new CustomThreadFactory("fs-cache-cleanup-thread");
cleanupExecutor = Executors.newScheduledThreadPool(1, cleanupThreadFactory);
fileSystemCache = fsCacheFactory.buildCache(this::loadFileSystem, (key, fs, removalCause) -> {
if (key != null) {
LOG.info("Close file system: {}", key.fsIdent);
}
try {
if (fs != null) {
fs.close();
}
} catch (IOException e) {
LOG.warn("Failed to close file system", e);
}
// Handle reference removal
removeWeakReference(fs);
}, executor);
cleanupExecutor.scheduleAtFixedRate(this::processQueue, 0, 5, TimeUnit.MINUTES);
}

private RemoteFileSystem loadFileSystem(FileSystemCacheKey key) {
return FileSystemFactory.getRemoteFileSystem(key.type, key.getFsProperties(), key.bindBrokerName);
RemoteFileSystem fs = FileSystemFactory.getRemoteFileSystem(key.type, key.getFsProperties(),
key.bindBrokerName);
WeakReference<RemoteFileSystem> weakRef = new WeakReference<>(fs, referenceQueue);
weakReferences.add(weakRef);
return fs;
}

private void removeWeakReference(RemoteFileSystem fs) {
weakReferences.removeIf(ref -> {
RemoteFileSystem referenced = ref.get();
return referenced == null || referenced.equals(fs);
});
// Explicitly try to clean up if the object is no longer referenced
processQueue();
}

private void processQueue() {
WeakReference<? extends RemoteFileSystem> ref;
while ((ref = (WeakReference<? extends RemoteFileSystem>) referenceQueue.poll()) != null) {
RemoteFileSystem fs = ref.get();
if (fs != null) {
try {
fs.close();
LOG.info("Closed file system: {}", fs);
} catch (IOException e) {
LOG.warn("Failed to close file system: " + e.getMessage());
}
}
}
}

public RemoteFileSystem getRemoteFileSystem(FileSystemCacheKey key) {
Expand Down

0 comments on commit 4c9bc60

Please sign in to comment.