Skip to content

Commit

Permalink
RATIS-2028. Refactor RaftLog to supply log as ReferenceCountedObject (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
duongkame authored Mar 12, 2024
1 parent 99a833c commit e199daa
Show file tree
Hide file tree
Showing 14 changed files with 147 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.ratis.server.metrics.RaftLogMetrics;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.storage.RaftStorageMetadata;
import org.apache.ratis.util.ReferenceCountedObject;
import org.apache.ratis.util.TimeDuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -57,10 +58,24 @@ default boolean contains(TermIndex ti) {

/**
* @return null if the log entry is not found in this log;
* otherwise, return the log entry corresponding to the given index.
* otherwise, return a copy of the log entry corresponding to the given index.
* @deprecated use {@link RaftLog#retainLog(long)} instead in order to avoid copying.
*/
@Deprecated
LogEntryProto get(long index) throws RaftLogIOException;

/**
* @return a retained {@link ReferenceCountedObject} to the log entry corresponding to the given index if it exists;
* otherwise, return null.
* Since the returned reference is retained, the caller must call {@link ReferenceCountedObject#release()}}
* after use.
*/
default ReferenceCountedObject<LogEntryProto> retainLog(long index) throws RaftLogIOException {
ReferenceCountedObject<LogEntryProto> wrap = ReferenceCountedObject.wrap(get(index));
wrap.retain();
return wrap;
}

/**
* @return null if the log entry is not found in this log;
* otherwise, return the {@link EntryWithData} corresponding to the given index.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1800,7 +1800,9 @@ TransactionContext getTransactionContext(LogEntryProto entry, Boolean createNew)
MemoizedSupplier.valueOf(() -> stateMachine.startTransaction(entry, getInfo().getCurrentRole())));
}

CompletableFuture<Message> applyLogToStateMachine(LogEntryProto next) throws RaftLogIOException {
CompletableFuture<Message> applyLogToStateMachine(ReferenceCountedObject<LogEntryProto> nextRef)
throws RaftLogIOException {
LogEntryProto next = nextRef.get();
if (!next.hasStateMachineLogEntry()) {
stateMachine.event().notifyTermIndexUpdated(next.getTerm(), next.getIndex());
}
Expand All @@ -1815,11 +1817,7 @@ CompletableFuture<Message> applyLogToStateMachine(LogEntryProto next) throws Raf
TransactionContext trx = getTransactionContext(next, true);
final ClientInvocationId invocationId = ClientInvocationId.valueOf(next.getStateMachineLogEntry());
writeIndexCache.add(invocationId.getClientId(), ((TransactionContextImpl) trx).getLogIndexFuture());

// TODO: RaftLog to provide the log entry as a ReferenceCountedObject as per RATIS-2028.
ReferenceCountedObject<?> ref = ReferenceCountedObject.wrap(next);
((TransactionContextImpl) trx).setDelegatedRef(ref);
ref.retain();
((TransactionContextImpl) trx).setDelegatedRef(nextRef);
try {
// Let the StateMachine inject logic for committed transactions in sequential order.
trx = stateMachine.applyTransactionSerial(trx);
Expand All @@ -1828,8 +1826,6 @@ CompletableFuture<Message> applyLogToStateMachine(LogEntryProto next) throws Raf
return replyPendingRequest(invocationId, TermIndex.valueOf(next), stateMachineFuture);
} catch (Exception e) {
throw new RaftLogIOException(e);
} finally {
ref.release();
}
}
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,10 +235,17 @@ private MemoizedSupplier<List<CompletableFuture<Message>>> applyLog() throws Raf
final long committed = raftLog.getLastCommittedIndex();
for(long applied; (applied = getLastAppliedIndex()) < committed && state == State.RUNNING && !shouldStop(); ) {
final long nextIndex = applied + 1;
final LogEntryProto next = raftLog.get(nextIndex);
if (next != null) {
final ReferenceCountedObject<LogEntryProto> next = raftLog.retainLog(nextIndex);
if (next == null) {
LOG.debug("{}: logEntry {} is null. There may be snapshot to load. state:{}",
this, nextIndex, state);
break;
}

try {
final LogEntryProto entry = next.get();
if (LOG.isTraceEnabled()) {
LOG.trace("{}: applying nextIndex={}, nextLog={}", this, nextIndex, LogProtoUtils.toLogEntryString(next));
LOG.trace("{}: applying nextIndex={}, nextLog={}", this, nextIndex, LogProtoUtils.toLogEntryString(entry));
} else {
LOG.debug("{}: applying nextIndex={}", this, nextIndex);
}
Expand All @@ -252,10 +259,8 @@ private MemoizedSupplier<List<CompletableFuture<Message>>> applyLog() throws Raf
} else {
notifyAppliedIndex(incremented);
}
} else {
LOG.debug("{}: logEntry {} is null. There may be snapshot to load. state:{}",
this, nextIndex, state);
break;
} finally {
next.release();
}
}
return futures;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,13 +240,19 @@ private boolean shouldAppendMetadata(long newCommitIndex) {
//log neither lastMetadataEntry, nor entries with a smaller commit index.
return false;
}
ReferenceCountedObject<LogEntryProto> ref = null;
try {
if (get(newCommitIndex).hasMetadataEntry()) {
ref = retainLog(newCommitIndex);
if (ref.get().hasMetadataEntry()) {
// do not log the metadata entry
return false;
}
} catch(RaftLogIOException e) {
LOG.error("Failed to get log entry for index " + newCommitIndex, e);
} finally {
if (ref != null) {
ref.release();
}
}
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@
import org.apache.ratis.server.metrics.RaftLogMetricsBase;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.server.raftlog.LogProtoUtils;
import org.apache.ratis.server.raftlog.RaftLogBase;
import org.apache.ratis.server.raftlog.LogEntryHeader;
import org.apache.ratis.server.raftlog.RaftLogIOException;
import org.apache.ratis.server.storage.RaftStorageMetadata;
import org.apache.ratis.statemachine.TransactionContext;
import org.apache.ratis.util.AutoCloseableLock;
Expand All @@ -45,8 +47,13 @@ public class MemoryRaftLog extends RaftLogBase {
static class EntryList {
private final List<ReferenceCountedObject<LogEntryProto>> entries = new ArrayList<>();

ReferenceCountedObject<LogEntryProto> getRef(int i) {
return i >= 0 && i < entries.size() ? entries.get(i) : null;
}

LogEntryProto get(int i) {
return i >= 0 && i < entries.size() ? entries.get(i).get() : null;
final ReferenceCountedObject<LogEntryProto> ref = getRef(i);
return ref != null ? ref.get() : null;
}

TermIndex getTermIndex(int i) {
Expand Down Expand Up @@ -108,16 +115,34 @@ public RaftLogMetricsBase getRaftLogMetrics() {
}

@Override
public LogEntryProto get(long index) {
public LogEntryProto get(long index) throws RaftLogIOException {
final ReferenceCountedObject<LogEntryProto> ref = retainLog(index);
try {
return LogProtoUtils.copy(ref.get());
} finally {
ref.release();
}
}

@Override
public ReferenceCountedObject<LogEntryProto> retainLog(long index) {
checkLogState();
try(AutoCloseableLock readLock = readLock()) {
return entries.get(Math.toIntExact(index));
try (AutoCloseableLock readLock = readLock()) {
ReferenceCountedObject<LogEntryProto> ref = entries.getRef(Math.toIntExact(index));
ref.retain();
return ref;
}
}

@Override
public EntryWithData getEntryWithData(long index) {
return newEntryWithData(get(index), null);
// TODO. The reference counted object should be passed to LogAppender RATIS-2026.
ReferenceCountedObject<LogEntryProto> ref = retainLog(index);
try {
return newEntryWithData(ref.get(), null);
} finally {
ref.release();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,26 +224,27 @@ private void assertSegment(long expectedStart, int expectedEntryCount, boolean c
*
* In the future we can make the cache loader configurable if necessary.
*/
class LogEntryLoader extends CacheLoader<LogRecord, LogEntryProto> {
class LogEntryLoader extends CacheLoader<LogRecord, ReferenceCountedObject<LogEntryProto>> {
private final SegmentedRaftLogMetrics raftLogMetrics;

LogEntryLoader(SegmentedRaftLogMetrics raftLogMetrics) {
this.raftLogMetrics = raftLogMetrics;
}

@Override
public LogEntryProto load(LogRecord key) throws IOException {
public ReferenceCountedObject<LogEntryProto> load(LogRecord key) throws IOException {
final File file = getFile();
// note the loading should not exceed the endIndex: it is possible that
// the on-disk log file should be truncated but has not been done yet.
final AtomicReference<LogEntryProto> toReturn = new AtomicReference<>();
final AtomicReference<ReferenceCountedObject<LogEntryProto>> toReturn = new AtomicReference<>();
final LogSegmentStartEnd startEnd = LogSegmentStartEnd.valueOf(startIndex, endIndex, isOpen);
readSegmentFile(file, startEnd, maxOpSize, getLogCorruptionPolicy(), raftLogMetrics, entryRef -> {
final LogEntryProto entry = entryRef.retain();
final TermIndex ti = TermIndex.valueOf(entry);
putEntryCache(ti, entryRef, Op.LOAD_SEGMENT_FILE);
if (ti.equals(key.getTermIndex())) {
toReturn.set(entry);
entryRef.retain();
toReturn.set(entryRef);
}
entryRef.release();
});
Expand All @@ -260,10 +261,8 @@ long size() {
return size.get();
}

LogEntryProto get(TermIndex ti) {
return Optional.ofNullable(map.get(ti))
.map(ReferenceCountedObject::get)
.orElse(null);
ReferenceCountedObject<LogEntryProto> get(TermIndex ti) {
return map.get(ti);
}

void clear() {
Expand Down Expand Up @@ -386,15 +385,15 @@ private LogRecord appendLogRecord(Op op, LogEntryProto entry) {
return record;
}

LogEntryProto getEntryFromCache(TermIndex ti) {
ReferenceCountedObject<LogEntryProto> getEntryFromCache(TermIndex ti) {
return entryCache.get(ti);
}

/**
* Acquire LogSegment's monitor so that there is no concurrent loading.
*/
synchronized LogEntryProto loadCache(LogRecord record) throws RaftLogIOException {
LogEntryProto entry = entryCache.get(record.getTermIndex());
synchronized ReferenceCountedObject<LogEntryProto> loadCache(LogRecord record) throws RaftLogIOException {
ReferenceCountedObject<LogEntryProto> entry = entryCache.get(record.getTermIndex());
if (entry != null) {
return entry;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,11 +180,17 @@ public long getLastAppliedIndex() {

@Override
public void notifyTruncatedLogEntry(TermIndex ti) {
ReferenceCountedObject<LogEntryProto> ref = null;
try {
final LogEntryProto entry = get(ti.getIndex());
ref = retainLog(ti.getIndex());
final LogEntryProto entry = ref != null ? ref.get() : null;
notifyTruncatedLogEntry.accept(entry);
} catch (RaftLogIOException e) {
LOG.error("{}: Failed to read log {}", getName(), ti, e);
} finally {
if (ref != null) {
ref.release();
}
}
}

Expand Down Expand Up @@ -272,6 +278,19 @@ private void loadLogSegments(long lastIndexInSnapshot,

@Override
public LogEntryProto get(long index) throws RaftLogIOException {
final ReferenceCountedObject<LogEntryProto> ref = retainLog(index);
if (ref == null) {
return null;
}
try {
return LogProtoUtils.copy(ref.get());
} finally {
ref.release();
}
}

@Override
public ReferenceCountedObject<LogEntryProto> retainLog(long index) throws RaftLogIOException {
checkLogState();
final LogSegment segment;
final LogRecord record;
Expand All @@ -284,9 +303,10 @@ record = segment.getLogRecord(index);
if (record == null) {
return null;
}
final LogEntryProto entry = segment.getEntryFromCache(record.getTermIndex());
final ReferenceCountedObject<LogEntryProto> entry = segment.getEntryFromCache(record.getTermIndex());
if (entry != null) {
getRaftLogMetrics().onRaftLogCacheHit();
entry.retain();
return entry;
}
}
Expand All @@ -299,10 +319,19 @@ record = segment.getLogRecord(index);

@Override
public EntryWithData getEntryWithData(long index) throws RaftLogIOException {
final LogEntryProto entry = get(index);
if (entry == null) {
final ReferenceCountedObject<LogEntryProto> entryRef = retainLog(index);
if (entryRef == null) {
throw new RaftLogIOException("Log entry not found: index = " + index);
}
try {
// TODO. The reference counted object should be passed to LogAppender RATIS-2026.
return getEntryWithData(entryRef.get());
} finally {
entryRef.release();
}
}

private EntryWithData getEntryWithData(LogEntryProto entry) throws RaftLogIOException {
if (!LogProtoUtils.isStateMachineDataEmpty(entry)) {
return newEntryWithData(entry, null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,15 @@
import static org.apache.ratis.server.metrics.SegmentedRaftLogMetrics.RATIS_LOG_WORKER_METRICS;

import org.apache.ratis.metrics.RatisMetrics;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.raftlog.LogProtoUtils;
import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.server.raftlog.RaftLogBase;
import org.apache.ratis.server.raftlog.RaftLogIOException;
import org.apache.ratis.util.AutoCloseableLock;
import org.apache.ratis.util.ReferenceCountedObject;

import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -72,11 +75,22 @@ static void printLog(RaftLogBase log, Consumer<String> println) {
b.append(i == committed? 'c': ' ');
b.append(String.format("%3d: ", i));
try {
b.append(LogProtoUtils.toLogEntryString(log.get(i)));
b.append(LogProtoUtils.toLogEntryString(getLogUnsafe(log, i)));
} catch (RaftLogIOException e) {
b.append(e);
}
println.accept(b.toString());
}
}

static LogEntryProto getLogUnsafe(RaftLog log, long index) throws RaftLogIOException {
ReferenceCountedObject<LogEntryProto> ref = log.retainLog(index);
try {
return ref != null ? ref.get() : null;
} finally {
if (ref != null) {
ref.release();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.apache.ratis.server.impl.StateMachineMetrics.RATIS_STATEMACHINE_METRICS_DESC;
import static org.apache.ratis.server.impl.StateMachineMetrics.STATEMACHINE_TAKE_SNAPSHOT_TIMER;
import static org.apache.ratis.metrics.RatisMetrics.RATIS_APPLICATION_NAME_METRICS;
import static org.apache.ratis.server.storage.RaftStorageTestUtils.getLogUnsafe;

import org.apache.ratis.BaseTest;
import org.apache.ratis.metrics.LongCounter;
Expand All @@ -43,6 +44,7 @@
import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.server.raftlog.segmented.LogSegmentPath;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.server.storage.RaftStorageTestUtils;
import org.apache.ratis.statemachine.impl.SimpleStateMachine4Testing;
import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
import org.apache.ratis.util.FileUtils;
Expand Down Expand Up @@ -95,7 +97,7 @@ public static void assertLeaderContent(MiniRaftCluster cluster) throws Exception
public static void assertLogContent(RaftServer.Division server, boolean isLeader) throws Exception {
final RaftLog log = server.getRaftLog();
final long lastIndex = log.getLastEntryTermIndex().getIndex();
final LogEntryProto e = log.get(lastIndex);
final LogEntryProto e = getLogUnsafe(log, lastIndex);
Assert.assertTrue(e.hasMetadataEntry());

JavaUtils.attemptRepeatedly(() -> {
Expand Down
Loading

0 comments on commit e199daa

Please sign in to comment.