Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RATIS-1881. In LeaderStateImpl, set placeHolderIndex from the log. #912

Merged
merged 3 commits into from
Sep 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.apache.ratis.util.CollectionUtils;
import org.apache.ratis.util.Daemon;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.MemoizedSupplier;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.TimeDuration;
import org.apache.ratis.util.Timestamp;
Expand Down Expand Up @@ -288,6 +289,31 @@ private List<FollowerInfo> getFollowerInfos(PeerConfiguration peers) {
}
}

private class StartupLogEntry {
/** The log index at leader startup. */
private final long startIndex = appendConfiguration(RaftConfigurationImpl.newBuilder()
.setConf(server.getRaftConf().getConf())
.setLogEntryIndex(raftLog.getNextIndex())
.build());
/** This future will be completed after the log entry is applied. */
private final CompletableFuture<Long> appliedIndexFuture = new CompletableFuture<>();

boolean isApplied(LogEntryProto logEntry) {
if (appliedIndexFuture.isDone()) {
return true;
}
final long appliedIndex = logEntry != null? logEntry.getIndex(): server.getState().getLastAppliedIndex();
if (appliedIndex >= startIndex) {
appliedIndexFuture.complete(appliedIndex);
LOG.info("leader is ready since appliedIndex == {} >= startIndex == {}",
appliedIndex, startIndex);
return true;
} else {
return false;
}
}
}

private final StateUpdateEvent updateCommitEvent =
new StateUpdateEvent(StateUpdateEvent.Type.UPDATE_COMMIT, -1, this::updateCommit);
private final StateUpdateEvent checkStagingEvent =
Expand All @@ -311,11 +337,11 @@ private List<FollowerInfo> getFollowerInfos(PeerConfiguration peers) {
private final PendingRequests pendingRequests;
private final WatchRequests watchRequests;
private final MessageStreamRequests messageStreamRequests;

private final MemoizedSupplier<StartupLogEntry> startupLogEntry = MemoizedSupplier.valueOf(StartupLogEntry::new);
private final AtomicBoolean isStopped = new AtomicBoolean();

private final int stagingCatchupGap;
private final long placeHolderIndex;
private final AtomicBoolean isReady = new AtomicBoolean();
private final RaftServerMetricsImpl raftServerMetrics;
private final LogAppenderMetrics logAppenderMetrics;
private final long followerMaxGapThreshold;
Expand Down Expand Up @@ -357,38 +383,36 @@ private List<FollowerInfo> getFollowerInfos(PeerConfiguration peers) {

final RaftConfigurationImpl conf = state.getRaftConf();
Collection<RaftPeer> others = conf.getOtherPeers(server.getId());
placeHolderIndex = raftLog.getNextIndex();

final long nextIndex = raftLog.getNextIndex();
senders = new SenderList();
addSenders(others, placeHolderIndex, true);
addSenders(others, nextIndex, true);

final Collection<RaftPeer> listeners = conf.getAllPeers(RaftPeerRole.LISTENER);
if (!listeners.isEmpty()) {
addSenders(listeners, placeHolderIndex, true);
addSenders(listeners, nextIndex, true);
}
}

LogEntryProto start() {
void start() {
// In the beginning of the new term, replicate a conf entry in order
// to finally commit entries in the previous term.
// Also this message can help identify the last committed index and the conf.
final LogEntryProto placeHolder = LogProtoUtils.toLogEntryProto(
server.getRaftConf(), server.getState().getCurrentTerm(), raftLog.getNextIndex());
CodeInjectionForTesting.execute(APPEND_PLACEHOLDER,
server.getId().toString(), null);
raftLog.append(Collections.singletonList(placeHolder));
// Initialize startup log entry and append it to the RaftLog
startupLogEntry.get();
processor.start();
senders.forEach(LogAppender::start);
return placeHolder;
}

boolean isReady() {
return isReady.get();
return startupLogEntry.isInitialized() && startupLogEntry.get().isApplied(null);
}

void checkReady(LogEntryProto entry) {
if (entry.getTerm() == currentTerm && entry.getIndex() == placeHolderIndex) {
isReady.set(true);
Preconditions.assertTrue(startupLogEntry.isInitialized());
if (entry.getTerm() == getCurrentTerm() && startupLogEntry.get().isApplied(entry)) {
server.getStateMachine().leaderEvent().notifyLeaderReady();
}
}
Expand Down Expand Up @@ -427,13 +451,10 @@ boolean inStagingState() {
}

long getCurrentTerm() {
Preconditions.assertSame(currentTerm, server.getState().getCurrentTerm(), "currentTerm");
return currentTerm;
}

TermIndex getLastEntry() {
return server.getState().getLastEntry();
}

@Override
public boolean onFollowerTerm(FollowerInfo follower, long followerTerm) {
if (isCaughtUp(follower) && followerTerm > getCurrentTerm()) {
Expand Down Expand Up @@ -546,16 +567,17 @@ private void applyOldNewConf() {
final RaftConfigurationImpl current = state.getRaftConf();
final RaftConfigurationImpl oldNewConf= stagingState.generateOldNewConf(current, state.getLog().getNextIndex());
// apply the (old, new) configuration to log, and use it as the current conf
long index = state.getLog().append(state.getCurrentTerm(), oldNewConf);
updateConfiguration(index, oldNewConf);
appendConfiguration(oldNewConf);

this.stagingState = null;
notifySenders();
}

private void updateConfiguration(long logIndex, RaftConfigurationImpl newConf) {
Preconditions.assertTrue(logIndex == newConf.getLogEntryIndex());
server.getState().setRaftConf(newConf);
private long appendConfiguration(RaftConfigurationImpl conf) {
final long logIndex = raftLog.append(getCurrentTerm(), conf);
Preconditions.assertSame(conf.getLogEntryIndex(), logIndex, "confLogIndex");
server.getState().setRaftConf(conf);
return logIndex;
}

void updateFollowerCommitInfos(CommitInfoCache cache, List<CommitInfoProto> protos) {
Expand Down Expand Up @@ -958,8 +980,7 @@ private void replicateNewConf() {
.build();
// stop the LogAppender if the corresponding follower and listener is no longer in the conf
updateSenders(newConf);
long index = raftLog.append(server.getState().getCurrentTerm(), newConf);
updateConfiguration(index, newConf);
appendConfiguration(newConf);
notifySenders();
}

Expand Down Expand Up @@ -1009,7 +1030,7 @@ private void checkPeersForYieldingLeader() {
highestPriorityInfos.add(logAppender);
}
}
final TermIndex leaderLastEntry = getLastEntry();
final TermIndex leaderLastEntry = server.getState().getLastEntry();
final LogAppender appender = chooseUpToDateFollower(highestPriorityInfos, leaderLastEntry);
if (appender != null) {
server.getTransferLeadership().start(appender);
Expand Down Expand Up @@ -1077,7 +1098,7 @@ CompletableFuture<Long> getReadIndex() {
}

// leader has not committed any entries in this term, reject
if (server.getRaftLog().getTermIndex(readIndex).getTerm() != server.getState().getCurrentTerm()) {
if (server.getRaftLog().getTermIndex(readIndex).getTerm() != getCurrentTerm()) {
return JavaUtils.completeExceptionally(new ReadIndexException(
"Failed to getReadIndex " + readIndex + " since the term is not yet committed.",
new LeaderNotReadyException(server.getMemberId())));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -607,8 +607,7 @@ synchronized void changeToLeader() {
state.becomeLeader();

// start sending AppendEntries RPC to followers
final LogEntryProto e = leader.start();
getState().setRaftConf(e);
leader.start();
}

@Override
Expand Down
Loading