Skip to content

Commit

Permalink
maintain leader lease
Browse files Browse the repository at this point in the history
  • Loading branch information
SzyWilliam committed Aug 4, 2023
1 parent 97220fe commit db098b6
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,7 @@ public void onNext(AppendEntriesReplyProto reply) {
AppendEntriesRequest request = pendingRequests.remove(reply);
if (request != null) {
request.stopRequestTimer(); // Update completion time
getFollower().updateLastAppendEntriesResponseTime(request.getSendTime()); // Update the last rpc time
}

if (LOG.isDebugEnabled()) {
Expand All @@ -385,8 +386,6 @@ public void onNext(AppendEntriesReplyProto reply) {
}

private void onNextImpl(AppendEntriesReplyProto reply) {
// update the last rpc time
getFollower().updateLastRpcResponseTime();
errCount.set(0);

if (!firstResponseReceived) {
Expand Down Expand Up @@ -747,6 +746,8 @@ static class AppendEntriesRequest {

private final TermIndex lastEntry;

private Timestamp sendTime;

AppendEntriesRequest(AppendEntriesRequestProto proto, RaftPeerId followerId, GrpcServerMetrics grpcServerMetrics) {
this.callId = proto.getServerRequest().getCallId();
this.previousLog = proto.hasPreviousLog()? TermIndex.valueOf(proto.getPreviousLog()): null;
Expand All @@ -765,8 +766,13 @@ TermIndex getPreviousLog() {
return previousLog;
}

public Timestamp getSendTime() {
return sendTime;
}

void startRequestTimer() {
timerContext = timer.time();
sendTime = Timestamp.currentTime();
}

void stopRequestTimer() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,4 +101,10 @@ public interface FollowerInfo {

/** @return the latest heartbeat send time. */
Timestamp getLastHeartbeatSendTime();

/** @return the send time of last responded rpc */
Timestamp getLastRespondedAppendEntriesSendTime();

/** Update lastRpcResponseTime and LastRespondedAppendEntriesSendTime */
void updateLastAppendEntriesResponseTime(Timestamp sendTime);
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,13 @@ class DivisionPropertiesImpl implements DivisionProperties {
private final TimeDuration rpcTimeoutMax;
private final TimeDuration rpcSleepTime;
private final TimeDuration rpcSlownessTimeout;
private final TimeDuration leaderLeaseTimeout;

DivisionPropertiesImpl(RaftProperties properties) {
this.rpcTimeoutMin = RaftServerConfigKeys.Rpc.timeoutMin(properties);
this.rpcTimeoutMax = RaftServerConfigKeys.Rpc.timeoutMax(properties);
Preconditions.assertTrue(rpcTimeoutMax.compareTo(rpcTimeoutMin) >= 0,
"rpcTimeoutMax = %s < rpcTimeoutMin = %s", rpcTimeoutMax, rpcTimeoutMin);

final double leaderLeaseTimeoutRatio = RaftServerConfigKeys.Read.leaderLeaseTimeoutRatio(properties);
this.leaderLeaseTimeout = this.rpcTimeoutMin.multiply(leaderLeaseTimeoutRatio);
Preconditions.assertTrue(rpcTimeoutMin.compareTo(leaderLeaseTimeout) >= 0,
"rpcTimeoutMin = %s < leaderLeaseTimeout = %s", rpcTimeoutMin, leaderLeaseTimeout);

this.rpcSleepTime = RaftServerConfigKeys.Rpc.sleepTime(properties);
this.rpcSlownessTimeout = RaftServerConfigKeys.Rpc.slownessTimeout(properties);
}
Expand All @@ -55,11 +49,6 @@ public TimeDuration maxRpcTimeout() {
return rpcTimeoutMax;
}

/** @return the ratio of leader lease timeout */
public TimeDuration leaderLeaseTimeout() {
return leaderLeaseTimeout;
}

@Override
public TimeDuration rpcSleepTime() {
return rpcSleepTime;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ class FollowerInfoImpl implements FollowerInfo {
private final AtomicReference<Timestamp> lastRpcResponseTime;
private final AtomicReference<Timestamp> lastRpcSendTime;
private final AtomicReference<Timestamp> lastHeartbeatSendTime;
private final AtomicReference<Timestamp> lastRespondedAppendEntriesSendTime;
private final RaftLogIndex nextIndex;
private final RaftLogIndex matchIndex = new RaftLogIndex("matchIndex", RaftLog.INVALID_LOG_INDEX);
private final RaftLogIndex commitIndex = new RaftLogIndex("commitIndex", RaftLog.INVALID_LOG_INDEX);
Expand All @@ -57,6 +58,7 @@ class FollowerInfoImpl implements FollowerInfo {
this.lastRpcResponseTime = new AtomicReference<>(lastRpcTime);
this.lastRpcSendTime = new AtomicReference<>(lastRpcTime);
this.lastHeartbeatSendTime = new AtomicReference<>(lastRpcTime);
this.lastRespondedAppendEntriesSendTime = new AtomicReference<>(lastRpcTime);
this.nextIndex = new RaftLogIndex("nextIndex", nextIndex);
this.caughtUp = caughtUp;
}
Expand Down Expand Up @@ -202,4 +204,15 @@ public Timestamp getLastRpcTime() {
public Timestamp getLastHeartbeatSendTime() {
return lastHeartbeatSendTime.get();
}

@Override
public Timestamp getLastRespondedAppendEntriesSendTime() {
return lastRespondedAppendEntriesSendTime.get();
}

@Override
public void updateLastAppendEntriesResponseTime(Timestamp sendTime) {
updateLastRpcResponseTime();
lastRespondedAppendEntriesSendTime.set(sendTime);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.LongSupplier;
import java.util.function.Predicate;
import java.util.function.ToLongFunction;
Expand Down Expand Up @@ -322,6 +323,9 @@ private List<FollowerInfo> getFollowerInfos(PeerConfiguration peers) {
private final PendingStepDown pendingStepDown;

private final ReadIndexHeartbeats readIndexHeartbeats;
private final long leaseTimeoutMs;
// TODO invalidate leader lease when stepDown / transferLeader
private final AtomicReference<Timestamp> lease = new AtomicReference<>(Timestamp.currentTime());

LeaderStateImpl(RaftServerImpl server) {
this.name = server.getMemberId() + "-" + JavaUtils.getClassSimpleName(getClass());
Expand All @@ -343,6 +347,14 @@ private List<FollowerInfo> getFollowerInfos(PeerConfiguration peers) {
this.messageStreamRequests = new MessageStreamRequests(server.getMemberId());
this.pendingStepDown = new PendingStepDown(this);
this.readIndexHeartbeats = new ReadIndexHeartbeats();

final double leaseRatio = RaftServerConfigKeys.Read.leaderLeaseTimeoutRatio(properties);
Preconditions.assertTrue(leaseRatio > 0.0 && leaseRatio <= 1.0,
"leader ratio should sit in (0,1], now is " + leaseRatio);
this.leaseTimeoutMs = RaftServerConfigKeys.Rpc.timeoutMin(properties)
.multiply(leaseRatio)
.toIntExact(TimeUnit.MILLISECONDS);

long maxPendingRequests = RaftServerConfigKeys.Write.elementLimit(properties);
double followerGapRatioMax = RaftServerConfigKeys.Write.followerGapRatioMax(properties);

Expand Down Expand Up @@ -1127,6 +1139,47 @@ public void onAppendEntriesReply(LogAppender appender, RaftProtos.AppendEntriesR
readIndexHeartbeats.onAppendEntriesReply(appender, reply, this::hasMajority);
}

boolean hasValidLeaderLease() {
if (checkLeaderLease()) {
return true;
}

extendLeaderLease();
return checkLeaderLease();
}

private boolean checkLeaderLease() {
return isReady() && lease.get().elapsedTimeMs() < leaseTimeoutMs;
}

private void extendLeaderLease() {
final List<RaftPeerId> activePeers = new ArrayList<>();
final List<Timestamp> lastRespondedAppendEntriesSendTimes = new ArrayList<>();
final RaftConfigurationImpl conf = server.getRaftConf();
final CurrentOldFollowerInfos info = followerInfoMap.getFollowerInfos(conf);
final List<FollowerInfo> currentFollowers = info.getCurrent();

for (FollowerInfo follower: currentFollowers) {
final Timestamp lastRespondedAppendEntriesSendTime = follower.getLastRespondedAppendEntriesSendTime();
lastRespondedAppendEntriesSendTimes.add(lastRespondedAppendEntriesSendTime);
if (lastRespondedAppendEntriesSendTime.elapsedTimeMs() < leaseTimeoutMs) {
activePeers.add(follower.getId());
}
}

if (conf.hasMajority(activePeers, server.getId())) {
// can extend leader lease
if (lastRespondedAppendEntriesSendTimes.isEmpty()) {
lease.set(Timestamp.currentTime());
} else {
Collections.sort(lastRespondedAppendEntriesSendTimes);
final Timestamp newLease =
lastRespondedAppendEntriesSendTimes.get(lastRespondedAppendEntriesSendTimes.size() / 2);
lease.set(newLease);
}
}
}

void replyPendingRequest(long logIndex, RaftClientReply reply) {
pendingRequests.replyPendingRequest(logIndex, reply);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.ratis.server.raftlog.RaftLogIOException;
import org.apache.ratis.server.util.ServerStringUtils;
import org.apache.ratis.statemachine.SnapshotInfo;
import org.apache.ratis.util.Timestamp;

import java.io.IOException;
import java.io.InterruptedIOException;
Expand Down Expand Up @@ -73,9 +74,10 @@ private AppendEntriesReplyProto sendAppendEntriesWithRetries()
}

resetHeartbeatTrigger();
final Timestamp sendTime = Timestamp.currentTime();
getFollower().updateLastRpcSendTime(request.getEntriesCount() == 0);
final AppendEntriesReplyProto r = getServerRpc().appendEntries(request);
getFollower().updateLastRpcResponseTime();
getFollower().updateLastAppendEntriesResponseTime(sendTime);

getLeaderState().onFollowerCommitIndex(getFollower(), r.getFollowerCommit());
return r;
Expand Down

0 comments on commit db098b6

Please sign in to comment.