diff --git a/ratis-shell/src/main/java/org/apache/ratis/shell/cli/RaftUtils.java b/ratis-shell/src/main/java/org/apache/ratis/shell/cli/RaftUtils.java index 1239fc56c4..9c5d90e4bf 100644 --- a/ratis-shell/src/main/java/org/apache/ratis/shell/cli/RaftUtils.java +++ b/ratis-shell/src/main/java/org/apache/ratis/shell/cli/RaftUtils.java @@ -20,20 +20,38 @@ import org.apache.ratis.client.RaftClient; import org.apache.ratis.client.RaftClientConfigKeys; import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.protocol.GroupInfoReply; +import org.apache.ratis.protocol.RaftClientReply; import org.apache.ratis.protocol.RaftGroup; +import org.apache.ratis.protocol.RaftGroupId; +import org.apache.ratis.protocol.RaftPeer; import org.apache.ratis.protocol.RaftPeerId; +import org.apache.ratis.protocol.exceptions.RaftException; import org.apache.ratis.retry.ExponentialBackoffRetry; import org.apache.ratis.util.TimeDuration; +import org.apache.ratis.util.function.CheckedFunction; +import java.io.IOException; +import java.io.PrintStream; import java.net.InetSocketAddress; -import java.util.Properties; +import java.util.ArrayList; +import java.util.Collection; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import java.util.List; +import java.util.Optional; +import java.util.Properties; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import java.util.UUID; /** * Helper class for raft operations. */ public final class RaftUtils { + public static final RaftGroupId DEFAULT_RAFT_GROUP_ID = RaftGroupId.randomId(); + private RaftUtils() { // prevent instantiation } @@ -86,4 +104,108 @@ public static RaftClient createClient(RaftGroup raftGroup) { .setRetryPolicy(retryPolicy) .build(); } + + /** + * Apply the given function to the given parameter a list. + * + * @param list the input parameter list + * @param function the function to be applied + * @param parameter type + * @param return value type + * @param the exception type thrown by the given function. + * @return the first non-null value returned by the given function applied to the given list. + */ + private static RETURN applyFunctionReturnFirstNonNull( + Collection list, CheckedFunction function) { + for (PARAMETER parameter : list) { + try { + RETURN ret = function.apply(parameter); + if (ret != null) { + return ret; + } + } catch (Throwable e) { + e.printStackTrace(); + } + } + return null; + } + + public static List buildRaftPeersFromStr(String peers) { + List addresses = new ArrayList<>(); + String[] peersArray = peers.split(","); + for (String peer : peersArray) { + addresses.add(parseInetSocketAddress(peer)); + } + + return addresses.stream() + .map(addr -> RaftPeer.newBuilder() + .setId(RaftUtils.getPeerId(addr)) + .setAddress(addr) + .build() + ).collect(Collectors.toList()); + } + + public static RaftGroupId buildRaftGroupIdFromStr(String groupId) { + return groupId != null && groupId.isEmpty() ? RaftGroupId.valueOf(UUID.fromString(groupId)) + : DEFAULT_RAFT_GROUP_ID; + } + + public static RaftGroupId retrieveRemoteGroupId(RaftGroupId raftGroupIdFromConfig, + List peers, + RaftClient client, PrintStream printStream) throws IOException { + if (!DEFAULT_RAFT_GROUP_ID .equals(raftGroupIdFromConfig)) { + return raftGroupIdFromConfig; + } + + final RaftGroupId remoteGroupId; + final List groupIds = applyFunctionReturnFirstNonNull(peers, + p -> client.getGroupManagementApi((p.getId())).list().getGroupIds()); + + if (groupIds == null) { + printStream.println("Failed to get group ID from " + peers); + throw new IOException("Failed to get group ID from " + peers); + } else if (groupIds.size() == 1) { + remoteGroupId = groupIds.get(0); + } else { + String message = "Unexpected multiple group IDs " + groupIds + + ". In such case, the target group ID must be specified."; + printStream.println(message); + throw new IOException(message); + } + return remoteGroupId; + } + + public static GroupInfoReply retrieveGroupInfoByGroupId(RaftGroupId remoteGroupId, List peers, + RaftClient client, PrintStream printStream) + throws IOException { + GroupInfoReply groupInfoReply = applyFunctionReturnFirstNonNull(peers, + p -> client.getGroupManagementApi((p.getId())).info(remoteGroupId)); + processReply(groupInfoReply, printStream::println, + () -> "Failed to get group info for group id " + remoteGroupId.getUuid() + " from " + peers); + return groupInfoReply; + } + + public static void processReply(RaftClientReply reply, Consumer printer, Supplier message) + throws IOException { + if (reply == null || !reply.isSuccess()) { + final RaftException e = Optional.ofNullable(reply) + .map(RaftClientReply::getException) + .orElseGet(() -> new RaftException("Reply: " + reply)); + printer.accept(message.get()); + throw new IOException(e.getMessage(), e); + } + } + + public static InetSocketAddress parseInetSocketAddress(String address) { + try { + final String[] hostPortPair = address.split(":"); + if (hostPortPair.length < 2) { + throw new IllegalArgumentException("Unexpected address format ."); + } + return new InetSocketAddress(hostPortPair[0], Integer.parseInt(hostPortPair[1])); + } catch (Exception e) { + throw new IllegalArgumentException("Failed to parse the server address parameter \"" + address + "\".", e); + } + } + } diff --git a/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/command/AbstractRatisCommand.java b/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/command/AbstractRatisCommand.java index 1888c0e0ea..91bdc873b7 100644 --- a/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/command/AbstractRatisCommand.java +++ b/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/command/AbstractRatisCommand.java @@ -18,8 +18,12 @@ package org.apache.ratis.shell.cli.sh.command; import org.apache.commons.cli.Option; -import org.apache.ratis.protocol.*; -import org.apache.ratis.protocol.exceptions.RaftException; +import org.apache.ratis.protocol.RaftClientReply; +import org.apache.ratis.protocol.RaftGroup; +import org.apache.ratis.protocol.RaftGroupId; +import org.apache.ratis.protocol.RaftPeer; +import org.apache.ratis.protocol.RaftPeerId; +import org.apache.ratis.protocol.GroupInfoReply; import org.apache.ratis.shell.cli.RaftUtils; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Options; @@ -30,48 +34,30 @@ import org.apache.ratis.proto.RaftProtos.RaftPeerRole; import org.apache.ratis.proto.RaftProtos.RoleInfoProto; import org.apache.ratis.util.ProtoUtils; -import org.apache.ratis.util.function.CheckedFunction; import java.io.IOException; +import java.io.PrintStream; import java.net.InetSocketAddress; -import java.util.*; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Set; import java.util.function.BiConsumer; import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.apache.ratis.shell.cli.RaftUtils.buildRaftGroupIdFromStr; +import static org.apache.ratis.shell.cli.RaftUtils.buildRaftPeersFromStr; +import static org.apache.ratis.shell.cli.RaftUtils.retrieveGroupInfoByGroupId; +import static org.apache.ratis.shell.cli.RaftUtils.retrieveRemoteGroupId; + /** * The base class for the ratis shell which need to connect to server. */ public abstract class AbstractRatisCommand extends AbstractCommand { public static final String PEER_OPTION_NAME = "peers"; public static final String GROUPID_OPTION_NAME = "groupid"; - public static final RaftGroupId DEFAULT_RAFT_GROUP_ID = RaftGroupId.randomId(); - - /** - * Execute a given function with input parameter from the members of a list. - * - * @param list the input parameters - * @param function the function to be executed - * @param parameter type - * @param return value type - * @param the exception type thrown by the given function. - * @return the value returned by the given function. - */ - public static K run(Collection list, CheckedFunction function) { - for (T t : list) { - try { - K ret = function.apply(t); - if (ret != null) { - return ret; - } - } catch (Throwable e) { - e.printStackTrace(); - } - } - return null; - } - private RaftGroup raftGroup; private GroupInfoReply groupInfoReply; @@ -81,46 +67,13 @@ protected AbstractRatisCommand(Context context) { @Override public int run(CommandLine cl) throws IOException { - List addresses = new ArrayList<>(); - String peersStr = cl.getOptionValue(PEER_OPTION_NAME); - String[] peersArray = peersStr.split(","); - for (String peer : peersArray) { - addresses.add(parseInetSocketAddress(peer)); - } - - final RaftGroupId raftGroupIdFromConfig = cl.hasOption(GROUPID_OPTION_NAME)? - RaftGroupId.valueOf(UUID.fromString(cl.getOptionValue(GROUPID_OPTION_NAME))) - : DEFAULT_RAFT_GROUP_ID; - - List peers = addresses.stream() - .map(addr -> RaftPeer.newBuilder() - .setId(RaftUtils.getPeerId(addr)) - .setAddress(addr) - .build() - ).collect(Collectors.toList()); + List peers = buildRaftPeersFromStr(cl.getOptionValue(PEER_OPTION_NAME)); + RaftGroupId raftGroupIdFromConfig = buildRaftGroupIdFromStr(cl.getOptionValue(GROUPID_OPTION_NAME)); raftGroup = RaftGroup.valueOf(raftGroupIdFromConfig, peers); + PrintStream printStream = getPrintStream(); try (final RaftClient client = RaftUtils.createClient(raftGroup)) { - final RaftGroupId remoteGroupId; - if (raftGroupIdFromConfig != DEFAULT_RAFT_GROUP_ID) { - remoteGroupId = raftGroupIdFromConfig; - } else { - final List groupIds = run(peers, - p -> client.getGroupManagementApi((p.getId())).list().getGroupIds()); - - if (groupIds == null) { - println("Failed to get group ID from " + peers); - return -1; - } else if (groupIds.size() == 1) { - remoteGroupId = groupIds.get(0); - } else { - println("There are more than one groups, you should specific one. " + groupIds); - return -2; - } - } - - groupInfoReply = run(peers, p -> client.getGroupManagementApi((p.getId())).info(remoteGroupId)); - processReply(groupInfoReply, - () -> "Failed to get group info for group id " + remoteGroupId.getUuid() + " from " + peers); + final RaftGroupId remoteGroupId = retrieveRemoteGroupId(raftGroupIdFromConfig, peers, client, printStream); + groupInfoReply = retrieveGroupInfoByGroupId(remoteGroupId, peers, client, printStream); raftGroup = groupInfoReply.getGroup(); } return 0; @@ -168,14 +121,7 @@ protected RaftPeerProto getLeader(RoleInfoProto roleInfo) { } protected void processReply(RaftClientReply reply, Supplier messageSupplier) throws IOException { - if (reply == null || !reply.isSuccess()) { - final RaftException e = Optional.ofNullable(reply) - .map(RaftClientReply::getException) - .orElseGet(() -> new RaftException("Reply: " + reply)); - final String message = messageSupplier.get(); - printf("%s. Error: %s%n", message, e); - throw new IOException(message, e); - } + RaftUtils.processReply(reply, getPrintStream()::println, messageSupplier); } protected List getIds(String[] optionValues, BiConsumer consumer) {