diff --git a/android-interop-testing/build.gradle b/android-interop-testing/build.gradle index 1d39aee1750..4f775d734e9 100644 --- a/android-interop-testing/build.gradle +++ b/android-interop-testing/build.gradle @@ -73,7 +73,6 @@ dependencies { project(':grpc-protobuf-lite'), project(':grpc-stub'), project(':grpc-testing'), - libraries.hdrhistogram, libraries.junit, libraries.truth, libraries.androidx.test.rules, diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java b/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java index 3f2aa048dec..88d570e7134 100644 --- a/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java +++ b/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java @@ -28,7 +28,6 @@ import static org.junit.Assert.fail; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Function; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.io.ByteStreams; @@ -120,7 +119,6 @@ import javax.annotation.Nullable; import javax.net.ssl.SSLPeerUnverifiedException; import javax.net.ssl.SSLSession; -import org.HdrHistogram.Histogram; import org.junit.After; import org.junit.Assert; import org.junit.Assume; @@ -1681,235 +1679,6 @@ public void getServerAddressAndLocalAddressFromClient() { assertNotNull(obtainLocalClientAddr()); } - private static class SoakIterationResult { - public SoakIterationResult(long latencyMs, Status status) { - this.latencyMs = latencyMs; - this.status = status; - } - - public long getLatencyMs() { - return latencyMs; - } - - public Status getStatus() { - return status; - } - - private long latencyMs = -1; - private Status status = Status.OK; - } - - - private static class ThreadResults { - private int threadFailures = 0; - private int iterationsDone = 0; - private Histogram latencies = new Histogram(4); - - public int getThreadFailures() { - return threadFailures; - } - - public int getIterationsDone() { - return iterationsDone; - } - - public Histogram getLatencies() { - return latencies; - } - } - - private SoakIterationResult performOneSoakIteration( - TestServiceGrpc.TestServiceBlockingStub soakStub, int soakRequestSize, int soakResponseSize) - throws InterruptedException { - long startNs = System.nanoTime(); - Status status = Status.OK; - try { - final SimpleRequest request = - SimpleRequest.newBuilder() - .setResponseSize(soakResponseSize) - .setPayload( - Payload.newBuilder().setBody(ByteString.copyFrom(new byte[soakRequestSize]))) - .build(); - final SimpleResponse goldenResponse = - SimpleResponse.newBuilder() - .setPayload( - Payload.newBuilder().setBody(ByteString.copyFrom(new byte[soakResponseSize]))) - .build(); - assertResponse(goldenResponse, soakStub.unaryCall(request)); - } catch (StatusRuntimeException e) { - status = e.getStatus(); - } - long elapsedNs = System.nanoTime() - startNs; - return new SoakIterationResult(TimeUnit.NANOSECONDS.toMillis(elapsedNs), status); - } - - /** - * Runs large unary RPCs in a loop with configurable failure thresholds - * and channel creation behavior. - */ - public void performSoakTest( - String serverUri, - int soakIterations, - int maxFailures, - int maxAcceptablePerIterationLatencyMs, - int minTimeMsBetweenRpcs, - int overallTimeoutSeconds, - int soakRequestSize, - int soakResponseSize, - int numThreads, - Function createNewChannel) - throws InterruptedException { - if (soakIterations % numThreads != 0) { - throw new IllegalArgumentException("soakIterations must be evenly divisible by numThreads."); - } - ManagedChannel sharedChannel = createChannel(); - long startNs = System.nanoTime(); - Thread[] threads = new Thread[numThreads]; - int soakIterationsPerThread = soakIterations / numThreads; - List threadResultsList = new ArrayList<>(numThreads); - for (int i = 0; i < numThreads; i++) { - threadResultsList.add(new ThreadResults()); - } - for (int threadInd = 0; threadInd < numThreads; threadInd++) { - final int currentThreadInd = threadInd; - threads[threadInd] = new Thread(() -> { - try { - executeSoakTestInThread( - soakIterationsPerThread, - startNs, - minTimeMsBetweenRpcs, - soakRequestSize, - soakResponseSize, - maxAcceptablePerIterationLatencyMs, - overallTimeoutSeconds, - serverUri, - threadResultsList.get(currentThreadInd), - sharedChannel, - createNewChannel); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException("Thread interrupted: " + e.getMessage(), e); - } - }); - threads[threadInd].start(); - } - for (Thread thread : threads) { - thread.join(); - } - - int totalFailures = 0; - int iterationsDone = 0; - Histogram latencies = new Histogram(4); - for (ThreadResults threadResult :threadResultsList) { - totalFailures += threadResult.getThreadFailures(); - iterationsDone += threadResult.getIterationsDone(); - latencies.add(threadResult.getLatencies()); - } - System.err.println( - String.format( - Locale.US, - "(server_uri: %s) soak test ran: %d / %d iterations. total failures: %d. " - + "p50: %d ms, p90: %d ms, p100: %d ms", - serverUri, - iterationsDone, - soakIterations, - totalFailures, - latencies.getValueAtPercentile(50), - latencies.getValueAtPercentile(90), - latencies.getValueAtPercentile(100))); - // check if we timed out - String timeoutErrorMessage = - String.format( - Locale.US, - "(server_uri: %s) soak test consumed all %d seconds of time and quit early, " - + "only having ran %d out of desired %d iterations.", - serverUri, - overallTimeoutSeconds, - iterationsDone, - soakIterations); - assertEquals(timeoutErrorMessage, iterationsDone, soakIterations); - // check if we had too many failures - String tooManyFailuresErrorMessage = - String.format( - Locale.US, - "(server_uri: %s) soak test total failures: %d exceeds max failures " - + "threshold: %d.", - serverUri, totalFailures, maxFailures); - assertTrue(tooManyFailuresErrorMessage, totalFailures <= maxFailures); - shutdownChannel(sharedChannel); - } - - private void shutdownChannel(ManagedChannel channel) throws InterruptedException { - if (channel != null) { - channel.shutdownNow(); - channel.awaitTermination(10, TimeUnit.SECONDS); - } - } - - protected ManagedChannel createNewChannel(ManagedChannel currentChannel) { - try { - shutdownChannel(currentChannel); - return createChannel(); - } catch (InterruptedException e) { - throw new RuntimeException("Interrupted while creating a new channel", e); - } - } - - private void executeSoakTestInThread( - int soakIterationsPerThread, - long startNs, - int minTimeMsBetweenRpcs, - int soakRequestSize, - int soakResponseSize, - int maxAcceptablePerIterationLatencyMs, - int overallTimeoutSeconds, - String serverUri, - ThreadResults threadResults, - ManagedChannel sharedChannel, - Function maybeCreateChannel) throws InterruptedException { - ManagedChannel currentChannel = sharedChannel; - for (int i = 0; i < soakIterationsPerThread; i++) { - if (System.nanoTime() - startNs >= TimeUnit.SECONDS.toNanos(overallTimeoutSeconds)) { - break; - } - long earliestNextStartNs = System.nanoTime() - + TimeUnit.MILLISECONDS.toNanos(minTimeMsBetweenRpcs); - // recordClientCallInterceptor takes an AtomicReference. - AtomicReference> soakThreadClientCallCapture = new AtomicReference<>(); - currentChannel = maybeCreateChannel.apply(currentChannel); - TestServiceGrpc.TestServiceBlockingStub currentStub = TestServiceGrpc - .newBlockingStub(currentChannel) - .withInterceptors(recordClientCallInterceptor(soakThreadClientCallCapture)); - SoakIterationResult result = performOneSoakIteration(currentStub, - soakRequestSize, soakResponseSize); - SocketAddress peer = soakThreadClientCallCapture - .get().getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR); - StringBuilder logStr = new StringBuilder( - String.format( - Locale.US, - "thread id: %d soak iteration: %d elapsed_ms: %d peer: %s server_uri: %s", - Thread.currentThread().getId(), - i, result.getLatencyMs(), peer != null ? peer.toString() : "null", serverUri)); - if (!result.getStatus().equals(Status.OK)) { - threadResults.threadFailures++; - logStr.append(String.format(" failed: %s", result.getStatus())); - } else if (result.getLatencyMs() > maxAcceptablePerIterationLatencyMs) { - threadResults.threadFailures++; - logStr.append( - " exceeds max acceptable latency: " + maxAcceptablePerIterationLatencyMs); - } else { - logStr.append(" succeeded"); - } - System.err.println(logStr.toString()); - threadResults.iterationsDone++; - threadResults.getLatencies().recordValue(result.getLatencyMs()); - long remainingNs = earliestNextStartNs - System.nanoTime(); - if (remainingNs > 0) { - TimeUnit.NANOSECONDS.sleep(remainingNs); - } - } - } - private static void assertSuccess(StreamRecorder recorder) { if (recorder.getError() != null) { throw new AssertionError(recorder.getError()); diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/SoakClient.java b/interop-testing/src/main/java/io/grpc/testing/integration/SoakClient.java new file mode 100644 index 00000000000..935586cfbdd --- /dev/null +++ b/interop-testing/src/main/java/io/grpc/testing/integration/SoakClient.java @@ -0,0 +1,295 @@ +/* + * Copyright 2025 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.testing.integration; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import com.google.common.base.Function; +import com.google.protobuf.ByteString; +import io.grpc.CallOptions; +import io.grpc.Channel; +import io.grpc.ClientCall; +import io.grpc.ClientInterceptor; +import io.grpc.Grpc; +import io.grpc.ManagedChannel; +import io.grpc.MethodDescriptor; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; +import io.grpc.testing.integration.Messages.Payload; +import io.grpc.testing.integration.Messages.SimpleRequest; +import io.grpc.testing.integration.Messages.SimpleResponse; +import java.net.SocketAddress; +import java.util.ArrayList; +import java.util.List; +import java.util.Locale; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import org.HdrHistogram.Histogram; + +/** + * Shared implementation for rpc_soak and channel_soak. Unlike the tests in AbstractInteropTest, + * these "test cases" are only intended to be run from the command line. They don't fit the regular + * test patterns of AbstractInteropTest. + * https://github.com/grpc/grpc/blob/master/doc/interop-test-descriptions.md#rpc_soak + */ +final class SoakClient { + private static class SoakIterationResult { + public SoakIterationResult(long latencyMs, Status status) { + this.latencyMs = latencyMs; + this.status = status; + } + + public long getLatencyMs() { + return latencyMs; + } + + public Status getStatus() { + return status; + } + + private long latencyMs = -1; + private Status status = Status.OK; + } + + private static class ThreadResults { + private int threadFailures = 0; + private int iterationsDone = 0; + private Histogram latencies = new Histogram(4); + + public int getThreadFailures() { + return threadFailures; + } + + public int getIterationsDone() { + return iterationsDone; + } + + public Histogram getLatencies() { + return latencies; + } + } + + private static SoakIterationResult performOneSoakIteration( + TestServiceGrpc.TestServiceBlockingStub soakStub, int soakRequestSize, int soakResponseSize) + throws InterruptedException { + long startNs = System.nanoTime(); + Status status = Status.OK; + try { + final SimpleRequest request = + SimpleRequest.newBuilder() + .setResponseSize(soakResponseSize) + .setPayload( + Payload.newBuilder().setBody(ByteString.copyFrom(new byte[soakRequestSize]))) + .build(); + final SimpleResponse goldenResponse = + SimpleResponse.newBuilder() + .setPayload( + Payload.newBuilder().setBody(ByteString.copyFrom(new byte[soakResponseSize]))) + .build(); + assertResponse(goldenResponse, soakStub.unaryCall(request)); + } catch (StatusRuntimeException e) { + status = e.getStatus(); + } + long elapsedNs = System.nanoTime() - startNs; + return new SoakIterationResult(TimeUnit.NANOSECONDS.toMillis(elapsedNs), status); + } + + /** + * Runs large unary RPCs in a loop with configurable failure thresholds + * and channel creation behavior. + */ + public static void performSoakTest( + String serverUri, + int soakIterations, + int maxFailures, + int maxAcceptablePerIterationLatencyMs, + int minTimeMsBetweenRpcs, + int overallTimeoutSeconds, + int soakRequestSize, + int soakResponseSize, + int numThreads, + ManagedChannel sharedChannel, + Function maybeCreateChannel) + throws InterruptedException { + if (soakIterations % numThreads != 0) { + throw new IllegalArgumentException("soakIterations must be evenly divisible by numThreads."); + } + long startNs = System.nanoTime(); + Thread[] threads = new Thread[numThreads]; + int soakIterationsPerThread = soakIterations / numThreads; + List threadResultsList = new ArrayList<>(numThreads); + for (int i = 0; i < numThreads; i++) { + threadResultsList.add(new ThreadResults()); + } + for (int threadInd = 0; threadInd < numThreads; threadInd++) { + final int currentThreadInd = threadInd; + threads[threadInd] = new Thread(() -> { + try { + executeSoakTestInThread( + soakIterationsPerThread, + startNs, + minTimeMsBetweenRpcs, + soakRequestSize, + soakResponseSize, + maxAcceptablePerIterationLatencyMs, + overallTimeoutSeconds, + serverUri, + threadResultsList.get(currentThreadInd), + sharedChannel, + maybeCreateChannel); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Thread interrupted: " + e.getMessage(), e); + } + }); + threads[threadInd].start(); + } + for (Thread thread : threads) { + thread.join(); + } + + int totalFailures = 0; + int iterationsDone = 0; + Histogram latencies = new Histogram(4); + for (ThreadResults threadResult :threadResultsList) { + totalFailures += threadResult.getThreadFailures(); + iterationsDone += threadResult.getIterationsDone(); + latencies.add(threadResult.getLatencies()); + } + System.err.println( + String.format( + Locale.US, + "(server_uri: %s) soak test ran: %d / %d iterations. total failures: %d. " + + "p50: %d ms, p90: %d ms, p100: %d ms", + serverUri, + iterationsDone, + soakIterations, + totalFailures, + latencies.getValueAtPercentile(50), + latencies.getValueAtPercentile(90), + latencies.getValueAtPercentile(100))); + // check if we timed out + String timeoutErrorMessage = + String.format( + Locale.US, + "(server_uri: %s) soak test consumed all %d seconds of time and quit early, " + + "only having ran %d out of desired %d iterations.", + serverUri, + overallTimeoutSeconds, + iterationsDone, + soakIterations); + assertEquals(timeoutErrorMessage, iterationsDone, soakIterations); + // check if we had too many failures + String tooManyFailuresErrorMessage = + String.format( + Locale.US, + "(server_uri: %s) soak test total failures: %d exceeds max failures " + + "threshold: %d.", + serverUri, totalFailures, maxFailures); + assertTrue(tooManyFailuresErrorMessage, totalFailures <= maxFailures); + sharedChannel.shutdownNow(); + sharedChannel.awaitTermination(10, TimeUnit.SECONDS); + } + + private static void executeSoakTestInThread( + int soakIterationsPerThread, + long startNs, + int minTimeMsBetweenRpcs, + int soakRequestSize, + int soakResponseSize, + int maxAcceptablePerIterationLatencyMs, + int overallTimeoutSeconds, + String serverUri, + ThreadResults threadResults, + ManagedChannel sharedChannel, + Function maybeCreateChannel) throws InterruptedException { + ManagedChannel currentChannel = sharedChannel; + for (int i = 0; i < soakIterationsPerThread; i++) { + if (System.nanoTime() - startNs >= TimeUnit.SECONDS.toNanos(overallTimeoutSeconds)) { + break; + } + long earliestNextStartNs = System.nanoTime() + + TimeUnit.MILLISECONDS.toNanos(minTimeMsBetweenRpcs); + // recordClientCallInterceptor takes an AtomicReference. + AtomicReference> soakThreadClientCallCapture = new AtomicReference<>(); + currentChannel = maybeCreateChannel.apply(currentChannel); + TestServiceGrpc.TestServiceBlockingStub currentStub = TestServiceGrpc + .newBlockingStub(currentChannel) + .withInterceptors(recordClientCallInterceptor(soakThreadClientCallCapture)); + SoakIterationResult result = performOneSoakIteration(currentStub, + soakRequestSize, soakResponseSize); + SocketAddress peer = soakThreadClientCallCapture + .get().getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR); + StringBuilder logStr = new StringBuilder( + String.format( + Locale.US, + "thread id: %d soak iteration: %d elapsed_ms: %d peer: %s server_uri: %s", + Thread.currentThread().getId(), + i, result.getLatencyMs(), peer != null ? peer.toString() : "null", serverUri)); + if (!result.getStatus().equals(Status.OK)) { + threadResults.threadFailures++; + logStr.append(String.format(" failed: %s", result.getStatus())); + } else if (result.getLatencyMs() > maxAcceptablePerIterationLatencyMs) { + threadResults.threadFailures++; + logStr.append( + " exceeds max acceptable latency: " + maxAcceptablePerIterationLatencyMs); + } else { + logStr.append(" succeeded"); + } + System.err.println(logStr.toString()); + threadResults.iterationsDone++; + threadResults.getLatencies().recordValue(result.getLatencyMs()); + long remainingNs = earliestNextStartNs - System.nanoTime(); + if (remainingNs > 0) { + TimeUnit.NANOSECONDS.sleep(remainingNs); + } + } + } + + private static void assertResponse(SimpleResponse expected, SimpleResponse actual) { + assertPayload(expected.getPayload(), actual.getPayload()); + assertEquals(expected.getUsername(), actual.getUsername()); + assertEquals(expected.getOauthScope(), actual.getOauthScope()); + } + + private static void assertPayload(Payload expected, Payload actual) { + // Compare non deprecated fields in Payload, to make this test forward compatible. + if (expected == null || actual == null) { + assertEquals(expected, actual); + } else { + assertEquals(expected.getBody(), actual.getBody()); + } + } + + /** + * Captures the ClientCall. Useful for testing {@link ClientCall#getAttributes()} + */ + private static ClientInterceptor recordClientCallInterceptor( + final AtomicReference> clientCallCapture) { + return new ClientInterceptor() { + @Override + public ClientCall interceptCall( + MethodDescriptor method, CallOptions callOptions, Channel next) { + ClientCall clientCall = next.newCall(method,callOptions); + clientCallCapture.set(clientCall); + return clientCall; + } + }; + } + +} diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceClient.java b/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceClient.java index 8ade38cb024..125d876b705 100644 --- a/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceClient.java +++ b/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceClient.java @@ -523,7 +523,7 @@ private void runTest(TestCases testCase) throws Exception { } case RPC_SOAK: { - tester.performSoakTest( + SoakClient.performSoakTest( serverHost, soakIterations, soakMaxFailures, @@ -533,12 +533,13 @@ private void runTest(TestCases testCase) throws Exception { soakRequestSize, soakResponseSize, numThreads, + tester.createChannelBuilder().build(), (currentChannel) -> currentChannel); break; } case CHANNEL_SOAK: { - tester.performSoakTest( + SoakClient.performSoakTest( serverHost, soakIterations, soakMaxFailures, @@ -548,6 +549,7 @@ private void runTest(TestCases testCase) throws Exception { soakRequestSize, soakResponseSize, numThreads, + tester.createChannelBuilder().build(), (currentChannel) -> tester.createNewChannel(currentChannel)); break; } @@ -711,6 +713,16 @@ protected ManagedChannelBuilder createChannelBuilder() { return okBuilder.intercept(createCensusStatsClientInterceptor()); } + ManagedChannel createNewChannel(ManagedChannel currentChannel) { + currentChannel.shutdownNow(); + try { + currentChannel.awaitTermination(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + throw new RuntimeException("Interrupted while creating a new channel", e); + } + return createChannel(); + } + /** * Assuming "pick_first" policy is used, tests that all requests are sent to the same server. */ diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/XdsFederationTestClient.java b/interop-testing/src/main/java/io/grpc/testing/integration/XdsFederationTestClient.java index 08d845422a5..bba282b7b6f 100644 --- a/interop-testing/src/main/java/io/grpc/testing/integration/XdsFederationTestClient.java +++ b/interop-testing/src/main/java/io/grpc/testing/integration/XdsFederationTestClient.java @@ -22,9 +22,10 @@ import io.grpc.ChannelCredentials; import io.grpc.Grpc; import io.grpc.InsecureChannelCredentials; -import io.grpc.ManagedChannelBuilder; +import io.grpc.ManagedChannel; import io.grpc.alts.ComputeEngineChannelCredentials; import java.util.ArrayList; +import java.util.concurrent.TimeUnit; import java.util.logging.Logger; /** @@ -44,26 +45,8 @@ public final class XdsFederationTestClient { public static void main(String[] args) throws Exception { final XdsFederationTestClient client = new XdsFederationTestClient(); client.parseArgs(args); - Runtime.getRuntime() - .addShutdownHook( - new Thread() { - @Override - @SuppressWarnings("CatchAndPrintStackTrace") - public void run() { - System.out.println("Shutting down"); - try { - client.tearDown(); - } catch (RuntimeException e) { - e.printStackTrace(); - } - } - }); client.setUp(); - try { - client.run(); - } finally { - client.tearDown(); - } + client.run(); System.exit(0); } @@ -209,22 +192,13 @@ void setUp() { for (int i = 0; i < uris.length; i++) { clients.add(new InnerClient(creds[i], uris[i])); } - for (InnerClient c : clients) { - c.setUp(); - } - } - - private synchronized void tearDown() { - for (InnerClient c : clients) { - c.tearDown(); - } } /** * Wraps a single client stub configuration and executes a * soak test case with that configuration. */ - class InnerClient extends AbstractInteropTest { + class InnerClient { private final String credentialsType; private final String serverUri; private boolean runSucceeded = false; @@ -249,7 +223,7 @@ public void run() throws InterruptedException { try { switch (testCase) { case "rpc_soak": { - performSoakTest( + SoakClient.performSoakTest( serverUri, soakIterations, soakMaxFailures, @@ -259,11 +233,12 @@ public void run() throws InterruptedException { soakRequestSize, soakResponseSize, 1, + createChannel(), (currentChannel) -> currentChannel); } break; case "channel_soak": { - performSoakTest( + SoakClient.performSoakTest( serverUri, soakIterations, soakMaxFailures, @@ -273,6 +248,7 @@ public void run() throws InterruptedException { soakRequestSize, soakResponseSize, 1, + createChannel(), (currentChannel) -> createNewChannel(currentChannel)); } break; @@ -288,8 +264,7 @@ public void run() throws InterruptedException { } } - @Override - protected ManagedChannelBuilder createChannelBuilder() { + ManagedChannel createChannel() { ChannelCredentials channelCredentials; switch (credentialsType) { case "compute_engine_channel_creds": @@ -303,7 +278,18 @@ protected ManagedChannelBuilder createChannelBuilder() { } return Grpc.newChannelBuilder(serverUri, channelCredentials) .keepAliveTime(3600, SECONDS) - .keepAliveTimeout(20, SECONDS); + .keepAliveTimeout(20, SECONDS) + .build(); + } + + ManagedChannel createNewChannel(ManagedChannel currentChannel) { + currentChannel.shutdownNow(); + try { + currentChannel.awaitTermination(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + throw new RuntimeException("Interrupted while creating a new channel", e); + } + return createChannel(); } }