Skip to content

Commit

Permalink
RATIS-2172. RaftServer may lose FollowerState (#1166)
Browse files Browse the repository at this point in the history
  • Loading branch information
133tosakarin authored and OneSizeFitsQuorum committed Oct 14, 2024
1 parent 9f77cdd commit b09bfe7
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,9 @@ public static boolean execute(String injectionPoint, Object localId,
}
return code.execute(localId, remoteId, args);
}

/** Remove an injection point. */
public static void remove(String injectionPoint) {
INJECTION_POINTS.remove(injectionPoint);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,14 @@ public void run() {
}
}

private boolean roleChangeChecking(TimeDuration electionTimeout) {
return outstandingOp.get() == 0
&& isRunning && server.getInfo().isFollower()
&& lastRpcTime.elapsedTime().compareTo(electionTimeout) >= 0
&& !lostMajorityHeartbeatsRecently()
&& server.isRunning();
}

private void runImpl() {
final TimeDuration sleepDeviationThreshold = server.getSleepDeviationThreshold();
while (shouldRun()) {
Expand All @@ -149,10 +157,7 @@ private void runImpl() {
break;
}
synchronized (server) {
if (outstandingOp.get() == 0
&& isRunning && server.getInfo().isFollower()
&& lastRpcTime.elapsedTime().compareTo(electionTimeout) >= 0
&& !lostMajorityHeartbeatsRecently()) {
if (roleChangeChecking(electionTimeout)) {
LOG.info("{}: change to CANDIDATE, lastRpcElapsedTime:{}, electionTimeout:{}",
this, lastRpcTime.elapsedTime(), electionTimeout);
server.getLeaderElectionMetrics().onLeaderElectionTimeout(); // Update timeout metric counters.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,10 +253,6 @@ private void runImpl() {
}

try (AutoCloseable ignored = Timekeeper.start(server.getLeaderElectionMetrics().getLeaderElectionTimer())) {
if (!server.isRunning()) {
LOG.info("{}: skip since the server is not running", this);
return;
}
for (int round = 0; shouldRun(); round++) {
if (skipPreVote || askForVotes(Phase.PRE_VOTE, round)) {
if (askForVotes(Phase.ELECTION, round)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ class RaftServerImpl implements RaftServer.Division,
static final String APPEND_TRANSACTION = CLASS_NAME + ".appendTransaction";
static final String LOG_SYNC = APPEND_ENTRIES + ".logComplete";
static final String START_LEADER_ELECTION = CLASS_NAME + ".startLeaderElection";
static final String START_COMPLETE = CLASS_NAME + ".startComplete";

class Info implements DivisionInfo {
@Override
Expand Down Expand Up @@ -400,7 +401,10 @@ boolean start() throws IOException {

jmxAdapter.registerMBean();
state.start();
startComplete.compareAndSet(false, true);
CodeInjectionForTesting.execute(START_COMPLETE, getId(), null, role);
if (startComplete.compareAndSet(false, true)) {
LOG.info("{}: Successfully started.", getMemberId());
}
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.ratis.protocol.RaftGroupMemberId;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.protocol.SetConfigurationRequest;
import org.apache.ratis.protocol.exceptions.LeaderSteppingDownException;
import org.apache.ratis.protocol.exceptions.TransferLeadershipException;
import org.apache.ratis.server.DivisionInfo;
Expand All @@ -44,6 +45,8 @@
import org.apache.ratis.util.TimeDuration;
import org.apache.ratis.util.Timestamp;
import org.apache.ratis.util.function.CheckedBiConsumer;
import org.apache.ratis.util.CodeInjectionForTesting;
import org.junit.Assert;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
Expand Down Expand Up @@ -97,6 +100,80 @@ public void testBasicLeaderElection() throws Exception {
cluster.shutdown();
}

static class SleepCode implements CodeInjectionForTesting.Code {
private final long sleepMs;

SleepCode(long sleepMs) {
this.sleepMs = sleepMs;
}

@Override
public boolean execute(Object localId, Object remoteId, Object... args) {
try {
LOG.info("{}: Simulate RaftServer startup blocking", localId);
Thread.sleep(sleepMs);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return true;
}
}

@Test
public void testWaitServerReady() throws Exception {
final int sleepMs = 1000 + ThreadLocalRandom.current().nextInt(1000);
LOG.info("Running testWaitServerReady, sleep = {}ms", sleepMs);
CodeInjectionForTesting.put(RaftServerImpl.START_COMPLETE, new SleepCode(sleepMs));
final MiniRaftCluster cluster = newCluster(1);
final Timestamp startTime = Timestamp.currentTime();
cluster.start();
LOG.info("Cluster started at {}ms", startTime.elapsedTimeMs());
final RaftGroupId groupId = cluster.getGroupId();
final RaftServerImpl server = (RaftServerImpl) cluster.getServers().iterator().next().getDivision(groupId);
final boolean isRunning = server.isRunning();
LOG.info("{} isRunning at {}ms? {}", server.getId(), startTime.elapsedTimeMs(), isRunning);

// Leader will be elected if the server is ready
Assertions.assertNotNull(waitForLeader(cluster), "No leader is elected.");
final long elapsedMs = startTime.elapsedTimeMs();
// allow a small difference to tolerate system timer inaccuracy
Assertions.assertTrue(elapsedMs > sleepMs - 10, () -> "elapseMs = " + elapsedMs + " but sleepMs = " + sleepMs);
cluster.shutdown();
CodeInjectionForTesting.remove(RaftServerImpl.START_COMPLETE);
}

@Test
public void testAddServerForWaitReady() throws IOException, InterruptedException {
LOG.info("Running testAddServerForWaitReady");
// normal startup cluster with 3 server
final MiniRaftCluster cluster = newCluster(3);
cluster.start();
RaftTestUtil.waitForLeader(cluster);
try (RaftClient client = cluster.createClient()) {
for (int i = 0; i < 10; ++i) {
RaftClientReply reply = client.io().send(new RaftTestUtil.SimpleMessage("message_" + i));
Assertions.assertTrue(reply.isSuccess());
}
// add 3 new servers and wait longer time
CodeInjectionForTesting.put(RaftServerImpl.START_COMPLETE, new SleepCode(2000));
MiniRaftCluster.PeerChanges peerChanges = cluster.addNewPeers(2, true, false);
LOG.info("add new 3 servers");
LOG.info(cluster.printServers());
RaftClientReply reply = client.admin().setConfiguration(SetConfigurationRequest.Arguments.newBuilder()
.setServersInNewConf(peerChanges.newPeers)
.setMode(SetConfigurationRequest.Mode.ADD).build());
Assert.assertTrue(reply.isSuccess());
for (RaftServer server : cluster.getServers()) {
RaftServerProxy proxy = (RaftServerProxy) server;
proxy.getImpls().forEach(s -> {
Assertions.assertTrue(s.isRunning());
});
}
}
cluster.shutdown();;
CodeInjectionForTesting.remove(RaftServerImpl.START_COMPLETE);
}

@Test
public void testChangeLeader() throws Exception {
SegmentedRaftLogTestUtils.setRaftLogWorkerLogLevel(Level.TRACE);
Expand Down

0 comments on commit b09bfe7

Please sign in to comment.