From 1ec4ac4c73328f92f7330e199c4ce477ca0f3dc2 Mon Sep 17 00:00:00 2001 From: Tsz-Wo Nicholas Sze Date: Sat, 13 Jul 2024 11:19:06 +0800 Subject: [PATCH 1/2] RATIS-1071. NettyClientRpc supports sendRequestAsync. --- .../org/apache/ratis/netty/NettyRpcProxy.java | 6 ++ .../ratis/netty/client/NettyClientRpc.java | 67 ++++++++++++++----- .../ratis/netty/TestNettyClientRpcAsync.java | 67 +++++++++++++++++++ .../ratis/netty/TestRaftAsyncWithNetty.java | 25 +++++++ 4 files changed, 148 insertions(+), 17 deletions(-) create mode 100644 ratis-test/src/test/java/org/apache/ratis/netty/TestNettyClientRpcAsync.java create mode 100644 ratis-test/src/test/java/org/apache/ratis/netty/TestRaftAsyncWithNetty.java diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyRpcProxy.java b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyRpcProxy.java index b9788a8bb4..41269f76e3 100644 --- a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyRpcProxy.java +++ b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyRpcProxy.java @@ -176,6 +176,12 @@ public void close() { connection.close(); } + public CompletableFuture sendAsync(RaftNettyServerRequestProto proto) { + final CompletableFuture reply = new CompletableFuture<>(); + connection.offer(proto, reply); + return reply; + } + public RaftNettyServerReplyProto send( RaftRpcRequestProto request, RaftNettyServerRequestProto proto) throws IOException { diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRpc.java b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRpc.java index c816e29ee8..26ac41f7db 100644 --- a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRpc.java +++ b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRpc.java @@ -28,71 +28,104 @@ import org.apache.ratis.proto.RaftProtos.GroupManagementRequestProto; import org.apache.ratis.proto.RaftProtos.SetConfigurationRequestProto; import org.apache.ratis.proto.netty.NettyProtos.RaftNettyServerRequestProto; +import org.apache.ratis.util.JavaUtils; import java.io.IOException; +import java.util.concurrent.CompletableFuture; public class NettyClientRpc extends RaftClientRpcWithProxy { public NettyClientRpc(ClientId clientId, RaftProperties properties) { super(new NettyRpcProxy.PeerMap(clientId.toString(), properties)); } + @Override + public CompletableFuture sendRequestAsync(RaftClientRequest request) { + final RaftPeerId serverId = request.getServerId(); + try { + final NettyRpcProxy proxy = getProxies().getProxy(serverId); + final RaftNettyServerRequestProto serverRequestProto = buildRequestProto(request); + return proxy.sendAsync(serverRequestProto).thenApply(replyProto -> { + if (request instanceof GroupListRequest) { + return ClientProtoUtils.toGroupListReply(replyProto.getGroupListReply()); + } else if (request instanceof GroupInfoRequest) { + return ClientProtoUtils.toGroupInfoReply(replyProto.getGroupInfoReply()); + } else { + return ClientProtoUtils.toRaftClientReply(replyProto.getRaftClientReply()); + } + }); + } catch (Throwable e) { + return JavaUtils.completeExceptionally(e); + } + } + @Override public RaftClientReply sendRequest(RaftClientRequest request) throws IOException { final RaftPeerId serverId = request.getServerId(); final NettyRpcProxy proxy = getProxies().getProxy(serverId); + final RaftNettyServerRequestProto serverRequestProto = buildRequestProto(request); + final RaftRpcRequestProto rpcRequest = getRpcRequestProto(serverRequestProto); + if (request instanceof GroupListRequest) { + return ClientProtoUtils.toGroupListReply( + proxy.send(rpcRequest, serverRequestProto).getGroupListReply()); + } else if (request instanceof GroupInfoRequest) { + return ClientProtoUtils.toGroupInfoReply( + proxy.send(rpcRequest, serverRequestProto).getGroupInfoReply()); + } else { + return ClientProtoUtils.toRaftClientReply( + proxy.send(rpcRequest, serverRequestProto).getRaftClientReply()); + } + } + + private RaftNettyServerRequestProto buildRequestProto(RaftClientRequest request) { final RaftNettyServerRequestProto.Builder b = RaftNettyServerRequestProto.newBuilder(); - final RaftRpcRequestProto rpcRequest; if (request instanceof GroupManagementRequest) { final GroupManagementRequestProto proto = ClientProtoUtils.toGroupManagementRequestProto( (GroupManagementRequest)request); b.setGroupManagementRequest(proto); - rpcRequest = proto.getRpcRequest(); } else if (request instanceof SetConfigurationRequest) { final SetConfigurationRequestProto proto = ClientProtoUtils.toSetConfigurationRequestProto( (SetConfigurationRequest)request); b.setSetConfigurationRequest(proto); - rpcRequest = proto.getRpcRequest(); } else if (request instanceof GroupListRequest) { final RaftProtos.GroupListRequestProto proto = ClientProtoUtils.toGroupListRequestProto( (GroupListRequest)request); b.setGroupListRequest(proto); - rpcRequest = proto.getRpcRequest(); } else if (request instanceof GroupInfoRequest) { final RaftProtos.GroupInfoRequestProto proto = ClientProtoUtils.toGroupInfoRequestProto( (GroupInfoRequest)request); b.setGroupInfoRequest(proto); - rpcRequest = proto.getRpcRequest(); } else if (request instanceof TransferLeadershipRequest) { final RaftProtos.TransferLeadershipRequestProto proto = ClientProtoUtils.toTransferLeadershipRequestProto( (TransferLeadershipRequest)request); b.setTransferLeadershipRequest(proto); - rpcRequest = proto.getRpcRequest(); } else if (request instanceof SnapshotManagementRequest) { final RaftProtos.SnapshotManagementRequestProto proto = ClientProtoUtils.toSnapshotManagementRequestProto( (SnapshotManagementRequest) request); b.setSnapshotManagementRequest(proto); - rpcRequest = proto.getRpcRequest(); } else if (request instanceof LeaderElectionManagementRequest) { final RaftProtos.LeaderElectionManagementRequestProto proto = ClientProtoUtils.toLeaderElectionManagementRequestProto( (LeaderElectionManagementRequest) request); b.setLeaderElectionManagementRequest(proto); - rpcRequest = proto.getRpcRequest(); } else { final RaftClientRequestProto proto = ClientProtoUtils.toRaftClientRequestProto(request); b.setRaftClientRequest(proto); - rpcRequest = proto.getRpcRequest(); } - if (request instanceof GroupListRequest) { - return ClientProtoUtils.toGroupListReply( - proxy.send(rpcRequest, b.build()).getGroupListReply()); - } else if (request instanceof GroupInfoRequest) { - return ClientProtoUtils.toGroupInfoReply( - proxy.send(rpcRequest, b.build()).getGroupInfoReply()); + return b.build(); + } + + private RaftRpcRequestProto getRpcRequestProto(RaftNettyServerRequestProto serverRequestProto) { + if (serverRequestProto.hasGroupManagementRequest()) { + return serverRequestProto.getGroupManagementRequest().getRpcRequest(); + } else if (serverRequestProto.hasSetConfigurationRequest()) { + return serverRequestProto.getSetConfigurationRequest().getRpcRequest(); + } else if (serverRequestProto.hasGroupListRequest()) { + return serverRequestProto.getGroupListRequest().getRpcRequest(); + } else if (serverRequestProto.hasGroupInfoRequest()) { + return serverRequestProto.getGroupInfoRequest().getRpcRequest(); } else { - return ClientProtoUtils.toRaftClientReply( - proxy.send(rpcRequest, b.build()).getRaftClientReply()); + return serverRequestProto.getRaftClientRequest().getRpcRequest(); } } } diff --git a/ratis-test/src/test/java/org/apache/ratis/netty/TestNettyClientRpcAsync.java b/ratis-test/src/test/java/org/apache/ratis/netty/TestNettyClientRpcAsync.java new file mode 100644 index 0000000000..e80fdaf0b9 --- /dev/null +++ b/ratis-test/src/test/java/org/apache/ratis/netty/TestNettyClientRpcAsync.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.netty; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.ratis.BaseTest; +import org.apache.ratis.RaftTestUtil; +import org.apache.ratis.RaftTestUtil.SimpleMessage; +import org.apache.ratis.client.RaftClient; +import org.apache.ratis.client.RaftClientRpc; +import org.apache.ratis.client.impl.RaftClientTestUtil; +import org.apache.ratis.protocol.RaftClientReply; +import org.apache.ratis.protocol.RaftClientRequest; +import org.apache.ratis.protocol.RaftPeerId; +import org.apache.ratis.server.RaftServer; +import org.apache.ratis.server.impl.MiniRaftCluster; +import org.apache.ratis.util.ProtoUtils; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class TestNettyClientRpcAsync extends BaseTest implements MiniRaftClusterWithNetty.FactoryGet { + + @Test + public void testClientAsyncApi() throws Exception { + runWithNewCluster(1, this::runTestClientAsyncApi); + } + + public void runTestClientAsyncApi(MiniRaftCluster cluster) + throws Exception { + final RaftServer.Division leader = RaftTestUtil.waitForLeader(cluster); + + try (final RaftClient client = cluster.createClient()) { + final RaftClientRpc rpc = client.getClientRpc(); + + final AtomicLong seqNum = new AtomicLong(); + { + // send a request using rpc directly + final RaftClientRequest request = newRaftClientRequest(client, leader.getId(), + seqNum.incrementAndGet()); + final CompletableFuture f = rpc.sendRequestAsync(request); + Assertions.assertTrue(f.get().isSuccess()); + } + } + } + + static RaftClientRequest newRaftClientRequest(RaftClient client, RaftPeerId serverId, long seqNum) { + final SimpleMessage m = new SimpleMessage("m" + seqNum); + return RaftClientTestUtil.newRaftClientRequest(client, serverId, seqNum, m, + RaftClientRequest.writeRequestType(), ProtoUtils.toSlidingWindowEntry(seqNum, seqNum == 1L)); + } +} diff --git a/ratis-test/src/test/java/org/apache/ratis/netty/TestRaftAsyncWithNetty.java b/ratis-test/src/test/java/org/apache/ratis/netty/TestRaftAsyncWithNetty.java new file mode 100644 index 0000000000..ebaa33d505 --- /dev/null +++ b/ratis-test/src/test/java/org/apache/ratis/netty/TestRaftAsyncWithNetty.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.netty; + +import org.apache.ratis.RaftAsyncTests; + +public class TestRaftAsyncWithNetty + extends RaftAsyncTests + implements MiniRaftClusterWithNetty.FactoryGet { +} \ No newline at end of file From 79dc317cba8743433b90b2c7adbdb75ba0e03cc2 Mon Sep 17 00:00:00 2001 From: Tsz-Wo Nicholas Sze Date: Wed, 17 Jul 2024 07:57:20 +0800 Subject: [PATCH 2/2] Remove TestNettyClientRpcAsync. --- .../ratis/netty/TestNettyClientRpcAsync.java | 67 ------------------- 1 file changed, 67 deletions(-) delete mode 100644 ratis-test/src/test/java/org/apache/ratis/netty/TestNettyClientRpcAsync.java diff --git a/ratis-test/src/test/java/org/apache/ratis/netty/TestNettyClientRpcAsync.java b/ratis-test/src/test/java/org/apache/ratis/netty/TestNettyClientRpcAsync.java deleted file mode 100644 index e80fdaf0b9..0000000000 --- a/ratis-test/src/test/java/org/apache/ratis/netty/TestNettyClientRpcAsync.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.ratis.netty; - -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.atomic.AtomicLong; -import org.apache.ratis.BaseTest; -import org.apache.ratis.RaftTestUtil; -import org.apache.ratis.RaftTestUtil.SimpleMessage; -import org.apache.ratis.client.RaftClient; -import org.apache.ratis.client.RaftClientRpc; -import org.apache.ratis.client.impl.RaftClientTestUtil; -import org.apache.ratis.protocol.RaftClientReply; -import org.apache.ratis.protocol.RaftClientRequest; -import org.apache.ratis.protocol.RaftPeerId; -import org.apache.ratis.server.RaftServer; -import org.apache.ratis.server.impl.MiniRaftCluster; -import org.apache.ratis.util.ProtoUtils; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; - -public class TestNettyClientRpcAsync extends BaseTest implements MiniRaftClusterWithNetty.FactoryGet { - - @Test - public void testClientAsyncApi() throws Exception { - runWithNewCluster(1, this::runTestClientAsyncApi); - } - - public void runTestClientAsyncApi(MiniRaftCluster cluster) - throws Exception { - final RaftServer.Division leader = RaftTestUtil.waitForLeader(cluster); - - try (final RaftClient client = cluster.createClient()) { - final RaftClientRpc rpc = client.getClientRpc(); - - final AtomicLong seqNum = new AtomicLong(); - { - // send a request using rpc directly - final RaftClientRequest request = newRaftClientRequest(client, leader.getId(), - seqNum.incrementAndGet()); - final CompletableFuture f = rpc.sendRequestAsync(request); - Assertions.assertTrue(f.get().isSuccess()); - } - } - } - - static RaftClientRequest newRaftClientRequest(RaftClient client, RaftPeerId serverId, long seqNum) { - final SimpleMessage m = new SimpleMessage("m" + seqNum); - return RaftClientTestUtil.newRaftClientRequest(client, serverId, seqNum, m, - RaftClientRequest.writeRequestType(), ProtoUtils.toSlidingWindowEntry(seqNum, seqNum == 1L)); - } -}