Skip to content

Commit

Permalink
RATIS-2129. Low replication performance because LogAppender is often …
Browse files Browse the repository at this point in the history
…blocked by RaftLog's readLock. (apache#1141)
  • Loading branch information
szetszwo authored Aug 30, 2024
1 parent dfed101 commit 781d61d
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,13 @@

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -109,6 +109,44 @@ long getOffset() {
}
}

private static class Records {
private final ConcurrentNavigableMap<Long, LogRecord> map = new ConcurrentSkipListMap<>();

int size() {
return map.size();
}

LogRecord getFirst() {
final Map.Entry<Long, LogRecord> first = map.firstEntry();
return first != null? first.getValue() : null;
}

LogRecord getLast() {
final Map.Entry<Long, LogRecord> last = map.lastEntry();
return last != null? last.getValue() : null;
}

LogRecord get(long i) {
return map.get(i);
}

long append(LogRecord record) {
final long index = record.getTermIndex().getIndex();
final LogRecord previous = map.put(index, record);
Preconditions.assertNull(previous, "previous");
return index;
}

LogRecord removeLast() {
final Map.Entry<Long, LogRecord> last = map.pollLastEntry();
return Objects.requireNonNull(last, "last == null").getValue();
}

void clear() {
map.clear();
}
}

static LogSegment newOpenSegment(RaftStorage storage, long start, SizeInBytes maxOpSize,
SegmentedRaftLogMetrics raftLogMetrics) {
Preconditions.assertTrue(start >= 0);
Expand Down Expand Up @@ -207,10 +245,12 @@ private void assertSegment(long expectedStart, int expectedEntryCount, boolean c
final long expectedLastIndex = expectedStart + expectedEntryCount - 1;
Preconditions.assertSame(expectedLastIndex, getEndIndex(), "Segment end index");

final LogRecord last = getLastRecord();
final LogRecord last = records.getLast();
if (last != null) {
Preconditions.assertSame(expectedLastIndex, last.getTermIndex().getIndex(), "Index at the last record");
Preconditions.assertSame(expectedStart, records.get(0).getTermIndex().getIndex(), "Index at the first record");
final LogRecord first = records.getFirst();
Objects.requireNonNull(first, "first record");
Preconditions.assertSame(expectedStart, first.getTermIndex().getIndex(), "Index at the first record");
}
if (!corrupted) {
Preconditions.assertSame(expectedEnd, expectedLastIndex, "End/last Index");
Expand Down Expand Up @@ -306,7 +346,7 @@ File getFile() {
/**
* the list of records is more like the index of a segment
*/
private final List<LogRecord> records = new ArrayList<>();
private final Records records = new Records();
/**
* the entryCache caches the content of log entries.
*/
Expand Down Expand Up @@ -366,20 +406,18 @@ private void append(Op op, ReferenceCountedObject<LogEntryProto> entryRef,

private LogRecord appendLogRecord(Op op, LogEntryProto entry) {
Objects.requireNonNull(entry, "entry == null");
if (records.isEmpty()) {
final LogRecord currentLast = records.getLast();
if (currentLast == null) {
Preconditions.assertTrue(entry.getIndex() == startIndex,
"gap between start index %s and first entry to append %s",
startIndex, entry.getIndex());
}

final LogRecord currentLast = getLastRecord();
if (currentLast != null) {
} else {
Preconditions.assertTrue(entry.getIndex() == currentLast.getTermIndex().getIndex() + 1,
"gap between entries %s and %s", entry.getIndex(), currentLast.getTermIndex().getIndex());
}

final LogRecord record = new LogRecord(totalFileSize, entry);
records.add(record);
records.append(record);
totalFileSize += getEntrySize(entry, op);
endIndex = entry.getIndex();
return record;
Expand All @@ -406,17 +444,13 @@ synchronized ReferenceCountedObject<LogEntryProto> loadCache(LogRecord record) t

LogRecord getLogRecord(long index) {
if (index >= startIndex && index <= endIndex) {
return records.get(Math.toIntExact(index - startIndex));
return records.get(index);
}
return null;
}

private LogRecord getLastRecord() {
return records.isEmpty() ? null : records.get(records.size() - 1);
}

TermIndex getLastTermIndex() {
LogRecord last = getLastRecord();
final LogRecord last = records.getLast();
return last == null ? null : last.getTermIndex();
}

Expand All @@ -434,7 +468,8 @@ long getTotalCacheSize() {
synchronized void truncate(long fromIndex) {
Preconditions.assertTrue(fromIndex >= startIndex && fromIndex <= endIndex);
for (long index = endIndex; index >= fromIndex; index--) {
LogRecord removed = records.remove(Math.toIntExact(index - startIndex));
final LogRecord removed = records.removeLast();
Preconditions.assertSame(index, removed.getTermIndex().getIndex(), "removedIndex");
removeEntryCache(removed.getTermIndex());
totalFileSize = removed.offset;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,23 +296,19 @@ public LogEntryProto get(long index) throws RaftLogIOException {
@Override
public ReferenceCountedObject<LogEntryProto> retainLog(long index) throws RaftLogIOException {
checkLogState();
final LogSegment segment;
final LogRecord record;
try (AutoCloseableLock readLock = readLock()) {
segment = cache.getSegment(index);
if (segment == null) {
return null;
}
record = segment.getLogRecord(index);
if (record == null) {
return null;
}
final ReferenceCountedObject<LogEntryProto> entry = segment.getEntryFromCache(record.getTermIndex());
if (entry != null) {
getRaftLogMetrics().onRaftLogCacheHit();
entry.retain();
return entry;
}
final LogSegment segment = cache.getSegment(index);
if (segment == null) {
return null;
}
final LogRecord record = segment.getLogRecord(index);
if (record == null) {
return null;
}
final ReferenceCountedObject<LogEntryProto> entry = segment.getEntryFromCache(record.getTermIndex());
if (entry != null) {
getRaftLogMetrics().onRaftLogCacheHit();
entry.retain();
return entry;
}

// the entry is not in the segment's cache. Load the cache without holding the lock.
Expand Down Expand Up @@ -369,26 +365,19 @@ private void checkAndEvictCache() {
@Override
public TermIndex getTermIndex(long index) {
checkLogState();
try(AutoCloseableLock readLock = readLock()) {
LogRecord record = cache.getLogRecord(index);
return record != null ? record.getTermIndex() : null;
}
return cache.getTermIndex(index);
}

@Override
public LogEntryHeader[] getEntries(long startIndex, long endIndex) {
checkLogState();
try(AutoCloseableLock readLock = readLock()) {
return cache.getTermIndices(startIndex, endIndex);
}
return cache.getTermIndices(startIndex, endIndex);
}

@Override
public TermIndex getLastEntryTermIndex() {
checkLogState();
try(AutoCloseableLock readLock = readLock()) {
return cache.getLastTermIndex();
}
return cache.getLastTermIndex();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -523,16 +523,21 @@ void rollOpenSegment(boolean createNewOpen) {
}

LogSegment getSegment(long index) {
if (openSegment != null && index >= openSegment.getStartIndex()) {
return openSegment;
final LogSegment open = this.openSegment;
if (open != null && index >= open.getStartIndex()) {
return open;
} else {
return closedSegments.search(index);
}
}

LogRecord getLogRecord(long index) {
TermIndex getTermIndex(long index) {
LogSegment segment = getSegment(index);
return segment == null ? null : segment.getLogRecord(index);
if (segment == null) {
return null;
}
final LogRecord record = segment.getLogRecord(index);
return record != null ? record.getTermIndex() : null;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import static org.apache.ratis.server.metrics.SegmentedRaftLogMetrics.*;
import static org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogTestUtils.MAX_OP_SIZE;

import java.io.IOException;
import java.util.Iterator;
import java.util.stream.IntStream;

Expand Down Expand Up @@ -286,12 +285,12 @@ private void populatedSegment(int start, int end, int segmentSize, boolean isOpe
});
}

private void testIterator(long startIndex) throws IOException {
private void testIterator(long startIndex) {
Iterator<TermIndex> iterator = cache.iterator(startIndex);
TermIndex prev = null;
while (iterator.hasNext()) {
TermIndex termIndex = iterator.next();
Assertions.assertEquals(cache.getLogRecord(termIndex.getIndex()).getTermIndex(), termIndex);
Assertions.assertEquals(cache.getTermIndex(termIndex.getIndex()), termIndex);
if (prev != null) {
Assertions.assertEquals(prev.getIndex() + 1, termIndex.getIndex());
}
Expand Down

0 comments on commit 781d61d

Please sign in to comment.