From 19be6777a651ac104d058d61117317f1b60f3b60 Mon Sep 17 00:00:00 2001 From: Tsz-Wo Nicholas Sze Date: Wed, 31 Jul 2024 11:10:41 -0700 Subject: [PATCH 1/3] RATIS-2129. Low replication performance because GrpcLogAppender is often blocked by RaftLog's readLock. --- .../server/raftlog/segmented/LogSegment.java | 63 +++++++++++++++---- .../raftlog/segmented/SegmentedRaftLog.java | 8 +-- 2 files changed, 54 insertions(+), 17 deletions(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java index 12e7c4f1d1..d2497f73f4 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java @@ -37,13 +37,13 @@ import java.io.File; import java.io.IOException; -import java.util.ArrayList; import java.util.Comparator; -import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentNavigableMap; +import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -109,6 +109,44 @@ long getOffset() { } } + private static class Records { + private final ConcurrentNavigableMap map = new ConcurrentSkipListMap<>(); + + int size() { + return map.size(); + } + + LogRecord getFirst() { + final Map.Entry first = map.firstEntry(); + return first != null? first.getValue() : null; + } + + LogRecord getLast() { + final Map.Entry last = map.lastEntry(); + return last != null? last.getValue() : null; + } + + LogRecord get(long i) { + return map.get(i); + } + + long append(LogRecord record) { + final long index = record.getTermIndex().getIndex(); + final LogRecord previous = map.put(index, record); + Preconditions.assertNull(previous, "previous"); + return index; + } + + LogRecord removeLast() { + final Map.Entry last = map.pollLastEntry(); + return Objects.requireNonNull(last, "last == null").getValue(); + } + + void clear() { + map.clear(); + } + } + static LogSegment newOpenSegment(RaftStorage storage, long start, SizeInBytes maxOpSize, SegmentedRaftLogMetrics raftLogMetrics) { Preconditions.assertTrue(start >= 0); @@ -210,7 +248,7 @@ private void assertSegment(long expectedStart, int expectedEntryCount, boolean c final LogRecord last = getLastRecord(); if (last != null) { Preconditions.assertSame(expectedLastIndex, last.getTermIndex().getIndex(), "Index at the last record"); - Preconditions.assertSame(expectedStart, records.get(0).getTermIndex().getIndex(), "Index at the first record"); + Preconditions.assertSame(expectedStart, records.getFirst().getTermIndex().getIndex(), "Index at the first record"); } if (!corrupted) { Preconditions.assertSame(expectedEnd, expectedLastIndex, "End/last Index"); @@ -306,7 +344,7 @@ File getFile() { /** * the list of records is more like the index of a segment */ - private final List records = new ArrayList<>(); + private final Records records = new Records(); /** * the entryCache caches the content of log entries. */ @@ -366,20 +404,18 @@ private void append(Op op, ReferenceCountedObject entryRef, private LogRecord appendLogRecord(Op op, LogEntryProto entry) { Objects.requireNonNull(entry, "entry == null"); - if (records.isEmpty()) { + final LogRecord currentLast = records.getLast(); + if (currentLast == null) { Preconditions.assertTrue(entry.getIndex() == startIndex, "gap between start index %s and first entry to append %s", startIndex, entry.getIndex()); - } - - final LogRecord currentLast = getLastRecord(); - if (currentLast != null) { + } else { Preconditions.assertTrue(entry.getIndex() == currentLast.getTermIndex().getIndex() + 1, "gap between entries %s and %s", entry.getIndex(), currentLast.getTermIndex().getIndex()); } final LogRecord record = new LogRecord(totalFileSize, entry); - records.add(record); + records.append(record); totalFileSize += getEntrySize(entry, op); endIndex = entry.getIndex(); return record; @@ -406,13 +442,13 @@ synchronized LogEntryProto loadCache(LogRecord record) throws RaftLogIOException LogRecord getLogRecord(long index) { if (index >= startIndex && index <= endIndex) { - return records.get(Math.toIntExact(index - startIndex)); + return records.get(index); } return null; } private LogRecord getLastRecord() { - return records.isEmpty() ? null : records.get(records.size() - 1); + return records.getLast(); } TermIndex getLastTermIndex() { @@ -434,7 +470,8 @@ long getTotalCacheSize() { synchronized void truncate(long fromIndex) { Preconditions.assertTrue(fromIndex >= startIndex && fromIndex <= endIndex); for (long index = endIndex; index >= fromIndex; index--) { - LogRecord removed = records.remove(Math.toIntExact(index - startIndex)); + final LogRecord removed = records.removeLast(); + Preconditions.assertSame(index, removed.getTermIndex().getIndex(), "removedIndex"); removeEntryCache(removed.getTermIndex()); totalFileSize = removed.offset; } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java index def472a607..aad23d7159 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java @@ -276,7 +276,7 @@ public LogEntryProto get(long index) throws RaftLogIOException { checkLogState(); final LogSegment segment; final LogRecord record; - try (AutoCloseableLock readLock = readLock()) { + { segment = cache.getSegment(index); if (segment == null) { return null; @@ -339,7 +339,7 @@ private void checkAndEvictCache() { @Override public TermIndex getTermIndex(long index) { checkLogState(); - try(AutoCloseableLock readLock = readLock()) { + { LogRecord record = cache.getLogRecord(index); return record != null ? record.getTermIndex() : null; } @@ -348,7 +348,7 @@ public TermIndex getTermIndex(long index) { @Override public LogEntryHeader[] getEntries(long startIndex, long endIndex) { checkLogState(); - try(AutoCloseableLock readLock = readLock()) { + { return cache.getTermIndices(startIndex, endIndex); } } @@ -356,7 +356,7 @@ public LogEntryHeader[] getEntries(long startIndex, long endIndex) { @Override public TermIndex getLastEntryTermIndex() { checkLogState(); - try(AutoCloseableLock readLock = readLock()) { + { return cache.getLastTermIndex(); } } From e4af20163144d0c9aa3d92721fa70470cbb99dee Mon Sep 17 00:00:00 2001 From: Tsz-Wo Nicholas Sze Date: Thu, 1 Aug 2024 08:14:28 -0700 Subject: [PATCH 2/3] Fix checkstyle. --- .../server/raftlog/segmented/LogSegment.java | 4 +- .../raftlog/segmented/SegmentedRaftLog.java | 42 +++++++------------ .../segmented/SegmentedRaftLogCache.java | 1 + 3 files changed, 20 insertions(+), 27 deletions(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java index d2497f73f4..a8505ad185 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java @@ -248,7 +248,9 @@ private void assertSegment(long expectedStart, int expectedEntryCount, boolean c final LogRecord last = getLastRecord(); if (last != null) { Preconditions.assertSame(expectedLastIndex, last.getTermIndex().getIndex(), "Index at the last record"); - Preconditions.assertSame(expectedStart, records.getFirst().getTermIndex().getIndex(), "Index at the first record"); + final LogRecord first = records.getFirst(); + Objects.requireNonNull(first, "first record"); + Preconditions.assertSame(expectedStart, first.getTermIndex().getIndex(), "Index at the first record"); } if (!corrupted) { Preconditions.assertSame(expectedEnd, expectedLastIndex, "End/last Index"); diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java index aad23d7159..1036952a4a 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java @@ -274,22 +274,18 @@ private void loadLogSegments(long lastIndexInSnapshot, @Override public LogEntryProto get(long index) throws RaftLogIOException { checkLogState(); - final LogSegment segment; - final LogRecord record; - { - segment = cache.getSegment(index); - if (segment == null) { - return null; - } - record = segment.getLogRecord(index); - if (record == null) { - return null; - } - final LogEntryProto entry = segment.getEntryFromCache(record.getTermIndex()); - if (entry != null) { - getRaftLogMetrics().onRaftLogCacheHit(); - return entry; - } + final LogSegment segment = cache.getSegment(index); + if (segment == null) { + return null; + } + final LogRecord record = segment.getLogRecord(index); + if (record == null) { + return null; + } + final LogEntryProto entry = segment.getEntryFromCache(record.getTermIndex()); + if (entry != null) { + getRaftLogMetrics().onRaftLogCacheHit(); + return entry; } // the entry is not in the segment's cache. Load the cache without holding the lock. @@ -339,26 +335,20 @@ private void checkAndEvictCache() { @Override public TermIndex getTermIndex(long index) { checkLogState(); - { - LogRecord record = cache.getLogRecord(index); - return record != null ? record.getTermIndex() : null; - } + final LogRecord record = cache.getLogRecord(index); + return record != null ? record.getTermIndex() : null; } @Override public LogEntryHeader[] getEntries(long startIndex, long endIndex) { checkLogState(); - { - return cache.getTermIndices(startIndex, endIndex); - } + return cache.getTermIndices(startIndex, endIndex); } @Override public TermIndex getLastEntryTermIndex() { checkLogState(); - { - return cache.getLastTermIndex(); - } + return cache.getLastTermIndex(); } @Override diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java index 0b05b14e5c..9370a19cdf 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java @@ -521,6 +521,7 @@ void rollOpenSegment(boolean createNewOpen) { } LogSegment getSegment(long index) { + final LogSegment openSegment = this.openSegment; if (openSegment != null && index >= openSegment.getStartIndex()) { return openSegment; } else { From 49a6ac6192a34bb85698fb53039abc145d007200 Mon Sep 17 00:00:00 2001 From: Tsz-Wo Nicholas Sze Date: Thu, 1 Aug 2024 08:54:46 -0700 Subject: [PATCH 3/3] Fix checkstyle and some minor changes. --- .../ratis/server/raftlog/segmented/LogSegment.java | 8 ++------ .../server/raftlog/segmented/SegmentedRaftLog.java | 3 +-- .../raftlog/segmented/SegmentedRaftLogCache.java | 14 +++++++++----- .../segmented/TestSegmentedRaftLogCache.java | 5 ++--- 4 files changed, 14 insertions(+), 16 deletions(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java index a8505ad185..79972cdfa1 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java @@ -245,7 +245,7 @@ private void assertSegment(long expectedStart, int expectedEntryCount, boolean c final long expectedLastIndex = expectedStart + expectedEntryCount - 1; Preconditions.assertSame(expectedLastIndex, getEndIndex(), "Segment end index"); - final LogRecord last = getLastRecord(); + final LogRecord last = records.getLast(); if (last != null) { Preconditions.assertSame(expectedLastIndex, last.getTermIndex().getIndex(), "Index at the last record"); final LogRecord first = records.getFirst(); @@ -449,12 +449,8 @@ LogRecord getLogRecord(long index) { return null; } - private LogRecord getLastRecord() { - return records.getLast(); - } - TermIndex getLastTermIndex() { - LogRecord last = getLastRecord(); + final LogRecord last = records.getLast(); return last == null ? null : last.getTermIndex(); } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java index 1036952a4a..6660e0a1a2 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java @@ -335,8 +335,7 @@ private void checkAndEvictCache() { @Override public TermIndex getTermIndex(long index) { checkLogState(); - final LogRecord record = cache.getLogRecord(index); - return record != null ? record.getTermIndex() : null; + return cache.getTermIndex(index); } @Override diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java index 9370a19cdf..89fea4a85c 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java @@ -521,17 +521,21 @@ void rollOpenSegment(boolean createNewOpen) { } LogSegment getSegment(long index) { - final LogSegment openSegment = this.openSegment; - if (openSegment != null && index >= openSegment.getStartIndex()) { - return openSegment; + final LogSegment open = this.openSegment; + if (open != null && index >= open.getStartIndex()) { + return open; } else { return closedSegments.search(index); } } - LogRecord getLogRecord(long index) { + TermIndex getTermIndex(long index) { LogSegment segment = getSegment(index); - return segment == null ? null : segment.getLogRecord(index); + if (segment == null) { + return null; + } + final LogRecord record = segment.getLogRecord(index); + return record != null ? record.getTermIndex() : null; } /** diff --git a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java index efcb90580f..0e3844ef07 100644 --- a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java +++ b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java @@ -20,7 +20,6 @@ import static org.apache.ratis.server.metrics.SegmentedRaftLogMetrics.*; import static org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogTestUtils.MAX_OP_SIZE; -import java.io.IOException; import java.util.Iterator; import java.util.stream.IntStream; @@ -286,12 +285,12 @@ private void populatedSegment(int start, int end, int segmentSize, boolean isOpe }); } - private void testIterator(long startIndex) throws IOException { + private void testIterator(long startIndex) { Iterator iterator = cache.iterator(startIndex); TermIndex prev = null; while (iterator.hasNext()) { TermIndex termIndex = iterator.next(); - Assertions.assertEquals(cache.getLogRecord(termIndex.getIndex()).getTermIndex(), termIndex); + Assertions.assertEquals(cache.getTermIndex(termIndex.getIndex()), termIndex); if (prev != null) { Assertions.assertEquals(prev.getIndex() + 1, termIndex.getIndex()); }