Skip to content

Commit

Permalink
interop-testing: Move soak out of AbstractInteropTest
Browse files Browse the repository at this point in the history
The soak code grew considerably in 6a92a2a. Since it isn't a JUnit
test and doesn't resemble the other tests, it doesn't belong in
AbstractInteropTest. AbstractInteropTest has lots of users, including it
being re-compiled for use on Android, so moving it out makes the
remaining code more clear for the more common cases.
  • Loading branch information
ejona86 committed Jan 14, 2025
1 parent 87b27b1 commit 87c7b7a
Show file tree
Hide file tree
Showing 5 changed files with 330 additions and 269 deletions.
1 change: 0 additions & 1 deletion android-interop-testing/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ dependencies {
project(':grpc-protobuf-lite'),
project(':grpc-stub'),
project(':grpc-testing'),
libraries.hdrhistogram,
libraries.junit,
libraries.truth,
libraries.androidx.test.rules,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import static org.junit.Assert.fail;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.io.ByteStreams;
Expand Down Expand Up @@ -120,7 +119,6 @@
import javax.annotation.Nullable;
import javax.net.ssl.SSLPeerUnverifiedException;
import javax.net.ssl.SSLSession;
import org.HdrHistogram.Histogram;
import org.junit.After;
import org.junit.Assert;
import org.junit.Assume;
Expand Down Expand Up @@ -1681,235 +1679,6 @@ public void getServerAddressAndLocalAddressFromClient() {
assertNotNull(obtainLocalClientAddr());
}

private static class SoakIterationResult {
public SoakIterationResult(long latencyMs, Status status) {
this.latencyMs = latencyMs;
this.status = status;
}

public long getLatencyMs() {
return latencyMs;
}

public Status getStatus() {
return status;
}

private long latencyMs = -1;
private Status status = Status.OK;
}


private static class ThreadResults {
private int threadFailures = 0;
private int iterationsDone = 0;
private Histogram latencies = new Histogram(4);

public int getThreadFailures() {
return threadFailures;
}

public int getIterationsDone() {
return iterationsDone;
}

public Histogram getLatencies() {
return latencies;
}
}

private SoakIterationResult performOneSoakIteration(
TestServiceGrpc.TestServiceBlockingStub soakStub, int soakRequestSize, int soakResponseSize)
throws InterruptedException {
long startNs = System.nanoTime();
Status status = Status.OK;
try {
final SimpleRequest request =
SimpleRequest.newBuilder()
.setResponseSize(soakResponseSize)
.setPayload(
Payload.newBuilder().setBody(ByteString.copyFrom(new byte[soakRequestSize])))
.build();
final SimpleResponse goldenResponse =
SimpleResponse.newBuilder()
.setPayload(
Payload.newBuilder().setBody(ByteString.copyFrom(new byte[soakResponseSize])))
.build();
assertResponse(goldenResponse, soakStub.unaryCall(request));
} catch (StatusRuntimeException e) {
status = e.getStatus();
}
long elapsedNs = System.nanoTime() - startNs;
return new SoakIterationResult(TimeUnit.NANOSECONDS.toMillis(elapsedNs), status);
}

/**
* Runs large unary RPCs in a loop with configurable failure thresholds
* and channel creation behavior.
*/
public void performSoakTest(
String serverUri,
int soakIterations,
int maxFailures,
int maxAcceptablePerIterationLatencyMs,
int minTimeMsBetweenRpcs,
int overallTimeoutSeconds,
int soakRequestSize,
int soakResponseSize,
int numThreads,
Function<ManagedChannel, ManagedChannel> createNewChannel)
throws InterruptedException {
if (soakIterations % numThreads != 0) {
throw new IllegalArgumentException("soakIterations must be evenly divisible by numThreads.");
}
ManagedChannel sharedChannel = createChannel();
long startNs = System.nanoTime();
Thread[] threads = new Thread[numThreads];
int soakIterationsPerThread = soakIterations / numThreads;
List<ThreadResults> threadResultsList = new ArrayList<>(numThreads);
for (int i = 0; i < numThreads; i++) {
threadResultsList.add(new ThreadResults());
}
for (int threadInd = 0; threadInd < numThreads; threadInd++) {
final int currentThreadInd = threadInd;
threads[threadInd] = new Thread(() -> {
try {
executeSoakTestInThread(
soakIterationsPerThread,
startNs,
minTimeMsBetweenRpcs,
soakRequestSize,
soakResponseSize,
maxAcceptablePerIterationLatencyMs,
overallTimeoutSeconds,
serverUri,
threadResultsList.get(currentThreadInd),
sharedChannel,
createNewChannel);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Thread interrupted: " + e.getMessage(), e);
}
});
threads[threadInd].start();
}
for (Thread thread : threads) {
thread.join();
}

int totalFailures = 0;
int iterationsDone = 0;
Histogram latencies = new Histogram(4);
for (ThreadResults threadResult :threadResultsList) {
totalFailures += threadResult.getThreadFailures();
iterationsDone += threadResult.getIterationsDone();
latencies.add(threadResult.getLatencies());
}
System.err.println(
String.format(
Locale.US,
"(server_uri: %s) soak test ran: %d / %d iterations. total failures: %d. "
+ "p50: %d ms, p90: %d ms, p100: %d ms",
serverUri,
iterationsDone,
soakIterations,
totalFailures,
latencies.getValueAtPercentile(50),
latencies.getValueAtPercentile(90),
latencies.getValueAtPercentile(100)));
// check if we timed out
String timeoutErrorMessage =
String.format(
Locale.US,
"(server_uri: %s) soak test consumed all %d seconds of time and quit early, "
+ "only having ran %d out of desired %d iterations.",
serverUri,
overallTimeoutSeconds,
iterationsDone,
soakIterations);
assertEquals(timeoutErrorMessage, iterationsDone, soakIterations);
// check if we had too many failures
String tooManyFailuresErrorMessage =
String.format(
Locale.US,
"(server_uri: %s) soak test total failures: %d exceeds max failures "
+ "threshold: %d.",
serverUri, totalFailures, maxFailures);
assertTrue(tooManyFailuresErrorMessage, totalFailures <= maxFailures);
shutdownChannel(sharedChannel);
}

private void shutdownChannel(ManagedChannel channel) throws InterruptedException {
if (channel != null) {
channel.shutdownNow();
channel.awaitTermination(10, TimeUnit.SECONDS);
}
}

protected ManagedChannel createNewChannel(ManagedChannel currentChannel) {
try {
shutdownChannel(currentChannel);
return createChannel();
} catch (InterruptedException e) {
throw new RuntimeException("Interrupted while creating a new channel", e);
}
}

private void executeSoakTestInThread(
int soakIterationsPerThread,
long startNs,
int minTimeMsBetweenRpcs,
int soakRequestSize,
int soakResponseSize,
int maxAcceptablePerIterationLatencyMs,
int overallTimeoutSeconds,
String serverUri,
ThreadResults threadResults,
ManagedChannel sharedChannel,
Function<ManagedChannel, ManagedChannel> maybeCreateChannel) throws InterruptedException {
ManagedChannel currentChannel = sharedChannel;
for (int i = 0; i < soakIterationsPerThread; i++) {
if (System.nanoTime() - startNs >= TimeUnit.SECONDS.toNanos(overallTimeoutSeconds)) {
break;
}
long earliestNextStartNs = System.nanoTime()
+ TimeUnit.MILLISECONDS.toNanos(minTimeMsBetweenRpcs);
// recordClientCallInterceptor takes an AtomicReference.
AtomicReference<ClientCall<?, ?>> soakThreadClientCallCapture = new AtomicReference<>();
currentChannel = maybeCreateChannel.apply(currentChannel);
TestServiceGrpc.TestServiceBlockingStub currentStub = TestServiceGrpc
.newBlockingStub(currentChannel)
.withInterceptors(recordClientCallInterceptor(soakThreadClientCallCapture));
SoakIterationResult result = performOneSoakIteration(currentStub,
soakRequestSize, soakResponseSize);
SocketAddress peer = soakThreadClientCallCapture
.get().getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
StringBuilder logStr = new StringBuilder(
String.format(
Locale.US,
"thread id: %d soak iteration: %d elapsed_ms: %d peer: %s server_uri: %s",
Thread.currentThread().getId(),
i, result.getLatencyMs(), peer != null ? peer.toString() : "null", serverUri));
if (!result.getStatus().equals(Status.OK)) {
threadResults.threadFailures++;
logStr.append(String.format(" failed: %s", result.getStatus()));
} else if (result.getLatencyMs() > maxAcceptablePerIterationLatencyMs) {
threadResults.threadFailures++;
logStr.append(
" exceeds max acceptable latency: " + maxAcceptablePerIterationLatencyMs);
} else {
logStr.append(" succeeded");
}
System.err.println(logStr.toString());
threadResults.iterationsDone++;
threadResults.getLatencies().recordValue(result.getLatencyMs());
long remainingNs = earliestNextStartNs - System.nanoTime();
if (remainingNs > 0) {
TimeUnit.NANOSECONDS.sleep(remainingNs);
}
}
}

private static void assertSuccess(StreamRecorder<?> recorder) {
if (recorder.getError() != null) {
throw new AssertionError(recorder.getError());
Expand Down
Loading

0 comments on commit 87c7b7a

Please sign in to comment.