Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RATIS-2102. AsyncApi#send() is not handling retry and reply correctly for replication levels higher than MAJORITY #1104

Merged
merged 2 commits into from
May 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -832,20 +832,14 @@ private CompletableFuture<RaftClientReply> appendTransaction(
leaderState.notifySenders();
}

final CompletableFuture<RaftClientReply> future = pending.getFuture();
if (request.is(TypeCase.WRITE)) {
// check replication
final ReplicationLevel replication = request.getType().getWrite().getReplication();
if (replication != ReplicationLevel.MAJORITY) {
return future.thenCompose(reply -> waitForReplication(reply, replication));
}
}

return future;
return pending.getFuture();
}

/** Wait until the given replication requirement is satisfied. */
private CompletableFuture<RaftClientReply> waitForReplication(RaftClientReply reply, ReplicationLevel replication) {
if (!reply.isSuccess()) {
return CompletableFuture.completedFuture(reply);
}
final RaftClientRequest.Type type = RaftClientRequest.watchRequestType(reply.getLogIndex(), replication);
final RaftClientRequest watch = RaftClientRequest.newBuilder()
.setServerId(reply.getServerId())
Expand All @@ -854,7 +848,24 @@ private CompletableFuture<RaftClientReply> waitForReplication(RaftClientReply re
.setCallId(reply.getCallId())
.setType(type)
.build();
return watchAsync(watch).thenApply(r -> reply);
return watchAsync(watch).thenApply(watchReply -> combineReplies(reply, watchReply));
}

private RaftClientReply combineReplies(RaftClientReply reply, RaftClientReply watchReply) {
final RaftClientReply combinedReply = RaftClientReply.newBuilder()
.setServerId(getMemberId())
// from write reply
.setClientId(reply.getClientId())
.setCallId(reply.getCallId())
.setMessage(reply.getMessage())
.setLogIndex(reply.getLogIndex())
// from watchReply
.setSuccess(watchReply.isSuccess())
.setException(watchReply.getException())
.setCommitInfos(watchReply.getCommitInfos())
.build();
LOG.debug("combinedReply={}", combinedReply);
return combinedReply;
}

void stepDownOnJvmPause() {
Expand Down Expand Up @@ -934,6 +945,19 @@ private CompletableFuture<RaftClientReply> replyFuture(ReferenceCountedObject<Ra
}

private CompletableFuture<RaftClientReply> writeAsync(ReferenceCountedObject<RaftClientRequest> requestRef) {
final RaftClientRequest request = requestRef.get();
final CompletableFuture<RaftClientReply> future = writeAsyncImpl(requestRef);
if (request.is(TypeCase.WRITE)) {
// check replication
final ReplicationLevel replication = request.getType().getWrite().getReplication();
if (replication != ReplicationLevel.MAJORITY) {
return future.thenCompose(r -> waitForReplication(r, replication));
}
}
return future;
}

private CompletableFuture<RaftClientReply> writeAsyncImpl(ReferenceCountedObject<RaftClientRequest> requestRef) {
final RaftClientRequest request = requestRef.get();
final CompletableFuture<RaftClientReply> reply = checkLeaderState(request);
if (reply != null) {
Expand Down
33 changes: 33 additions & 0 deletions ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.proto.RaftProtos.CommitInfoProto;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.proto.RaftProtos.ReplicationLevel;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.exceptions.AlreadyClosedException;
Expand Down Expand Up @@ -357,6 +358,38 @@ void runTestStaleReadAsync(CLUSTER cluster) throws Exception {
}
}

@Test
public void testWriteAsyncCustomReplicationLevel() throws Exception {
// verify that send(msg, ALL_COMMITTED) would reply with all servers committed past the log index
runWithNewCluster(NUM_SERVERS, this::runTestWriteAsyncCustomReplicationLevel);
}

void runTestWriteAsyncCustomReplicationLevel(CLUSTER cluster) throws Exception {
final int numMessages = 20;
try (RaftClient client = cluster.createClient()) {
RaftTestUtil.waitForLeader(cluster);

// submit some messages
for (int i = 0; i < numMessages; i++) {
final String s = "" + i;
LOG.info("sendAsync with ALL_COMMITTED " + s);
client.async().send(new SimpleMessage(s), ReplicationLevel.ALL_COMMITTED).whenComplete((reply, exception) -> {
if (exception != null) {
LOG.error("Failed to send message " + s, exception);
// reply should be null in case of exception
Assert.assertNull(reply);
return;
}
Assert.assertTrue(reply.isSuccess());
Assert.assertNull(reply.getException());
// verify that all servers have caught up to log index when the reply is returned
reply.getCommitInfos().forEach(commitInfoProto ->
Assert.assertTrue(commitInfoProto.getCommitIndex() >= reply.getLogIndex()));
});
}
}
}

@Test
public void testRequestTimeout() throws Exception {
final TimeDuration oldExpiryTime = RaftServerConfigKeys.RetryCache.expiryTime(getProperties());
Expand Down
Loading