diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java index f96e34e4cb..c51464f9e9 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java @@ -37,13 +37,13 @@ import java.io.File; import java.io.IOException; -import java.util.ArrayList; import java.util.Comparator; -import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentNavigableMap; +import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -109,6 +109,44 @@ long getOffset() { } } + private static class Records { + private final ConcurrentNavigableMap map = new ConcurrentSkipListMap<>(); + + int size() { + return map.size(); + } + + LogRecord getFirst() { + final Map.Entry first = map.firstEntry(); + return first != null? first.getValue() : null; + } + + LogRecord getLast() { + final Map.Entry last = map.lastEntry(); + return last != null? last.getValue() : null; + } + + LogRecord get(long i) { + return map.get(i); + } + + long append(LogRecord record) { + final long index = record.getTermIndex().getIndex(); + final LogRecord previous = map.put(index, record); + Preconditions.assertNull(previous, "previous"); + return index; + } + + LogRecord removeLast() { + final Map.Entry last = map.pollLastEntry(); + return Objects.requireNonNull(last, "last == null").getValue(); + } + + void clear() { + map.clear(); + } + } + static LogSegment newOpenSegment(RaftStorage storage, long start, SizeInBytes maxOpSize, SegmentedRaftLogMetrics raftLogMetrics) { Preconditions.assertTrue(start >= 0); @@ -207,10 +245,12 @@ private void assertSegment(long expectedStart, int expectedEntryCount, boolean c final long expectedLastIndex = expectedStart + expectedEntryCount - 1; Preconditions.assertSame(expectedLastIndex, getEndIndex(), "Segment end index"); - final LogRecord last = getLastRecord(); + final LogRecord last = records.getLast(); if (last != null) { Preconditions.assertSame(expectedLastIndex, last.getTermIndex().getIndex(), "Index at the last record"); - Preconditions.assertSame(expectedStart, records.get(0).getTermIndex().getIndex(), "Index at the first record"); + final LogRecord first = records.getFirst(); + Objects.requireNonNull(first, "first record"); + Preconditions.assertSame(expectedStart, first.getTermIndex().getIndex(), "Index at the first record"); } if (!corrupted) { Preconditions.assertSame(expectedEnd, expectedLastIndex, "End/last Index"); @@ -306,7 +346,7 @@ File getFile() { /** * the list of records is more like the index of a segment */ - private final List records = new ArrayList<>(); + private final Records records = new Records(); /** * the entryCache caches the content of log entries. */ @@ -366,20 +406,18 @@ private void append(Op op, ReferenceCountedObject entryRef, private LogRecord appendLogRecord(Op op, LogEntryProto entry) { Objects.requireNonNull(entry, "entry == null"); - if (records.isEmpty()) { + final LogRecord currentLast = records.getLast(); + if (currentLast == null) { Preconditions.assertTrue(entry.getIndex() == startIndex, "gap between start index %s and first entry to append %s", startIndex, entry.getIndex()); - } - - final LogRecord currentLast = getLastRecord(); - if (currentLast != null) { + } else { Preconditions.assertTrue(entry.getIndex() == currentLast.getTermIndex().getIndex() + 1, "gap between entries %s and %s", entry.getIndex(), currentLast.getTermIndex().getIndex()); } final LogRecord record = new LogRecord(totalFileSize, entry); - records.add(record); + records.append(record); totalFileSize += getEntrySize(entry, op); endIndex = entry.getIndex(); return record; @@ -406,17 +444,13 @@ synchronized ReferenceCountedObject loadCache(LogRecord record) t LogRecord getLogRecord(long index) { if (index >= startIndex && index <= endIndex) { - return records.get(Math.toIntExact(index - startIndex)); + return records.get(index); } return null; } - private LogRecord getLastRecord() { - return records.isEmpty() ? null : records.get(records.size() - 1); - } - TermIndex getLastTermIndex() { - LogRecord last = getLastRecord(); + final LogRecord last = records.getLast(); return last == null ? null : last.getTermIndex(); } @@ -434,7 +468,8 @@ long getTotalCacheSize() { synchronized void truncate(long fromIndex) { Preconditions.assertTrue(fromIndex >= startIndex && fromIndex <= endIndex); for (long index = endIndex; index >= fromIndex; index--) { - LogRecord removed = records.remove(Math.toIntExact(index - startIndex)); + final LogRecord removed = records.removeLast(); + Preconditions.assertSame(index, removed.getTermIndex().getIndex(), "removedIndex"); removeEntryCache(removed.getTermIndex()); totalFileSize = removed.offset; } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java index 79f0380ee2..44b9c7599b 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java @@ -296,23 +296,19 @@ public LogEntryProto get(long index) throws RaftLogIOException { @Override public ReferenceCountedObject retainLog(long index) throws RaftLogIOException { checkLogState(); - final LogSegment segment; - final LogRecord record; - try (AutoCloseableLock readLock = readLock()) { - segment = cache.getSegment(index); - if (segment == null) { - return null; - } - record = segment.getLogRecord(index); - if (record == null) { - return null; - } - final ReferenceCountedObject entry = segment.getEntryFromCache(record.getTermIndex()); - if (entry != null) { - getRaftLogMetrics().onRaftLogCacheHit(); - entry.retain(); - return entry; - } + final LogSegment segment = cache.getSegment(index); + if (segment == null) { + return null; + } + final LogRecord record = segment.getLogRecord(index); + if (record == null) { + return null; + } + final ReferenceCountedObject entry = segment.getEntryFromCache(record.getTermIndex()); + if (entry != null) { + getRaftLogMetrics().onRaftLogCacheHit(); + entry.retain(); + return entry; } // the entry is not in the segment's cache. Load the cache without holding the lock. @@ -369,26 +365,19 @@ private void checkAndEvictCache() { @Override public TermIndex getTermIndex(long index) { checkLogState(); - try(AutoCloseableLock readLock = readLock()) { - LogRecord record = cache.getLogRecord(index); - return record != null ? record.getTermIndex() : null; - } + return cache.getTermIndex(index); } @Override public LogEntryHeader[] getEntries(long startIndex, long endIndex) { checkLogState(); - try(AutoCloseableLock readLock = readLock()) { - return cache.getTermIndices(startIndex, endIndex); - } + return cache.getTermIndices(startIndex, endIndex); } @Override public TermIndex getLastEntryTermIndex() { checkLogState(); - try(AutoCloseableLock readLock = readLock()) { - return cache.getLastTermIndex(); - } + return cache.getLastTermIndex(); } @Override diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java index ad16332326..8b194bbc92 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java @@ -523,16 +523,21 @@ void rollOpenSegment(boolean createNewOpen) { } LogSegment getSegment(long index) { - if (openSegment != null && index >= openSegment.getStartIndex()) { - return openSegment; + final LogSegment open = this.openSegment; + if (open != null && index >= open.getStartIndex()) { + return open; } else { return closedSegments.search(index); } } - LogRecord getLogRecord(long index) { + TermIndex getTermIndex(long index) { LogSegment segment = getSegment(index); - return segment == null ? null : segment.getLogRecord(index); + if (segment == null) { + return null; + } + final LogRecord record = segment.getLogRecord(index); + return record != null ? record.getTermIndex() : null; } /** diff --git a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java index 9f0df2fabc..7bc4954388 100644 --- a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java +++ b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java @@ -20,7 +20,6 @@ import static org.apache.ratis.server.metrics.SegmentedRaftLogMetrics.*; import static org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogTestUtils.MAX_OP_SIZE; -import java.io.IOException; import java.util.Iterator; import java.util.stream.IntStream; @@ -286,12 +285,12 @@ private void populatedSegment(int start, int end, int segmentSize, boolean isOpe }); } - private void testIterator(long startIndex) throws IOException { + private void testIterator(long startIndex) { Iterator iterator = cache.iterator(startIndex); TermIndex prev = null; while (iterator.hasNext()) { TermIndex termIndex = iterator.next(); - Assertions.assertEquals(cache.getLogRecord(termIndex.getIndex()).getTermIndex(), termIndex); + Assertions.assertEquals(cache.getTermIndex(termIndex.getIndex()), termIndex); if (prev != null) { Assertions.assertEquals(prev.getIndex() + 1, termIndex.getIndex()); }