Skip to content

Commit

Permalink
RATIS-1881. In LeaderStateImpl, set placeHolderIndex from the log. (#912
Browse files Browse the repository at this point in the history
)
  • Loading branch information
szetszwo authored Sep 1, 2023
1 parent b06f82a commit 1b54bfa
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.apache.ratis.util.CollectionUtils;
import org.apache.ratis.util.Daemon;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.MemoizedSupplier;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.TimeDuration;
import org.apache.ratis.util.Timestamp;
Expand Down Expand Up @@ -288,6 +289,31 @@ private List<FollowerInfo> getFollowerInfos(PeerConfiguration peers) {
}
}

private class StartupLogEntry {
/** The log index at leader startup. */
private final long startIndex = appendConfiguration(RaftConfigurationImpl.newBuilder()
.setConf(server.getRaftConf().getConf())
.setLogEntryIndex(raftLog.getNextIndex())
.build());
/** This future will be completed after the log entry is applied. */
private final CompletableFuture<Long> appliedIndexFuture = new CompletableFuture<>();

boolean isApplied(LogEntryProto logEntry) {
if (appliedIndexFuture.isDone()) {
return true;
}
final long appliedIndex = logEntry != null? logEntry.getIndex(): server.getState().getLastAppliedIndex();
if (appliedIndex >= startIndex) {
appliedIndexFuture.complete(appliedIndex);
LOG.info("leader is ready since appliedIndex == {} >= startIndex == {}",
appliedIndex, startIndex);
return true;
} else {
return false;
}
}
}

private final StateUpdateEvent updateCommitEvent =
new StateUpdateEvent(StateUpdateEvent.Type.UPDATE_COMMIT, -1, this::updateCommit);
private final StateUpdateEvent checkStagingEvent =
Expand All @@ -311,11 +337,11 @@ private List<FollowerInfo> getFollowerInfos(PeerConfiguration peers) {
private final PendingRequests pendingRequests;
private final WatchRequests watchRequests;
private final MessageStreamRequests messageStreamRequests;

private final MemoizedSupplier<StartupLogEntry> startupLogEntry = MemoizedSupplier.valueOf(StartupLogEntry::new);
private final AtomicBoolean isStopped = new AtomicBoolean();

private final int stagingCatchupGap;
private final long placeHolderIndex;
private final AtomicBoolean isReady = new AtomicBoolean();
private final RaftServerMetricsImpl raftServerMetrics;
private final LogAppenderMetrics logAppenderMetrics;
private final long followerMaxGapThreshold;
Expand Down Expand Up @@ -357,38 +383,36 @@ private List<FollowerInfo> getFollowerInfos(PeerConfiguration peers) {

final RaftConfigurationImpl conf = state.getRaftConf();
Collection<RaftPeer> others = conf.getOtherPeers(server.getId());
placeHolderIndex = raftLog.getNextIndex();

final long nextIndex = raftLog.getNextIndex();
senders = new SenderList();
addSenders(others, placeHolderIndex, true);
addSenders(others, nextIndex, true);

final Collection<RaftPeer> listeners = conf.getAllPeers(RaftPeerRole.LISTENER);
if (!listeners.isEmpty()) {
addSenders(listeners, placeHolderIndex, true);
addSenders(listeners, nextIndex, true);
}
}

LogEntryProto start() {
void start() {
// In the beginning of the new term, replicate a conf entry in order
// to finally commit entries in the previous term.
// Also this message can help identify the last committed index and the conf.
final LogEntryProto placeHolder = LogProtoUtils.toLogEntryProto(
server.getRaftConf(), server.getState().getCurrentTerm(), raftLog.getNextIndex());
CodeInjectionForTesting.execute(APPEND_PLACEHOLDER,
server.getId().toString(), null);
raftLog.append(Collections.singletonList(placeHolder));
// Initialize startup log entry and append it to the RaftLog
startupLogEntry.get();
processor.start();
senders.forEach(LogAppender::start);
return placeHolder;
}

boolean isReady() {
return isReady.get();
return startupLogEntry.isInitialized() && startupLogEntry.get().isApplied(null);
}

void checkReady(LogEntryProto entry) {
if (entry.getTerm() == currentTerm && entry.getIndex() == placeHolderIndex) {
isReady.set(true);
Preconditions.assertTrue(startupLogEntry.isInitialized());
if (entry.getTerm() == getCurrentTerm() && startupLogEntry.get().isApplied(entry)) {
server.getStateMachine().leaderEvent().notifyLeaderReady();
}
}
Expand Down Expand Up @@ -427,13 +451,10 @@ boolean inStagingState() {
}

long getCurrentTerm() {
Preconditions.assertSame(currentTerm, server.getState().getCurrentTerm(), "currentTerm");
return currentTerm;
}

TermIndex getLastEntry() {
return server.getState().getLastEntry();
}

@Override
public boolean onFollowerTerm(FollowerInfo follower, long followerTerm) {
if (isCaughtUp(follower) && followerTerm > getCurrentTerm()) {
Expand Down Expand Up @@ -546,16 +567,17 @@ private void applyOldNewConf() {
final RaftConfigurationImpl current = state.getRaftConf();
final RaftConfigurationImpl oldNewConf= stagingState.generateOldNewConf(current, state.getLog().getNextIndex());
// apply the (old, new) configuration to log, and use it as the current conf
long index = state.getLog().append(state.getCurrentTerm(), oldNewConf);
updateConfiguration(index, oldNewConf);
appendConfiguration(oldNewConf);

this.stagingState = null;
notifySenders();
}

private void updateConfiguration(long logIndex, RaftConfigurationImpl newConf) {
Preconditions.assertTrue(logIndex == newConf.getLogEntryIndex());
server.getState().setRaftConf(newConf);
private long appendConfiguration(RaftConfigurationImpl conf) {
final long logIndex = raftLog.append(getCurrentTerm(), conf);
Preconditions.assertSame(conf.getLogEntryIndex(), logIndex, "confLogIndex");
server.getState().setRaftConf(conf);
return logIndex;
}

void updateFollowerCommitInfos(CommitInfoCache cache, List<CommitInfoProto> protos) {
Expand Down Expand Up @@ -958,8 +980,7 @@ private void replicateNewConf() {
.build();
// stop the LogAppender if the corresponding follower and listener is no longer in the conf
updateSenders(newConf);
long index = raftLog.append(server.getState().getCurrentTerm(), newConf);
updateConfiguration(index, newConf);
appendConfiguration(newConf);
notifySenders();
}

Expand Down Expand Up @@ -1009,7 +1030,7 @@ private void checkPeersForYieldingLeader() {
highestPriorityInfos.add(logAppender);
}
}
final TermIndex leaderLastEntry = getLastEntry();
final TermIndex leaderLastEntry = server.getState().getLastEntry();
final LogAppender appender = chooseUpToDateFollower(highestPriorityInfos, leaderLastEntry);
if (appender != null) {
server.getTransferLeadership().start(appender);
Expand Down Expand Up @@ -1077,7 +1098,7 @@ CompletableFuture<Long> getReadIndex() {
}

// leader has not committed any entries in this term, reject
if (server.getRaftLog().getTermIndex(readIndex).getTerm() != server.getState().getCurrentTerm()) {
if (server.getRaftLog().getTermIndex(readIndex).getTerm() != getCurrentTerm()) {
return JavaUtils.completeExceptionally(new ReadIndexException(
"Failed to getReadIndex " + readIndex + " since the term is not yet committed.",
new LeaderNotReadyException(server.getMemberId())));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -607,8 +607,7 @@ synchronized void changeToLeader() {
state.becomeLeader();

// start sending AppendEntries RPC to followers
final LogEntryProto e = leader.start();
getState().setRaftConf(e);
leader.start();
}

@Override
Expand Down

0 comments on commit 1b54bfa

Please sign in to comment.