Skip to content

Commit

Permalink
implement leader lease
Browse files Browse the repository at this point in the history
  • Loading branch information
SzyWilliam committed Sep 11, 2023
1 parent 078b115 commit ec7dc95
Show file tree
Hide file tree
Showing 9 changed files with 223 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,9 @@ public void onNext(AppendEntriesReplyProto reply) {
AppendEntriesRequest request = pendingRequests.remove(reply);
if (request != null) {
request.stopRequestTimer(); // Update completion time
getFollower().updateLastAppendEntriesResponseTime(request.getSendTime()); // Update the last rpc time
} else {
getFollower().updateLastRpcResponseTime();
}

if (LOG.isDebugEnabled()) {
Expand All @@ -407,8 +410,6 @@ public void onNext(AppendEntriesReplyProto reply) {
}

private void onNextImpl(AppendEntriesReplyProto reply) {
// update the last rpc time
getFollower().updateLastRpcResponseTime();
errCount.set(0);

if (!firstResponseReceived) {
Expand Down Expand Up @@ -770,6 +771,8 @@ static class AppendEntriesRequest {

private final TermIndex lastEntry;

private volatile Timestamp sendTime;

AppendEntriesRequest(AppendEntriesRequestProto proto, RaftPeerId followerId, GrpcServerMetrics grpcServerMetrics) {
this.callId = proto.getServerRequest().getCallId();
this.previousLog = proto.hasPreviousLog()? TermIndex.valueOf(proto.getPreviousLog()): null;
Expand All @@ -788,8 +791,13 @@ TermIndex getPreviousLog() {
return previousLog;
}

public Timestamp getSendTime() {
return sendTime;
}

void startRequestTimer() {
timerContext = timer.time();
sendTime = Timestamp.currentTime();
}

void stopRequestTimer() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,4 +101,10 @@ public interface FollowerInfo {

/** @return the latest heartbeat send time. */
Timestamp getLastHeartbeatSendTime();

/** @return the send time of last responded rpc */
Timestamp getLastRespondedAppendEntriesSendTime();

/** Update lastRpcResponseTime and LastRespondedAppendEntriesSendTime */
void updateLastAppendEntriesResponseTime(Timestamp sendTime);
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,13 @@ class DivisionPropertiesImpl implements DivisionProperties {
private final TimeDuration rpcTimeoutMax;
private final TimeDuration rpcSleepTime;
private final TimeDuration rpcSlownessTimeout;
private final TimeDuration leaderLeaseTimeout;

DivisionPropertiesImpl(RaftProperties properties) {
this.rpcTimeoutMin = RaftServerConfigKeys.Rpc.timeoutMin(properties);
this.rpcTimeoutMax = RaftServerConfigKeys.Rpc.timeoutMax(properties);
Preconditions.assertTrue(rpcTimeoutMax.compareTo(rpcTimeoutMin) >= 0,
"rpcTimeoutMax = %s < rpcTimeoutMin = %s", rpcTimeoutMax, rpcTimeoutMin);

final double leaderLeaseTimeoutRatio = RaftServerConfigKeys.Read.leaderLeaseTimeoutRatio(properties);
this.leaderLeaseTimeout = this.rpcTimeoutMin.multiply(leaderLeaseTimeoutRatio);
Preconditions.assertTrue(rpcTimeoutMin.compareTo(leaderLeaseTimeout) >= 0,
"rpcTimeoutMin = %s < leaderLeaseTimeout = %s", rpcTimeoutMin, leaderLeaseTimeout);

this.rpcSleepTime = RaftServerConfigKeys.Rpc.sleepTime(properties);
this.rpcSlownessTimeout = RaftServerConfigKeys.Rpc.slownessTimeout(properties);
}
Expand All @@ -55,11 +49,6 @@ public TimeDuration maxRpcTimeout() {
return rpcTimeoutMax;
}

/** @return the ratio of leader lease timeout */
public TimeDuration leaderLeaseTimeout() {
return leaderLeaseTimeout;
}

@Override
public TimeDuration rpcSleepTime() {
return rpcSleepTime;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ class FollowerInfoImpl implements FollowerInfo {
private final AtomicReference<Timestamp> lastRpcResponseTime;
private final AtomicReference<Timestamp> lastRpcSendTime;
private final AtomicReference<Timestamp> lastHeartbeatSendTime;
private final AtomicReference<Timestamp> lastRespondedAppendEntriesSendTime;
private final RaftLogIndex nextIndex;
private final RaftLogIndex matchIndex = new RaftLogIndex("matchIndex", RaftLog.INVALID_LOG_INDEX);
private final RaftLogIndex commitIndex = new RaftLogIndex("commitIndex", RaftLog.INVALID_LOG_INDEX);
Expand All @@ -57,6 +58,7 @@ class FollowerInfoImpl implements FollowerInfo {
this.lastRpcResponseTime = new AtomicReference<>(lastRpcTime);
this.lastRpcSendTime = new AtomicReference<>(lastRpcTime);
this.lastHeartbeatSendTime = new AtomicReference<>(lastRpcTime);
this.lastRespondedAppendEntriesSendTime = new AtomicReference<>(lastRpcTime);
this.nextIndex = new RaftLogIndex("nextIndex", nextIndex);
this.caughtUp = caughtUp;
}
Expand Down Expand Up @@ -202,4 +204,15 @@ public Timestamp getLastRpcTime() {
public Timestamp getLastHeartbeatSendTime() {
return lastHeartbeatSendTime.get();
}

@Override
public Timestamp getLastRespondedAppendEntriesSendTime() {
return lastRespondedAppendEntriesSendTime.get();
}

@Override
public void updateLastAppendEntriesResponseTime(Timestamp sendTime) {
updateLastRpcResponseTime();
lastRespondedAppendEntriesSendTime.set(sendTime);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ratis.server.impl;

import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.leader.FollowerInfo;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.Timestamp;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import java.util.stream.Stream;

public class LeaderLease {

private final long leaseTimeoutMs;
// TODO invalidate leader lease when stepDown / transferLeader
private final AtomicReference<Timestamp> lease = new AtomicReference<>(Timestamp.currentTime());

public LeaderLease(RaftProperties properties) {
final double leaseRatio = RaftServerConfigKeys.Read.leaderLeaseTimeoutRatio(properties);
Preconditions.assertTrue(leaseRatio > 0.0 && leaseRatio <= 1.0,
"leader ratio should sit in (0,1], now is " + leaseRatio);
this.leaseTimeoutMs = RaftServerConfigKeys.Rpc.timeoutMin(properties)
.multiply(leaseRatio)
.toIntExact(TimeUnit.MILLISECONDS);
}

boolean isValid() {
return lease.get().elapsedTimeMs() < leaseTimeoutMs;
}

void extendLeaderLease(Stream<FollowerInfo> allFollowers, Predicate<List<RaftPeerId>> hasMajority) {
// check the latest heartbeats of all peers (including those in transitional)
final List<RaftPeerId> activePeers = new ArrayList<>();
final List<Timestamp> lastRespondedAppendEntriesSendTimes = new ArrayList<>();

allFollowers.forEach(follower -> {
final Timestamp lastRespondedAppendEntriesSendTime = follower.getLastRespondedAppendEntriesSendTime();
lastRespondedAppendEntriesSendTimes.add(lastRespondedAppendEntriesSendTime);
if (lastRespondedAppendEntriesSendTime.elapsedTimeMs() < leaseTimeoutMs) {
activePeers.add(follower.getId());
}
});

if (hasMajority.test(activePeers)) {
// can extend leader lease
if (lastRespondedAppendEntriesSendTimes.isEmpty()) {
lease.set(Timestamp.currentTime());
} else {
Collections.sort(lastRespondedAppendEntriesSendTimes);
final Timestamp newLease =
lastRespondedAppendEntriesSendTimes.get(lastRespondedAppendEntriesSendTimes.size() / 2);
lease.set(newLease);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,11 @@ List<FollowerInfo> getCurrent() {
List<FollowerInfo> getOld() {
return old;
}

Stream<FollowerInfo> getCurrentAndOld() {
return Stream.concat(current.stream(),
Optional.ofNullable(old).map(List::stream).orElse(Stream.empty()));
}
}

static boolean isSameSize(List<FollowerInfo> infos, PeerConfiguration conf) {
Expand Down Expand Up @@ -348,6 +353,7 @@ boolean isApplied(LogEntryProto logEntry) {
private final PendingStepDown pendingStepDown;

private final ReadIndexHeartbeats readIndexHeartbeats;
private final LeaderLease lease;

LeaderStateImpl(RaftServerImpl server) {
this.name = server.getMemberId() + "-" + JavaUtils.getClassSimpleName(getClass());
Expand All @@ -369,6 +375,7 @@ boolean isApplied(LogEntryProto logEntry) {
this.messageStreamRequests = new MessageStreamRequests(server.getMemberId());
this.pendingStepDown = new PendingStepDown(this);
this.readIndexHeartbeats = new ReadIndexHeartbeats();
this.lease = new LeaderLease(properties);
long maxPendingRequests = RaftServerConfigKeys.Write.elementLimit(properties);
double followerGapRatioMax = RaftServerConfigKeys.Write.followerGapRatioMax(properties);

Expand Down Expand Up @@ -1127,6 +1134,23 @@ public void onAppendEntriesReply(LogAppender appender, RaftProtos.AppendEntriesR
readIndexHeartbeats.onAppendEntriesReply(appender, reply, this::hasMajority);
}

boolean hasLease() {
if (checkLeaderLease()) {
return true;
}

// try extending the leader lease
final RaftConfigurationImpl conf = server.getRaftConf();
final CurrentOldFollowerInfos info = followerInfoMap.getFollowerInfos(conf);
lease.extendLeaderLease(info.getCurrentAndOld(), peers -> conf.hasMajority(peers, server.getId()));

return checkLeaderLease();
}

private boolean checkLeaderLease() {
return isReady() && (server.getRaftConf().isSingleton() || lease.isValid());
}

void replyPendingRequest(long logIndex, RaftClientReply reply) {
pendingRequests.replyPendingRequest(logIndex, reply);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.ratis.server.raftlog.RaftLogIOException;
import org.apache.ratis.server.util.ServerStringUtils;
import org.apache.ratis.statemachine.SnapshotInfo;
import org.apache.ratis.util.Timestamp;

import java.io.IOException;
import java.io.InterruptedIOException;
Expand Down Expand Up @@ -73,9 +74,10 @@ private AppendEntriesReplyProto sendAppendEntriesWithRetries()
}

resetHeartbeatTrigger();
final Timestamp sendTime = Timestamp.currentTime();
getFollower().updateLastRpcSendTime(request.getEntriesCount() == 0);
final AppendEntriesReplyProto r = getServerRpc().appendEntries(request);
getFollower().updateLastRpcResponseTime();
getFollower().updateLastAppendEntriesResponseTime(sendTime);

getLeaderState().onFollowerCommitIndex(getFollower(), r.getFollowerCommit());
return r;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.ratis.util.Slf4jUtils;
import org.apache.ratis.util.TimeDuration;
import org.apache.ratis.util.Timestamp;
import org.apache.ratis.util.function.CheckedBiConsumer;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
Expand Down Expand Up @@ -640,6 +641,87 @@ void runTestPauseResumeLeaderElection(CLUSTER cluster) throws IOException, Inter
}
}

private void runLeaseTest(CLUSTER cluster, CheckedBiConsumer<CLUSTER, Long, Exception> testCase) throws Exception {
final double leaseRatio = RaftServerConfigKeys.Read.leaderLeaseTimeoutRatio(getProperties());
final long leaseTimeoutMs = RaftServerConfigKeys.Rpc.timeoutMin(getProperties())
.multiply(leaseRatio)
.toIntExact(TimeUnit.MILLISECONDS);
testCase.accept(cluster, leaseTimeoutMs);
}

@Test
public void testLeaderLease() throws Exception {
// use a strict lease
RaftServerConfigKeys.Read.setLeaderLeaseTimeoutRatio(getProperties(), 0.5);
runWithNewCluster(3, c -> runLeaseTest(c, this::runTestLeaderLease));
}

void runTestLeaderLease(CLUSTER cluster, long leaseTimeoutMs) throws Exception {
final RaftServer.Division leader = RaftTestUtil.waitForLeader(cluster);
try (final RaftClient client = cluster.createClient(leader.getId())) {
client.io().send(new RaftTestUtil.SimpleMessage("message"));

Assert.assertTrue(leader.getInfo().isLeader());
Assert.assertTrue(leader.getInfo().isLeaderReady());
RaftServerTestUtil.assertLeaderLease(leader, true);

isolate(cluster, leader.getId());
Thread.sleep(leaseTimeoutMs);

Assert.assertTrue(leader.getInfo().isLeader());
Assert.assertTrue(leader.getInfo().isLeaderReady());
RaftServerTestUtil.assertLeaderLease(leader, false);
} finally {
deIsolate(cluster, leader.getId());
}
}

@Test
public void testLeaderLeaseDuringReconfiguration() throws Exception {
// use a strict lease
RaftServerConfigKeys.Read.setLeaderLeaseTimeoutRatio(getProperties(), 0.5);
runWithNewCluster(3, c -> runLeaseTest(c, this::runTestLeaderLeaseDuringReconfiguration));
}

void runTestLeaderLeaseDuringReconfiguration(CLUSTER cluster, long leaseTimeoutMs) throws Exception {
final RaftServer.Division leader = RaftTestUtil.waitForLeader(cluster);
try (final RaftClient client = cluster.createClient(leader.getId())) {
client.io().send(new RaftTestUtil.SimpleMessage("message"));

Assert.assertTrue(leader.getInfo().isLeader());
Assert.assertTrue(leader.getInfo().isLeaderReady());
RaftServerTestUtil.assertLeaderLease(leader, true);

final List<RaftServer.Division> followers = cluster.getFollowers();
final MiniRaftCluster.PeerChanges changes = cluster.addNewPeers(2, true);

// blocking the original 2 followers
BlockRequestHandlingInjection.getInstance().blockReplier(followers.get(0).getId().toString());
BlockRequestHandlingInjection.getInstance().blockReplier(followers.get(1).getId().toString());

// start reconfiguration in another thread, shall fail eventually
new Thread(() -> {
try {
client.admin().setConfiguration(changes.allPeersInNewConf);
} catch (IOException e) {
System.out.println("as expected: " + e.getMessage());
}
}).start();

Thread.sleep(leaseTimeoutMs);

Assert.assertTrue(leader.getInfo().isLeader());
Assert.assertTrue(leader.getInfo().isLeaderReady());
RaftServerTestUtil.assertLeaderLease(leader, false);

} finally {
BlockRequestHandlingInjection.getInstance().unblockAll();
}
}




private static RaftServerImpl createMockServer(boolean alive) {
final DivisionInfo info = mock(DivisionInfo.class);
when(info.isAlive()).thenReturn(alive);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,12 @@ public static Stream<LogAppender> getLogAppenders(RaftServer.Division server) {
return getLeaderState(server).map(LeaderStateImpl::getLogAppenders).orElse(null);
}

public static void assertLeaderLease(RaftServer.Division leader, boolean hasLease) {
final LeaderStateImpl l = getLeaderState(leader).orElse(null);
Assert.assertNotNull(l);
Assert.assertEquals(l.hasLease(), hasLease);
}

public static void restartLogAppenders(RaftServer.Division server) {
final LeaderStateImpl leaderState = getLeaderState(server).orElseThrow(
() -> new IllegalStateException(server + " is not the leader"));
Expand Down

0 comments on commit ec7dc95

Please sign in to comment.