diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/counter/server/CounterStateMachine.java b/ratis-examples/src/main/java/org/apache/ratis/examples/counter/server/CounterStateMachine.java index 299b67325b..90f41ddadf 100644 --- a/ratis-examples/src/main/java/org/apache/ratis/examples/counter/server/CounterStateMachine.java +++ b/ratis-examples/src/main/java/org/apache/ratis/examples/counter/server/CounterStateMachine.java @@ -82,7 +82,7 @@ int getCounter() { private final TimeDuration simulatedSlowness; - CounterStateMachine(TimeDuration simulatedSlowness) { + public CounterStateMachine(TimeDuration simulatedSlowness) { this.simulatedSlowness = simulatedSlowness; } CounterStateMachine() { diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/membership/server/CServer.java b/ratis-examples/src/main/java/org/apache/ratis/examples/membership/server/CServer.java new file mode 100644 index 0000000000..69cc4d58d4 --- /dev/null +++ b/ratis-examples/src/main/java/org/apache/ratis/examples/membership/server/CServer.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.examples.membership.server; + +import org.apache.ratis.RaftConfigKeys; +import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.examples.counter.server.CounterStateMachine; +import org.apache.ratis.netty.NettyConfigKeys; +import org.apache.ratis.protocol.RaftGroup; +import org.apache.ratis.protocol.RaftGroupId; +import org.apache.ratis.protocol.RaftPeer; +import org.apache.ratis.protocol.RaftPeerId; +import org.apache.ratis.rpc.SupportedRpcType; +import org.apache.ratis.server.RaftServer; +import org.apache.ratis.server.RaftServerConfigKeys; +import org.apache.ratis.server.storage.RaftStorage; +import org.apache.ratis.thirdparty.com.google.common.base.MoreObjects; +import org.apache.ratis.util.FileUtils; +import org.apache.ratis.util.TimeDuration; + +import java.io.Closeable; +import java.io.File; +import java.io.IOException; +import java.util.Collections; + +/** + * A simple raft server using {@link CounterStateMachine}. + */ +public class CServer implements Closeable { + public static final RaftGroupId GROUP_ID = RaftGroupId.randomId(); + public static final String LOCAL_ADDR = "0.0.0.0"; + + private final RaftServer server; + private final int port; + private final File storageDir; + + public CServer(RaftGroup group, RaftPeerId serverId, int port) throws IOException { + this.storageDir = new File("./" + serverId); + this.port = port; + + final RaftProperties properties = new RaftProperties(); + RaftServerConfigKeys.setStorageDir(properties, Collections.singletonList(storageDir)); + RaftConfigKeys.Rpc.setType(properties, SupportedRpcType.NETTY); + NettyConfigKeys.Server.setPort(properties, port); + + // create the counter state machine which holds the counter value. + final CounterStateMachine counterStateMachine = new CounterStateMachine(TimeDuration.ZERO); + + // build the Raft server. + this.server = RaftServer.newBuilder() + .setGroup(group) + .setProperties(properties) + .setServerId(serverId) + .setStateMachine(counterStateMachine) + .setOption(RaftStorage.StartupOption.FORMAT) + .build(); + } + + public void start() throws IOException { + server.start(); + } + + public RaftPeer getPeer() { + return server.getPeer(); + } + + @Override + public void close() throws IOException { + server.close(); + FileUtils.deleteFully(storageDir); + } + + @Override + public String toString() { + try { + return MoreObjects.toStringHelper(this) + .add("server", server.getPeer()) + .add("role", server.getDivision(GROUP_ID).getInfo().getCurrentRole()) + .toString(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + public int getPort() { + return port; + } +} diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/membership/server/Console.java b/ratis-examples/src/main/java/org/apache/ratis/examples/membership/server/Console.java new file mode 100644 index 0000000000..1015d59e29 --- /dev/null +++ b/ratis-examples/src/main/java/org/apache/ratis/examples/membership/server/Console.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.examples.membership.server; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Scanner; + +/** + * Interactive command line console. + */ +public class Console { + public static final String USAGE_MSG = + "Usage: java org.apache.ratis.examples.membership.server.Console [options]\n" + + "Options:\n" + + "\tupdate [new_peer_ports] Update membership to C_new. Separate ports with comma. " + + "e.g. update 5100,5101\n" + + "\tadd [peer_port] Add peer with peer_port to raft cluster. e.g. add 5103\n" + + "\tremove [peer_port] Remove peer with peer_port from raft cluster. e.g. remove" + + " 5100\n" + + "\tshow Show all peers of raft cluster.\n" + + "\tincr Increment the counter value.\n" + + "\tquery Query the value of counter.\n" + + "\tquit Quit."; + + private final Scanner sc = new Scanner(System.in, "UTF-8"); + private final RaftCluster cluster = new RaftCluster(); + + private void init() { + System.out.println("Raft Server Membership Example."); + System.out.println("Type ports seperated by comma for initial peers. e.g. 5100,5101,5102"); + + String[] portArguments = commandLineInput()[0].split(","); + List ports = new ArrayList<>(); + Arrays.stream(portArguments).map(Integer::parseInt).forEach(ports::add); + try { + cluster.init(ports); + } catch (IOException e) { + throw new RuntimeException(e); + } + show(); + System.out.println(USAGE_MSG); + } + + private void execute() { + while (true) { + try { + String[] args = commandLineInput(); + String command = args[0]; + + if (command.equalsIgnoreCase("show")) { + show(); + } else if (command.equalsIgnoreCase("add")) { + add(args, 1); + } else if (command.equalsIgnoreCase("remove")) { + remove(args, 1); + } else if (command.equalsIgnoreCase("update")) { + update(args, 1); + } else if (command.equalsIgnoreCase("incr")) { + cluster.counterIncrement(); + } else if (command.equalsIgnoreCase("query")) { + cluster.queryCounter(); + } else if (command.equalsIgnoreCase("quit")) { + break; + } else { + System.out.println(USAGE_MSG); + } + } catch (Exception e) { + System.out.println("Get error " + e.getMessage()); + } + } + try { + System.out.println("Closing cluster..."); + cluster.close(); + System.out.println("Cluster closed successfully."); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private void show() { + cluster.show(); + } + + private void add(String[] args, int index) throws IOException { + int port = Integer.parseInt(args[index]); + List ports = new ArrayList(); + ports.add(port); + ports.addAll(cluster.ports()); + cluster.update(ports); + } + + private void remove(String[] args, int index) throws IOException { + int port = Integer.parseInt(args[index]); + List ports = new ArrayList<>(); + ports.addAll(cluster.ports()); + if (ports.remove(Integer.valueOf(port))) { + cluster.update(ports); + } else { + System.out.println("Invalid port " + port); + } + } + + private void update(String[] args, int index) throws IOException { + String[] portStrArray = args[index].split(","); + List ports = new ArrayList<>(); + for (String portStr : portStrArray) { + ports.add(Integer.parseInt(portStr)); + } + cluster.update(ports); + } + + private String[] commandLineInput() { + System.out.print(">>> "); + return sc.nextLine().split(" "); + } + + public static void main(String[] args) throws IOException { + Console console = new Console(); + console.init(); + console.execute(); + } +} diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/membership/server/RaftCluster.java b/ratis-examples/src/main/java/org/apache/ratis/examples/membership/server/RaftCluster.java new file mode 100644 index 0000000000..ec1c6e0959 --- /dev/null +++ b/ratis-examples/src/main/java/org/apache/ratis/examples/membership/server/RaftCluster.java @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.examples.membership.server; + +import org.apache.ratis.client.RaftClient; +import org.apache.ratis.conf.Parameters; +import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.examples.counter.CounterCommand; +import org.apache.ratis.netty.NettyFactory; +import org.apache.ratis.protocol.ClientId; +import org.apache.ratis.protocol.RaftClientReply; +import org.apache.ratis.protocol.RaftGroup; +import org.apache.ratis.protocol.RaftPeer; +import org.apache.ratis.protocol.RaftPeerId; +import org.apache.ratis.util.Preconditions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.apache.ratis.examples.membership.server.CServer.GROUP_ID; +import static org.apache.ratis.examples.membership.server.CServer.LOCAL_ADDR; + +/** + * An in process raft cluster. Running all servers in a single process. + */ +public class RaftCluster { + private Map members = new HashMap<>(); + + /** + * Start cluster. + * + * @param initPorts the ports of the initial peers. + */ + public void init(Collection initPorts) throws IOException { + RaftGroup group = initGroup(initPorts); + for (int port : initPorts) { + CServer server = new CServer(group, peerId(port), port); + server.start(); + members.put(port, server); + } + } + + /** + * Update membership to C_new. + * + * @param newPorts the ports of the C_new peers. + */ + public void update(Collection newPorts) throws IOException { + Preconditions.assertTrue(members.size() > 0, "Cluster is empty."); + + Collection oldPeers = members.values(); + List newPeers = new ArrayList<>(); + List peerToStart = new ArrayList<>(); + List peerToStop = new ArrayList<>(); + + for (Integer port : newPorts) { + CServer server = members.get(port); + if (server == null) { + // New peer always start with an empty group. + RaftGroup group = RaftGroup.valueOf(GROUP_ID); + server = new CServer(group, peerId(port), port); + peerToStart.add(server); + } + newPeers.add(server); + } + + for (CServer peer : oldPeers) { + if (!newPeers.contains(peer)) { + peerToStop.add(peer); + } + } + + // Step 1: start new peers. + System.out.println("Update membership ...... Step 1: start new peers."); + System.out.println(peersInfo(peerToStart, "Peers_to_start")); + for (CServer server : peerToStart) { + server.start(); + } + + // Step 2: update membership. + System.out.println("Update membership ...... Step 2: update membership from C_old to C_new."); + System.out.println(peersInfo(oldPeers, "C_old")); + System.out.println(peersInfo(newPeers, "C_new")); + if (members.size() > 0) { + try (RaftClient client = createClient()) { + RaftClientReply reply = client.admin().setConfiguration(newPeers.stream() + .map(CServer::getPeer).collect(Collectors.toList())); + if (!reply.isSuccess()) { + throw reply.getException(); + } + } + } + + // Step 3: stop outdated peers. + System.out.println("Update membership ...... Step 3: stop outdated peers."); + System.out.println(peersInfo(peerToStop, "Peers_to_stop")); + for (CServer server : peerToStop) { + server.close(); + members.remove(server.getPort()); + } + + // Add new peers to members. + for (CServer server : peerToStart) { + members.put(server.getPort(), server); + } + } + + public void show() { + Collection peers = members.values(); + System.out.println(peersInfo(peers, "Cluster members")); + } + + public void counterIncrement() throws IOException { + RaftClient client = createClient(); + try { + RaftClientReply reply = client.io().send(CounterCommand.INCREMENT.getMessage()); + if (!reply.isSuccess()) { + throw reply.getException(); + } + } finally { + client.close(); + } + } + + public void queryCounter() throws IOException { + RaftClient client = createClient(); + try { + RaftClientReply reply = client.io().sendReadOnly(CounterCommand.GET.getMessage()); + String count = reply.getMessage().getContent().toStringUtf8(); + System.out.println("Current counter value: " + count); + } finally { + client.close(); + } + } + + /** + * Configure the raft group with initial peers. + */ + private RaftGroup initGroup(Collection ports) { + List peers = new ArrayList<>(); + for (int port : ports) { + peers.add(RaftPeer.newBuilder() + .setId(peerId(port)) + .setAddress(LOCAL_ADDR + ":" + port) + .build()); + } + members.values().stream().map(CServer::getPeer).forEach(peers::add); + return RaftGroup.valueOf(GROUP_ID, peers); + } + + public Collection ports() { + return members.keySet(); + } + + public void close() throws IOException { + for (CServer server : members.values()) { + server.close(); + } + } + + private RaftClient createClient() { + RaftProperties properties = new RaftProperties(); + RaftClient.Builder builder = RaftClient.newBuilder().setProperties(properties); + + builder.setRaftGroup(RaftGroup.valueOf(GROUP_ID, + members.values().stream().map(s -> s.getPeer()).collect(Collectors.toList()))); + + builder.setClientRpc(new NettyFactory(new Parameters()).newRaftClientRpc(ClientId.randomId(), properties)); + + return builder.build(); + } + + private static RaftPeerId peerId(int port) { + return RaftPeerId.valueOf("p" + port); + } + + private static String peersInfo(Collection peers, String prefix) { + StringBuilder msgBuilder = new StringBuilder(prefix).append("={"); + if (peers.size() == 0) { + msgBuilder.append("}"); + } else { + peers.forEach(p -> msgBuilder.append("\n\t").append(p)); + msgBuilder.append("\n}"); + } + return msgBuilder.toString(); + } +}