diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java index 3544f3be1b..a0ff98e189 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java @@ -368,6 +368,7 @@ public void onNext(AppendEntriesReplyProto reply) { AppendEntriesRequest request = pendingRequests.remove(reply); if (request != null) { request.stopRequestTimer(); // Update completion time + getFollower().updateLastAppendEntriesResponseTime(request.getSendTime()); // Update the last rpc time } if (LOG.isDebugEnabled()) { @@ -385,8 +386,6 @@ public void onNext(AppendEntriesReplyProto reply) { } private void onNextImpl(AppendEntriesReplyProto reply) { - // update the last rpc time - getFollower().updateLastRpcResponseTime(); errCount.set(0); if (!firstResponseReceived) { @@ -747,6 +746,8 @@ static class AppendEntriesRequest { private final TermIndex lastEntry; + private Timestamp sendTime; + AppendEntriesRequest(AppendEntriesRequestProto proto, RaftPeerId followerId, GrpcServerMetrics grpcServerMetrics) { this.callId = proto.getServerRequest().getCallId(); this.previousLog = proto.hasPreviousLog()? TermIndex.valueOf(proto.getPreviousLog()): null; @@ -765,8 +766,13 @@ TermIndex getPreviousLog() { return previousLog; } + public Timestamp getSendTime() { + return sendTime; + } + void startRequestTimer() { timerContext = timer.time(); + sendTime = Timestamp.currentTime(); } void stopRequestTimer() { diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/leader/FollowerInfo.java b/ratis-server-api/src/main/java/org/apache/ratis/server/leader/FollowerInfo.java index 1dd4066e82..e1a22a1579 100644 --- a/ratis-server-api/src/main/java/org/apache/ratis/server/leader/FollowerInfo.java +++ b/ratis-server-api/src/main/java/org/apache/ratis/server/leader/FollowerInfo.java @@ -101,4 +101,10 @@ public interface FollowerInfo { /** @return the latest heartbeat send time. */ Timestamp getLastHeartbeatSendTime(); + + /** @return the send time of last responded rpc */ + Timestamp getLastRespondedAppendEntriesSendTime(); + + /** Update lastRpcResponseTime and LastRespondedAppendEntriesSendTime */ + void updateLastAppendEntriesResponseTime(Timestamp sendTime); } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/DivisionPropertiesImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/DivisionPropertiesImpl.java index b3e8dd94d0..63cbc02ed6 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/DivisionPropertiesImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/DivisionPropertiesImpl.java @@ -28,7 +28,6 @@ class DivisionPropertiesImpl implements DivisionProperties { private final TimeDuration rpcTimeoutMax; private final TimeDuration rpcSleepTime; private final TimeDuration rpcSlownessTimeout; - private final TimeDuration leaderLeaseTimeout; DivisionPropertiesImpl(RaftProperties properties) { this.rpcTimeoutMin = RaftServerConfigKeys.Rpc.timeoutMin(properties); @@ -36,11 +35,6 @@ class DivisionPropertiesImpl implements DivisionProperties { Preconditions.assertTrue(rpcTimeoutMax.compareTo(rpcTimeoutMin) >= 0, "rpcTimeoutMax = %s < rpcTimeoutMin = %s", rpcTimeoutMax, rpcTimeoutMin); - final double leaderLeaseTimeoutRatio = RaftServerConfigKeys.Read.leaderLeaseTimeoutRatio(properties); - this.leaderLeaseTimeout = this.rpcTimeoutMin.multiply(leaderLeaseTimeoutRatio); - Preconditions.assertTrue(rpcTimeoutMin.compareTo(leaderLeaseTimeout) >= 0, - "rpcTimeoutMin = %s < leaderLeaseTimeout = %s", rpcTimeoutMin, leaderLeaseTimeout); - this.rpcSleepTime = RaftServerConfigKeys.Rpc.sleepTime(properties); this.rpcSlownessTimeout = RaftServerConfigKeys.Rpc.slownessTimeout(properties); } @@ -55,11 +49,6 @@ public TimeDuration maxRpcTimeout() { return rpcTimeoutMax; } - /** @return the ratio of leader lease timeout */ - public TimeDuration leaderLeaseTimeout() { - return leaderLeaseTimeout; - } - @Override public TimeDuration rpcSleepTime() { return rpcSleepTime; diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfoImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfoImpl.java index 245cbc8883..3aeb674ec3 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfoImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfoImpl.java @@ -39,6 +39,7 @@ class FollowerInfoImpl implements FollowerInfo { private final AtomicReference lastRpcResponseTime; private final AtomicReference lastRpcSendTime; private final AtomicReference lastHeartbeatSendTime; + private final AtomicReference lastRespondedAppendEntriesSendTime; private final RaftLogIndex nextIndex; private final RaftLogIndex matchIndex = new RaftLogIndex("matchIndex", RaftLog.INVALID_LOG_INDEX); private final RaftLogIndex commitIndex = new RaftLogIndex("commitIndex", RaftLog.INVALID_LOG_INDEX); @@ -57,6 +58,7 @@ class FollowerInfoImpl implements FollowerInfo { this.lastRpcResponseTime = new AtomicReference<>(lastRpcTime); this.lastRpcSendTime = new AtomicReference<>(lastRpcTime); this.lastHeartbeatSendTime = new AtomicReference<>(lastRpcTime); + this.lastRespondedAppendEntriesSendTime = new AtomicReference<>(lastRpcTime); this.nextIndex = new RaftLogIndex("nextIndex", nextIndex); this.caughtUp = caughtUp; } @@ -202,4 +204,15 @@ public Timestamp getLastRpcTime() { public Timestamp getLastHeartbeatSendTime() { return lastHeartbeatSendTime.get(); } + + @Override + public Timestamp getLastRespondedAppendEntriesSendTime() { + return lastRespondedAppendEntriesSendTime.get(); + } + + @Override + public void updateLastAppendEntriesResponseTime(Timestamp sendTime) { + updateLastRpcResponseTime(); + lastRespondedAppendEntriesSendTime.set(sendTime); + } } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java index 7ea4d738dc..f677da3630 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java @@ -79,6 +79,7 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.LongSupplier; import java.util.function.Predicate; import java.util.function.ToLongFunction; @@ -322,6 +323,9 @@ private List getFollowerInfos(PeerConfiguration peers) { private final PendingStepDown pendingStepDown; private final ReadIndexHeartbeats readIndexHeartbeats; + private final long leaseTimeoutMs; + // TODO invalidate leader lease when stepDown / transferLeader + private final AtomicReference lease = new AtomicReference<>(Timestamp.currentTime()); LeaderStateImpl(RaftServerImpl server) { this.name = server.getMemberId() + "-" + JavaUtils.getClassSimpleName(getClass()); @@ -343,6 +347,14 @@ private List getFollowerInfos(PeerConfiguration peers) { this.messageStreamRequests = new MessageStreamRequests(server.getMemberId()); this.pendingStepDown = new PendingStepDown(this); this.readIndexHeartbeats = new ReadIndexHeartbeats(); + + final double leaseRatio = RaftServerConfigKeys.Read.leaderLeaseTimeoutRatio(properties); + Preconditions.assertTrue(leaseRatio > 0.0 && leaseRatio <= 1.0, + "leader ratio should sit in (0,1], now is " + leaseRatio); + this.leaseTimeoutMs = RaftServerConfigKeys.Rpc.timeoutMin(properties) + .multiply(leaseRatio) + .toIntExact(TimeUnit.MILLISECONDS); + long maxPendingRequests = RaftServerConfigKeys.Write.elementLimit(properties); double followerGapRatioMax = RaftServerConfigKeys.Write.followerGapRatioMax(properties); @@ -1127,6 +1139,47 @@ public void onAppendEntriesReply(LogAppender appender, RaftProtos.AppendEntriesR readIndexHeartbeats.onAppendEntriesReply(appender, reply, this::hasMajority); } + boolean hasValidLeaderLease() { + if (checkLeaderLease()) { + return true; + } + + extendLeaderLease(); + return checkLeaderLease(); + } + + private boolean checkLeaderLease() { + return isReady() && lease.get().elapsedTimeMs() < leaseTimeoutMs; + } + + private void extendLeaderLease() { + final List activePeers = new ArrayList<>(); + final List lastRespondedAppendEntriesSendTimes = new ArrayList<>(); + final RaftConfigurationImpl conf = server.getRaftConf(); + final CurrentOldFollowerInfos info = followerInfoMap.getFollowerInfos(conf); + final List currentFollowers = info.getCurrent(); + + for (FollowerInfo follower: currentFollowers) { + final Timestamp lastRespondedAppendEntriesSendTime = follower.getLastRespondedAppendEntriesSendTime(); + lastRespondedAppendEntriesSendTimes.add(lastRespondedAppendEntriesSendTime); + if (lastRespondedAppendEntriesSendTime.elapsedTimeMs() < leaseTimeoutMs) { + activePeers.add(follower.getId()); + } + } + + if (conf.hasMajority(activePeers, server.getId())) { + // can extend leader lease + if (lastRespondedAppendEntriesSendTimes.isEmpty()) { + lease.set(Timestamp.currentTime()); + } else { + Collections.sort(lastRespondedAppendEntriesSendTimes); + final Timestamp newLease = + lastRespondedAppendEntriesSendTimes.get(lastRespondedAppendEntriesSendTimes.size() / 2); + lease.set(newLease); + } + } + } + void replyPendingRequest(long logIndex, RaftClientReply reply) { pendingRequests.replyPendingRequest(logIndex, reply); } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java index 8f71f91fcf..3111a82a3b 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java @@ -26,6 +26,7 @@ import org.apache.ratis.server.raftlog.RaftLogIOException; import org.apache.ratis.server.util.ServerStringUtils; import org.apache.ratis.statemachine.SnapshotInfo; +import org.apache.ratis.util.Timestamp; import java.io.IOException; import java.io.InterruptedIOException; @@ -73,9 +74,10 @@ private AppendEntriesReplyProto sendAppendEntriesWithRetries() } resetHeartbeatTrigger(); + final Timestamp sendTime = Timestamp.currentTime(); getFollower().updateLastRpcSendTime(request.getEntriesCount() == 0); final AppendEntriesReplyProto r = getServerRpc().appendEntries(request); - getFollower().updateLastRpcResponseTime(); + getFollower().updateLastAppendEntriesResponseTime(sendTime); getLeaderState().onFollowerCommitIndex(getFollower(), r.getFollowerCommit()); return r;