Skip to content

Commit

Permalink
RATIS-1901. Update Counter example to benchmark performance.
Browse files Browse the repository at this point in the history
  • Loading branch information
szetszwo committed Oct 25, 2023
1 parent 081940a commit b237529
Show file tree
Hide file tree
Showing 8 changed files with 195 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,12 @@ static String getSystemProperty(final String key) {
return doPrivileged(() -> System.getProperty(key), () -> "get system property " + key);
}

static String getEnv(String variable) {
final String value = System.getenv().get(variable);
LOG.info("ENV: {} = {}", variable, value);
return value;
}

/**
* Similar to {@link System#setProperty(String, String)}
* except that this method may invoke {@link AccessController#doPrivileged(PrivilegedAction)}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,41 +21,69 @@
import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.TimeDuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Reader;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
* Constants across servers and clients
*/
public final class Constants {
private static final Logger LOG = LoggerFactory.getLogger(Constants.class);

private static final String CONF_FILE_NAME = "conf.properties";
private static final List<String> CONF_FILE_DEFAULTS = Collections.unmodifiableList(Arrays.asList(
"examples/conf/" + CONF_FILE_NAME,
"ratis-examples/src/main/resources/" + CONF_FILE_NAME));
private static final String CONF_FILE_ENV_VAR_NAME = "RATIS_EXAMPLE_CONF";

static Path getConfPath() {
final String env = JavaUtils.getEnv(CONF_FILE_ENV_VAR_NAME);
final Stream<String> s = Stream.concat(
Optional.ofNullable(env).map(Stream::of).orElseGet(Stream::empty),
CONF_FILE_DEFAULTS.stream());
for(final Iterator<String> i = s.iterator(); i.hasNext(); ) {
final Path p = Paths.get(i.next());
if (Files.exists(p)) {
LOG.info("Using conf file {}", p);
return p;
}
}
throw new IllegalArgumentException("Conf file not found: please set environment variable \""
+ CONF_FILE_ENV_VAR_NAME + "\"");
}

public static final List<RaftPeer> PEERS;
public static final String PATH;
public static final List<TimeDuration> SIMULATED_SLOWNESS;

static {
final Properties properties = new Properties();
final String conf = "ratis-examples/src/main/resources/conf.properties";
try(InputStream inputStream = new FileInputStream(conf);
Reader reader = new InputStreamReader(inputStream, StandardCharsets.UTF_8);
BufferedReader bufferedReader = new BufferedReader(reader)) {
properties.load(bufferedReader);
final Path conf = getConfPath();
try(BufferedReader in = new BufferedReader(new InputStreamReader(
Files.newInputStream(conf), StandardCharsets.UTF_8))) {
properties.load(in);
} catch (IOException e) {
throw new IllegalStateException("Failed to load " + conf, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.ratis.examples.counter;

import org.apache.ratis.protocol.Message;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;

/**
* The supported commands the Counter example.
Expand All @@ -38,4 +39,9 @@ public Message getMessage() {
public boolean matches(String command) {
return name().equalsIgnoreCase(command);
}

/** Does the given command string match this command? */
public boolean matches(ByteString command) {
return message.getContent().equals(command);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,23 @@
import org.apache.ratis.examples.common.Constants;
import org.apache.ratis.examples.counter.CounterCommand;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.TimeDuration;
import org.apache.ratis.util.Timestamp;

import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

/**
* Counter client application, this application sends specific number of
Expand All @@ -42,81 +49,113 @@
* parameter found, application use default value which is 10
*/
public final class CounterClient implements Closeable {
enum Mode {
DRY_RUN, IO, ASYNC;

static Mode parse(String s) {
for(Mode m : values()) {
if (m.name().equalsIgnoreCase(s)) {
return m;
}
}
return DRY_RUN;
}
}

//build the client
private final RaftClient client = RaftClient.newBuilder()
.setProperties(new RaftProperties())
.setRaftGroup(Constants.RAFT_GROUP)
.build();
static RaftClient newClient() {
return RaftClient.newBuilder()
.setProperties(new RaftProperties())
.setRaftGroup(Constants.RAFT_GROUP)
.build();
}

private final RaftClient client = newClient();

@Override
public void close() throws IOException {
client.close();
}

private void run(int increment, boolean blocking) throws Exception {
System.out.printf("Sending %d %s command(s) using the %s ...%n",
increment, CounterCommand.INCREMENT, blocking? "BlockingApi": "AsyncApi");
final List<Future<RaftClientReply>> futures = new ArrayList<>(increment);
static RaftClientReply assertReply(RaftClientReply reply) {
Preconditions.assertTrue(reply.isSuccess(), "Failed");
return reply;
}

static void send(int increment, Mode mode, RaftClient client) throws Exception {
final List<CompletableFuture<RaftClientReply>> futures = new ArrayList<>(increment);

//send INCREMENT command(s)
if (blocking) {
if (mode == Mode.IO) {
// use BlockingApi
final ExecutorService executor = Executors.newFixedThreadPool(10);
for (int i = 0; i < increment; i++) {
final Future<RaftClientReply> f = executor.submit(
() -> client.io().send(CounterCommand.INCREMENT.getMessage()));
futures.add(f);
final RaftClientReply reply = client.io().send(CounterCommand.INCREMENT.getMessage());
futures.add(CompletableFuture.completedFuture(reply));
}
executor.shutdown();
} else {
} else if (mode == Mode.ASYNC) {
// use AsyncApi
for (int i = 0; i < increment; i++) {
final Future<RaftClientReply> f = client.async().send(CounterCommand.INCREMENT.getMessage());
futures.add(f);
futures.add(client.async().send(CounterCommand.INCREMENT.getMessage()).thenApply(CounterClient::assertReply));
}

//wait for the futures
JavaUtils.allOf(futures).get();
}
}

//wait for the futures
for (Future<RaftClientReply> f : futures) {
final RaftClientReply reply = f.get();
if (reply.isSuccess()) {
final String count = reply.getMessage().getContent().toStringUtf8();
System.out.println("Counter is incremented to " + count);
} else {
System.err.println("Failed " + reply);
}
private void send(int increment, Mode mode) {
try (RaftClient c = newClient()) {
send(increment, mode, c);
} catch (Exception e) {
throw new CompletionException(e);
}
}

private void run(int increment, Mode mode, int numClients, ExecutorService executor) throws Exception {
System.out.printf("Sending %d %s command(s) in %s mode with %d client(s) ...%n",
increment, CounterCommand.INCREMENT, mode, numClients);
final Timestamp sendStarted = Timestamp.currentTime();
final List<CompletableFuture<Void>> sendFutures = new ArrayList<>(numClients - 1);
for (int i = 1; i < numClients; i++) {
sendFutures.add(CompletableFuture.runAsync(() -> send(increment, mode), executor));
}
send(increment, mode, client);
JavaUtils.allOf(sendFutures).get();
final TimeDuration sendElapsed = sendStarted.elapsedTime();
System.out.printf("Completed sending command(s) in %s, op/s is %01.2f%n",
sendElapsed.toString(TimeUnit.SECONDS, 3),
increment * 1000.0 / sendElapsed.toLong(TimeUnit.MILLISECONDS));

if (mode == Mode.DRY_RUN) {
return;
}

//send a GET command and print the reply
final RaftClientReply reply = client.io().sendReadOnly(CounterCommand.GET.getMessage());
final String count = reply.getMessage().getContent().toStringUtf8();
final long count = reply.getMessage().getContent().asReadOnlyByteBuffer().getLong();
System.out.println("Current counter value: " + count);

// using Linearizable Read
futures.clear();
final long startTime = System.currentTimeMillis();
final ExecutorService executor = Executors.newFixedThreadPool(Constants.PEERS.size());
Constants.PEERS.forEach(p -> {
final Future<RaftClientReply> f = CompletableFuture.supplyAsync(() -> {
try {
return client.io().sendReadOnly(CounterCommand.GET.getMessage(), p.getId());
} catch (IOException e) {
System.err.println("Failed read-only request");
return RaftClientReply.newBuilder().setSuccess(false).build();
}
}, executor).whenCompleteAsync((r, ex) -> {
if (ex != null || !r.isSuccess()) {
System.err.println("Failed " + r);
return;
}
final long endTime = System.currentTimeMillis();
final long elapsedSec = (endTime-startTime) / 1000;
final String countValue = r.getMessage().getContent().toStringUtf8();
System.out.println("read from " + p.getId() + " and get counter value: " + countValue
+ ", time elapsed: " + elapsedSec + " seconds");
});
futures.add(f);
});
final Timestamp readStarted = Timestamp.currentTime();
final List<CompletableFuture<RaftClientReply>> futures = Constants.PEERS.stream()
.map(p -> CompletableFuture.supplyAsync(() -> {
try {
return client.io().sendReadOnly(CounterCommand.GET.getMessage(), p.getId());
} catch (IOException e) {
System.err.println("Failed read-only request");
return RaftClientReply.newBuilder().setSuccess(false).build();
}
}, executor).whenComplete((r, ex) -> {
if (ex != null || !r.isSuccess()) {
System.err.println("Failed " + r);
return;
}

final TimeDuration readElapsed = readStarted.elapsedTime();
final long countValue = r.getMessage().getContent().asReadOnlyByteBuffer().getLong();
System.out.printf("read from " + p.getId() + " and get counter value: " + countValue
+ ", time elapsed: " + readElapsed.toString(TimeUnit.SECONDS, 3));
})).collect(Collectors.toList());

for (Future<RaftClientReply> f : futures) {
f.get();
Expand All @@ -127,17 +166,25 @@ public static void main(String[] args) {
try(CounterClient client = new CounterClient()) {
//the number of INCREMENT commands, default is 10
final int increment = args.length > 0 ? Integer.parseInt(args[0]) : 10;
final boolean io = args.length > 1 && "io".equalsIgnoreCase(args[1]);
client.run(increment, io);
final Mode mode = Mode.parse(args.length > 1? args[1] : null);
final int numClients = args.length > 2 ? Integer.parseInt(args[0]) : 1;

final ExecutorService executor = Executors.newFixedThreadPool(Math.max(numClients, Constants.PEERS.size()));
try {
client.run(increment, mode, numClients, executor);
} finally {
executor.shutdown();
}
} catch (Throwable e) {
e.printStackTrace();
System.err.println();
System.err.println("args = " + Arrays.toString(args));
System.err.println();
System.err.println("Usage: java org.apache.ratis.examples.counter.client.CounterClient [increment] [async|io]");
System.err.printf("Usage: java %s [increment] [dry_run|async|io] [num_clients]%n", CounterClient.class.getName());
System.err.println();
System.err.println(" increment: the number of INCREMENT commands to be sent (default is 10)");
System.err.println(" async : use the AsyncApi (default)");
System.err.println(" dry_run : dry run only (default)");
System.err.println(" async : use the AsyncApi");
System.err.println(" io : use the BlockingApi");
System.exit(1);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.storage.RaftStorage;
import org.apache.ratis.util.NetUtils;
import org.apache.ratis.util.TimeDuration;

Expand Down Expand Up @@ -76,6 +77,7 @@ public CounterServer(RaftPeer peer, File storageDir, TimeDuration simulatedSlown
.setProperties(properties)
.setServerId(peer.getId())
.setStateMachine(counterStateMachine)
.setOption(RaftStorage.StartupOption.RECOVER)
.build();
}

Expand Down
Loading

0 comments on commit b237529

Please sign in to comment.