diff --git a/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedLeakDetector.java b/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedLeakDetector.java index bcd921d7a8..1c794a516b 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedLeakDetector.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedLeakDetector.java @@ -182,18 +182,17 @@ public synchronized T get() { @Override public synchronized T retain() { - if (getCount() == 0) { - this.removeMethod = leakDetector.track(this, this::logLeakMessage); - } + final T value; try { - final T value = super.retain(); - if (getCount() == 0) { - this.valueString = value.toString(); - } - return value; + value = super.retain(); } catch (Exception e) { throw new IllegalStateException("Failed to retain: " + getTraceString(getCount()), e); } + if (getCount() == 1) { // this is the first retain + this.removeMethod = leakDetector.track(this, this::logLeakMessage); + this.valueString = value.toString(); + } + return value; } @Override diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java index cc8e911298..a88ade5872 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java @@ -29,6 +29,7 @@ import org.apache.ratis.thirdparty.com.google.common.cache.CacheLoader; import org.apache.ratis.thirdparty.com.google.protobuf.CodedOutputStream; import org.apache.ratis.util.FileUtils; +import org.apache.ratis.util.JavaUtils; import org.apache.ratis.util.Preconditions; import org.apache.ratis.util.ReferenceCountedObject; import org.apache.ratis.util.SizeInBytes; @@ -38,10 +39,10 @@ import java.io.File; import java.io.IOException; import java.util.Comparator; +import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.Objects; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.AtomicInteger; @@ -293,47 +294,92 @@ public ReferenceCountedObject load(LogRecord key) throws IOExcept } } - static class EntryCache { - private final Map>> map - = new ConcurrentHashMap<>(); + private static class Item { + private final AtomicReference> ref; + private final long serializedSize; + + Item(ReferenceCountedObject obj, long serializedSize) { + this.ref = new AtomicReference<>(obj); + this.serializedSize = serializedSize; + } + + ReferenceCountedObject get() { + return ref.get(); + } + + long release() { + final ReferenceCountedObject entry = ref.getAndSet(null); + if (entry == null) { + return 0; + } + entry.release(); + return serializedSize; + } + } + + class EntryCache { + private Map map = new HashMap<>(); private final AtomicLong size = new AtomicLong(); + @Override + public String toString() { + return JavaUtils.getClassSimpleName(getClass()) + "-" + LogSegment.this; + } + long size() { return size.get(); } - ReferenceCountedObject get(TermIndex ti) { - final AtomicReference> ref = map.get(ti); + synchronized ReferenceCountedObject get(TermIndex ti) { + if (map == null) { + return null; + } + final Item ref = map.get(ti); return ref == null? null: ref.get(); } - void clear() { - for (Iterator>>> - i = map.entrySet().iterator(); i.hasNext(); ) { + /** After close(), the cache CANNOT be used again. */ + synchronized void close() { + if (map == null) { + return; + } + evict(); + map = null; + LOG.info("Successfully closed {}", this); + } + + /** After evict(), the cache can be used again. */ + synchronized void evict() { + if (map == null) { + return; + } + for (Iterator> i = map.entrySet().iterator(); i.hasNext(); i.remove()) { release(i.next().getValue()); - i.remove(); } } - void put(TermIndex key, ReferenceCountedObject valueRef, Op op) { + synchronized void put(TermIndex key, ReferenceCountedObject valueRef, Op op) { + if (map == null) { + return; + } valueRef.retain(); - release(map.put(key, new AtomicReference<>(valueRef))); - size.getAndAdd(getEntrySize(valueRef.get(), op)); + final long serializedSize = getEntrySize(valueRef.get(), op); + release(map.put(key, new Item(valueRef, serializedSize))); + size.getAndAdd(serializedSize); } - private void release(AtomicReference> ref) { + private void release(Item ref) { if (ref == null) { return; } - final ReferenceCountedObject entry = ref.getAndSet(null); - if (entry == null) { - return; - } - size.getAndAdd(-getEntrySize(entry.get(), Op.REMOVE_CACHE)); - entry.release(); + final long serializedSize = ref.release(); + size.getAndAdd(-serializedSize); } - void remove(TermIndex key) { + synchronized void remove(TermIndex key) { + if (map == null) { + return; + } release(map.remove(key)); } } @@ -522,7 +568,7 @@ private int compareTo(Long l) { synchronized void clear() { records.clear(); - evictCache(); + entryCache.close(); endIndex = startIndex - 1; } @@ -531,7 +577,7 @@ int getLoadingTimes() { } void evictCache() { - entryCache.clear(); + entryCache.evict(); } void putEntryCache(TermIndex key, ReferenceCountedObject valueRef, Op op) { diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java index 557a2b37ea..485eb53d10 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java @@ -564,6 +564,7 @@ public CompletableFuture onSnapshotInstalled(long lastSnapshotIndex) { @Override public void close() throws IOException { try(AutoCloseableLock writeLock = writeLock()) { + LOG.info("Start closing {}", this); super.close(); cacheEviction.close(); cache.close(); @@ -571,6 +572,7 @@ public void close() throws IOException { fileLogWorker.close(); storage.close(); getRaftLogMetrics().unregister(); + LOG.info("Successfully closed {}", this); } SegmentedRaftLogCache getRaftLogCache() {