Skip to content

Commit

Permalink
RATIS-2148. Snapshot transfer may cause followers to trigger reloadSt…
Browse files Browse the repository at this point in the history
…ateMachine incorrectly (#1145)
  • Loading branch information
133tosakarin authored Sep 7, 2024
1 parent 8f5159d commit d1e76cf
Showing 1 changed file with 10 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

Expand All @@ -68,6 +69,7 @@ class SnapshotInstallationHandler {
new AtomicReference<>(INVALID_TERM_INDEX);
private final AtomicBoolean isSnapshotNull = new AtomicBoolean();
private final AtomicLong installedIndex = new AtomicLong(INVALID_LOG_INDEX);
private final AtomicInteger nextChunkIndex = new AtomicInteger(-1);

SnapshotInstallationHandler(RaftServerImpl server, RaftProperties properties) {
this.server = server;
Expand Down Expand Up @@ -172,6 +174,12 @@ private InstallSnapshotReplyProto checkAndInstallSnapshot(InstallSnapshotRequest
state.setLeader(leaderId, "installSnapshot");

server.updateLastRpcTime(FollowerState.UpdateType.INSTALL_SNAPSHOT_START);
if (snapshotChunkRequest.getRequestIndex() == 0) {
nextChunkIndex.set(0);
} else if (nextChunkIndex.get() != snapshotChunkRequest.getRequestIndex()) {
throw new IOException("Snapshot request already failed at chunk index " + nextChunkIndex.get()
+ "; ignoring request with chunk index " + snapshotChunkRequest.getRequestIndex());
}
try {
// Check and append the snapshot chunk. We simply put this in lock
// considering a follower peer requiring a snapshot installation does not
Expand All @@ -184,6 +192,8 @@ private InstallSnapshotReplyProto checkAndInstallSnapshot(InstallSnapshotRequest
//TODO: We should only update State with installed snapshot once the request is done.
state.installSnapshot(request);

int idx = nextChunkIndex.getAndIncrement();
Preconditions.assertEquals(snapshotChunkRequest.getRequestIndex(), idx, "nextChunkIndex");
// update the committed index
// re-load the state machine if this is the last chunk
if (snapshotChunkRequest.getDone()) {
Expand Down

0 comments on commit d1e76cf

Please sign in to comment.