Skip to content

Commit

Permalink
New entries can to added after EntryCache is closed.
Browse files Browse the repository at this point in the history
  • Loading branch information
szetszwo committed Oct 6, 2024
1 parent c4ac263 commit b831226
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -182,18 +182,17 @@ public synchronized T get() {

@Override
public synchronized T retain() {
if (getCount() == 0) {
this.removeMethod = leakDetector.track(this, this::logLeakMessage);
}
final T value;
try {
final T value = super.retain();
if (getCount() == 0) {
this.valueString = value.toString();
}
return value;
value = super.retain();
} catch (Exception e) {
throw new IllegalStateException("Failed to retain: " + getTraceString(getCount()), e);
}
if (getCount() == 1) { // this is the first retain
this.removeMethod = leakDetector.track(this, this::logLeakMessage);
this.valueString = value.toString();
}
return value;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.ratis.thirdparty.com.google.common.cache.CacheLoader;
import org.apache.ratis.thirdparty.com.google.protobuf.CodedOutputStream;
import org.apache.ratis.util.FileUtils;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.ReferenceCountedObject;
import org.apache.ratis.util.SizeInBytes;
Expand All @@ -38,10 +39,10 @@
import java.io.File;
import java.io.IOException;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -293,47 +294,92 @@ public ReferenceCountedObject<LogEntryProto> load(LogRecord key) throws IOExcept
}
}

static class EntryCache {
private final Map<TermIndex, AtomicReference<ReferenceCountedObject<LogEntryProto>>> map
= new ConcurrentHashMap<>();
private static class Item {
private final AtomicReference<ReferenceCountedObject<LogEntryProto>> ref;
private final long serializedSize;

Item(ReferenceCountedObject<LogEntryProto> obj, long serializedSize) {
this.ref = new AtomicReference<>(obj);
this.serializedSize = serializedSize;
}

ReferenceCountedObject<LogEntryProto> get() {
return ref.get();
}

long release() {
final ReferenceCountedObject<LogEntryProto> entry = ref.getAndSet(null);
if (entry == null) {
return 0;
}
entry.release();
return serializedSize;
}
}

class EntryCache {
private Map<TermIndex, Item> map = new HashMap<>();
private final AtomicLong size = new AtomicLong();

@Override
public String toString() {
return JavaUtils.getClassSimpleName(getClass()) + "-" + LogSegment.this;
}

long size() {
return size.get();
}

ReferenceCountedObject<LogEntryProto> get(TermIndex ti) {
final AtomicReference<ReferenceCountedObject<LogEntryProto>> ref = map.get(ti);
synchronized ReferenceCountedObject<LogEntryProto> get(TermIndex ti) {
if (map == null) {
return null;
}
final Item ref = map.get(ti);
return ref == null? null: ref.get();
}

void clear() {
for (Iterator<Map.Entry<TermIndex, AtomicReference<ReferenceCountedObject<LogEntryProto>>>>
i = map.entrySet().iterator(); i.hasNext(); ) {
/** After close(), the cache CANNOT be used again. */
synchronized void close() {
if (map == null) {
return;
}
evict();
map = null;
LOG.info("Successfully closed {}", this);
}

/** After evict(), the cache can be used again. */
synchronized void evict() {
if (map == null) {
return;
}
for (Iterator<Map.Entry<TermIndex, Item>> i = map.entrySet().iterator(); i.hasNext(); i.remove()) {
release(i.next().getValue());
i.remove();
}
}

void put(TermIndex key, ReferenceCountedObject<LogEntryProto> valueRef, Op op) {
synchronized void put(TermIndex key, ReferenceCountedObject<LogEntryProto> valueRef, Op op) {
if (map == null) {
return;
}
valueRef.retain();
release(map.put(key, new AtomicReference<>(valueRef)));
size.getAndAdd(getEntrySize(valueRef.get(), op));
final long serializedSize = getEntrySize(valueRef.get(), op);
release(map.put(key, new Item(valueRef, serializedSize)));
size.getAndAdd(serializedSize);
}

private void release(AtomicReference<ReferenceCountedObject<LogEntryProto>> ref) {
private void release(Item ref) {
if (ref == null) {
return;
}
final ReferenceCountedObject<LogEntryProto> entry = ref.getAndSet(null);
if (entry == null) {
return;
}
size.getAndAdd(-getEntrySize(entry.get(), Op.REMOVE_CACHE));
entry.release();
final long serializedSize = ref.release();
size.getAndAdd(-serializedSize);
}

void remove(TermIndex key) {
synchronized void remove(TermIndex key) {
if (map == null) {
return;
}
release(map.remove(key));
}
}
Expand Down Expand Up @@ -522,7 +568,7 @@ private int compareTo(Long l) {

synchronized void clear() {
records.clear();
evictCache();
entryCache.close();
endIndex = startIndex - 1;
}

Expand All @@ -531,7 +577,7 @@ int getLoadingTimes() {
}

void evictCache() {
entryCache.clear();
entryCache.evict();
}

void putEntryCache(TermIndex key, ReferenceCountedObject<LogEntryProto> valueRef, Op op) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -564,13 +564,15 @@ public CompletableFuture<Long> onSnapshotInstalled(long lastSnapshotIndex) {
@Override
public void close() throws IOException {
try(AutoCloseableLock writeLock = writeLock()) {
LOG.info("Start closing {}", this);
super.close();
cacheEviction.close();
cache.close();
}
fileLogWorker.close();
storage.close();
getRaftLogMetrics().unregister();
LOG.info("Successfully closed {}", this);
}

SegmentedRaftLogCache getRaftLogCache() {
Expand Down

0 comments on commit b831226

Please sign in to comment.