diff --git a/ratis-shell/pom.xml b/ratis-shell/pom.xml
index 78ab6679fd..5b06bca1a0 100644
--- a/ratis-shell/pom.xml
+++ b/ratis-shell/pom.xml
@@ -48,6 +48,14 @@
org.slf4j
slf4j-simple
+
+ org.apache.ratis
+ ratis-grpc
+
+
+ org.apache.ratis
+ ratis-netty
+
diff --git a/ratis-shell/src/main/java/org/apache/ratis/shell/cli/RaftUtils.java b/ratis-shell/src/main/java/org/apache/ratis/shell/cli/RaftUtils.java
index 1239fc56c4..73c1f2a5cb 100644
--- a/ratis-shell/src/main/java/org/apache/ratis/shell/cli/RaftUtils.java
+++ b/ratis-shell/src/main/java/org/apache/ratis/shell/cli/RaftUtils.java
@@ -19,21 +19,35 @@
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.client.RaftClientConfigKeys;
+import org.apache.ratis.conf.Parameters;
import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.protocol.RaftGroup;
-import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.grpc.GrpcConfigKeys;
+import org.apache.ratis.grpc.GrpcTlsConfig;
+import org.apache.ratis.netty.NettyConfigKeys;
+import org.apache.ratis.protocol.*;
+import org.apache.ratis.protocol.exceptions.RaftException;
import org.apache.ratis.retry.ExponentialBackoffRetry;
+import org.apache.ratis.rpc.RpcType;
+import org.apache.ratis.rpc.SupportedRpcType;
import org.apache.ratis.util.TimeDuration;
+import org.apache.ratis.util.function.CheckedFunction;
+import java.io.IOException;
+import java.io.PrintStream;
import java.net.InetSocketAddress;
-import java.util.Properties;
+import java.util.*;
import java.util.concurrent.TimeUnit;
+import java.util.function.*;
+import java.util.stream.Collectors;
/**
* Helper class for raft operations.
*/
public final class RaftUtils {
+ public static final RaftGroupId DEFAULT_RAFT_GROUP_ID = RaftGroupId.randomId();
+
+
private RaftUtils() {
// prevent instantiation
}
@@ -65,6 +79,18 @@ public static RaftPeerId getPeerId(String host, int port) {
* @return return a raft client
*/
public static RaftClient createClient(RaftGroup raftGroup) {
+ return createClient(raftGroup, null, null);
+ }
+
+
+ /**
+ * Create a raft client to communicate to ratis server.
+ * @param raftGroup the raft group
+ * @param rpcType the rpcType
+ * @param tlsConfig the tlsConfig
+ * @return return a raft client
+ */
+ public static RaftClient createClient(RaftGroup raftGroup, RpcType rpcType, GrpcTlsConfig tlsConfig) {
RaftProperties properties = new RaftProperties();
RaftClientConfigKeys.Rpc.setRequestTimeout(properties,
TimeDuration.valueOf(15, TimeUnit.SECONDS));
@@ -84,6 +110,139 @@ public static RaftClient createClient(RaftGroup raftGroup) {
.setRaftGroup(raftGroup)
.setProperties(properties)
.setRetryPolicy(retryPolicy)
+ .setParameters(setClientTlsConf(rpcType, tlsConfig))
.build();
}
+
+ /**
+ * Execute a given function with input parameter from the members of a list.
+ *
+ * @param list the input parameters
+ * @param function the function to be executed
+ * @param parameter type
+ * @param return value type
+ * @param the exception type thrown by the given function.
+ * @return the value returned by the given function.
+ */
+ public static K runFunction(Collection list, CheckedFunction function) {
+ for (T t : list) {
+ try {
+ K ret = function.apply(t);
+ if (ret != null) {
+ return ret;
+ }
+ } catch (Throwable e) {
+ e.printStackTrace();
+ }
+ }
+ return null;
+ }
+
+
+ public static List buildRaftPeersFromStr(String peers) {
+ List addresses = new ArrayList<>();
+ String[] peersArray = peers.split(",");
+ for (String peer : peersArray) {
+ addresses.add(parseInetSocketAddress(peer));
+ }
+
+ return addresses.stream()
+ .map(addr -> RaftPeer.newBuilder()
+ .setId(RaftUtils.getPeerId(addr))
+ .setAddress(addr)
+ .build()
+ ).collect(Collectors.toList());
+ }
+
+ public static RaftGroupId buildRaftGroupIdFromStr(String groupId) {
+ return (groupId != null && !groupId.equals("")) ? RaftGroupId.valueOf(UUID.fromString(groupId))
+ : DEFAULT_RAFT_GROUP_ID;
+ }
+
+ public static RaftGroupId retrieveRemoteGroupId(RaftGroupId raftGroupIdFromConfig,
+ List peers,
+ RaftClient client, PrintStream printStream) throws IOException {
+ RaftGroupId remoteGroupId;
+ if (raftGroupIdFromConfig != DEFAULT_RAFT_GROUP_ID) {
+ return raftGroupIdFromConfig;
+ } else {
+ final List groupIds = runFunction(peers,
+ p -> client.getGroupManagementApi((p.getId())).list().getGroupIds());
+
+ if (groupIds == null) {
+ printStream.println("Failed to get group ID from " + peers);
+ throw new IOException("Failed to get group ID from " + peers);
+ } else if (groupIds.size() == 1) {
+ remoteGroupId = groupIds.get(0);
+ } else {
+ printStream.println("There are more than one groups, you should specific one. " + groupIds);
+ throw new IOException("There are more than one groups, you should specific one. " + groupIds);
+ }
+ }
+
+ return remoteGroupId;
+ }
+
+ public static GroupInfoReply retrieveGroupInfoByGroupId(RaftGroupId remoteGroupId, List peers, RaftClient client, PrintStream printStream)
+ throws IOException {
+ GroupInfoReply groupInfoReply = runFunction(peers, p -> client.getGroupManagementApi((p.getId())).info(remoteGroupId));
+ processReply(groupInfoReply,
+ printStream::println, "Failed to get group info for group id " + remoteGroupId.getUuid() + " from " + peers);
+ return groupInfoReply;
+ }
+
+ public static void processReply(RaftClientReply reply, Consumer printer, String message) throws IOException {
+ processReplyInternal(reply, () -> printer.accept(message));
+ }
+
+ private static void processReplyInternal(RaftClientReply reply, Runnable printer) throws IOException {
+ if (reply == null || !reply.isSuccess()) {
+ final RaftException e = Optional.ofNullable(reply)
+ .map(RaftClientReply::getException)
+ .orElseGet(() -> new RaftException("Reply: " + reply));
+ printer.run();
+ throw new IOException(e.getMessage(), e);
+ }
+ }
+
+ public static InetSocketAddress parseInetSocketAddress(String address) {
+ try {
+ final String[] hostPortPair = address.split(":");
+ if (hostPortPair.length < 2) {
+ throw new IllegalArgumentException("Unexpected address format .");
+ }
+ return new InetSocketAddress(hostPortPair[0], Integer.parseInt(hostPortPair[1]));
+ } catch (Exception e) {
+ throw new IllegalArgumentException("Failed to parse the server address parameter \"" + address + "\".", e);
+ }
+ }
+
+ public static Parameters setClientTlsConf(RpcType rpcType,
+ GrpcTlsConfig tlsConfig) {
+ // TODO: GRPC TLS only for now, netty/hadoop RPC TLS support later.
+ if (tlsConfig != null && rpcType == SupportedRpcType.GRPC) {
+ Parameters parameters = new Parameters();
+ setAdminTlsConf(parameters, tlsConfig);
+ setClientTlsConf(parameters, tlsConfig);
+ return parameters;
+ }
+ return null;
+ }
+
+ private static void setAdminTlsConf(Parameters parameters,
+ GrpcTlsConfig tlsConfig) {
+ if (tlsConfig != null) {
+ GrpcConfigKeys.Admin.setTlsConf(parameters, tlsConfig);
+ }
+ }
+
+ private static void setClientTlsConf(Parameters parameters,
+ GrpcTlsConfig tlsConfig) {
+ if (tlsConfig != null) {
+ GrpcConfigKeys.Client.setTlsConf(parameters, tlsConfig);
+ NettyConfigKeys.DataStream.Client.setTlsConf(parameters, tlsConfig);
+ }
+ }
+
+
}
diff --git a/ratis-shell/src/main/java/org/apache/ratis/shell/cli/SecurityUtils.java b/ratis-shell/src/main/java/org/apache/ratis/shell/cli/SecurityUtils.java
new file mode 100644
index 0000000000..14150ffe6c
--- /dev/null
+++ b/ratis-shell/src/main/java/org/apache/ratis/shell/cli/SecurityUtils.java
@@ -0,0 +1,63 @@
+package org.apache.ratis.shell.cli;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.TrustManagerFactory;
+import javax.net.ssl.X509TrustManager;
+import java.io.*;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.security.KeyStore;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.security.cert.CertificateException;
+import java.security.cert.CertificateFactory;
+import java.security.cert.X509Certificate;
+import java.util.Arrays;
+import java.util.Optional;
+import java.util.function.Supplier;
+
+public class SecurityUtils {
+ static Logger LOG = LoggerFactory.getLogger(SecurityUtils.class);
+
+ public static KeyStore getTrustStore()
+ throws Exception {
+ X509Certificate[] certificate = getCertificate("ssl/ca.crt");
+
+ // build trustStore
+ KeyStore trustStore = KeyStore.getInstance(KeyStore.getDefaultType());
+ trustStore.load(null, null);
+
+ for (X509Certificate cert: certificate) {
+ trustStore.setCertificateEntry(cert.getSerialNumber().toString(), cert);
+ }
+ return trustStore;
+ }
+
+ public static X509TrustManager getTrustManager(KeyStore keyStore) throws Exception{
+ TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(
+ TrustManagerFactory.getDefaultAlgorithm());
+ trustManagerFactory.init(keyStore);
+ TrustManager[] trustManagers = trustManagerFactory.getTrustManagers();
+ if (trustManagers.length != 1 || !(trustManagers[0] instanceof X509TrustManager)) {
+ throw new IllegalStateException("Unexpected default trust managers:"
+ + Arrays.toString(trustManagers));
+ }
+ return (X509TrustManager) trustManagers[0];
+ }
+
+ static X509Certificate[] getCertificate(String certPath)
+ throws CertificateException, IOException {
+ // Read certificates
+ X509Certificate[] certificate = new X509Certificate[1];
+ CertificateFactory fact = CertificateFactory.getInstance("X.509");
+ try (InputStream is = Files.newInputStream(Paths.get(certPath))) {
+ certificate[0] = (X509Certificate) fact.generateCertificate(is);
+ }
+ return certificate;
+ }
+
+}
diff --git a/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/command/AbstractRatisCommand.java b/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/command/AbstractRatisCommand.java
index 1888c0e0ea..022fa25b52 100644
--- a/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/command/AbstractRatisCommand.java
+++ b/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/command/AbstractRatisCommand.java
@@ -20,33 +20,48 @@
import org.apache.commons.cli.Option;
import org.apache.ratis.protocol.*;
import org.apache.ratis.protocol.exceptions.RaftException;
+import org.apache.ratis.rpc.SupportedRpcType;
import org.apache.ratis.shell.cli.RaftUtils;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.grpc.GrpcTlsConfig;
import org.apache.ratis.proto.RaftProtos.RaftConfigurationProto;
import org.apache.ratis.proto.RaftProtos.FollowerInfoProto;
import org.apache.ratis.proto.RaftProtos.RaftPeerProto;
import org.apache.ratis.proto.RaftProtos.RaftPeerRole;
import org.apache.ratis.proto.RaftProtos.RoleInfoProto;
+import org.apache.ratis.shell.cli.SecurityUtils;
import org.apache.ratis.util.ProtoUtils;
import org.apache.ratis.util.function.CheckedFunction;
+import javax.net.ssl.TrustManager;
import java.io.IOException;
+import java.io.PrintStream;
import java.net.InetSocketAddress;
+import java.security.KeyStore;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.security.cert.CertificateException;
import java.util.*;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import static org.apache.ratis.shell.cli.RaftUtils.*;
+import static org.apache.ratis.shell.cli.RaftUtils.retrieveGroupInfoByGroupId;
+
/**
* The base class for the ratis shell which need to connect to server.
*/
public abstract class AbstractRatisCommand extends AbstractCommand {
public static final String PEER_OPTION_NAME = "peers";
public static final String GROUPID_OPTION_NAME = "groupid";
- public static final RaftGroupId DEFAULT_RAFT_GROUP_ID = RaftGroupId.randomId();
+ public static final String TLS_ENABLED_OPTION_NAME = "t";
+ private PrintStream printStream;
+
+
/**
* Execute a given function with input parameter from the members of a list.
@@ -77,52 +92,22 @@ public static K run(Collection list, CheckedFunct
protected AbstractRatisCommand(Context context) {
super(context);
+ this.printStream = getPrintStream();
}
@Override
public int run(CommandLine cl) throws IOException {
- List addresses = new ArrayList<>();
- String peersStr = cl.getOptionValue(PEER_OPTION_NAME);
- String[] peersArray = peersStr.split(",");
- for (String peer : peersArray) {
- addresses.add(parseInetSocketAddress(peer));
- }
- final RaftGroupId raftGroupIdFromConfig = cl.hasOption(GROUPID_OPTION_NAME)?
- RaftGroupId.valueOf(UUID.fromString(cl.getOptionValue(GROUPID_OPTION_NAME)))
- : DEFAULT_RAFT_GROUP_ID;
-
- List peers = addresses.stream()
- .map(addr -> RaftPeer.newBuilder()
- .setId(RaftUtils.getPeerId(addr))
- .setAddress(addr)
- .build()
- ).collect(Collectors.toList());
+ List peers = buildRaftPeersFromStr(cl.getOptionValue(PEER_OPTION_NAME));
+ RaftGroupId raftGroupIdFromConfig = buildRaftGroupIdFromStr(cl.getOptionValue(GROUPID_OPTION_NAME));
raftGroup = RaftGroup.valueOf(raftGroupIdFromConfig, peers);
- try (final RaftClient client = RaftUtils.createClient(raftGroup)) {
- final RaftGroupId remoteGroupId;
- if (raftGroupIdFromConfig != DEFAULT_RAFT_GROUP_ID) {
- remoteGroupId = raftGroupIdFromConfig;
- } else {
- final List groupIds = run(peers,
- p -> client.getGroupManagementApi((p.getId())).list().getGroupIds());
-
- if (groupIds == null) {
- println("Failed to get group ID from " + peers);
- return -1;
- } else if (groupIds.size() == 1) {
- remoteGroupId = groupIds.get(0);
- } else {
- println("There are more than one groups, you should specific one. " + groupIds);
- return -2;
- }
- }
- groupInfoReply = run(peers, p -> client.getGroupManagementApi((p.getId())).info(remoteGroupId));
- processReply(groupInfoReply,
- () -> "Failed to get group info for group id " + remoteGroupId.getUuid() + " from " + peers);
+ try (final RaftClient client = getRaftClient(cl.hasOption(TLS_ENABLED_OPTION_NAME))) {
+ RaftGroupId remoteGroupId = retrieveRemoteGroupId(raftGroupIdFromConfig, peers, client, printStream);;
+ groupInfoReply = retrieveGroupInfoByGroupId(remoteGroupId, peers, client, printStream);
raftGroup = groupInfoReply.getGroup();
}
+
return 0;
}
@@ -167,16 +152,16 @@ protected RaftPeerProto getLeader(RoleInfoProto roleInfo) {
return followerInfo.getLeaderInfo().getId();
}
- protected void processReply(RaftClientReply reply, Supplier messageSupplier) throws IOException {
- if (reply == null || !reply.isSuccess()) {
- final RaftException e = Optional.ofNullable(reply)
- .map(RaftClientReply::getException)
- .orElseGet(() -> new RaftException("Reply: " + reply));
- final String message = messageSupplier.get();
- printf("%s. Error: %s%n", message, e);
- throw new IOException(message, e);
- }
- }
+// protected void processReply(RaftClientReply reply, Supplier messageSupplier) throws IOException {
+// if (reply == null || !reply.isSuccess()) {
+// final RaftException e = Optional.ofNullable(reply)
+// .map(RaftClientReply::getException)
+// .orElseGet(() -> new RaftException("Reply: " + reply));
+// final String message = messageSupplier.get();
+// printf(printStream, "%s. Error: %s%n", message, e);
+// throw new IOException(message, e);
+// }
+// }
protected List getIds(String[] optionValues, BiConsumer consumer) {
if (optionValues == null) {
@@ -207,4 +192,19 @@ protected Stream getPeerStream(RaftPeerRole role) {
.stream()
.filter(targets::contains);
}
+
+ private RaftClient getRaftClient(boolean tlsEnabled) throws
+ IOException {
+ GrpcTlsConfig tlsConfig = null;
+ if (tlsEnabled) {
+ try {
+ TrustManager trustManager = SecurityUtils.getTrustManager(SecurityUtils.getTrustStore());
+ tlsConfig = new GrpcTlsConfig(null, trustManager, false);
+ } catch (Exception e) {
+ throw new IOException("Failed to get TrustManager: " + e.getCause());
+ }
+ }
+ return RaftUtils.createClient(raftGroup, SupportedRpcType.GRPC, tlsConfig);
+ }
+
}
diff --git a/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/election/PauseCommand.java b/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/election/PauseCommand.java
index 4ea2969bac..80e7bae254 100644
--- a/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/election/PauseCommand.java
+++ b/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/election/PauseCommand.java
@@ -30,6 +30,8 @@
import java.io.IOException;
+import static org.apache.ratis.shell.cli.RaftUtils.processReply;
+
/**
* Command for pause leader election on specific server
*/
@@ -63,7 +65,7 @@ public int run(CommandLine cl) throws IOException {
}
try(final RaftClient raftClient = RaftUtils.createClient(getRaftGroup())) {
RaftClientReply reply = raftClient.getLeaderElectionManagementApi(peerId).pause();
- processReply(reply, () -> String.format("Failed to pause leader election on peer %s", strAddr));
+ processReply(reply, this::println, String.format("Failed to pause leader election on peer %s", strAddr));
printf(String.format("Successful pause leader election on peer %s", strAddr));
}
return 0;
diff --git a/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/election/ResumeCommand.java b/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/election/ResumeCommand.java
index 4b4dc225a0..9eb92cae40 100644
--- a/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/election/ResumeCommand.java
+++ b/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/election/ResumeCommand.java
@@ -30,6 +30,8 @@
import java.io.IOException;
+import static org.apache.ratis.shell.cli.RaftUtils.processReply;
+
/**
* Command for resuming leader election on specific server
*/
@@ -63,7 +65,7 @@ public int run(CommandLine cl) throws IOException {
}
try(final RaftClient raftClient = RaftUtils.createClient(getRaftGroup())) {
RaftClientReply reply = raftClient.getLeaderElectionManagementApi(peerId).resume();
- processReply(reply, () -> String.format("Failed to resume leader election on peer %s", strAddr));
+ processReply(reply, this::println, String.format("Failed to resume leader election on peer %s", strAddr));
printf(String.format("Successful pause leader election on peer %s", strAddr));
}
return 0;
diff --git a/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/election/StepDownCommand.java b/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/election/StepDownCommand.java
index 911a2bb26a..54398f86be 100644
--- a/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/election/StepDownCommand.java
+++ b/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/election/StepDownCommand.java
@@ -27,6 +27,8 @@
import java.io.IOException;
+import static org.apache.ratis.shell.cli.RaftUtils.processReply;
+
/**
* Command for stepping down ratis leader server.
*/
@@ -51,7 +53,7 @@ public int run(CommandLine cl) throws IOException {
try (RaftClient client = RaftUtils.createClient(getRaftGroup())) {
RaftPeerId leaderId = RaftPeerId.valueOf(getLeader(getGroupInfoReply().getRoleInfoProto()).getId());
final RaftClientReply transferLeadershipReply = client.admin().transferLeadership(null, leaderId, 60_000);
- processReply(transferLeadershipReply, () -> "Failed to step down leader");
+ processReply(transferLeadershipReply, this::println, "Failed to step down leader");
} catch (Throwable t) {
printf("caught an error when executing step down leader: %s%n", t.getMessage());
return -1;
diff --git a/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/election/TransferCommand.java b/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/election/TransferCommand.java
index c71d7f89f6..2567427bcc 100644
--- a/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/election/TransferCommand.java
+++ b/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/election/TransferCommand.java
@@ -36,6 +36,8 @@
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
+import static org.apache.ratis.shell.cli.RaftUtils.processReply;
+
/**
* Command for transferring the ratis leader to specific server.
*/
@@ -98,7 +100,7 @@ private boolean tryTransfer(RaftClient client, RaftPeer newLeader, int highestPr
}
RaftClientReply transferLeadershipReply =
client.admin().transferLeadership(newLeader.getId(), timeout.toLong(TimeUnit.MILLISECONDS));
- processReply(transferLeadershipReply, () -> "election failed");
+ processReply(transferLeadershipReply, this::println,"election failed");
} catch (TransferLeadershipException tle) {
if (tle.getMessage().contains("it does not has highest priority")) {
return false;
@@ -116,7 +118,7 @@ private void setPriority(RaftClient client, RaftPeer target, int priority) throw
.collect(Collectors.toList());
final List listeners = getPeerStream(RaftPeerRole.LISTENER).collect(Collectors.toList());
RaftClientReply reply = client.admin().setConfiguration(peers, listeners);
- processReply(reply, () -> "Failed to set master priorities");
+ processReply(reply, this::println, "Failed to set master priorities");
}
@Override
diff --git a/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/group/GroupListCommand.java b/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/group/GroupListCommand.java
index 5bbd1939ad..1d3ef2ec2a 100644
--- a/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/group/GroupListCommand.java
+++ b/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/group/GroupListCommand.java
@@ -31,6 +31,8 @@
import java.io.IOException;
import java.net.InetSocketAddress;
+import static org.apache.ratis.shell.cli.RaftUtils.processReply;
+
/**
* Command for querying the group information of a ratis server.
*/
@@ -71,8 +73,8 @@ public int run(CommandLine cl) throws IOException {
try(final RaftClient raftClient = RaftUtils.createClient(getRaftGroup())) {
GroupListReply reply = raftClient.getGroupManagementApi(peerId).list();
- processReply(reply, () -> String.format("Failed to get group information of peerId %s (server %s)",
- peerId, address));
+ processReply(reply, this::println, String.format("Failed to get group information of peerId %s (server %s)",
+ peerId.toString(), address));
printf(String.format("The peerId %s (server %s) is in %d groups, and the groupIds is: %s",
peerId, address, reply.getGroupIds().size(), reply.getGroupIds()));
}
diff --git a/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/peer/AddCommand.java b/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/peer/AddCommand.java
index 3c65bb12de..35a6612326 100644
--- a/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/peer/AddCommand.java
+++ b/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/peer/AddCommand.java
@@ -38,6 +38,8 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import static org.apache.ratis.shell.cli.RaftUtils.processReply;
+
/**
* Command for add ratis server.
*/
@@ -93,7 +95,7 @@ public int run(CommandLine cl) throws IOException {
System.out.println("New peer list: " + peers);
System.out.println("New listener list: " + listeners);
RaftClientReply reply = client.admin().setConfiguration(peers, listeners);
- processReply(reply, () -> "Failed to change raft peer");
+ processReply(reply, this::println, "Failed to change raft peer");
}
return 0;
}
diff --git a/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/peer/RemoveCommand.java b/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/peer/RemoveCommand.java
index 5918516070..8c578ffb48 100644
--- a/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/peer/RemoveCommand.java
+++ b/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/peer/RemoveCommand.java
@@ -34,6 +34,8 @@
import java.util.List;
import java.util.stream.Collectors;
+import static org.apache.ratis.shell.cli.RaftUtils.processReply;
+
/**
* Command for remove ratis server.
*/
@@ -74,7 +76,7 @@ public int run(CommandLine cl) throws IOException {
System.out.println("New peer list: " + peers);
System.out.println("New listener list: " + listeners);
final RaftClientReply reply = client.admin().setConfiguration(peers, listeners);
- processReply(reply, () -> "Failed to change raft peer");
+ processReply(reply, this::println, "Failed to change raft peer");
}
return 0;
}
diff --git a/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/peer/SetPriorityCommand.java b/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/peer/SetPriorityCommand.java
index 01e81f3c34..624c6e9d8d 100644
--- a/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/peer/SetPriorityCommand.java
+++ b/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/peer/SetPriorityCommand.java
@@ -34,6 +34,8 @@
import java.util.Map;
import java.util.stream.Collectors;
+import static org.apache.ratis.shell.cli.RaftUtils.processReply;
+
public class SetPriorityCommand extends AbstractRatisCommand {
public static final String PEER_WITH_NEW_PRIORITY_OPTION_NAME = "addressPriority";
@@ -72,7 +74,7 @@ public int run(CommandLine cl) throws IOException {
final List listeners =
getPeerStream(RaftPeerRole.LISTENER).collect(Collectors.toList());
RaftClientReply reply = client.admin().setConfiguration(peers, listeners);
- processReply(reply, () -> "Failed to set master priorities ");
+ processReply(reply, this::println, "Failed to set master priorities ");
}
return 0;
}
diff --git a/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/snapshot/TakeSnapshotCommand.java b/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/snapshot/TakeSnapshotCommand.java
index 10bac34975..c005dae780 100644
--- a/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/snapshot/TakeSnapshotCommand.java
+++ b/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/snapshot/TakeSnapshotCommand.java
@@ -29,6 +29,8 @@
import java.io.IOException;
+import static org.apache.ratis.shell.cli.RaftUtils.processReply;
+
/**
* Command for make a ratis server take snapshot.
*/
@@ -65,7 +67,7 @@ public int run(CommandLine cl) throws IOException {
peerId = null;
}
RaftClientReply reply = raftClient.getSnapshotManagementApi(peerId).create(timeout);
- processReply(reply, () -> String.format("Failed to take snapshot of peerId %s", peerId));
+ processReply(reply, this::println, String.format("Failed to take snapshot of peerId %s", peerId.toString()));
printf(String.format("Successful take snapshot on peerId %s, the latest snapshot index is %d",
peerId, reply.getLogIndex()));
}