Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RATIS-2084. Follower reply ALREADY_INSTALLED when install old snapshots from leader #1091

Merged
merged 7 commits into from
May 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1851,6 +1851,7 @@ CompletableFuture<Message> applyLogToStateMachine(ReferenceCountedObject<LogEntr
* @param logEntry the log entry being truncated
*/
void notifyTruncatedLogEntry(LogEntryProto logEntry) {
Optional.ofNullable(getState()).ifPresent(s -> s.truncate(logEntry.getIndex()));
if (logEntry.hasStateMachineLogEntry()) {
getTransactionManager().remove(TermIndex.valueOf(logEntry));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,10 @@ void setRaftConf(RaftConfiguration conf) {
LOG.trace("{}: {}", getMemberId(), configurationManager);
}

void truncate(long logIndex) {
configurationManager.removeConfigurations(logIndex);
}

void updateConfiguration(List<LogEntryProto> entries) {
if (entries != null && !entries.isEmpty()) {
configurationManager.removeConfigurations(entries.get(0).getIndex());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ private InstallSnapshotReplyProto installSnapshotImpl(InstallSnapshotRequestProt
if (request.hasLastRaftConfigurationLogEntryProto()) {
// Set the configuration included in the snapshot
final LogEntryProto proto = request.getLastRaftConfigurationLogEntryProto();
state.truncate(proto.getIndex());
if (!state.getRaftConf().equals(LogProtoUtils.toRaftConfiguration(proto))) {
LOG.info("{}: set new configuration {} from snapshot", getMemberId(), proto);
state.setRaftConf(proto);
Expand Down Expand Up @@ -175,9 +176,10 @@ private InstallSnapshotReplyProto checkAndInstallSnapshot(InstallSnapshotRequest
// Check and append the snapshot chunk. We simply put this in lock
// considering a follower peer requiring a snapshot installation does not
// have a lot of requests
Preconditions.assertTrue(state.getLog().getLastCommittedIndex() < lastIncludedIndex,
"%s log's commit index is %s, last included index in snapshot is %s",
getMemberId(), state.getLog().getLastCommittedIndex(), lastIncludedIndex);
if (state.getLog().getLastCommittedIndex() >= lastIncludedIndex) {
return toInstallSnapshotReplyProto(leaderId, getMemberId(),
currentTerm, snapshotChunkRequest.getRequestIndex(), InstallSnapshotResult.ALREADY_INSTALLED);
}

//TODO: We should only update State with installed snapshot once the request is done.
state.installSnapshot(request);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.protocol.exceptions.RaftRetryFailureException;
import org.apache.ratis.retry.RetryPolicies;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.impl.MiniRaftCluster;
Expand Down Expand Up @@ -85,6 +88,12 @@ public void testSeparateSnapshotInstallPath() throws Exception {
runWithNewCluster(1, this::testMultiFileInstallSnapshot);
}

public void testInstallSnapshotLeaderSwitch() throws Exception {
getProperties().setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY,
StateMachineWithSeparatedSnapshotPath.class, StateMachine.class);
runWithNewCluster(3, this::testInstallSnapshotDuringLeaderSwitch);
}

private void testMultiFileInstallSnapshot(CLUSTER cluster) throws Exception {
try {
int i = 0;
Expand Down Expand Up @@ -127,6 +136,67 @@ private void testMultiFileInstallSnapshot(CLUSTER cluster) throws Exception {
}
}

private void testInstallSnapshotDuringLeaderSwitch(CLUSTER cluster) throws Exception {
try {
RaftTestUtil.waitForLeader(cluster);
final RaftPeerId leaderId = cluster.getLeader().getId();

// perform operations and force all peers to take snapshot
try (final RaftClient client = cluster.createClient(leaderId)) {
for (int i = 0; i < SNAPSHOT_TRIGGER_THRESHOLD * 2; i++) {
final RaftClientReply
reply = client.io().send(new RaftTestUtil.SimpleMessage("m" + i));
Assertions.assertTrue(reply.isSuccess());
}

for (final RaftPeer peer: cluster.getPeers()) {
final RaftClientReply snapshotReply = client.getSnapshotManagementApi(leaderId).create(3000);
Assertions.assertTrue(snapshotReply.isSuccess());
}
}
final SnapshotInfo snapshot = cluster.getLeader().getStateMachine().getLatestSnapshot();
Assertions.assertNotNull(snapshot);

// isolate two followers (majority) in old configuration
final List<RaftServer.Division> oldFollowers = cluster.getFollowers();
for (RaftServer.Division f: oldFollowers) {
RaftTestUtil.isolate(cluster, f.getId());
}

// add two more peers and install snapshot from leaders
final MiniRaftCluster.PeerChanges change = cluster.addNewPeers(2, true,
true);
try (final RaftClient client = cluster.createClient(leaderId, RetryPolicies.noRetry())) {
Assertions.assertThrows(RaftRetryFailureException.class,
() -> client.admin().setConfiguration(change.allPeersInNewConf));
}

final SnapshotInfo snapshotInfo = cluster.getDivision(change.newPeers[0].getId())
.getStateMachine().getLatestSnapshot();
Assertions.assertNotNull(snapshotInfo);

// recover the old followers and isolate the leader to force leader switch
RaftTestUtil.isolate(cluster, leaderId);
for (RaftServer.Division f: oldFollowers) {
RaftTestUtil.deIsolate(cluster, f.getId());
}
RaftTestUtil.waitForLeader(cluster);

try (final RaftClient client = cluster.createClient(cluster.getLeader().getId())) {
// successfully setConfiguration during leader switch
final RaftClientReply setConf = client.admin().setConfiguration(change.allPeersInNewConf);
Assertions.assertTrue(setConf.isSuccess());

RaftTestUtil.deIsolate(cluster, leaderId);
final RaftClientReply
reply = client.io().send(new RaftTestUtil.SimpleMessage("final"));
Assertions.assertTrue(reply.isSuccess());
}
} finally {
cluster.shutdown();
}
}

private static class StateMachineWithMultiNestedSnapshotFile extends SimpleStateMachine4Testing {

File snapshotRoot;
Expand Down
14 changes: 14 additions & 0 deletions ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,20 @@ static void blockQueueAndSetDelay(Iterable<RaftServer> servers,
Thread.sleep(3 * maxTimeout.toLong(TimeUnit.MILLISECONDS));
}

static void isolate(MiniRaftCluster cluster, RaftPeerId id) {
try {
BlockRequestHandlingInjection.getInstance().blockReplier(id.toString());
cluster.setBlockRequestsFrom(id.toString(), true);
} catch (Exception e) {
e.printStackTrace();
}
}

static void deIsolate(MiniRaftCluster cluster, RaftPeerId id) {
BlockRequestHandlingInjection.getInstance().unblockReplier(id.toString());
cluster.setBlockRequestsFrom(id.toString(), false);
}

static Thread sendMessageInNewThread(MiniRaftCluster cluster, RaftPeerId leaderId, SimpleMessage... messages) {
Thread t = new Thread(() -> {
try (final RaftClient client = cluster.createClient(leaderId)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,12 @@ void runTestLostMajorityHeartbeats(CLUSTER cluster) throws Exception {
final TimeDuration maxTimeout = RaftServerConfigKeys.Rpc.timeoutMax(getProperties());
final RaftServer.Division leader = waitForLeader(cluster);
try {
isolate(cluster, leader.getId());
RaftTestUtil.isolate(cluster, leader.getId());
maxTimeout.sleep();
maxTimeout.sleep();
RaftServerTestUtil.assertLostMajorityHeartbeatsRecently(leader);
} finally {
deIsolate(cluster, leader.getId());
RaftTestUtil.deIsolate(cluster, leader.getId());
}
}

Expand Down Expand Up @@ -164,12 +164,12 @@ void runTestListenerNotStartLeaderElection(CLUSTER cluster) throws Exception {
final RaftServer.Division listener = cluster.getListeners().get(0);
final RaftPeerId listenerId = listener.getId();
try {
isolate(cluster, listenerId);
RaftTestUtil.isolate(cluster, listenerId);
maxTimeout.sleep();
maxTimeout.sleep();
Assertions.assertEquals(RaftProtos.RaftPeerRole.LISTENER, listener.getInfo().getCurrentRole());
} finally {
deIsolate(cluster, listener.getId());
RaftTestUtil.deIsolate(cluster, listener.getId());
}
}

Expand Down Expand Up @@ -247,7 +247,7 @@ public void testTransferLeaderTimeout() throws Exception {
RaftServer.Division newLeader = followers.get(0);

// isolate new leader, so that transfer leadership will timeout
isolate(cluster, newLeader.getId());
RaftTestUtil.isolate(cluster, newLeader.getId());

List<RaftPeer> peers = cluster.getPeers();

Expand Down Expand Up @@ -287,7 +287,7 @@ public void testTransferLeaderTimeout() throws Exception {
Assertions.assertEquals(leader.getId().toString(), reply.getReplierId());
Assertions.assertTrue(reply.isSuccess());

deIsolate(cluster, newLeader.getId());
RaftTestUtil.deIsolate(cluster, newLeader.getId());
}

cluster.shutdown();
Expand Down Expand Up @@ -364,32 +364,18 @@ protected void testDisconnectLeader() throws Exception {
try (RaftClient client = cluster.createClient(leader.getId())) {
client.io().send(new RaftTestUtil.SimpleMessage("message"));
Thread.sleep(1000);
isolate(cluster, leader.getId());
RaftTestUtil.isolate(cluster, leader.getId());
RaftClientReply reply = client.io().send(new RaftTestUtil.SimpleMessage("message"));
Assertions.assertNotEquals(reply.getReplierId(), leader.getId().toString());
Assertions.assertTrue(reply.isSuccess());
} finally {
deIsolate(cluster, leader.getId());
RaftTestUtil.deIsolate(cluster, leader.getId());
}

cluster.shutdown();
}
}

private void isolate(MiniRaftCluster cluster, RaftPeerId id) {
try {
BlockRequestHandlingInjection.getInstance().blockReplier(id.toString());
cluster.setBlockRequestsFrom(id.toString(), true);
} catch (Exception e) {
e.printStackTrace();
}
}

private void deIsolate(MiniRaftCluster cluster, RaftPeerId id) {
BlockRequestHandlingInjection.getInstance().unblockReplier(id.toString());
cluster.setBlockRequestsFrom(id.toString(), false);
}

@Test
public void testAddListener() throws Exception {
try (final MiniRaftCluster cluster = newCluster(3)) {
Expand Down Expand Up @@ -570,15 +556,15 @@ public void testPreVote() {
assertEquals(followers.size(), 2);

RaftServer.Division follower = followers.get(0);
isolate(cluster, follower.getId());
RaftTestUtil.isolate(cluster, follower.getId());
// send message so that the isolated follower's log lag the others
RaftClientReply reply = client.io().send(new RaftTestUtil.SimpleMessage("message"));
Assertions.assertTrue(reply.isSuccess());

final long savedTerm = leader.getInfo().getCurrentTerm();
LOG.info("Wait follower {} timeout and trigger pre-vote", follower.getId());
Thread.sleep(2000);
deIsolate(cluster, follower.getId());
RaftTestUtil.deIsolate(cluster, follower.getId());
Thread.sleep(2000);
// with pre-vote leader will not step down
RaftServer.Division newleader = waitForLeader(cluster);
Expand Down Expand Up @@ -668,14 +654,14 @@ void runTestLeaderLease(CLUSTER cluster, long leaseTimeoutMs) throws Exception {
Assertions.assertTrue(leader.getInfo().isLeaderReady());
RaftServerTestUtil.assertLeaderLease(leader, true);

isolate(cluster, leader.getId());
RaftTestUtil.isolate(cluster, leader.getId());
Thread.sleep(leaseTimeoutMs);

Assertions.assertTrue(leader.getInfo().isLeader());
Assertions.assertTrue(leader.getInfo().isLeaderReady());
RaftServerTestUtil.assertLeaderLease(leader, false);
} finally {
deIsolate(cluster, leader.getId());
RaftTestUtil.deIsolate(cluster, leader.getId());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,10 @@ public void testSeparateSnapshotInstallPath(Boolean separateHeartbeat) throws Ex
super.testSeparateSnapshotInstallPath();
}

@ParameterizedTest
@MethodSource("data")
public void testInstallSnapshotLeaderSwitch(Boolean separateHeartbeat) throws Exception {
GrpcConfigKeys.Server.setHeartbeatChannel(getProperties(), separateHeartbeat);
super.testInstallSnapshotLeaderSwitch();
}
}
Loading