Skip to content

Commit

Permalink
Fix bugs in LogSegment.EntryCache.
Browse files Browse the repository at this point in the history
  • Loading branch information
szetszwo committed Oct 5, 2024
1 parent 38f5c69 commit 23af8ed
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,7 @@ protected List<CompletableFuture<Long>> appendImpl(ReferenceCountedObject<List<L
@Override
public String toString() {
return getName() + ":" + state + ":c" + getLastCommittedIndex()
+ (isOpened()? ":last" + getLastEntryTermIndex(): "");
+ (isOpened()? ":open:last" + getLastEntryTermIndex(): ":closed");
}

public AutoCloseableLock readLock() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@
import java.io.File;
import java.io.IOException;
import java.util.Comparator;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
Expand Down Expand Up @@ -294,36 +294,46 @@ public ReferenceCountedObject<LogEntryProto> load(LogRecord key) throws IOExcept
}

static class EntryCache {
private final Map<TermIndex, ReferenceCountedObject<LogEntryProto>> map = new ConcurrentHashMap<>();
private final Map<TermIndex, AtomicReference<ReferenceCountedObject<LogEntryProto>>> map = new ConcurrentHashMap<>();
private final AtomicLong size = new AtomicLong();

long size() {
return size.get();
}

ReferenceCountedObject<LogEntryProto> get(TermIndex ti) {
return map.get(ti);
final AtomicReference<ReferenceCountedObject<LogEntryProto>> ref = map.get(ti);
return ref == null? null: ref.get();
}

void clear() {
map.values().forEach(ReferenceCountedObject::release);
map.clear();
size.set(0);
for (Iterator<Map.Entry<TermIndex, AtomicReference<ReferenceCountedObject<LogEntryProto>>>>
i = map.entrySet().iterator(); i.hasNext(); ) {
release(i.next().getValue());
i.remove();
}
}

void put(TermIndex key, ReferenceCountedObject<LogEntryProto> valueRef, Op op) {
valueRef.retain();
Optional.ofNullable(map.put(key, valueRef)).ifPresent(this::release);
release(map.put(key, new AtomicReference<>(valueRef)));
size.getAndAdd(getEntrySize(valueRef.get(), op));
}

private void release(ReferenceCountedObject<LogEntryProto> entry) {
private void release(AtomicReference<ReferenceCountedObject<LogEntryProto>> ref) {
if (ref == null) {
return;
}
final ReferenceCountedObject<LogEntryProto> entry = ref.getAndSet(null);
if (entry == null) {
return;
}
size.getAndAdd(-getEntrySize(entry.get(), Op.REMOVE_CACHE));
entry.release();
}

void remove(TermIndex key) {
Optional.ofNullable(map.remove(key)).ifPresent(this::release);
release(map.remove(key));
}
}

Expand Down
7 changes: 5 additions & 2 deletions ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,11 @@ static boolean logEntriesContains(RaftLog log, long startIndex, long endIndex, S
log.get(termIndices[idxEntries].getIndex()).getStateMachineLogEntry().getLogData().toByteArray())) {
++idxExpected;
}
} catch (IOException e) {
throw new RuntimeException(e);
} catch (Exception e) {
throw new IllegalStateException("Failed startIndex=" + startIndex
+ ", endIndex=" + endIndex
+ ", #expectedMessages=" + expectedMessages.length
+ ", log=" + log, e);
}
++idxEntries;
}
Expand Down

0 comments on commit 23af8ed

Please sign in to comment.