Skip to content

Commit

Permalink
RATIS-2102. AsyncApi#send() is not handling retry and reply correctly…
Browse files Browse the repository at this point in the history
… for replication levels higher than MAJORITY (#1104)
  • Loading branch information
smengcl authored May 29, 2024
1 parent dd75ffb commit bd4ab14
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -832,20 +832,14 @@ private CompletableFuture<RaftClientReply> appendTransaction(
leaderState.notifySenders();
}

final CompletableFuture<RaftClientReply> future = pending.getFuture();
if (request.is(TypeCase.WRITE)) {
// check replication
final ReplicationLevel replication = request.getType().getWrite().getReplication();
if (replication != ReplicationLevel.MAJORITY) {
return future.thenCompose(reply -> waitForReplication(reply, replication));
}
}

return future;
return pending.getFuture();
}

/** Wait until the given replication requirement is satisfied. */
private CompletableFuture<RaftClientReply> waitForReplication(RaftClientReply reply, ReplicationLevel replication) {
if (!reply.isSuccess()) {
return CompletableFuture.completedFuture(reply);
}
final RaftClientRequest.Type type = RaftClientRequest.watchRequestType(reply.getLogIndex(), replication);
final RaftClientRequest watch = RaftClientRequest.newBuilder()
.setServerId(reply.getServerId())
Expand All @@ -854,7 +848,24 @@ private CompletableFuture<RaftClientReply> waitForReplication(RaftClientReply re
.setCallId(reply.getCallId())
.setType(type)
.build();
return watchAsync(watch).thenApply(r -> reply);
return watchAsync(watch).thenApply(watchReply -> combineReplies(reply, watchReply));
}

private RaftClientReply combineReplies(RaftClientReply reply, RaftClientReply watchReply) {
final RaftClientReply combinedReply = RaftClientReply.newBuilder()
.setServerId(getMemberId())
// from write reply
.setClientId(reply.getClientId())
.setCallId(reply.getCallId())
.setMessage(reply.getMessage())
.setLogIndex(reply.getLogIndex())
// from watchReply
.setSuccess(watchReply.isSuccess())
.setException(watchReply.getException())
.setCommitInfos(watchReply.getCommitInfos())
.build();
LOG.debug("combinedReply={}", combinedReply);
return combinedReply;
}

void stepDownOnJvmPause() {
Expand Down Expand Up @@ -934,6 +945,19 @@ private CompletableFuture<RaftClientReply> replyFuture(ReferenceCountedObject<Ra
}

private CompletableFuture<RaftClientReply> writeAsync(ReferenceCountedObject<RaftClientRequest> requestRef) {
final RaftClientRequest request = requestRef.get();
final CompletableFuture<RaftClientReply> future = writeAsyncImpl(requestRef);
if (request.is(TypeCase.WRITE)) {
// check replication
final ReplicationLevel replication = request.getType().getWrite().getReplication();
if (replication != ReplicationLevel.MAJORITY) {
return future.thenCompose(r -> waitForReplication(r, replication));
}
}
return future;
}

private CompletableFuture<RaftClientReply> writeAsyncImpl(ReferenceCountedObject<RaftClientRequest> requestRef) {
final RaftClientRequest request = requestRef.get();
final CompletableFuture<RaftClientReply> reply = checkLeaderState(request);
if (reply != null) {
Expand Down
33 changes: 33 additions & 0 deletions ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.proto.RaftProtos.CommitInfoProto;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.proto.RaftProtos.ReplicationLevel;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.exceptions.AlreadyClosedException;
Expand Down Expand Up @@ -357,6 +358,38 @@ void runTestStaleReadAsync(CLUSTER cluster) throws Exception {
}
}

@Test
public void testWriteAsyncCustomReplicationLevel() throws Exception {
// verify that send(msg, ALL_COMMITTED) would reply with all servers committed past the log index
runWithNewCluster(NUM_SERVERS, this::runTestWriteAsyncCustomReplicationLevel);
}

void runTestWriteAsyncCustomReplicationLevel(CLUSTER cluster) throws Exception {
final int numMessages = 20;
try (RaftClient client = cluster.createClient()) {
RaftTestUtil.waitForLeader(cluster);

// submit some messages
for (int i = 0; i < numMessages; i++) {
final String s = "" + i;
LOG.info("sendAsync with ALL_COMMITTED " + s);
client.async().send(new SimpleMessage(s), ReplicationLevel.ALL_COMMITTED).whenComplete((reply, exception) -> {
if (exception != null) {
LOG.error("Failed to send message " + s, exception);
// reply should be null in case of exception
Assert.assertNull(reply);
return;
}
Assert.assertTrue(reply.isSuccess());
Assert.assertNull(reply.getException());
// verify that all servers have caught up to log index when the reply is returned
reply.getCommitInfos().forEach(commitInfoProto ->
Assert.assertTrue(commitInfoProto.getCommitIndex() >= reply.getLogIndex()));
});
}
}
}

@Test
public void testRequestTimeout() throws Exception {
final TimeDuration oldExpiryTime = RaftServerConfigKeys.RetryCache.expiryTime(getProperties());
Expand Down

0 comments on commit bd4ab14

Please sign in to comment.