Skip to content

Commit

Permalink
RATIS-2129. Low replication performance because GrpcLogAppender is of…
Browse files Browse the repository at this point in the history
…ten blocked by RaftLog's readLock.
  • Loading branch information
szetszwo committed Jul 30, 2024
1 parent 8906297 commit 11bf37f
Show file tree
Hide file tree
Showing 4 changed files with 161 additions and 134 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,13 @@

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -109,17 +109,55 @@ long getOffset() {
}
}

private static class LogRecordList {
private final ConcurrentNavigableMap<Long, LogRecord> map = new ConcurrentSkipListMap<>();

int size() {
return map.size();
}

LogRecord getFirst() {
final Map.Entry<Long, LogRecord> first = map.firstEntry();
return first != null? first.getValue() : null;
}

LogRecord getLast() {
final Map.Entry<Long, LogRecord> last = map.lastEntry();
return last != null? last.getValue() : null;
}

LogRecord get(long i) {
return map.get(i);
}

long append(LogRecord record) {
final long index = record.getTermIndex().getIndex();
final LogRecord previous = map.put(index, record);
Preconditions.assertNull(previous, "previous");
return index;
}

LogRecord removeLast() {
final Map.Entry<Long, LogRecord> last = map.pollLastEntry();
return Objects.requireNonNull(last, "last == null").getValue();
}

void clear() {
map.clear();
}
}

static LogSegment newOpenSegment(RaftStorage storage, long start, SizeInBytes maxOpSize,
SegmentedRaftLogMetrics raftLogMetrics) {
Preconditions.assertTrue(start >= 0);
return new LogSegment(storage, true, start, start - 1, maxOpSize, raftLogMetrics);
return new LogSegment(storage, start, null, maxOpSize, raftLogMetrics);
}

@VisibleForTesting
static LogSegment newCloseSegment(RaftStorage storage,
long start, long end, SizeInBytes maxOpSize, SegmentedRaftLogMetrics raftLogMetrics) {
Preconditions.assertTrue(start >= 0 && end >= start);
return new LogSegment(storage, false, start, end, maxOpSize, raftLogMetrics);
return new LogSegment(storage, start, end, maxOpSize, raftLogMetrics);
}

static LogSegment newLogSegment(RaftStorage storage, LogSegmentStartEnd startEnd, SizeInBytes maxOpSize,
Expand Down Expand Up @@ -202,15 +240,19 @@ static LogSegment loadSegment(RaftStorage storage, File file, LogSegmentStartEnd

private void assertSegment(long expectedStart, int expectedEntryCount, boolean corrupted, long expectedEnd) {
Preconditions.assertSame(expectedStart, getStartIndex(), "Segment start index");
Preconditions.assertSame(expectedEntryCount, records.size(), "Number of records");
final int n = numOfEntries();
Preconditions.assertSame(expectedEntryCount, n, "Number of records");

final long expectedLastIndex = expectedStart + expectedEntryCount - 1;
Preconditions.assertSame(expectedLastIndex, getEndIndex(), "Segment end index");

final LogRecord last = getLastRecord();
if (last != null) {
if (n > 0) {
final LogRecord last = records.getLast();
Objects.requireNonNull(last, "last == null");
Preconditions.assertSame(expectedLastIndex, last.getTermIndex().getIndex(), "Index at the last record");
Preconditions.assertSame(expectedStart, records.get(0).getTermIndex().getIndex(), "Index at the first record");
final LogRecord first = records.getFirst();
Objects.requireNonNull(first, "first == null");
Preconditions.assertSame(expectedStart, first.getTermIndex().getIndex(), "Index at the first record");
}
if (!corrupted) {
Preconditions.assertSame(expectedEnd, expectedLastIndex, "End/last Index");
Expand All @@ -237,7 +279,7 @@ public LogEntryProto load(LogRecord key) throws IOException {
// note the loading should not exceed the endIndex: it is possible that
// the on-disk log file should be truncated but has not been done yet.
final AtomicReference<LogEntryProto> toReturn = new AtomicReference<>();
final LogSegmentStartEnd startEnd = LogSegmentStartEnd.valueOf(startIndex, endIndex, isOpen);
final LogSegmentStartEnd startEnd = LogSegmentStartEnd.valueOf(startIndex, getEndIndex(), isOpen);
readSegmentFile(file, startEnd, maxOpSize, getLogCorruptionPolicy(), raftLogMetrics, entryRef -> {
final LogEntryProto entry = entryRef.retain();
final TermIndex ti = TermIndex.valueOf(entry);
Expand Down Expand Up @@ -288,15 +330,15 @@ void remove(TermIndex key) {
}

File getFile() {
return LogSegmentStartEnd.valueOf(startIndex, endIndex, isOpen).getFile(storage);
return LogSegmentStartEnd.valueOf(startIndex, getEndIndex(), isOpen).getFile(storage);
}

private volatile boolean isOpen;
private long totalFileSize = SegmentedRaftLogFormat.getHeaderLength();
/** Segment start index, inclusive. */
private final long startIndex;
/** Segment end index, inclusive. */
private volatile long endIndex;
private final Long endIndex;
private final RaftStorage storage;
private final SizeInBytes maxOpSize;
private final LogEntryLoader cacheLoader;
Expand All @@ -306,18 +348,18 @@ File getFile() {
/**
* the list of records is more like the index of a segment
*/
private final List<LogRecord> records = new ArrayList<>();
private final LogRecordList records = new LogRecordList();
/**
* the entryCache caches the content of log entries.
*/
private final EntryCache entryCache = new EntryCache();

private LogSegment(RaftStorage storage, boolean isOpen, long start, long end, SizeInBytes maxOpSize,
private LogSegment(RaftStorage storage, long start, Long end, SizeInBytes maxOpSize,
SegmentedRaftLogMetrics raftLogMetrics) {
this.storage = storage;
this.isOpen = isOpen;
this.startIndex = start;
this.endIndex = end;
this.isOpen = end == null;
this.maxOpSize = maxOpSize;
this.cacheLoader = new LogEntryLoader(raftLogMetrics);
}
Expand All @@ -327,15 +369,19 @@ long getStartIndex() {
}

long getEndIndex() {
return endIndex;
if (endIndex != null) {
return endIndex;
}
final LogRecord last = records.getLast();
return last != null? last.getTermIndex().getIndex() : startIndex - 1;
}

boolean isOpen() {
return isOpen;
}

int numOfEntries() {
return Math.toIntExact(endIndex - startIndex + 1);
return records.size();
}

CorruptionPolicy getLogCorruptionPolicy() {
Expand Down Expand Up @@ -366,22 +412,19 @@ private void append(Op op, ReferenceCountedObject<LogEntryProto> entryRef,

private LogRecord appendLogRecord(Op op, LogEntryProto entry) {
Objects.requireNonNull(entry, "entry == null");
if (records.isEmpty()) {
final LogRecord currentLast = records.getLast();
if (currentLast == null) {
Preconditions.assertTrue(entry.getIndex() == startIndex,
"gap between start index %s and first entry to append %s",
startIndex, entry.getIndex());
}

final LogRecord currentLast = getLastRecord();
if (currentLast != null) {
} else {
Preconditions.assertTrue(entry.getIndex() == currentLast.getTermIndex().getIndex() + 1,
"gap between entries %s and %s", entry.getIndex(), currentLast.getTermIndex().getIndex());
}

final LogRecord record = new LogRecord(totalFileSize, entry);
records.add(record);
records.append(record);
totalFileSize += getEntrySize(entry, op);
endIndex = entry.getIndex();
return record;
}

Expand All @@ -405,18 +448,11 @@ synchronized LogEntryProto loadCache(LogRecord record) throws RaftLogIOException
}

LogRecord getLogRecord(long index) {
if (index >= startIndex && index <= endIndex) {
return records.get(Math.toIntExact(index - startIndex));
}
return null;
}

private LogRecord getLastRecord() {
return records.isEmpty() ? null : records.get(records.size() - 1);
return records.get(index);
}

TermIndex getLastTermIndex() {
LogRecord last = getLastRecord();
final LogRecord last = records.getLast();
return last == null ? null : last.getTermIndex();
}

Expand All @@ -432,14 +468,15 @@ long getTotalCacheSize() {
* Remove records from the given index (inclusive)
*/
synchronized void truncate(long fromIndex) {
Preconditions.assertTrue(fromIndex >= startIndex && fromIndex <= endIndex);
for (long index = endIndex; index >= fromIndex; index--) {
LogRecord removed = records.remove(Math.toIntExact(index - startIndex));
long index = getEndIndex();
Preconditions.assertTrue(fromIndex >= startIndex && fromIndex <= index);
for (; index >= fromIndex; index--) {
final LogRecord removed = records.removeLast();
Preconditions.assertSame(index, removed.getTermIndex().getIndex(), "removedIndex");
removeEntryCache(removed.getTermIndex());
totalFileSize = removed.offset;
}
isOpen = false;
this.endIndex = fromIndex - 1;
}

void close() {
Expand All @@ -450,7 +487,7 @@ void close() {
@Override
public String toString() {
return isOpen() ? "log_" + "inprogress_" + startIndex :
"log-" + startIndex + "_" + endIndex;
"log-" + startIndex + "_" + getEndIndex();
}

/** Comparator to find <code>index</code> in list of <code>LogSegment</code>s. */
Expand All @@ -471,7 +508,6 @@ private int compareTo(Long l) {
synchronized void clear() {
records.clear();
evictCache();
endIndex = startIndex - 1;
}

int getLoadingTimes() {
Expand All @@ -495,7 +531,7 @@ boolean hasCache() {
}

boolean containsIndex(long index) {
return startIndex <= index && endIndex >= index;
return startIndex <= index && getEndIndex() >= index;
}

boolean hasEntries() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ private void loadLogSegments(long lastIndexInSnapshot,
// not load the log to avoid holes between log segments. This may happen
// when the local I/O worker is too slow to persist log (slower than
// committing the log and taking snapshot)
if (!cache.isEmpty() && cache.getEndIndex() < lastIndexInSnapshot) {
if (cache.getNumOfSegments() > 0 && cache.getEndIndex() < lastIndexInSnapshot) {
LOG.warn("End log index {} is smaller than last index in snapshot {}",
cache.getEndIndex(), lastIndexInSnapshot);
purgeImpl(lastIndexInSnapshot).whenComplete((purged, e) -> updatePurgeIndex(purged));
Expand All @@ -274,22 +274,18 @@ private void loadLogSegments(long lastIndexInSnapshot,
@Override
public LogEntryProto get(long index) throws RaftLogIOException {
checkLogState();
final LogSegment segment;
final LogRecord record;
try (AutoCloseableLock readLock = readLock()) {
segment = cache.getSegment(index);
if (segment == null) {
return null;
}
record = segment.getLogRecord(index);
if (record == null) {
return null;
}
final LogEntryProto entry = segment.getEntryFromCache(record.getTermIndex());
if (entry != null) {
getRaftLogMetrics().onRaftLogCacheHit();
return entry;
}
final LogSegment segment = cache.getSegment(index);
if (segment == null) {
return null;
}
final LogRecord record = segment.getLogRecord(index);
if (record == null) {
return null;
}
final LogEntryProto entry = segment.getEntryFromCache(record.getTermIndex());
if (entry != null) {
getRaftLogMetrics().onRaftLogCacheHit();
return entry;
}

// the entry is not in the segment's cache. Load the cache without holding the lock.
Expand Down Expand Up @@ -339,26 +335,19 @@ private void checkAndEvictCache() {
@Override
public TermIndex getTermIndex(long index) {
checkLogState();
try(AutoCloseableLock readLock = readLock()) {
LogRecord record = cache.getLogRecord(index);
return record != null ? record.getTermIndex() : null;
}
return cache.getTermIndex(index);
}

@Override
public LogEntryHeader[] getEntries(long startIndex, long endIndex) {
checkLogState();
try(AutoCloseableLock readLock = readLock()) {
return cache.getTermIndices(startIndex, endIndex);
}
return cache.getTermIndices(startIndex, endIndex);
}

@Override
public TermIndex getLastEntryTermIndex() {
checkLogState();
try(AutoCloseableLock readLock = readLock()) {
return cache.getLastTermIndex();
}
return cache.getLastTermIndex();
}

@Override
Expand Down
Loading

0 comments on commit 11bf37f

Please sign in to comment.