diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java index 89a6e20501..a52b57511f 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java @@ -36,12 +36,12 @@ import java.io.File; import java.io.IOException; -import java.util.ArrayList; import java.util.Comparator; -import java.util.List; import java.util.Map; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentNavigableMap; +import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -104,6 +104,44 @@ long getOffset() { } } + private static class Records { + private final ConcurrentNavigableMap map = new ConcurrentSkipListMap<>(); + + int size() { + return map.size(); + } + + LogRecord getFirst() { + final Map.Entry first = map.firstEntry(); + return first != null? first.getValue() : null; + } + + LogRecord getLast() { + final Map.Entry last = map.lastEntry(); + return last != null? last.getValue() : null; + } + + LogRecord get(long i) { + return map.get(i); + } + + long append(LogRecord record) { + final long index = record.getTermIndex().getIndex(); + final LogRecord previous = map.put(index, record); + Preconditions.assertNull(previous, "previous"); + return index; + } + + LogRecord removeLast() { + final Map.Entry last = map.pollLastEntry(); + return Objects.requireNonNull(last, "last == null").getValue(); + } + + void clear() { + map.clear(); + } + } + static LogSegment newOpenSegment(RaftStorage storage, long start, SizeInBytes maxOpSize, SegmentedRaftLogMetrics raftLogMetrics) { Preconditions.assertTrue(start >= 0); @@ -203,10 +241,12 @@ private void assertSegment(long expectedStart, int expectedEntryCount, boolean c final long expectedLastIndex = expectedStart + expectedEntryCount - 1; Preconditions.assertSame(expectedLastIndex, getEndIndex(), "Segment end index"); - final LogRecord last = getLastRecord(); + final LogRecord last = records.getLast(); if (last != null) { Preconditions.assertSame(expectedLastIndex, last.getTermIndex().getIndex(), "Index at the last record"); - Preconditions.assertSame(expectedStart, records.get(0).getTermIndex().getIndex(), "Index at the first record"); + final LogRecord first = records.getFirst(); + Objects.requireNonNull(first, "first record"); + Preconditions.assertSame(expectedStart, first.getTermIndex().getIndex(), "Index at the first record"); } if (!corrupted) { Preconditions.assertSame(expectedEnd, expectedLastIndex, "End/last Index"); @@ -267,7 +307,7 @@ File getFile() { /** * the list of records is more like the index of a segment */ - private final List records = new ArrayList<>(); + private final Records records = new Records(); /** * the entryCache caches the content of log entries. */ @@ -310,20 +350,18 @@ void appendToOpenSegment(LogEntryProto entry, Op op) { private void append(boolean keepEntryInCache, LogEntryProto entry, Op op) { Objects.requireNonNull(entry, "entry == null"); - if (records.isEmpty()) { + final LogRecord currentLast = records.getLast(); + if (currentLast == null) { Preconditions.assertTrue(entry.getIndex() == startIndex, "gap between start index %s and first entry to append %s", startIndex, entry.getIndex()); - } - - final LogRecord currentLast = getLastRecord(); - if (currentLast != null) { + } else { Preconditions.assertTrue(entry.getIndex() == currentLast.getTermIndex().getIndex() + 1, "gap between entries %s and %s", entry.getIndex(), currentLast.getTermIndex().getIndex()); } final LogRecord record = new LogRecord(totalFileSize, entry); - records.add(record); + records.append(record); if (keepEntryInCache) { putEntryCache(record.getTermIndex(), entry, op); } @@ -352,17 +390,13 @@ synchronized LogEntryProto loadCache(LogRecord record) throws RaftLogIOException LogRecord getLogRecord(long index) { if (index >= startIndex && index <= endIndex) { - return records.get(Math.toIntExact(index - startIndex)); + return records.get(index); } return null; } - private LogRecord getLastRecord() { - return records.isEmpty() ? null : records.get(records.size() - 1); - } - TermIndex getLastTermIndex() { - LogRecord last = getLastRecord(); + final LogRecord last = records.getLast(); return last == null ? null : last.getTermIndex(); } @@ -380,7 +414,8 @@ long getTotalCacheSize() { synchronized void truncate(long fromIndex) { Preconditions.assertTrue(fromIndex >= startIndex && fromIndex <= endIndex); for (long index = endIndex; index >= fromIndex; index--) { - LogRecord removed = records.remove(Math.toIntExact(index - startIndex)); + final LogRecord removed = records.removeLast(); + Preconditions.assertSame(index, removed.getTermIndex().getIndex(), "removedIndex"); removeEntryCache(removed.getTermIndex(), Op.REMOVE_CACHE); totalFileSize = removed.offset; } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java index f49900f16a..59b97c80b4 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java @@ -337,26 +337,19 @@ private void checkAndEvictCache() { @Override public TermIndex getTermIndex(long index) { checkLogState(); - try(AutoCloseableLock readLock = readLock()) { - LogRecord record = cache.getLogRecord(index); - return record != null ? record.getTermIndex() : null; - } + return cache.getTermIndex(index); } @Override public LogEntryHeader[] getEntries(long startIndex, long endIndex) { checkLogState(); - try(AutoCloseableLock readLock = readLock()) { - return cache.getTermIndices(startIndex, endIndex); - } + return cache.getTermIndices(startIndex, endIndex); } @Override public TermIndex getLastEntryTermIndex() { checkLogState(); - try(AutoCloseableLock readLock = readLock()) { - return cache.getLastTermIndex(); - } + return cache.getLastTermIndex(); } @Override diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java index 58c70c4af9..5d66660834 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java @@ -520,16 +520,21 @@ void rollOpenSegment(boolean createNewOpen) { } LogSegment getSegment(long index) { - if (openSegment != null && index >= openSegment.getStartIndex()) { - return openSegment; + final LogSegment open = this.openSegment; + if (open != null && index >= open.getStartIndex()) { + return open; } else { return closedSegments.search(index); } } - LogRecord getLogRecord(long index) { + TermIndex getTermIndex(long index) { LogSegment segment = getSegment(index); - return segment == null ? null : segment.getLogRecord(index); + if (segment == null) { + return null; + } + final LogRecord record = segment.getLogRecord(index); + return record != null ? record.getTermIndex() : null; } /** diff --git a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java index 8015f18274..5937430f7d 100644 --- a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java +++ b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java @@ -20,7 +20,6 @@ import static org.apache.ratis.server.metrics.SegmentedRaftLogMetrics.*; import static org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogTestUtils.MAX_OP_SIZE; -import java.io.IOException; import java.util.Iterator; import java.util.stream.IntStream; @@ -282,12 +281,12 @@ private void populatedSegment(int start, int end, int segmentSize, boolean isOpe }); } - private void testIterator(long startIndex) throws IOException { + private void testIterator(long startIndex) { Iterator iterator = cache.iterator(startIndex); TermIndex prev = null; while (iterator.hasNext()) { TermIndex termIndex = iterator.next(); - Assertions.assertEquals(cache.getLogRecord(termIndex.getIndex()).getTermIndex(), termIndex); + Assertions.assertEquals(cache.getTermIndex(termIndex.getIndex()), termIndex); if (prev != null) { Assertions.assertEquals(prev.getIndex() + 1, termIndex.getIndex()); }