Skip to content

Commit

Permalink
RATIS-2129. Low replication performance because LogAppender is often …
Browse files Browse the repository at this point in the history
…blocked by RaftLog's readLock. (apache#1141)
  • Loading branch information
szetszwo committed Sep 10, 2024
1 parent 342688d commit 76003e4
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -104,6 +104,44 @@ long getOffset() {
}
}

private static class Records {
private final ConcurrentNavigableMap<Long, LogRecord> map = new ConcurrentSkipListMap<>();

int size() {
return map.size();
}

LogRecord getFirst() {
final Map.Entry<Long, LogRecord> first = map.firstEntry();
return first != null? first.getValue() : null;
}

LogRecord getLast() {
final Map.Entry<Long, LogRecord> last = map.lastEntry();
return last != null? last.getValue() : null;
}

LogRecord get(long i) {
return map.get(i);
}

long append(LogRecord record) {
final long index = record.getTermIndex().getIndex();
final LogRecord previous = map.put(index, record);
Preconditions.assertNull(previous, "previous");
return index;
}

LogRecord removeLast() {
final Map.Entry<Long, LogRecord> last = map.pollLastEntry();
return Objects.requireNonNull(last, "last == null").getValue();
}

void clear() {
map.clear();
}
}

static LogSegment newOpenSegment(RaftStorage storage, long start, SizeInBytes maxOpSize,
SegmentedRaftLogMetrics raftLogMetrics) {
Preconditions.assertTrue(start >= 0);
Expand Down Expand Up @@ -203,10 +241,12 @@ private void assertSegment(long expectedStart, int expectedEntryCount, boolean c
final long expectedLastIndex = expectedStart + expectedEntryCount - 1;
Preconditions.assertSame(expectedLastIndex, getEndIndex(), "Segment end index");

final LogRecord last = getLastRecord();
final LogRecord last = records.getLast();
if (last != null) {
Preconditions.assertSame(expectedLastIndex, last.getTermIndex().getIndex(), "Index at the last record");
Preconditions.assertSame(expectedStart, records.get(0).getTermIndex().getIndex(), "Index at the first record");
final LogRecord first = records.getFirst();
Objects.requireNonNull(first, "first record");
Preconditions.assertSame(expectedStart, first.getTermIndex().getIndex(), "Index at the first record");
}
if (!corrupted) {
Preconditions.assertSame(expectedEnd, expectedLastIndex, "End/last Index");
Expand Down Expand Up @@ -267,7 +307,7 @@ File getFile() {
/**
* the list of records is more like the index of a segment
*/
private final List<LogRecord> records = new ArrayList<>();
private final Records records = new Records();
/**
* the entryCache caches the content of log entries.
*/
Expand Down Expand Up @@ -310,20 +350,18 @@ void appendToOpenSegment(LogEntryProto entry, Op op) {

private void append(boolean keepEntryInCache, LogEntryProto entry, Op op) {
Objects.requireNonNull(entry, "entry == null");
if (records.isEmpty()) {
final LogRecord currentLast = records.getLast();
if (currentLast == null) {
Preconditions.assertTrue(entry.getIndex() == startIndex,
"gap between start index %s and first entry to append %s",
startIndex, entry.getIndex());
}

final LogRecord currentLast = getLastRecord();
if (currentLast != null) {
} else {
Preconditions.assertTrue(entry.getIndex() == currentLast.getTermIndex().getIndex() + 1,
"gap between entries %s and %s", entry.getIndex(), currentLast.getTermIndex().getIndex());
}

final LogRecord record = new LogRecord(totalFileSize, entry);
records.add(record);
records.append(record);
if (keepEntryInCache) {
putEntryCache(record.getTermIndex(), entry, op);
}
Expand Down Expand Up @@ -352,17 +390,13 @@ synchronized LogEntryProto loadCache(LogRecord record) throws RaftLogIOException

LogRecord getLogRecord(long index) {
if (index >= startIndex && index <= endIndex) {
return records.get(Math.toIntExact(index - startIndex));
return records.get(index);
}
return null;
}

private LogRecord getLastRecord() {
return records.isEmpty() ? null : records.get(records.size() - 1);
}

TermIndex getLastTermIndex() {
LogRecord last = getLastRecord();
final LogRecord last = records.getLast();
return last == null ? null : last.getTermIndex();
}

Expand All @@ -380,7 +414,8 @@ long getTotalCacheSize() {
synchronized void truncate(long fromIndex) {
Preconditions.assertTrue(fromIndex >= startIndex && fromIndex <= endIndex);
for (long index = endIndex; index >= fromIndex; index--) {
LogRecord removed = records.remove(Math.toIntExact(index - startIndex));
final LogRecord removed = records.removeLast();
Preconditions.assertSame(index, removed.getTermIndex().getIndex(), "removedIndex");
removeEntryCache(removed.getTermIndex(), Op.REMOVE_CACHE);
totalFileSize = removed.offset;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -337,26 +337,19 @@ private void checkAndEvictCache() {
@Override
public TermIndex getTermIndex(long index) {
checkLogState();
try(AutoCloseableLock readLock = readLock()) {
LogRecord record = cache.getLogRecord(index);
return record != null ? record.getTermIndex() : null;
}
return cache.getTermIndex(index);
}

@Override
public LogEntryHeader[] getEntries(long startIndex, long endIndex) {
checkLogState();
try(AutoCloseableLock readLock = readLock()) {
return cache.getTermIndices(startIndex, endIndex);
}
return cache.getTermIndices(startIndex, endIndex);
}

@Override
public TermIndex getLastEntryTermIndex() {
checkLogState();
try(AutoCloseableLock readLock = readLock()) {
return cache.getLastTermIndex();
}
return cache.getLastTermIndex();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -520,16 +520,21 @@ void rollOpenSegment(boolean createNewOpen) {
}

LogSegment getSegment(long index) {
if (openSegment != null && index >= openSegment.getStartIndex()) {
return openSegment;
final LogSegment open = this.openSegment;
if (open != null && index >= open.getStartIndex()) {
return open;
} else {
return closedSegments.search(index);
}
}

LogRecord getLogRecord(long index) {
TermIndex getTermIndex(long index) {
LogSegment segment = getSegment(index);
return segment == null ? null : segment.getLogRecord(index);
if (segment == null) {
return null;
}
final LogRecord record = segment.getLogRecord(index);
return record != null ? record.getTermIndex() : null;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import static org.apache.ratis.server.metrics.SegmentedRaftLogMetrics.*;
import static org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogTestUtils.MAX_OP_SIZE;

import java.io.IOException;
import java.util.Iterator;
import java.util.stream.IntStream;

Expand Down Expand Up @@ -282,12 +281,12 @@ private void populatedSegment(int start, int end, int segmentSize, boolean isOpe
});
}

private void testIterator(long startIndex) throws IOException {
private void testIterator(long startIndex) {
Iterator<TermIndex> iterator = cache.iterator(startIndex);
TermIndex prev = null;
while (iterator.hasNext()) {
TermIndex termIndex = iterator.next();
Assertions.assertEquals(cache.getLogRecord(termIndex.getIndex()).getTermIndex(), termIndex);
Assertions.assertEquals(cache.getTermIndex(termIndex.getIndex()), termIndex);
if (prev != null) {
Assertions.assertEquals(prev.getIndex() + 1, termIndex.getIndex());
}
Expand Down

0 comments on commit 76003e4

Please sign in to comment.