forked from apache/ratis
-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
RATIS-1911. Add MembershipManager example. (apache#941)
- Loading branch information
1 parent
a0f74e5
commit 081940a
Showing
4 changed files
with
451 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
103 changes: 103 additions & 0 deletions
103
ratis-examples/src/main/java/org/apache/ratis/examples/membership/server/CServer.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,103 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package org.apache.ratis.examples.membership.server; | ||
|
||
import org.apache.ratis.RaftConfigKeys; | ||
import org.apache.ratis.conf.RaftProperties; | ||
import org.apache.ratis.examples.counter.server.CounterStateMachine; | ||
import org.apache.ratis.netty.NettyConfigKeys; | ||
import org.apache.ratis.protocol.RaftGroup; | ||
import org.apache.ratis.protocol.RaftGroupId; | ||
import org.apache.ratis.protocol.RaftPeer; | ||
import org.apache.ratis.protocol.RaftPeerId; | ||
import org.apache.ratis.rpc.SupportedRpcType; | ||
import org.apache.ratis.server.RaftServer; | ||
import org.apache.ratis.server.RaftServerConfigKeys; | ||
import org.apache.ratis.server.storage.RaftStorage; | ||
import org.apache.ratis.thirdparty.com.google.common.base.MoreObjects; | ||
import org.apache.ratis.util.FileUtils; | ||
import org.apache.ratis.util.TimeDuration; | ||
|
||
import java.io.Closeable; | ||
import java.io.File; | ||
import java.io.IOException; | ||
import java.util.Collections; | ||
|
||
/** | ||
* A simple raft server using {@link CounterStateMachine}. | ||
*/ | ||
public class CServer implements Closeable { | ||
public static final RaftGroupId GROUP_ID = RaftGroupId.randomId(); | ||
public static final String LOCAL_ADDR = "0.0.0.0"; | ||
|
||
private final RaftServer server; | ||
private final int port; | ||
private final File storageDir; | ||
|
||
public CServer(RaftGroup group, RaftPeerId serverId, int port) throws IOException { | ||
this.storageDir = new File("./" + serverId); | ||
this.port = port; | ||
|
||
final RaftProperties properties = new RaftProperties(); | ||
RaftServerConfigKeys.setStorageDir(properties, Collections.singletonList(storageDir)); | ||
RaftConfigKeys.Rpc.setType(properties, SupportedRpcType.NETTY); | ||
NettyConfigKeys.Server.setPort(properties, port); | ||
|
||
// create the counter state machine which holds the counter value. | ||
final CounterStateMachine counterStateMachine = new CounterStateMachine(TimeDuration.ZERO); | ||
|
||
// build the Raft server. | ||
this.server = RaftServer.newBuilder() | ||
.setGroup(group) | ||
.setProperties(properties) | ||
.setServerId(serverId) | ||
.setStateMachine(counterStateMachine) | ||
.setOption(RaftStorage.StartupOption.FORMAT) | ||
.build(); | ||
} | ||
|
||
public void start() throws IOException { | ||
server.start(); | ||
} | ||
|
||
public RaftPeer getPeer() { | ||
return server.getPeer(); | ||
} | ||
|
||
@Override | ||
public void close() throws IOException { | ||
server.close(); | ||
FileUtils.deleteFully(storageDir); | ||
} | ||
|
||
@Override | ||
public String toString() { | ||
try { | ||
return MoreObjects.toStringHelper(this) | ||
.add("server", server.getPeer()) | ||
.add("role", server.getDivision(GROUP_ID).getInfo().getCurrentRole()) | ||
.toString(); | ||
} catch (IOException e) { | ||
throw new RuntimeException(e); | ||
} | ||
} | ||
|
||
public int getPort() { | ||
return port; | ||
} | ||
} |
140 changes: 140 additions & 0 deletions
140
ratis-examples/src/main/java/org/apache/ratis/examples/membership/server/Console.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,140 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package org.apache.ratis.examples.membership.server; | ||
|
||
import java.io.IOException; | ||
import java.util.ArrayList; | ||
import java.util.Arrays; | ||
import java.util.List; | ||
import java.util.Scanner; | ||
|
||
/** | ||
* Interactive command line console. | ||
*/ | ||
public class Console { | ||
public static final String USAGE_MSG = | ||
"Usage: java org.apache.ratis.examples.membership.server.Console [options]\n" | ||
+ "Options:\n" | ||
+ "\tupdate [new_peer_ports] Update membership to C_new. Separate ports with comma. " | ||
+ "e.g. update 5100,5101\n" | ||
+ "\tadd [peer_port] Add peer with peer_port to raft cluster. e.g. add 5103\n" | ||
+ "\tremove [peer_port] Remove peer with peer_port from raft cluster. e.g. remove" | ||
+ " 5100\n" | ||
+ "\tshow Show all peers of raft cluster.\n" | ||
+ "\tincr Increment the counter value.\n" | ||
+ "\tquery Query the value of counter.\n" | ||
+ "\tquit Quit."; | ||
|
||
private final Scanner sc = new Scanner(System.in, "UTF-8"); | ||
private final RaftCluster cluster = new RaftCluster(); | ||
|
||
private void init() { | ||
System.out.println("Raft Server Membership Example."); | ||
System.out.println("Type ports seperated by comma for initial peers. e.g. 5100,5101,5102"); | ||
|
||
String[] portArguments = commandLineInput()[0].split(","); | ||
List<Integer> ports = new ArrayList<>(); | ||
Arrays.stream(portArguments).map(Integer::parseInt).forEach(ports::add); | ||
try { | ||
cluster.init(ports); | ||
} catch (IOException e) { | ||
throw new RuntimeException(e); | ||
} | ||
show(); | ||
System.out.println(USAGE_MSG); | ||
} | ||
|
||
private void execute() { | ||
while (true) { | ||
try { | ||
String[] args = commandLineInput(); | ||
String command = args[0]; | ||
|
||
if (command.equalsIgnoreCase("show")) { | ||
show(); | ||
} else if (command.equalsIgnoreCase("add")) { | ||
add(args, 1); | ||
} else if (command.equalsIgnoreCase("remove")) { | ||
remove(args, 1); | ||
} else if (command.equalsIgnoreCase("update")) { | ||
update(args, 1); | ||
} else if (command.equalsIgnoreCase("incr")) { | ||
cluster.counterIncrement(); | ||
} else if (command.equalsIgnoreCase("query")) { | ||
cluster.queryCounter(); | ||
} else if (command.equalsIgnoreCase("quit")) { | ||
break; | ||
} else { | ||
System.out.println(USAGE_MSG); | ||
} | ||
} catch (Exception e) { | ||
System.out.println("Get error " + e.getMessage()); | ||
} | ||
} | ||
try { | ||
System.out.println("Closing cluster..."); | ||
cluster.close(); | ||
System.out.println("Cluster closed successfully."); | ||
} catch (IOException e) { | ||
throw new RuntimeException(e); | ||
} | ||
} | ||
|
||
private void show() { | ||
cluster.show(); | ||
} | ||
|
||
private void add(String[] args, int index) throws IOException { | ||
int port = Integer.parseInt(args[index]); | ||
List<Integer> ports = new ArrayList(); | ||
ports.add(port); | ||
ports.addAll(cluster.ports()); | ||
cluster.update(ports); | ||
} | ||
|
||
private void remove(String[] args, int index) throws IOException { | ||
int port = Integer.parseInt(args[index]); | ||
List<Integer> ports = new ArrayList<>(); | ||
ports.addAll(cluster.ports()); | ||
if (ports.remove(Integer.valueOf(port))) { | ||
cluster.update(ports); | ||
} else { | ||
System.out.println("Invalid port " + port); | ||
} | ||
} | ||
|
||
private void update(String[] args, int index) throws IOException { | ||
String[] portStrArray = args[index].split(","); | ||
List<Integer> ports = new ArrayList<>(); | ||
for (String portStr : portStrArray) { | ||
ports.add(Integer.parseInt(portStr)); | ||
} | ||
cluster.update(ports); | ||
} | ||
|
||
private String[] commandLineInput() { | ||
System.out.print(">>> "); | ||
return sc.nextLine().split(" "); | ||
} | ||
|
||
public static void main(String[] args) throws IOException { | ||
Console console = new Console(); | ||
console.init(); | ||
console.execute(); | ||
} | ||
} |
Oops, something went wrong.