Skip to content

Commit

Permalink
Use a future for the applied index.
Browse files Browse the repository at this point in the history
  • Loading branch information
szetszwo committed Aug 31, 2023
1 parent e214215 commit dca2a01
Showing 1 changed file with 11 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -289,30 +289,29 @@ private List<FollowerInfo> getFollowerInfos(PeerConfiguration peers) {
}
}

/** When Leader startup, append a log entry . */
private class StartupLogEntry {
/** The log index at leader startup. */
private final long startIndex = appendConfiguration(RaftConfigurationImpl.newBuilder()
.setConf(server.getRaftConf().getConf())
.setLogEntryIndex(raftLog.getNextIndex())
.build());
/** Is the log entry applied? */
private final AtomicBoolean applied = new AtomicBoolean();
/** This future will be completed after the log entry is applied. */
private final CompletableFuture<Long> appliedIndexFuture = new CompletableFuture<>();

boolean checkStartIndex(long appliedIndex) {
boolean isApplied(LogEntryProto logEntry) {
if (appliedIndexFuture.isDone()) {
return true;
}
final long appliedIndex = logEntry != null? logEntry.getIndex(): server.getState().getLastAppliedIndex();
if (appliedIndex >= startIndex) {
applied.set(true);
appliedIndexFuture.complete(appliedIndex);
LOG.info("leader is ready since appliedIndex == {} >= startIndex == {}",
appliedIndex, startIndex);
return true;
} else {
return false;
}
}

boolean isApplied() {
return applied.get() || checkStartIndex(server.getState().getLastAppliedIndex());
}
}

private final StateUpdateEvent updateCommitEvent =
Expand Down Expand Up @@ -401,19 +400,19 @@ void start() {
// Also this message can help identify the last committed index and the conf.
CodeInjectionForTesting.execute(APPEND_PLACEHOLDER,
server.getId().toString(), null);
// append the startup log entry to RaftLog
// Initialize startup log entry and append it to the RaftLog
startupLogEntry.get();
processor.start();
senders.forEach(LogAppender::start);
}

boolean isReady() {
return startupLogEntry.isInitialized() && startupLogEntry.get().isApplied();
return startupLogEntry.isInitialized() && startupLogEntry.get().isApplied(null);
}

void checkReady(LogEntryProto entry) {
Preconditions.assertTrue(startupLogEntry.isInitialized());
if (entry.getTerm() == getCurrentTerm() && startupLogEntry.get().checkStartIndex(entry.getIndex())) {
if (entry.getTerm() == getCurrentTerm() && startupLogEntry.get().isApplied(entry)) {
server.getStateMachine().leaderEvent().notifyLeaderReady();
}
}
Expand Down

0 comments on commit dca2a01

Please sign in to comment.