Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RATIS-2129. Low replication performance because of lock contention on RaftLog #1127

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,13 @@

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -109,6 +109,44 @@ long getOffset() {
}
}

private static class Records {
private final ConcurrentNavigableMap<Long, LogRecord> map = new ConcurrentSkipListMap<>();

int size() {
return map.size();
}

LogRecord getFirst() {
final Map.Entry<Long, LogRecord> first = map.firstEntry();
return first != null? first.getValue() : null;
}

LogRecord getLast() {
final Map.Entry<Long, LogRecord> last = map.lastEntry();
return last != null? last.getValue() : null;
}

LogRecord get(long i) {
return map.get(i);
}

long append(LogRecord record) {
final long index = record.getTermIndex().getIndex();
final LogRecord previous = map.put(index, record);
Preconditions.assertNull(previous, "previous");
return index;
}

LogRecord removeLast() {
final Map.Entry<Long, LogRecord> last = map.pollLastEntry();
return Objects.requireNonNull(last, "last == null").getValue();
}

void clear() {
map.clear();
}
}

static LogSegment newOpenSegment(RaftStorage storage, long start, SizeInBytes maxOpSize,
SegmentedRaftLogMetrics raftLogMetrics) {
Preconditions.assertTrue(start >= 0);
Expand Down Expand Up @@ -207,10 +245,12 @@ private void assertSegment(long expectedStart, int expectedEntryCount, boolean c
final long expectedLastIndex = expectedStart + expectedEntryCount - 1;
Preconditions.assertSame(expectedLastIndex, getEndIndex(), "Segment end index");

final LogRecord last = getLastRecord();
final LogRecord last = records.getLast();
if (last != null) {
Preconditions.assertSame(expectedLastIndex, last.getTermIndex().getIndex(), "Index at the last record");
Preconditions.assertSame(expectedStart, records.get(0).getTermIndex().getIndex(), "Index at the first record");
final LogRecord first = records.getFirst();
Objects.requireNonNull(first, "first record");
Preconditions.assertSame(expectedStart, first.getTermIndex().getIndex(), "Index at the first record");
}
if (!corrupted) {
Preconditions.assertSame(expectedEnd, expectedLastIndex, "End/last Index");
Expand Down Expand Up @@ -306,7 +346,7 @@ File getFile() {
/**
* the list of records is more like the index of a segment
*/
private final List<LogRecord> records = new ArrayList<>();
private final Records records = new Records();
/**
* the entryCache caches the content of log entries.
*/
Expand Down Expand Up @@ -366,20 +406,18 @@ private void append(Op op, ReferenceCountedObject<LogEntryProto> entryRef,

private LogRecord appendLogRecord(Op op, LogEntryProto entry) {
Objects.requireNonNull(entry, "entry == null");
if (records.isEmpty()) {
final LogRecord currentLast = records.getLast();
if (currentLast == null) {
Preconditions.assertTrue(entry.getIndex() == startIndex,
"gap between start index %s and first entry to append %s",
startIndex, entry.getIndex());
}

final LogRecord currentLast = getLastRecord();
if (currentLast != null) {
} else {
Preconditions.assertTrue(entry.getIndex() == currentLast.getTermIndex().getIndex() + 1,
"gap between entries %s and %s", entry.getIndex(), currentLast.getTermIndex().getIndex());
}

final LogRecord record = new LogRecord(totalFileSize, entry);
records.add(record);
records.append(record);
totalFileSize += getEntrySize(entry, op);
endIndex = entry.getIndex();
return record;
Expand All @@ -406,17 +444,13 @@ synchronized LogEntryProto loadCache(LogRecord record) throws RaftLogIOException

LogRecord getLogRecord(long index) {
if (index >= startIndex && index <= endIndex) {
return records.get(Math.toIntExact(index - startIndex));
return records.get(index);
}
return null;
}

private LogRecord getLastRecord() {
return records.isEmpty() ? null : records.get(records.size() - 1);
}

TermIndex getLastTermIndex() {
LogRecord last = getLastRecord();
final LogRecord last = records.getLast();
return last == null ? null : last.getTermIndex();
}

Expand All @@ -434,7 +468,8 @@ long getTotalCacheSize() {
synchronized void truncate(long fromIndex) {
Preconditions.assertTrue(fromIndex >= startIndex && fromIndex <= endIndex);
for (long index = endIndex; index >= fromIndex; index--) {
LogRecord removed = records.remove(Math.toIntExact(index - startIndex));
final LogRecord removed = records.removeLast();
Preconditions.assertSame(index, removed.getTermIndex().getIndex(), "removedIndex");
removeEntryCache(removed.getTermIndex());
totalFileSize = removed.offset;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,22 +274,18 @@ private void loadLogSegments(long lastIndexInSnapshot,
@Override
public LogEntryProto get(long index) throws RaftLogIOException {
checkLogState();
final LogSegment segment;
final LogRecord record;
try (AutoCloseableLock readLock = readLock()) {
segment = cache.getSegment(index);
if (segment == null) {
return null;
}
record = segment.getLogRecord(index);
if (record == null) {
return null;
}
final LogEntryProto entry = segment.getEntryFromCache(record.getTermIndex());
if (entry != null) {
getRaftLogMetrics().onRaftLogCacheHit();
return entry;
}
final LogSegment segment = cache.getSegment(index);
if (segment == null) {
return null;
}
final LogRecord record = segment.getLogRecord(index);
if (record == null) {
return null;
}
final LogEntryProto entry = segment.getEntryFromCache(record.getTermIndex());
if (entry != null) {
getRaftLogMetrics().onRaftLogCacheHit();
return entry;
}

// the entry is not in the segment's cache. Load the cache without holding the lock.
Expand Down Expand Up @@ -339,26 +335,19 @@ private void checkAndEvictCache() {
@Override
public TermIndex getTermIndex(long index) {
checkLogState();
try(AutoCloseableLock readLock = readLock()) {
LogRecord record = cache.getLogRecord(index);
return record != null ? record.getTermIndex() : null;
}
return cache.getTermIndex(index);
}

@Override
public LogEntryHeader[] getEntries(long startIndex, long endIndex) {
checkLogState();
try(AutoCloseableLock readLock = readLock()) {
return cache.getTermIndices(startIndex, endIndex);
}
return cache.getTermIndices(startIndex, endIndex);
}

@Override
public TermIndex getLastEntryTermIndex() {
checkLogState();
try(AutoCloseableLock readLock = readLock()) {
return cache.getLastTermIndex();
}
return cache.getLastTermIndex();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -521,16 +521,21 @@ void rollOpenSegment(boolean createNewOpen) {
}

LogSegment getSegment(long index) {
if (openSegment != null && index >= openSegment.getStartIndex()) {
return openSegment;
final LogSegment open = this.openSegment;
if (open != null && index >= open.getStartIndex()) {
return open;
} else {
return closedSegments.search(index);
}
}

LogRecord getLogRecord(long index) {
TermIndex getTermIndex(long index) {
LogSegment segment = getSegment(index);
return segment == null ? null : segment.getLogRecord(index);
if (segment == null) {
return null;
}
final LogRecord record = segment.getLogRecord(index);
return record != null ? record.getTermIndex() : null;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import static org.apache.ratis.server.metrics.SegmentedRaftLogMetrics.*;
import static org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogTestUtils.MAX_OP_SIZE;

import java.io.IOException;
import java.util.Iterator;
import java.util.stream.IntStream;

Expand Down Expand Up @@ -286,12 +285,12 @@ private void populatedSegment(int start, int end, int segmentSize, boolean isOpe
});
}

private void testIterator(long startIndex) throws IOException {
private void testIterator(long startIndex) {
Iterator<TermIndex> iterator = cache.iterator(startIndex);
TermIndex prev = null;
while (iterator.hasNext()) {
TermIndex termIndex = iterator.next();
Assertions.assertEquals(cache.getLogRecord(termIndex.getIndex()).getTermIndex(), termIndex);
Assertions.assertEquals(cache.getTermIndex(termIndex.getIndex()), termIndex);
if (prev != null) {
Assertions.assertEquals(prev.getIndex() + 1, termIndex.getIndex());
}
Expand Down
Loading