Skip to content

Commit

Permalink
RATIS-2129. Low replication performance because GrpcLogAppender is of…
Browse files Browse the repository at this point in the history
…ten blocked by RaftLog's readLock.
  • Loading branch information
szetszwo committed Jul 31, 2024
1 parent 8906297 commit 19be677
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,13 @@

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -109,6 +109,44 @@ long getOffset() {
}
}

private static class Records {
private final ConcurrentNavigableMap<Long, LogRecord> map = new ConcurrentSkipListMap<>();

int size() {
return map.size();
}

LogRecord getFirst() {
final Map.Entry<Long, LogRecord> first = map.firstEntry();
return first != null? first.getValue() : null;
}

LogRecord getLast() {
final Map.Entry<Long, LogRecord> last = map.lastEntry();
return last != null? last.getValue() : null;
}

LogRecord get(long i) {
return map.get(i);
}

long append(LogRecord record) {
final long index = record.getTermIndex().getIndex();
final LogRecord previous = map.put(index, record);
Preconditions.assertNull(previous, "previous");
return index;
}

LogRecord removeLast() {
final Map.Entry<Long, LogRecord> last = map.pollLastEntry();
return Objects.requireNonNull(last, "last == null").getValue();
}

void clear() {
map.clear();
}
}

static LogSegment newOpenSegment(RaftStorage storage, long start, SizeInBytes maxOpSize,
SegmentedRaftLogMetrics raftLogMetrics) {
Preconditions.assertTrue(start >= 0);
Expand Down Expand Up @@ -210,7 +248,7 @@ private void assertSegment(long expectedStart, int expectedEntryCount, boolean c
final LogRecord last = getLastRecord();
if (last != null) {
Preconditions.assertSame(expectedLastIndex, last.getTermIndex().getIndex(), "Index at the last record");
Preconditions.assertSame(expectedStart, records.get(0).getTermIndex().getIndex(), "Index at the first record");
Preconditions.assertSame(expectedStart, records.getFirst().getTermIndex().getIndex(), "Index at the first record");
}
if (!corrupted) {
Preconditions.assertSame(expectedEnd, expectedLastIndex, "End/last Index");
Expand Down Expand Up @@ -306,7 +344,7 @@ File getFile() {
/**
* the list of records is more like the index of a segment
*/
private final List<LogRecord> records = new ArrayList<>();
private final Records records = new Records();
/**
* the entryCache caches the content of log entries.
*/
Expand Down Expand Up @@ -366,20 +404,18 @@ private void append(Op op, ReferenceCountedObject<LogEntryProto> entryRef,

private LogRecord appendLogRecord(Op op, LogEntryProto entry) {
Objects.requireNonNull(entry, "entry == null");
if (records.isEmpty()) {
final LogRecord currentLast = records.getLast();
if (currentLast == null) {
Preconditions.assertTrue(entry.getIndex() == startIndex,
"gap between start index %s and first entry to append %s",
startIndex, entry.getIndex());
}

final LogRecord currentLast = getLastRecord();
if (currentLast != null) {
} else {
Preconditions.assertTrue(entry.getIndex() == currentLast.getTermIndex().getIndex() + 1,
"gap between entries %s and %s", entry.getIndex(), currentLast.getTermIndex().getIndex());
}

final LogRecord record = new LogRecord(totalFileSize, entry);
records.add(record);
records.append(record);
totalFileSize += getEntrySize(entry, op);
endIndex = entry.getIndex();
return record;
Expand All @@ -406,13 +442,13 @@ synchronized LogEntryProto loadCache(LogRecord record) throws RaftLogIOException

LogRecord getLogRecord(long index) {
if (index >= startIndex && index <= endIndex) {
return records.get(Math.toIntExact(index - startIndex));
return records.get(index);
}
return null;
}

private LogRecord getLastRecord() {
return records.isEmpty() ? null : records.get(records.size() - 1);
return records.getLast();
}

TermIndex getLastTermIndex() {
Expand All @@ -434,7 +470,8 @@ long getTotalCacheSize() {
synchronized void truncate(long fromIndex) {
Preconditions.assertTrue(fromIndex >= startIndex && fromIndex <= endIndex);
for (long index = endIndex; index >= fromIndex; index--) {
LogRecord removed = records.remove(Math.toIntExact(index - startIndex));
final LogRecord removed = records.removeLast();
Preconditions.assertSame(index, removed.getTermIndex().getIndex(), "removedIndex");
removeEntryCache(removed.getTermIndex());
totalFileSize = removed.offset;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ public LogEntryProto get(long index) throws RaftLogIOException {
checkLogState();
final LogSegment segment;
final LogRecord record;
try (AutoCloseableLock readLock = readLock()) {
{
segment = cache.getSegment(index);
if (segment == null) {
return null;
Expand Down Expand Up @@ -339,7 +339,7 @@ private void checkAndEvictCache() {
@Override
public TermIndex getTermIndex(long index) {
checkLogState();
try(AutoCloseableLock readLock = readLock()) {
{
LogRecord record = cache.getLogRecord(index);
return record != null ? record.getTermIndex() : null;
}
Expand All @@ -348,15 +348,15 @@ public TermIndex getTermIndex(long index) {
@Override
public LogEntryHeader[] getEntries(long startIndex, long endIndex) {
checkLogState();
try(AutoCloseableLock readLock = readLock()) {
{
return cache.getTermIndices(startIndex, endIndex);
}
}

@Override
public TermIndex getLastEntryTermIndex() {
checkLogState();
try(AutoCloseableLock readLock = readLock()) {
{
return cache.getLastTermIndex();
}
}
Expand Down

0 comments on commit 19be677

Please sign in to comment.