diff --git a/ratis-common/src/main/java/org/apache/ratis/util/CodeInjectionForTesting.java b/ratis-common/src/main/java/org/apache/ratis/util/CodeInjectionForTesting.java index a7d36ac0eb..112f6bd250 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/CodeInjectionForTesting.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/CodeInjectionForTesting.java @@ -68,4 +68,9 @@ public static boolean execute(String injectionPoint, Object localId, } return code.execute(localId, remoteId, args); } + + /** Remove an injection point. */ + public static void remove(String injectionPoint) { + INJECTION_POINTS.remove(injectionPoint); + } } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java index fa61e90883..1be160f182 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java @@ -133,6 +133,14 @@ public void run() { } } + private boolean roleChangeChecking(TimeDuration electionTimeout) { + return outstandingOp.get() == 0 + && isRunning && server.getInfo().isFollower() + && lastRpcTime.elapsedTime().compareTo(electionTimeout) >= 0 + && !lostMajorityHeartbeatsRecently() + && server.isRunning(); + } + private void runImpl() { final TimeDuration sleepDeviationThreshold = server.getSleepDeviationThreshold(); while (shouldRun()) { @@ -149,10 +157,7 @@ private void runImpl() { break; } synchronized (server) { - if (outstandingOp.get() == 0 - && isRunning && server.getInfo().isFollower() - && lastRpcTime.elapsedTime().compareTo(electionTimeout) >= 0 - && !lostMajorityHeartbeatsRecently()) { + if (roleChangeChecking(electionTimeout)) { LOG.info("{}: change to CANDIDATE, lastRpcElapsedTime:{}, electionTimeout:{}", this, lastRpcTime.elapsedTime(), electionTimeout); server.getLeaderElectionMetrics().onLeaderElectionTimeout(); // Update timeout metric counters. diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java index a5bfba7bec..4badd09cd1 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java @@ -253,10 +253,6 @@ private void runImpl() { } try (AutoCloseable ignored = Timekeeper.start(server.getLeaderElectionMetrics().getLeaderElectionTimer())) { - if (!server.isRunning()) { - LOG.info("{}: skip since the server is not running", this); - return; - } for (int round = 0; shouldRun(); round++) { if (skipPreVote || askForVotes(Phase.PRE_VOTE, round)) { if (askForVotes(Phase.ELECTION, round)) { diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index ae158ad75e..1ac62fd987 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -160,6 +160,7 @@ class RaftServerImpl implements RaftServer.Division, static final String APPEND_TRANSACTION = CLASS_NAME + ".appendTransaction"; static final String LOG_SYNC = APPEND_ENTRIES + ".logComplete"; static final String START_LEADER_ELECTION = CLASS_NAME + ".startLeaderElection"; + static final String START_COMPLETE = CLASS_NAME + ".startComplete"; class Info implements DivisionInfo { @Override @@ -400,7 +401,10 @@ boolean start() throws IOException { jmxAdapter.registerMBean(); state.start(); - startComplete.compareAndSet(false, true); + CodeInjectionForTesting.execute(START_COMPLETE, getId(), null, role); + if (startComplete.compareAndSet(false, true)) { + LOG.info("{}: Successfully started.", getMemberId()); + } return true; } diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java index b29b537abe..b175ffe292 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java @@ -29,6 +29,7 @@ import org.apache.ratis.protocol.RaftGroupMemberId; import org.apache.ratis.protocol.RaftPeer; import org.apache.ratis.protocol.RaftPeerId; +import org.apache.ratis.protocol.SetConfigurationRequest; import org.apache.ratis.protocol.exceptions.LeaderSteppingDownException; import org.apache.ratis.protocol.exceptions.TransferLeadershipException; import org.apache.ratis.server.DivisionInfo; @@ -44,6 +45,8 @@ import org.apache.ratis.util.TimeDuration; import org.apache.ratis.util.Timestamp; import org.apache.ratis.util.function.CheckedBiConsumer; +import org.apache.ratis.util.CodeInjectionForTesting; +import org.junit.Assert; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.slf4j.Logger; @@ -97,6 +100,80 @@ public void testBasicLeaderElection() throws Exception { cluster.shutdown(); } + static class SleepCode implements CodeInjectionForTesting.Code { + private final long sleepMs; + + SleepCode(long sleepMs) { + this.sleepMs = sleepMs; + } + + @Override + public boolean execute(Object localId, Object remoteId, Object... args) { + try { + LOG.info("{}: Simulate RaftServer startup blocking", localId); + Thread.sleep(sleepMs); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + return true; + } + } + + @Test + public void testWaitServerReady() throws Exception { + final int sleepMs = 1000 + ThreadLocalRandom.current().nextInt(1000); + LOG.info("Running testWaitServerReady, sleep = {}ms", sleepMs); + CodeInjectionForTesting.put(RaftServerImpl.START_COMPLETE, new SleepCode(sleepMs)); + final MiniRaftCluster cluster = newCluster(1); + final Timestamp startTime = Timestamp.currentTime(); + cluster.start(); + LOG.info("Cluster started at {}ms", startTime.elapsedTimeMs()); + final RaftGroupId groupId = cluster.getGroupId(); + final RaftServerImpl server = (RaftServerImpl) cluster.getServers().iterator().next().getDivision(groupId); + final boolean isRunning = server.isRunning(); + LOG.info("{} isRunning at {}ms? {}", server.getId(), startTime.elapsedTimeMs(), isRunning); + + // Leader will be elected if the server is ready + Assertions.assertNotNull(waitForLeader(cluster), "No leader is elected."); + final long elapsedMs = startTime.elapsedTimeMs(); + // allow a small difference to tolerate system timer inaccuracy + Assertions.assertTrue(elapsedMs > sleepMs - 10, () -> "elapseMs = " + elapsedMs + " but sleepMs = " + sleepMs); + cluster.shutdown(); + CodeInjectionForTesting.remove(RaftServerImpl.START_COMPLETE); + } + + @Test + public void testAddServerForWaitReady() throws IOException, InterruptedException { + LOG.info("Running testAddServerForWaitReady"); + // normal startup cluster with 3 server + final MiniRaftCluster cluster = newCluster(3); + cluster.start(); + RaftTestUtil.waitForLeader(cluster); + try (RaftClient client = cluster.createClient()) { + for (int i = 0; i < 10; ++i) { + RaftClientReply reply = client.io().send(new RaftTestUtil.SimpleMessage("message_" + i)); + Assertions.assertTrue(reply.isSuccess()); + } + // add 3 new servers and wait longer time + CodeInjectionForTesting.put(RaftServerImpl.START_COMPLETE, new SleepCode(2000)); + MiniRaftCluster.PeerChanges peerChanges = cluster.addNewPeers(2, true, false); + LOG.info("add new 3 servers"); + LOG.info(cluster.printServers()); + RaftClientReply reply = client.admin().setConfiguration(SetConfigurationRequest.Arguments.newBuilder() + .setServersInNewConf(peerChanges.newPeers) + .setMode(SetConfigurationRequest.Mode.ADD).build()); + Assert.assertTrue(reply.isSuccess()); + for (RaftServer server : cluster.getServers()) { + RaftServerProxy proxy = (RaftServerProxy) server; + proxy.getImpls().forEach(s -> { + Assertions.assertTrue(s.isRunning()); + }); + } + } + cluster.shutdown();; + CodeInjectionForTesting.remove(RaftServerImpl.START_COMPLETE); + } + @Test public void testChangeLeader() throws Exception { SegmentedRaftLogTestUtils.setRaftLogWorkerLogLevel(Level.TRACE);