From ec7dc957dbc7285e5b61502a8bb8df17b5d0623a Mon Sep 17 00:00:00 2001 From: szywilliam Date: Mon, 11 Sep 2023 14:20:30 +0800 Subject: [PATCH] implement leader lease --- .../ratis/grpc/server/GrpcLogAppender.java | 12 ++- .../ratis/server/leader/FollowerInfo.java | 6 ++ .../server/impl/DivisionPropertiesImpl.java | 11 --- .../ratis/server/impl/FollowerInfoImpl.java | 13 +++ .../apache/ratis/server/impl/LeaderLease.java | 79 ++++++++++++++++++ .../ratis/server/impl/LeaderStateImpl.java | 24 ++++++ .../server/leader/LogAppenderDefault.java | 4 +- .../server/impl/LeaderElectionTests.java | 82 +++++++++++++++++++ .../ratis/server/impl/RaftServerTestUtil.java | 6 ++ 9 files changed, 223 insertions(+), 14 deletions(-) create mode 100644 ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderLease.java diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java index eb3195326d..57c61ee3ad 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java @@ -390,6 +390,9 @@ public void onNext(AppendEntriesReplyProto reply) { AppendEntriesRequest request = pendingRequests.remove(reply); if (request != null) { request.stopRequestTimer(); // Update completion time + getFollower().updateLastAppendEntriesResponseTime(request.getSendTime()); // Update the last rpc time + } else { + getFollower().updateLastRpcResponseTime(); } if (LOG.isDebugEnabled()) { @@ -407,8 +410,6 @@ public void onNext(AppendEntriesReplyProto reply) { } private void onNextImpl(AppendEntriesReplyProto reply) { - // update the last rpc time - getFollower().updateLastRpcResponseTime(); errCount.set(0); if (!firstResponseReceived) { @@ -770,6 +771,8 @@ static class AppendEntriesRequest { private final TermIndex lastEntry; + private volatile Timestamp sendTime; + AppendEntriesRequest(AppendEntriesRequestProto proto, RaftPeerId followerId, GrpcServerMetrics grpcServerMetrics) { this.callId = proto.getServerRequest().getCallId(); this.previousLog = proto.hasPreviousLog()? TermIndex.valueOf(proto.getPreviousLog()): null; @@ -788,8 +791,13 @@ TermIndex getPreviousLog() { return previousLog; } + public Timestamp getSendTime() { + return sendTime; + } + void startRequestTimer() { timerContext = timer.time(); + sendTime = Timestamp.currentTime(); } void stopRequestTimer() { diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/leader/FollowerInfo.java b/ratis-server-api/src/main/java/org/apache/ratis/server/leader/FollowerInfo.java index 1dd4066e82..e1a22a1579 100644 --- a/ratis-server-api/src/main/java/org/apache/ratis/server/leader/FollowerInfo.java +++ b/ratis-server-api/src/main/java/org/apache/ratis/server/leader/FollowerInfo.java @@ -101,4 +101,10 @@ public interface FollowerInfo { /** @return the latest heartbeat send time. */ Timestamp getLastHeartbeatSendTime(); + + /** @return the send time of last responded rpc */ + Timestamp getLastRespondedAppendEntriesSendTime(); + + /** Update lastRpcResponseTime and LastRespondedAppendEntriesSendTime */ + void updateLastAppendEntriesResponseTime(Timestamp sendTime); } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/DivisionPropertiesImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/DivisionPropertiesImpl.java index b3e8dd94d0..63cbc02ed6 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/DivisionPropertiesImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/DivisionPropertiesImpl.java @@ -28,7 +28,6 @@ class DivisionPropertiesImpl implements DivisionProperties { private final TimeDuration rpcTimeoutMax; private final TimeDuration rpcSleepTime; private final TimeDuration rpcSlownessTimeout; - private final TimeDuration leaderLeaseTimeout; DivisionPropertiesImpl(RaftProperties properties) { this.rpcTimeoutMin = RaftServerConfigKeys.Rpc.timeoutMin(properties); @@ -36,11 +35,6 @@ class DivisionPropertiesImpl implements DivisionProperties { Preconditions.assertTrue(rpcTimeoutMax.compareTo(rpcTimeoutMin) >= 0, "rpcTimeoutMax = %s < rpcTimeoutMin = %s", rpcTimeoutMax, rpcTimeoutMin); - final double leaderLeaseTimeoutRatio = RaftServerConfigKeys.Read.leaderLeaseTimeoutRatio(properties); - this.leaderLeaseTimeout = this.rpcTimeoutMin.multiply(leaderLeaseTimeoutRatio); - Preconditions.assertTrue(rpcTimeoutMin.compareTo(leaderLeaseTimeout) >= 0, - "rpcTimeoutMin = %s < leaderLeaseTimeout = %s", rpcTimeoutMin, leaderLeaseTimeout); - this.rpcSleepTime = RaftServerConfigKeys.Rpc.sleepTime(properties); this.rpcSlownessTimeout = RaftServerConfigKeys.Rpc.slownessTimeout(properties); } @@ -55,11 +49,6 @@ public TimeDuration maxRpcTimeout() { return rpcTimeoutMax; } - /** @return the ratio of leader lease timeout */ - public TimeDuration leaderLeaseTimeout() { - return leaderLeaseTimeout; - } - @Override public TimeDuration rpcSleepTime() { return rpcSleepTime; diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfoImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfoImpl.java index 245cbc8883..3aeb674ec3 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfoImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfoImpl.java @@ -39,6 +39,7 @@ class FollowerInfoImpl implements FollowerInfo { private final AtomicReference lastRpcResponseTime; private final AtomicReference lastRpcSendTime; private final AtomicReference lastHeartbeatSendTime; + private final AtomicReference lastRespondedAppendEntriesSendTime; private final RaftLogIndex nextIndex; private final RaftLogIndex matchIndex = new RaftLogIndex("matchIndex", RaftLog.INVALID_LOG_INDEX); private final RaftLogIndex commitIndex = new RaftLogIndex("commitIndex", RaftLog.INVALID_LOG_INDEX); @@ -57,6 +58,7 @@ class FollowerInfoImpl implements FollowerInfo { this.lastRpcResponseTime = new AtomicReference<>(lastRpcTime); this.lastRpcSendTime = new AtomicReference<>(lastRpcTime); this.lastHeartbeatSendTime = new AtomicReference<>(lastRpcTime); + this.lastRespondedAppendEntriesSendTime = new AtomicReference<>(lastRpcTime); this.nextIndex = new RaftLogIndex("nextIndex", nextIndex); this.caughtUp = caughtUp; } @@ -202,4 +204,15 @@ public Timestamp getLastRpcTime() { public Timestamp getLastHeartbeatSendTime() { return lastHeartbeatSendTime.get(); } + + @Override + public Timestamp getLastRespondedAppendEntriesSendTime() { + return lastRespondedAppendEntriesSendTime.get(); + } + + @Override + public void updateLastAppendEntriesResponseTime(Timestamp sendTime) { + updateLastRpcResponseTime(); + lastRespondedAppendEntriesSendTime.set(sendTime); + } } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderLease.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderLease.java new file mode 100644 index 0000000000..36997f892a --- /dev/null +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderLease.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.server.impl; + +import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.protocol.RaftPeerId; +import org.apache.ratis.server.RaftServerConfigKeys; +import org.apache.ratis.server.leader.FollowerInfo; +import org.apache.ratis.util.Preconditions; +import org.apache.ratis.util.Timestamp; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Predicate; +import java.util.stream.Stream; + +public class LeaderLease { + + private final long leaseTimeoutMs; + // TODO invalidate leader lease when stepDown / transferLeader + private final AtomicReference lease = new AtomicReference<>(Timestamp.currentTime()); + + public LeaderLease(RaftProperties properties) { + final double leaseRatio = RaftServerConfigKeys.Read.leaderLeaseTimeoutRatio(properties); + Preconditions.assertTrue(leaseRatio > 0.0 && leaseRatio <= 1.0, + "leader ratio should sit in (0,1], now is " + leaseRatio); + this.leaseTimeoutMs = RaftServerConfigKeys.Rpc.timeoutMin(properties) + .multiply(leaseRatio) + .toIntExact(TimeUnit.MILLISECONDS); + } + + boolean isValid() { + return lease.get().elapsedTimeMs() < leaseTimeoutMs; + } + + void extendLeaderLease(Stream allFollowers, Predicate> hasMajority) { + // check the latest heartbeats of all peers (including those in transitional) + final List activePeers = new ArrayList<>(); + final List lastRespondedAppendEntriesSendTimes = new ArrayList<>(); + + allFollowers.forEach(follower -> { + final Timestamp lastRespondedAppendEntriesSendTime = follower.getLastRespondedAppendEntriesSendTime(); + lastRespondedAppendEntriesSendTimes.add(lastRespondedAppendEntriesSendTime); + if (lastRespondedAppendEntriesSendTime.elapsedTimeMs() < leaseTimeoutMs) { + activePeers.add(follower.getId()); + } + }); + + if (hasMajority.test(activePeers)) { + // can extend leader lease + if (lastRespondedAppendEntriesSendTimes.isEmpty()) { + lease.set(Timestamp.currentTime()); + } else { + Collections.sort(lastRespondedAppendEntriesSendTimes); + final Timestamp newLease = + lastRespondedAppendEntriesSendTimes.get(lastRespondedAppendEntriesSendTimes.size() / 2); + lease.set(newLease); + } + } + } +} diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java index 5156585f81..5f7d730ccc 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java @@ -247,6 +247,11 @@ List getCurrent() { List getOld() { return old; } + + Stream getCurrentAndOld() { + return Stream.concat(current.stream(), + Optional.ofNullable(old).map(List::stream).orElse(Stream.empty())); + } } static boolean isSameSize(List infos, PeerConfiguration conf) { @@ -348,6 +353,7 @@ boolean isApplied(LogEntryProto logEntry) { private final PendingStepDown pendingStepDown; private final ReadIndexHeartbeats readIndexHeartbeats; + private final LeaderLease lease; LeaderStateImpl(RaftServerImpl server) { this.name = server.getMemberId() + "-" + JavaUtils.getClassSimpleName(getClass()); @@ -369,6 +375,7 @@ boolean isApplied(LogEntryProto logEntry) { this.messageStreamRequests = new MessageStreamRequests(server.getMemberId()); this.pendingStepDown = new PendingStepDown(this); this.readIndexHeartbeats = new ReadIndexHeartbeats(); + this.lease = new LeaderLease(properties); long maxPendingRequests = RaftServerConfigKeys.Write.elementLimit(properties); double followerGapRatioMax = RaftServerConfigKeys.Write.followerGapRatioMax(properties); @@ -1127,6 +1134,23 @@ public void onAppendEntriesReply(LogAppender appender, RaftProtos.AppendEntriesR readIndexHeartbeats.onAppendEntriesReply(appender, reply, this::hasMajority); } + boolean hasLease() { + if (checkLeaderLease()) { + return true; + } + + // try extending the leader lease + final RaftConfigurationImpl conf = server.getRaftConf(); + final CurrentOldFollowerInfos info = followerInfoMap.getFollowerInfos(conf); + lease.extendLeaderLease(info.getCurrentAndOld(), peers -> conf.hasMajority(peers, server.getId())); + + return checkLeaderLease(); + } + + private boolean checkLeaderLease() { + return isReady() && (server.getRaftConf().isSingleton() || lease.isValid()); + } + void replyPendingRequest(long logIndex, RaftClientReply reply) { pendingRequests.replyPendingRequest(logIndex, reply); } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java index 8f71f91fcf..3111a82a3b 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java @@ -26,6 +26,7 @@ import org.apache.ratis.server.raftlog.RaftLogIOException; import org.apache.ratis.server.util.ServerStringUtils; import org.apache.ratis.statemachine.SnapshotInfo; +import org.apache.ratis.util.Timestamp; import java.io.IOException; import java.io.InterruptedIOException; @@ -73,9 +74,10 @@ private AppendEntriesReplyProto sendAppendEntriesWithRetries() } resetHeartbeatTrigger(); + final Timestamp sendTime = Timestamp.currentTime(); getFollower().updateLastRpcSendTime(request.getEntriesCount() == 0); final AppendEntriesReplyProto r = getServerRpc().appendEntries(request); - getFollower().updateLastRpcResponseTime(); + getFollower().updateLastAppendEntriesResponseTime(sendTime); getLeaderState().onFollowerCommitIndex(getFollower(), r.getFollowerCommit()); return r; diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java index 9e2b7bd2d5..c2e5cbd1c7 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java @@ -43,6 +43,7 @@ import org.apache.ratis.util.Slf4jUtils; import org.apache.ratis.util.TimeDuration; import org.apache.ratis.util.Timestamp; +import org.apache.ratis.util.function.CheckedBiConsumer; import org.junit.Assert; import org.junit.Test; import org.slf4j.Logger; @@ -640,6 +641,87 @@ void runTestPauseResumeLeaderElection(CLUSTER cluster) throws IOException, Inter } } + private void runLeaseTest(CLUSTER cluster, CheckedBiConsumer testCase) throws Exception { + final double leaseRatio = RaftServerConfigKeys.Read.leaderLeaseTimeoutRatio(getProperties()); + final long leaseTimeoutMs = RaftServerConfigKeys.Rpc.timeoutMin(getProperties()) + .multiply(leaseRatio) + .toIntExact(TimeUnit.MILLISECONDS); + testCase.accept(cluster, leaseTimeoutMs); + } + + @Test + public void testLeaderLease() throws Exception { + // use a strict lease + RaftServerConfigKeys.Read.setLeaderLeaseTimeoutRatio(getProperties(), 0.5); + runWithNewCluster(3, c -> runLeaseTest(c, this::runTestLeaderLease)); + } + + void runTestLeaderLease(CLUSTER cluster, long leaseTimeoutMs) throws Exception { + final RaftServer.Division leader = RaftTestUtil.waitForLeader(cluster); + try (final RaftClient client = cluster.createClient(leader.getId())) { + client.io().send(new RaftTestUtil.SimpleMessage("message")); + + Assert.assertTrue(leader.getInfo().isLeader()); + Assert.assertTrue(leader.getInfo().isLeaderReady()); + RaftServerTestUtil.assertLeaderLease(leader, true); + + isolate(cluster, leader.getId()); + Thread.sleep(leaseTimeoutMs); + + Assert.assertTrue(leader.getInfo().isLeader()); + Assert.assertTrue(leader.getInfo().isLeaderReady()); + RaftServerTestUtil.assertLeaderLease(leader, false); + } finally { + deIsolate(cluster, leader.getId()); + } + } + + @Test + public void testLeaderLeaseDuringReconfiguration() throws Exception { + // use a strict lease + RaftServerConfigKeys.Read.setLeaderLeaseTimeoutRatio(getProperties(), 0.5); + runWithNewCluster(3, c -> runLeaseTest(c, this::runTestLeaderLeaseDuringReconfiguration)); + } + + void runTestLeaderLeaseDuringReconfiguration(CLUSTER cluster, long leaseTimeoutMs) throws Exception { + final RaftServer.Division leader = RaftTestUtil.waitForLeader(cluster); + try (final RaftClient client = cluster.createClient(leader.getId())) { + client.io().send(new RaftTestUtil.SimpleMessage("message")); + + Assert.assertTrue(leader.getInfo().isLeader()); + Assert.assertTrue(leader.getInfo().isLeaderReady()); + RaftServerTestUtil.assertLeaderLease(leader, true); + + final List followers = cluster.getFollowers(); + final MiniRaftCluster.PeerChanges changes = cluster.addNewPeers(2, true); + + // blocking the original 2 followers + BlockRequestHandlingInjection.getInstance().blockReplier(followers.get(0).getId().toString()); + BlockRequestHandlingInjection.getInstance().blockReplier(followers.get(1).getId().toString()); + + // start reconfiguration in another thread, shall fail eventually + new Thread(() -> { + try { + client.admin().setConfiguration(changes.allPeersInNewConf); + } catch (IOException e) { + System.out.println("as expected: " + e.getMessage()); + } + }).start(); + + Thread.sleep(leaseTimeoutMs); + + Assert.assertTrue(leader.getInfo().isLeader()); + Assert.assertTrue(leader.getInfo().isLeaderReady()); + RaftServerTestUtil.assertLeaderLease(leader, false); + + } finally { + BlockRequestHandlingInjection.getInstance().unblockAll(); + } + } + + + + private static RaftServerImpl createMockServer(boolean alive) { final DivisionInfo info = mock(DivisionInfo.class); when(info.isAlive()).thenReturn(alive); diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java index 618e398b3f..958c19442f 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java @@ -147,6 +147,12 @@ public static Stream getLogAppenders(RaftServer.Division server) { return getLeaderState(server).map(LeaderStateImpl::getLogAppenders).orElse(null); } + public static void assertLeaderLease(RaftServer.Division leader, boolean hasLease) { + final LeaderStateImpl l = getLeaderState(leader).orElse(null); + Assert.assertNotNull(l); + Assert.assertEquals(l.hasLease(), hasLease); + } + public static void restartLogAppenders(RaftServer.Division server) { final LeaderStateImpl leaderState = getLeaderState(server).orElseThrow( () -> new IllegalStateException(server + " is not the leader"));