diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java index 921bd72ecf25e..5e8ab9afcf38f 100644 --- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java +++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java @@ -1476,6 +1476,7 @@ private boolean hasValidClusterId(String requestClusterId) { * - {@link Errors#INVALID_REQUEST} if the request epoch is larger than the leader's current epoch * or if either the fetch offset or the last fetched epoch is invalid */ + @SuppressWarnings("CyclomaticComplexity") private CompletableFuture handleFetchRequest( RaftRequest.Inbound requestMetadata, long currentTimeMs @@ -1512,6 +1513,9 @@ private CompletableFuture handleFetchRequest( FetchRequest.replicaId(request), fetchPartition.replicaDirectoryId() ); + if (quorum.isLeader()) { + quorum.leaderStateOrThrow().updateLastReceivedFetchRequest(replicaKey, currentTimeMs); + } FetchResponseData response = tryCompleteFetchRequest( requestMetadata.listenerName(), requestMetadata.apiVersion(), @@ -2856,7 +2860,7 @@ private long maybeSendRequests( return minBackoffMs; } - private long maybeSendRequest( + private long maybeSendRequests( long currentTimeMs, Set remoteVoters, Function destinationSupplier, @@ -3043,12 +3047,17 @@ private long maybeSendBeginQuorumEpochRequests( ) ); - timeUntilNextBeginQuorumSend = maybeSendRequest( + Set needToSendBeginQuorumRequest = state.needSendBeginQuorumRequestReplicaKey(currentTimeMs); + Set needToSendBeginQuorumRequestNode = needToSendBeginQuorumRequest.stream(). + map(ReplicaKey::id).collect(Collectors.toSet()); + + timeUntilNextBeginQuorumSend = maybeSendRequests( currentTimeMs, voters .voterKeys() .stream() .filter(key -> key.id() != quorum.localIdOrThrow()) + .filter(key -> !needToSendBeginQuorumRequestNode.contains(key.id())) .collect(Collectors.toSet()), nodeSupplier, this::buildBeginQuorumEpochRequest @@ -3131,7 +3140,7 @@ private long maybeSendVoteRequests( if (!state.epochElection().isVoteRejected()) { VoterSet voters = partitionState.lastVoterSet(); boolean preVote = quorum.isProspective(); - return maybeSendRequest( + return maybeSendRequests( currentTimeMs, state.epochElection().unrecordedVoters(), voterId -> voters diff --git a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java index 7c379275a0e50..9f505c6ef33bf 100644 --- a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java +++ b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java @@ -54,6 +54,8 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.apache.kafka.raft.ReplicaKey.NO_DIRECTORY_ID; + /** * In the context of LeaderState, an acknowledged voter means one who has acknowledged the current leader by either * responding to a `BeginQuorumEpoch` request from the leader or by beginning to send `Fetch` requests. @@ -88,6 +90,7 @@ public class LeaderState implements EpochState { private final Timer beginQuorumEpochTimer; private final int beginQuorumEpochTimeoutMs; private final KafkaRaftMetrics kafkaRaftMetrics; + private final Map lastFetchRequestMs = new HashMap<>(); // This is volatile because resignation can be requested from an external thread. private volatile boolean resignRequested = false; @@ -188,6 +191,22 @@ public void resetBeginQuorumEpochTimer(long currentTimeMs) { beginQuorumEpochTimer.reset(beginQuorumEpochTimeoutMs); } + public void updateLastReceivedFetchRequest(ReplicaKey replicaKey, long currentTimeMs) { + beginQuorumEpochTimer.update(currentTimeMs); + lastFetchRequestMs.put(replicaKey, beginQuorumEpochTimer.currentTimeMs()); + } + + public Set needSendBeginQuorumRequestReplicaKey(long currentTimeMs) { + Set replicaKeys = new HashSet<>(); + beginQuorumEpochTimer.update(currentTimeMs); + for (Map.Entry entry : lastFetchRequestMs.entrySet()) { + if (beginQuorumEpochTimer.currentTimeMs() - entry.getValue() >= beginQuorumEpochTimeoutMs) { + replicaKeys.add(ReplicaKey.of(entry.getKey().id(), entry.getKey().directoryId().orElse(NO_DIRECTORY_ID))); + } + } + return replicaKeys; + } + /** * Get the remaining time in milliseconds until the checkQuorumTimer expires. * diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java index 4687fd3d90376..cd96350316a8f 100644 --- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java @@ -633,6 +633,39 @@ public void testBeginQuorumEpochHeartbeat(boolean withKip853Rpc) throws Exceptio context.assertSentBeginQuorumEpochRequest(epoch, Set.of(remoteId1, remoteId2)); } + @ParameterizedTest + @ValueSource(booleans = { true, false }) + public void testBeginQuorumShouldNotSendAfterFetchRequest(boolean withKip853Rpc) throws Exception { + int localId = randomReplicaId(); + int remoteId1 = localId + 1; + int remoteId2 = localId + 2; + Set voters = Set.of(localId, remoteId1, remoteId2); + ReplicaKey replicaKey = replicaKey(localId + 1, withKip853Rpc); + + RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters) + .withKip853Rpc(withKip853Rpc) + .build(); + + context.unattachedToLeader(); + int epoch = context.currentEpoch(); + assertEquals(OptionalInt.of(localId), context.currentLeader()); + + // begin epoch requests should be sent out every beginQuorumEpochTimeoutMs + context.time.sleep(context.beginQuorumEpochTimeoutMs); + context.client.poll(); + context.assertSentBeginQuorumEpochRequest(epoch, Set.of(remoteId1, remoteId2)); + + context.time.sleep(context.beginQuorumEpochTimeoutMs / 3); + context.deliverRequest(context.fetchRequest(epoch, replicaKey, 0, 0, 0)); + context.pollUntilResponse(); + + context.time.sleep(context.beginQuorumEpochTimeoutMs); + context.client.poll(); + // don't send BeginQuorumEpochRequest again if fetchRequest is sent. + context.assertSentBeginQuorumEpochRequest(epoch, Set.of(remoteId2)); + } + + @ParameterizedTest @ValueSource(booleans = { true, false }) public void testLeaderShouldResignLeadershipIfNotGetFetchRequestFromMajorityVoters(boolean withKip853Rpc) throws Exception {