diff --git a/.git-blame-ignore-revs b/.git-blame-ignore-revs index ef33ed3e..bfa7eda2 100644 --- a/.git-blame-ignore-revs +++ b/.git-blame-ignore-revs @@ -1,5 +1,6 @@ # scalafmt 3b6138ee4093ae6c451d03e48d2bc085b3239731 +64b67e4087205035744f3a2851443472569b46c5 # manual 6d565ee5f4851d9c7bb3546f684cceeb23b9eb73 diff --git a/interop-tests/src/main/java/io/grpc/testing/integration2/ClientTester.java b/interop-tests/src/main/java/io/grpc/testing/integration2/ClientTester.java index 685067f0..00a64376 100644 --- a/interop-tests/src/main/java/io/grpc/testing/integration2/ClientTester.java +++ b/interop-tests/src/main/java/io/grpc/testing/integration2/ClientTester.java @@ -13,67 +13,68 @@ package io.grpc.testing.integration2; -import io.grpc.ManagedChannel; - import java.io.InputStream; /** - * This class has all the methods of the grpc-java AbstractInteropTest, but none of the implementations, - * so it can be implemented either by calling AbstractInteropTest or with an Apache Pekko gRPC implementation. + * This class has all the methods of the grpc-java AbstractInteropTest, but none of the + * implementations, so it can be implemented either by calling AbstractInteropTest or with an Apache + * Pekko gRPC implementation. * - * Test requirements documentation: https://github.com/grpc/grpc/blob/master/doc/interop-test-descriptions.md + *

Test requirements documentation: + * https://github.com/grpc/grpc/blob/master/doc/interop-test-descriptions.md */ public interface ClientTester { - void setUp(); - - void tearDown() throws Exception; + void setUp(); - void emptyUnary() throws Exception; + void tearDown() throws Exception; - void cacheableUnary(); + void emptyUnary() throws Exception; - void largeUnary() throws Exception; + void cacheableUnary(); - void clientCompressedUnary(boolean probe) throws Exception; + void largeUnary() throws Exception; - void serverCompressedUnary() throws Exception; + void clientCompressedUnary(boolean probe) throws Exception; - void clientStreaming() throws Exception; + void serverCompressedUnary() throws Exception; - void clientCompressedStreaming(boolean probe) throws Exception; + void clientStreaming() throws Exception; - void serverStreaming() throws Exception; + void clientCompressedStreaming(boolean probe) throws Exception; - void serverCompressedStreaming() throws Exception; - - void pingPong() throws Exception; + void serverStreaming() throws Exception; - void emptyStream() throws Exception; + void serverCompressedStreaming() throws Exception; - void computeEngineCreds(String serviceAccount, String oauthScope) throws Exception; + void pingPong() throws Exception; - void serviceAccountCreds(String jsonKey, InputStream credentialsStream, String authScope) throws Exception; + void emptyStream() throws Exception; - void jwtTokenCreds(InputStream serviceAccountJson) throws Exception; + void computeEngineCreds(String serviceAccount, String oauthScope) throws Exception; - void oauth2AuthToken(String jsonKey, InputStream credentialsStream, String authScope) throws Exception; + void serviceAccountCreds(String jsonKey, InputStream credentialsStream, String authScope) + throws Exception; - void perRpcCreds(String jsonKey, InputStream credentialsStream, String oauthScope) throws Exception; + void jwtTokenCreds(InputStream serviceAccountJson) throws Exception; - void customMetadata() throws Exception; + void oauth2AuthToken(String jsonKey, InputStream credentialsStream, String authScope) + throws Exception; - void statusCodeAndMessage() throws Exception; + void perRpcCreds(String jsonKey, InputStream credentialsStream, String oauthScope) + throws Exception; - void unimplementedMethod(); + void customMetadata() throws Exception; - void unimplementedService(); + void statusCodeAndMessage() throws Exception; - void cancelAfterBegin() throws Exception; + void unimplementedMethod(); - void cancelAfterFirstResponse() throws Exception; + void unimplementedService(); - void timeoutOnSleepingServer() throws Exception; + void cancelAfterBegin() throws Exception; + void cancelAfterFirstResponse() throws Exception; + void timeoutOnSleepingServer() throws Exception; } diff --git a/interop-tests/src/main/java/io/grpc/testing/integration2/GrpcJavaClientTester.java b/interop-tests/src/main/java/io/grpc/testing/integration2/GrpcJavaClientTester.java index f88d339c..adef1c96 100644 --- a/interop-tests/src/main/java/io/grpc/testing/integration2/GrpcJavaClientTester.java +++ b/interop-tests/src/main/java/io/grpc/testing/integration2/GrpcJavaClientTester.java @@ -13,164 +13,159 @@ package io.grpc.testing.integration2; -import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import io.grpc.testing.integration.AbstractInteropTest; - import java.io.InputStream; -/** - * Implementation of ClientTester that forwards all calls to the - * grpc-java AbstractInteropTest. - */ +/** Implementation of ClientTester that forwards all calls to the grpc-java AbstractInteropTest. */ public class GrpcJavaClientTester implements ClientTester { - final private Settings settings; - - private final UnderlyingTester tester = new UnderlyingTester(); - - public GrpcJavaClientTester(Settings settings) { - this.settings = settings; - } - - - @Override - public void setUp() { - tester.setUp(); - } - - @Override - public void tearDown() throws Exception { - tester.tearDown(); - } - - - @Override - public void emptyUnary() throws Exception { - tester.emptyUnary(); - } - - @Override - public void cacheableUnary() { - tester.cacheableUnary(); - } - - @Override - public void largeUnary() throws Exception { - tester.largeUnary(); - } - - @Override - public void clientCompressedUnary(boolean probe) throws Exception { - tester.clientCompressedUnary(probe); - } - - @Override - public void serverCompressedUnary() throws Exception { - tester.serverCompressedUnary(); - } - - @Override - public void clientStreaming() throws Exception { - tester.clientStreaming(); - } - - @Override - public void clientCompressedStreaming(boolean probe) throws Exception { - tester.clientCompressedStreaming(probe); - } - - @Override - public void serverStreaming() throws Exception { - tester.serverStreaming(); - } - - @Override - public void serverCompressedStreaming() throws Exception { - tester.serverCompressedStreaming(); - } - - @Override - public void pingPong() throws Exception { - tester.pingPong(); - } - - @Override - public void emptyStream() throws Exception { - tester.emptyStream(); - } - - @Override - public void computeEngineCreds(String serviceAccount, String oauthScope) throws Exception { - tester.computeEngineCreds(serviceAccount, oauthScope); - } - - @Override - public void serviceAccountCreds(String jsonKey, InputStream credentialsStream, String authScope) throws Exception { - tester.serviceAccountCreds(jsonKey, credentialsStream, authScope); - } - - @Override - public void jwtTokenCreds(InputStream serviceAccountJson) throws Exception { - tester.jwtTokenCreds(serviceAccountJson); - } - - @Override - public void oauth2AuthToken(String jsonKey, InputStream credentialsStream, String authScope) throws Exception { - tester.oauth2AuthToken(jsonKey, credentialsStream, authScope); - } - - @Override - public void perRpcCreds(String jsonKey, InputStream credentialsStream, String oauthScope) throws Exception { - tester.perRpcCreds(jsonKey, credentialsStream, oauthScope); - } - - @Override - public void customMetadata() throws Exception { - tester.customMetadata(); - } - - @Override - public void statusCodeAndMessage() throws Exception { - tester.statusCodeAndMessage(); - } - - @Override - public void unimplementedMethod() { - tester.unimplementedMethod(); - } - - @Override - public void unimplementedService() { - tester.unimplementedService(); - } - - @Override - public void cancelAfterBegin() throws Exception { - tester.cancelAfterBegin(); - } - - @Override - public void cancelAfterFirstResponse() throws Exception { - tester.cancelAfterFirstResponse(); - } - - @Override - public void timeoutOnSleepingServer() throws Exception { - tester.timeoutOnSleepingServer(); - } - - private class UnderlyingTester extends AbstractInteropTest { - @Override - protected ManagedChannelBuilder createChannelBuilder() { - return ChannelBuilder.create(settings); - } - - @Override - protected boolean metricsExpected() { - // Server-side metrics won't be found, because server is a separate process. - return false; - } - } - + private final Settings settings; + + private final UnderlyingTester tester = new UnderlyingTester(); + + public GrpcJavaClientTester(Settings settings) { + this.settings = settings; + } + + @Override + public void setUp() { + tester.setUp(); + } + + @Override + public void tearDown() throws Exception { + tester.tearDown(); + } + + @Override + public void emptyUnary() throws Exception { + tester.emptyUnary(); + } + + @Override + public void cacheableUnary() { + tester.cacheableUnary(); + } + + @Override + public void largeUnary() throws Exception { + tester.largeUnary(); + } + + @Override + public void clientCompressedUnary(boolean probe) throws Exception { + tester.clientCompressedUnary(probe); + } + + @Override + public void serverCompressedUnary() throws Exception { + tester.serverCompressedUnary(); + } + + @Override + public void clientStreaming() throws Exception { + tester.clientStreaming(); + } + + @Override + public void clientCompressedStreaming(boolean probe) throws Exception { + tester.clientCompressedStreaming(probe); + } + + @Override + public void serverStreaming() throws Exception { + tester.serverStreaming(); + } + + @Override + public void serverCompressedStreaming() throws Exception { + tester.serverCompressedStreaming(); + } + + @Override + public void pingPong() throws Exception { + tester.pingPong(); + } + + @Override + public void emptyStream() throws Exception { + tester.emptyStream(); + } + + @Override + public void computeEngineCreds(String serviceAccount, String oauthScope) throws Exception { + tester.computeEngineCreds(serviceAccount, oauthScope); + } + + @Override + public void serviceAccountCreds(String jsonKey, InputStream credentialsStream, String authScope) + throws Exception { + tester.serviceAccountCreds(jsonKey, credentialsStream, authScope); + } + + @Override + public void jwtTokenCreds(InputStream serviceAccountJson) throws Exception { + tester.jwtTokenCreds(serviceAccountJson); + } + + @Override + public void oauth2AuthToken(String jsonKey, InputStream credentialsStream, String authScope) + throws Exception { + tester.oauth2AuthToken(jsonKey, credentialsStream, authScope); + } + + @Override + public void perRpcCreds(String jsonKey, InputStream credentialsStream, String oauthScope) + throws Exception { + tester.perRpcCreds(jsonKey, credentialsStream, oauthScope); + } + + @Override + public void customMetadata() throws Exception { + tester.customMetadata(); + } + + @Override + public void statusCodeAndMessage() throws Exception { + tester.statusCodeAndMessage(); + } + + @Override + public void unimplementedMethod() { + tester.unimplementedMethod(); + } + + @Override + public void unimplementedService() { + tester.unimplementedService(); + } + + @Override + public void cancelAfterBegin() throws Exception { + tester.cancelAfterBegin(); + } + + @Override + public void cancelAfterFirstResponse() throws Exception { + tester.cancelAfterFirstResponse(); + } + + @Override + public void timeoutOnSleepingServer() throws Exception { + tester.timeoutOnSleepingServer(); + } + + private class UnderlyingTester extends AbstractInteropTest { + @Override + protected ManagedChannelBuilder createChannelBuilder() { + return ChannelBuilder.create(settings); + } + + @Override + protected boolean metricsExpected() { + // Server-side metrics won't be found, because server is a separate process. + return false; + } + } } diff --git a/interop-tests/src/main/java/io/grpc/testing/integration2/TestServiceClient.java b/interop-tests/src/main/java/io/grpc/testing/integration2/TestServiceClient.java index 213d794f..3b0df682 100644 --- a/interop-tests/src/main/java/io/grpc/testing/integration2/TestServiceClient.java +++ b/interop-tests/src/main/java/io/grpc/testing/integration2/TestServiceClient.java @@ -21,7 +21,6 @@ import io.grpc.internal.testing.TestUtils; import io.grpc.testing.integration.TestCases; import io.grpc.testing.integration.TestServiceGrpc; - import java.io.File; import java.io.FileInputStream; import java.nio.charset.Charset; @@ -32,190 +31,208 @@ */ public class TestServiceClient { - private static final Charset UTF_8 = Charset.forName("UTF-8"); + private static final Charset UTF_8 = Charset.forName("UTF-8"); - private String testCase = "empty_unary"; + private String testCase = "empty_unary"; - /** - * The main application allowing this client to be launched from the command line. - */ - public static void main(String[] args) throws Exception { - // Let OkHttp use Conscrypt if it is available. - TestUtils.installConscryptIfAvailable(); - Settings settings = Settings.parseArgs(args); - final TestServiceClient client = new TestServiceClient(new GrpcJavaClientTester(settings)); - client.setUp(); + /** The main application allowing this client to be launched from the command line. */ + public static void main(String[] args) throws Exception { + // Let OkHttp use Conscrypt if it is available. + TestUtils.installConscryptIfAvailable(); + Settings settings = Settings.parseArgs(args); + final TestServiceClient client = new TestServiceClient(new GrpcJavaClientTester(settings)); + client.setUp(); - Runtime.getRuntime().addShutdownHook(new Thread() { - @Override - public void run() { + Runtime.getRuntime() + .addShutdownHook( + new Thread() { + @Override + public void run() { System.out.println("Shutting down"); try { - client.tearDown(); + client.tearDown(); } catch (Exception e) { - e.printStackTrace(); + e.printStackTrace(); } - } - }); + } + }); - try { - client.run(settings); - } finally { - client.tearDown(); - } - System.exit(0); + try { + client.run(settings); + } finally { + client.tearDown(); + } + System.exit(0); + } + + private ClientTester clientTester; + + public TestServiceClient(ClientTester clientTester) { + this.clientTester = clientTester; + } + + @VisibleForTesting + public void setUp() { + clientTester.setUp(); + } + + public synchronized void tearDown() { + try { + clientTester.tearDown(); + } catch (RuntimeException ex) { + throw ex; + } catch (Exception ex) { + throw new RuntimeException(ex); } + } + + public void run(Settings settings) { + System.out.println("Running test " + settings.getTestCase()); + try { + runTest(TestCases.fromString(settings.getTestCase()), settings); + } catch (RuntimeException ex) { + throw ex; + } catch (Exception ex) { + throw new RuntimeException(ex); + } + System.out.println("Test completed."); + } + + private void runTest(TestCases testCase, Settings settings) throws Exception { + switch (testCase) { + case EMPTY_UNARY: + clientTester.emptyUnary(); + break; + + case CACHEABLE_UNARY: + { + clientTester.cacheableUnary(); + break; + } - private ClientTester clientTester; + case LARGE_UNARY: + clientTester.largeUnary(); + break; + + case CLIENT_COMPRESSED_UNARY: + clientTester.clientCompressedUnary(false); + break; + + case SERVER_COMPRESSED_UNARY: + clientTester.serverCompressedUnary(); + break; + + case CLIENT_STREAMING: + clientTester.clientStreaming(); + break; + + case CLIENT_COMPRESSED_STREAMING: + clientTester.clientCompressedStreaming(false); + break; + + case SERVER_STREAMING: + clientTester.serverStreaming(); + break; + + case SERVER_COMPRESSED_STREAMING: + clientTester.serverCompressedStreaming(); + break; + + case PING_PONG: + clientTester.pingPong(); + break; + + case EMPTY_STREAM: + clientTester.emptyStream(); + break; + + case COMPUTE_ENGINE_CREDS: + clientTester.computeEngineCreds( + settings.getDefaultServiceAccount(), settings.getOauthScope()); + break; + + case SERVICE_ACCOUNT_CREDS: + { + String jsonKey = + Files.asCharSource(new File(settings.getServiceAccountKeyFile()), UTF_8).read(); + FileInputStream credentialsStream = + new FileInputStream(new File(settings.getServiceAccountKeyFile())); + clientTester.serviceAccountCreds(jsonKey, credentialsStream, settings.getOauthScope()); + break; + } - public TestServiceClient(ClientTester clientTester) { - this.clientTester = clientTester; - } + case JWT_TOKEN_CREDS: + { + FileInputStream credentialsStream = + new FileInputStream(new File(settings.getServiceAccountKeyFile())); + clientTester.jwtTokenCreds(credentialsStream); + break; + } - @VisibleForTesting - public void setUp() { - clientTester.setUp(); - } + case OAUTH2_AUTH_TOKEN: + { + String jsonKey = + Files.asCharSource(new File(settings.getServiceAccountKeyFile()), UTF_8).read(); + FileInputStream credentialsStream = + new FileInputStream(new File(settings.getServiceAccountKeyFile())); + clientTester.oauth2AuthToken(jsonKey, credentialsStream, settings.getOauthScope()); + break; + } - public synchronized void tearDown() { - try { - clientTester.tearDown(); - } catch (RuntimeException ex) { - throw ex; - } catch (Exception ex) { - throw new RuntimeException(ex); + case PER_RPC_CREDS: + { + String jsonKey = + Files.asCharSource(new File(settings.getServiceAccountKeyFile()), UTF_8).read(); + FileInputStream credentialsStream = + new FileInputStream(new File(settings.getServiceAccountKeyFile())); + clientTester.perRpcCreds(jsonKey, credentialsStream, settings.getOauthScope()); + break; } - } - public void run(Settings settings) { - System.out.println("Running test " + settings.getTestCase()); - try { - runTest(TestCases.fromString(settings.getTestCase()), settings); - } catch (RuntimeException ex) { - throw ex; - } catch (Exception ex) { - throw new RuntimeException(ex); + case CUSTOM_METADATA: + { + clientTester.customMetadata(); + break; } - System.out.println("Test completed."); - } - private void runTest(TestCases testCase, Settings settings) throws Exception { - switch (testCase) { - case EMPTY_UNARY: - clientTester.emptyUnary(); - break; - - case CACHEABLE_UNARY: { - clientTester.cacheableUnary(); - break; - } - - case LARGE_UNARY: - clientTester.largeUnary(); - break; - - case CLIENT_COMPRESSED_UNARY: - clientTester.clientCompressedUnary(false); - break; - - case SERVER_COMPRESSED_UNARY: - clientTester.serverCompressedUnary(); - break; - - case CLIENT_STREAMING: - clientTester.clientStreaming(); - break; - - case CLIENT_COMPRESSED_STREAMING: - clientTester.clientCompressedStreaming(false); - break; - - case SERVER_STREAMING: - clientTester.serverStreaming(); - break; - - case SERVER_COMPRESSED_STREAMING: - clientTester.serverCompressedStreaming(); - break; - - case PING_PONG: - clientTester.pingPong(); - break; - - case EMPTY_STREAM: - clientTester.emptyStream(); - break; - - case COMPUTE_ENGINE_CREDS: - clientTester.computeEngineCreds(settings.getDefaultServiceAccount(), settings.getOauthScope()); - break; - - case SERVICE_ACCOUNT_CREDS: { - String jsonKey = Files.asCharSource(new File(settings.getServiceAccountKeyFile()), UTF_8).read(); - FileInputStream credentialsStream = new FileInputStream(new File(settings.getServiceAccountKeyFile())); - clientTester.serviceAccountCreds(jsonKey, credentialsStream, settings.getOauthScope()); - break; - } - - case JWT_TOKEN_CREDS: { - FileInputStream credentialsStream = new FileInputStream(new File(settings.getServiceAccountKeyFile())); - clientTester.jwtTokenCreds(credentialsStream); - break; - } - - case OAUTH2_AUTH_TOKEN: { - String jsonKey = Files.asCharSource(new File(settings.getServiceAccountKeyFile()), UTF_8).read(); - FileInputStream credentialsStream = new FileInputStream(new File(settings.getServiceAccountKeyFile())); - clientTester.oauth2AuthToken(jsonKey, credentialsStream, settings.getOauthScope()); - break; - } - - case PER_RPC_CREDS: { - String jsonKey = Files.asCharSource(new File(settings.getServiceAccountKeyFile()), UTF_8).read(); - FileInputStream credentialsStream = new FileInputStream(new File(settings.getServiceAccountKeyFile())); - clientTester.perRpcCreds(jsonKey, credentialsStream, settings.getOauthScope()); - break; - } - - case CUSTOM_METADATA: { - clientTester.customMetadata(); - break; - } - - case STATUS_CODE_AND_MESSAGE: { - clientTester.statusCodeAndMessage(); - break; - } - - case UNIMPLEMENTED_METHOD: { - clientTester.unimplementedMethod(); - break; - } - - case UNIMPLEMENTED_SERVICE: { - clientTester.unimplementedService(); - break; - } - - case CANCEL_AFTER_BEGIN: { - clientTester.cancelAfterBegin(); - break; - } - - case CANCEL_AFTER_FIRST_RESPONSE: { - clientTester.cancelAfterFirstResponse(); - break; - } - - case TIMEOUT_ON_SLEEPING_SERVER: { - clientTester.timeoutOnSleepingServer(); - break; - } - - default: - throw new IllegalArgumentException("Unknown test case: " + testCase); + case STATUS_CODE_AND_MESSAGE: + { + clientTester.statusCodeAndMessage(); + break; + } + + case UNIMPLEMENTED_METHOD: + { + clientTester.unimplementedMethod(); + break; } - } + case UNIMPLEMENTED_SERVICE: + { + clientTester.unimplementedService(); + break; + } + + case CANCEL_AFTER_BEGIN: + { + clientTester.cancelAfterBegin(); + break; + } + case CANCEL_AFTER_FIRST_RESPONSE: + { + clientTester.cancelAfterFirstResponse(); + break; + } + + case TIMEOUT_ON_SLEEPING_SERVER: + { + clientTester.timeoutOnSleepingServer(); + break; + } + + default: + throw new IllegalArgumentException("Unknown test case: " + testCase); + } + } } diff --git a/interop-tests/src/main/java/io/grpc/testing/integration2/TestServiceServer.java b/interop-tests/src/main/java/io/grpc/testing/integration2/TestServiceServer.java index 91e880ab..e2331c7e 100644 --- a/interop-tests/src/main/java/io/grpc/testing/integration2/TestServiceServer.java +++ b/interop-tests/src/main/java/io/grpc/testing/integration2/TestServiceServer.java @@ -26,39 +26,36 @@ import io.grpc.testing.integration.AbstractInteropTest; import io.grpc.testing.integration.TestServiceImpl; import io.netty.handler.ssl.SslContext; - import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -/** - * Server that manages startup/shutdown of a single {@code TestService}. - */ +/** Server that manages startup/shutdown of a single {@code TestService}. */ public class TestServiceServer { - /** - * The main application allowing this server to be launched from the command line. - */ + /** The main application allowing this server to be launched from the command line. */ public static void main(String[] args) throws Exception { final TestServiceServer server = new TestServiceServer(); server.parseArgs(args); if (server.useTls) { System.out.println( "\nUsing fake CA for TLS certificate. Test clients should expect host\n" - + "*.test.google.fr and our test CA. For the Java test client binary, use:\n" - + "--server_host_override=foo.test.google.fr --use_test_ca=true\n"); + + "*.test.google.fr and our test CA. For the Java test client binary, use:\n" + + "--server_host_override=foo.test.google.fr --use_test_ca=true\n"); } - Runtime.getRuntime().addShutdownHook(new Thread() { - @Override - public void run() { - try { - System.out.println("Shutting down"); - server.stop(); - } catch (Exception e) { - e.printStackTrace(); - } - } - }); + Runtime.getRuntime() + .addShutdownHook( + new Thread() { + @Override + public void run() { + try { + System.out.println("Shutting down"); + server.stop(); + } catch (Exception e) { + e.printStackTrace(); + } + } + }); server.start(); System.out.println("Server started on port " + server.port); server.blockUntilShutdown(); @@ -111,10 +108,11 @@ public void parseArgs(String[] args) { TestServiceServer s = new TestServiceServer(); System.out.println( "Usage: [ARGS...]" - + "\n" - + "\n --port=PORT Port to connect to. Default " + s.port - + "\n --use_tls=true|false Whether to use TLS. Default " + s.useTls - ); + + "\n" + + "\n --port=PORT Port to connect to. Default " + + s.port + + "\n --use_tls=true|false Whether to use TLS. Default " + + s.useTls); System.exit(1); } } @@ -124,16 +122,20 @@ public void start() throws Exception { executor = Executors.newSingleThreadScheduledExecutor(); SslContext sslContext = null; if (useTls) { - sslContext = GrpcSslContexts.forServer( - TestUtils.loadCert("server1.pem"), TestUtils.loadCert("server1.key")).build(); + sslContext = + GrpcSslContexts.forServer( + TestUtils.loadCert("server1.pem"), TestUtils.loadCert("server1.key")) + .build(); } - server = NettyServerBuilder.forPort(port) - .sslContext(sslContext) - .maxInboundMessageSize(AbstractInteropTest.MAX_MESSAGE_SIZE) - .addService(ServerInterceptors.intercept( - new TestServiceImpl(executor), - TestServiceImpl.interceptors())) - .build().start(); + server = + NettyServerBuilder.forPort(port) + .sslContext(sslContext) + .maxInboundMessageSize(AbstractInteropTest.MAX_MESSAGE_SIZE) + .addService( + ServerInterceptors.intercept( + new TestServiceImpl(executor), TestServiceImpl.interceptors())) + .build() + .start(); } @VisibleForTesting @@ -142,7 +144,7 @@ public void stop() throws Exception { if (!server.awaitTermination(5, TimeUnit.SECONDS)) { System.err.println("Timed out waiting for server shutdown"); } - System.out.println("Server stopped"); + System.out.println("Server stopped"); MoreExecutors.shutdownAndAwaitTermination(executor, 5, TimeUnit.SECONDS); } @@ -151,9 +153,7 @@ int getPort() { return server.getPort(); } - /** - * Await termination on the main thread since the grpc library uses daemon threads. - */ + /** Await termination on the main thread since the grpc library uses daemon threads. */ private void blockUntilShutdown() throws InterruptedException { if (server != null) { server.awaitTermination(); diff --git a/interop-tests/src/main/java/org/apache/pekko/grpc/interop/PekkoGrpcClientJava.java b/interop-tests/src/main/java/org/apache/pekko/grpc/interop/PekkoGrpcClientJava.java index 816d6ac7..61d06d1a 100644 --- a/interop-tests/src/main/java/org/apache/pekko/grpc/interop/PekkoGrpcClientJava.java +++ b/interop-tests/src/main/java/org/apache/pekko/grpc/interop/PekkoGrpcClientJava.java @@ -13,17 +13,16 @@ package org.apache.pekko.grpc.interop; -import org.apache.pekko.actor.ActorSystem; import io.grpc.internal.testing.TestUtils; import io.grpc.testing.integration2.ClientTester; -import io.grpc.testing.integration2.TestServiceClient; import io.grpc.testing.integration2.Settings; +import io.grpc.testing.integration2.TestServiceClient; +import java.util.concurrent.TimeUnit; +import org.apache.pekko.actor.ActorSystem; import scala.Function2; import scala.concurrent.Await; import scala.concurrent.duration.Duration; -import java.util.concurrent.TimeUnit; - public class PekkoGrpcClientJava extends GrpcClient { private final Function2 clientTesterFactory; @@ -38,18 +37,17 @@ public void run(String[] args) { final ActorSystem sys = ActorSystem.create("PekkoGrpcClientJava"); - final TestServiceClient client = new TestServiceClient(clientTesterFactory.apply(settings, sys)); + final TestServiceClient client = + new TestServiceClient(clientTesterFactory.apply(settings, sys)); client.setUp(); try { client.run(settings); - } - finally { + } finally { client.tearDown(); try { Await.result(sys.terminate(), Duration.apply(5, TimeUnit.SECONDS)); - } - catch (Exception ex) { + } catch (Exception ex) { throw new RuntimeException(ex); } } diff --git a/interop-tests/src/main/java/org/apache/pekko/grpc/interop/PekkoGrpcServerJava.java b/interop-tests/src/main/java/org/apache/pekko/grpc/interop/PekkoGrpcServerJava.java index 795dfe22..9db117cd 100644 --- a/interop-tests/src/main/java/org/apache/pekko/grpc/interop/PekkoGrpcServerJava.java +++ b/interop-tests/src/main/java/org/apache/pekko/grpc/interop/PekkoGrpcServerJava.java @@ -13,26 +13,9 @@ package org.apache.pekko.grpc.interop; -import org.apache.pekko.actor.ActorSystem; -import org.apache.pekko.http.javadsl.Http; -import org.apache.pekko.http.javadsl.HttpsConnectionContext; -import org.apache.pekko.http.javadsl.ServerBinding; -import org.apache.pekko.http.javadsl.model.HttpRequest; -import org.apache.pekko.http.javadsl.model.HttpResponse; -import org.apache.pekko.http.javadsl.model.StatusCodes; -import org.apache.pekko.http.javadsl.settings.ServerSettings; -import org.apache.pekko.japi.function.Function; -import org.apache.pekko.stream.Materializer; -import org.apache.pekko.stream.SystemMaterializer; -import org.apache.pekko.util.ByteString; import com.typesafe.config.ConfigFactory; import io.grpc.internal.testing.TestUtils; import io.grpc.testing.integration.TestService; -import scala.Function2; -import scala.Tuple2; - -import javax.net.ssl.KeyManagerFactory; -import javax.net.ssl.SSLContext; import java.io.FileInputStream; import java.io.InputStream; import java.nio.file.Files; @@ -48,47 +31,74 @@ import java.util.Iterator; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; +import javax.net.ssl.KeyManagerFactory; +import javax.net.ssl.SSLContext; +import org.apache.pekko.actor.ActorSystem; +import org.apache.pekko.http.javadsl.Http; +import org.apache.pekko.http.javadsl.HttpsConnectionContext; +import org.apache.pekko.http.javadsl.ServerBinding; +import org.apache.pekko.http.javadsl.model.HttpRequest; +import org.apache.pekko.http.javadsl.model.HttpResponse; +import org.apache.pekko.http.javadsl.model.StatusCodes; +import org.apache.pekko.http.javadsl.settings.ServerSettings; +import org.apache.pekko.japi.function.Function; +import org.apache.pekko.stream.Materializer; +import org.apache.pekko.stream.SystemMaterializer; +import org.apache.pekko.util.ByteString; +import scala.Function2; +import scala.Tuple2; -/** - * Glue code to start a gRPC server based on the pekko-grpc Java API to test against - */ +/** Glue code to start a gRPC server based on the pekko-grpc Java API to test against */ public class PekkoGrpcServerJava extends GrpcServer> { - private final Function2>> handlerFactory; + private final Function2< + Materializer, ActorSystem, Function>> + handlerFactory; - public PekkoGrpcServerJava(Function2>> handlerFactory) { + public PekkoGrpcServerJava( + Function2>> + handlerFactory) { this.handlerFactory = handlerFactory; } public Tuple2 start(String[] args) throws Exception { - ActorSystem sys = ActorSystem.create( - "pekko-grpc-server-java", - ConfigFactory.parseString("pekko.http.server.preview.enable-http2 = on")); + ActorSystem sys = + ActorSystem.create( + "pekko-grpc-server-java", + ConfigFactory.parseString("pekko.http.server.preview.enable-http2 = on")); Materializer mat = SystemMaterializer.get(sys).materializer(); - Function> testService = handlerFactory.apply(mat, sys); - - Function> handler = req -> { - Iterator segmentIterator = req.getUri().pathSegments().iterator(); - if (segmentIterator.hasNext()) { - if (segmentIterator.next().equals(TestService.name)) { - return testService.apply(req); - } else { - return CompletableFuture.completedFuture(HttpResponse.create().withStatus(StatusCodes.NOT_FOUND)); - } - } else { - return CompletableFuture.completedFuture(HttpResponse.create().withStatus(StatusCodes.NOT_FOUND)); - } - }; + Function> testService = + handlerFactory.apply(mat, sys); + + Function> handler = + req -> { + Iterator segmentIterator = req.getUri().pathSegments().iterator(); + if (segmentIterator.hasNext()) { + if (segmentIterator.next().equals(TestService.name)) { + return testService.apply(req); + } else { + return CompletableFuture.completedFuture( + HttpResponse.create().withStatus(StatusCodes.NOT_FOUND)); + } + } else { + return CompletableFuture.completedFuture( + HttpResponse.create().withStatus(StatusCodes.NOT_FOUND)); + } + }; ServerSettings serverSettings = ServerSettings.create(sys); CompletionStage binding; if (Arrays.asList(args).contains("--use_tls=false")) { - binding = Http.get(sys).newServerAt("127.0.0.1", 0) + binding = + Http.get(sys) + .newServerAt("127.0.0.1", 0) .withMaterializer(mat) .withSettings(serverSettings) .bind(handler); } else { - binding = Http.get(sys).newServerAt("127.0.0.1", 0) + binding = + Http.get(sys) + .newServerAt("127.0.0.1", 0) .withMaterializer(mat) .withSettings(serverSettings) .enableHttps(serverHttpContext()) @@ -111,10 +121,13 @@ public void stop(Tuple2 binding) throws Exception { } private HttpsConnectionContext serverHttpContext() throws Exception { - String keyEncoded = new String(Files.readAllBytes(Paths.get(TestUtils.loadCert("server1.key").getAbsolutePath())), "UTF-8") - .replace("-----BEGIN PRIVATE KEY-----\n", "") - .replace("-----END PRIVATE KEY-----\n", "") - .replace("\n", ""); + String keyEncoded = + new String( + Files.readAllBytes(Paths.get(TestUtils.loadCert("server1.key").getAbsolutePath())), + "UTF-8") + .replace("-----BEGIN PRIVATE KEY-----\n", "") + .replace("-----END PRIVATE KEY-----\n", "") + .replace("\n", ""); byte[] decodedKey = ByteString.fromString(keyEncoded).decodeBase64().toArray(); @@ -129,7 +142,7 @@ private HttpsConnectionContext serverHttpContext() throws Exception { KeyStore ks = KeyStore.getInstance("PKCS12"); ks.load(null); - ks.setKeyEntry("private", privateKey, new char[]{}, new Certificate[] { cer }); + ks.setKeyEntry("private", privateKey, new char[] {}, new Certificate[] {cer}); KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance("SunX509"); keyManagerFactory.init(ks, null); @@ -139,5 +152,4 @@ private HttpsConnectionContext serverHttpContext() throws Exception { return HttpsConnectionContext.httpsServer(context); } - } diff --git a/interop-tests/src/test/java/example/myapp/helloworld/grpc/RichErrorImpl.java b/interop-tests/src/test/java/example/myapp/helloworld/grpc/RichErrorImpl.java index a5324dd9..f7118b36 100644 --- a/interop-tests/src/test/java/example/myapp/helloworld/grpc/RichErrorImpl.java +++ b/interop-tests/src/test/java/example/myapp/helloworld/grpc/RichErrorImpl.java @@ -13,57 +13,55 @@ package example.myapp.helloworld.grpc; -import org.apache.pekko.NotUsed; -import org.apache.pekko.stream.javadsl.Source; import com.google.protobuf.any.Any; import com.google.rpc.Code; -import com.google.rpc.error_details.LocalizedMessage; import com.google.rpc.Status; +import com.google.rpc.error_details.LocalizedMessage; import io.grpc.StatusRuntimeException; import io.grpc.protobuf.StatusProto; - import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; +import org.apache.pekko.NotUsed; +import org.apache.pekko.stream.javadsl.Source; public class RichErrorImpl implements GreeterService { - // #rich_error_model_unary - private com.google.protobuf.Any toJavaProto(com.google.protobuf.any.Any scalaPbSource) { - com.google.protobuf.Any.Builder javaPbOut = com.google.protobuf.Any.newBuilder(); - javaPbOut.setTypeUrl(scalaPbSource.typeUrl()); - javaPbOut.setValue(scalaPbSource.value()); - return javaPbOut.build(); - } + // #rich_error_model_unary + private com.google.protobuf.Any toJavaProto(com.google.protobuf.any.Any scalaPbSource) { + com.google.protobuf.Any.Builder javaPbOut = com.google.protobuf.Any.newBuilder(); + javaPbOut.setTypeUrl(scalaPbSource.typeUrl()); + javaPbOut.setValue(scalaPbSource.value()); + return javaPbOut.build(); + } - @Override - public CompletionStage sayHello(HelloRequest in) { - Status status = Status.newBuilder() - .setCode(Code.INVALID_ARGUMENT_VALUE) - .setMessage("What is wrong?") - .addDetails(toJavaProto(Any.pack( - LocalizedMessage.of("EN", "The password!") - ))) - .build(); - StatusRuntimeException statusRuntimeException = StatusProto.toStatusRuntimeException(status); + @Override + public CompletionStage sayHello(HelloRequest in) { + Status status = + Status.newBuilder() + .setCode(Code.INVALID_ARGUMENT_VALUE) + .setMessage("What is wrong?") + .addDetails(toJavaProto(Any.pack(LocalizedMessage.of("EN", "The password!")))) + .build(); + StatusRuntimeException statusRuntimeException = StatusProto.toStatusRuntimeException(status); - CompletableFuture future = new CompletableFuture<>(); - future.completeExceptionally(statusRuntimeException); - return future; - } - // #rich_error_model_unary + CompletableFuture future = new CompletableFuture<>(); + future.completeExceptionally(statusRuntimeException); + return future; + } + // #rich_error_model_unary - @Override - public CompletionStage itKeepsTalking(Source in) { - return null; - } + @Override + public CompletionStage itKeepsTalking(Source in) { + return null; + } - @Override - public Source itKeepsReplying(HelloRequest in) { - return null; - } + @Override + public Source itKeepsReplying(HelloRequest in) { + return null; + } - @Override - public Source streamHellos(Source in) { - return null; - } + @Override + public Source streamHellos(Source in) { + return null; + } } diff --git a/interop-tests/src/test/java/example/myapp/helloworld/grpc/RichErrorModelTest.java b/interop-tests/src/test/java/example/myapp/helloworld/grpc/RichErrorModelTest.java index db859ba8..5cc75e70 100644 --- a/interop-tests/src/test/java/example/myapp/helloworld/grpc/RichErrorModelTest.java +++ b/interop-tests/src/test/java/example/myapp/helloworld/grpc/RichErrorModelTest.java @@ -13,81 +13,87 @@ package example.myapp.helloworld.grpc; +import static org.junit.Assert.assertEquals; + +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; +import io.grpc.protobuf.StatusProto; +import java.util.concurrent.CompletionStage; import org.apache.pekko.actor.ActorSystem; import org.apache.pekko.grpc.GrpcClientSettings; import org.apache.pekko.http.javadsl.Http; import org.apache.pekko.http.javadsl.ServerBinding; import org.apache.pekko.http.javadsl.model.HttpRequest; import org.apache.pekko.http.javadsl.model.HttpResponse; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; -import io.grpc.Status; -import io.grpc.StatusRuntimeException; -import io.grpc.protobuf.StatusProto; import org.junit.Assert; import org.junit.Test; import org.scalatestplus.junit.JUnitSuite; -import java.util.concurrent.CompletionStage; - -import static org.junit.Assert.assertEquals; - - public class RichErrorModelTest extends JUnitSuite { - private com.google.protobuf.any.Any fromJavaProto(com.google.protobuf.Any javaPbSource) { - return com.google.protobuf.any.Any.of(javaPbSource.getTypeUrl(), javaPbSource.getValue()); - } - - private CompletionStage run(ActorSystem sys) throws Exception { - - GreeterService impl = new RichErrorImpl(); - - org.apache.pekko.japi.function.Function> service = GreeterServiceHandlerFactory.create(impl, sys); - - return Http - .get(sys) - .newServerAt("127.0.0.1", 8090) - .bind(service); - } - - @Test - @SuppressWarnings("unchecked") - public void testManualApproach() throws Exception { - Config conf = ConfigFactory.load(); - ActorSystem sys = ActorSystem.create("HelloWorld", conf); - run(sys); - - GrpcClientSettings settings = GrpcClientSettings.connectToServiceAt("127.0.0.1", 8090, sys).withTls(false); - - GreeterServiceClient client = null; - try { - client = GreeterServiceClient.create(settings, sys); - - // #client_request - HelloRequest request = HelloRequest.newBuilder().setName("Alice").build(); - CompletionStage response = client.sayHello(request); - StatusRuntimeException statusEx = response.toCompletableFuture().handle((res, ex) -> { - return (StatusRuntimeException) ex; - }).get(); - - com.google.rpc.Status status = StatusProto.fromStatusAndTrailers(statusEx.getStatus(), statusEx.getTrailers()); - - assertEquals("type.googleapis.com/google.rpc.LocalizedMessage", status.getDetails(0).getTypeUrl()); - - com.google.rpc.error_details.LocalizedMessage details = fromJavaProto(status.getDetails(0)).unpack(com.google.rpc.error_details.LocalizedMessage.messageCompanion()); - - assertEquals(Status.INVALID_ARGUMENT.getCode().value(), status.getCode()); - assertEquals("What is wrong?", status.getMessage()); - assertEquals("The password!", details.message()); - assertEquals("EN", details.locale()); - // #client_request - - } catch (Exception e) { - Assert.fail("Got unexpected error " + e.getMessage()); - } finally { - if (client != null) client.close(); - sys.terminate(); - } + private com.google.protobuf.any.Any fromJavaProto(com.google.protobuf.Any javaPbSource) { + return com.google.protobuf.any.Any.of(javaPbSource.getTypeUrl(), javaPbSource.getValue()); + } + + private CompletionStage run(ActorSystem sys) throws Exception { + + GreeterService impl = new RichErrorImpl(); + + org.apache.pekko.japi.function.Function> service = + GreeterServiceHandlerFactory.create(impl, sys); + + return Http.get(sys).newServerAt("127.0.0.1", 8090).bind(service); + } + + @Test + @SuppressWarnings("unchecked") + public void testManualApproach() throws Exception { + Config conf = ConfigFactory.load(); + ActorSystem sys = ActorSystem.create("HelloWorld", conf); + run(sys); + + GrpcClientSettings settings = + GrpcClientSettings.connectToServiceAt("127.0.0.1", 8090, sys).withTls(false); + + GreeterServiceClient client = null; + try { + client = GreeterServiceClient.create(settings, sys); + + // #client_request + HelloRequest request = HelloRequest.newBuilder().setName("Alice").build(); + CompletionStage response = client.sayHello(request); + StatusRuntimeException statusEx = + response + .toCompletableFuture() + .handle( + (res, ex) -> { + return (StatusRuntimeException) ex; + }) + .get(); + + com.google.rpc.Status status = + StatusProto.fromStatusAndTrailers(statusEx.getStatus(), statusEx.getTrailers()); + + assertEquals( + "type.googleapis.com/google.rpc.LocalizedMessage", status.getDetails(0).getTypeUrl()); + + com.google.rpc.error_details.LocalizedMessage details = + fromJavaProto(status.getDetails(0)) + .unpack(com.google.rpc.error_details.LocalizedMessage.messageCompanion()); + + assertEquals(Status.INVALID_ARGUMENT.getCode().value(), status.getCode()); + assertEquals("What is wrong?", status.getMessage()); + assertEquals("The password!", details.message()); + assertEquals("EN", details.locale()); + // #client_request + + } catch (Exception e) { + Assert.fail("Got unexpected error " + e.getMessage()); + } finally { + if (client != null) client.close(); + sys.terminate(); } + } } diff --git a/interop-tests/src/test/java/example/myapp/helloworld/grpc/RichErrorNativeImpl.java b/interop-tests/src/test/java/example/myapp/helloworld/grpc/RichErrorNativeImpl.java index a5ef34b6..6b277eda 100644 --- a/interop-tests/src/test/java/example/myapp/helloworld/grpc/RichErrorNativeImpl.java +++ b/interop-tests/src/test/java/example/myapp/helloworld/grpc/RichErrorNativeImpl.java @@ -13,51 +13,47 @@ package example.myapp.helloworld.grpc; -import org.apache.pekko.NotUsed; -import org.apache.pekko.grpc.GrpcServiceException; -import org.apache.pekko.stream.javadsl.Source; import com.google.rpc.Code; import com.google.rpc.error_details.LocalizedMessage; -import scala.collection.JavaConverters; -import scalapb.GeneratedMessage; - import java.util.ArrayList; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; +import org.apache.pekko.NotUsed; +import org.apache.pekko.grpc.GrpcServiceException; +import org.apache.pekko.stream.javadsl.Source; +import scala.collection.JavaConverters; public class RichErrorNativeImpl implements GreeterService { - // #rich_error_model_unary - @Override - public CompletionStage sayHello(HelloRequest in) { - - ArrayList ar = new ArrayList<>(); - ar.add(LocalizedMessage.of("EN", "The password!")); - - GrpcServiceException exception = GrpcServiceException.apply( - Code.INVALID_ARGUMENT, - "What is wrong?", - JavaConverters.asScalaBuffer(ar).toSeq() - ); - - CompletableFuture future = new CompletableFuture<>(); - future.completeExceptionally(exception); - return future; - } - // #rich_error_model_unary - - @Override - public CompletionStage itKeepsTalking(Source in) { - return null; - } - - @Override - public Source itKeepsReplying(HelloRequest in) { - return null; - } - - @Override - public Source streamHellos(Source in) { - return null; - } + // #rich_error_model_unary + @Override + public CompletionStage sayHello(HelloRequest in) { + + ArrayList ar = new ArrayList<>(); + ar.add(LocalizedMessage.of("EN", "The password!")); + + GrpcServiceException exception = + GrpcServiceException.apply( + Code.INVALID_ARGUMENT, "What is wrong?", JavaConverters.asScalaBuffer(ar).toSeq()); + + CompletableFuture future = new CompletableFuture<>(); + future.completeExceptionally(exception); + return future; + } + // #rich_error_model_unary + + @Override + public CompletionStage itKeepsTalking(Source in) { + return null; + } + + @Override + public Source itKeepsReplying(HelloRequest in) { + return null; + } + + @Override + public Source streamHellos(Source in) { + return null; + } } diff --git a/interop-tests/src/test/java/org/apache/pekko/grpc/interop/PekkoGrpcJavaClientTester.java b/interop-tests/src/test/java/org/apache/pekko/grpc/interop/PekkoGrpcJavaClientTester.java index f27b9804..83f773c3 100644 --- a/interop-tests/src/test/java/org/apache/pekko/grpc/interop/PekkoGrpcJavaClientTester.java +++ b/interop-tests/src/test/java/org/apache/pekko/grpc/interop/PekkoGrpcJavaClientTester.java @@ -13,18 +13,9 @@ package org.apache.pekko.grpc.interop; -import org.apache.pekko.actor.ActorSystem; -import org.apache.pekko.grpc.GrpcClientSettings; -import org.apache.pekko.grpc.GrpcResponseMetadata; -import org.apache.pekko.grpc.GrpcSingleResponse; -import org.apache.pekko.grpc.SSLContextUtils; -import org.apache.pekko.grpc.javadsl.Metadata; -import org.apache.pekko.japi.Pair; -import org.apache.pekko.stream.Materializer; -import org.apache.pekko.stream.SystemMaterializer; -import org.apache.pekko.stream.javadsl.Keep; -import org.apache.pekko.stream.javadsl.Sink; -import org.apache.pekko.stream.javadsl.Source; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + import com.google.protobuf.ByteString; import io.grpc.Status; import io.grpc.StatusRuntimeException; @@ -34,23 +25,33 @@ import io.grpc.testing.integration.UnimplementedServiceClient; import io.grpc.testing.integration2.ClientTester; import io.grpc.testing.integration2.Settings; -import org.junit.Assert; -import scala.concurrent.ExecutionContext; - import java.io.InputStream; import java.util.ArrayList; import java.util.List; import java.util.Optional; import java.util.concurrent.CompletionStage; import java.util.concurrent.TimeUnit; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; +import org.apache.pekko.actor.ActorSystem; +import org.apache.pekko.grpc.GrpcClientSettings; +import org.apache.pekko.grpc.GrpcResponseMetadata; +import org.apache.pekko.grpc.GrpcSingleResponse; +import org.apache.pekko.grpc.SSLContextUtils; +import org.apache.pekko.grpc.javadsl.Metadata; +import org.apache.pekko.japi.Pair; +import org.apache.pekko.stream.Materializer; +import org.apache.pekko.stream.SystemMaterializer; +import org.apache.pekko.stream.javadsl.Keep; +import org.apache.pekko.stream.javadsl.Sink; +import org.apache.pekko.stream.javadsl.Source; +import org.junit.Assert; +import scala.concurrent.ExecutionContext; /** - * ClientTester implementation that uses the generated pekko-grpc Java client to exercise a server under test. + * ClientTester implementation that uses the generated pekko-grpc Java client to exercise a server + * under test. * - * Essentially porting the client code from [[io.grpc.testing.integration.AbstractInteropTest]] against our Scala API's + *

Essentially porting the client code from [[io.grpc.testing.integration.AbstractInteropTest]] + * against our Scala API's */ public class PekkoGrpcJavaClientTester implements ClientTester { private final Settings settings; @@ -74,25 +75,29 @@ public PekkoGrpcJavaClientTester(Settings settings, ActorSystem sys) { public void setUp() { final GrpcClientSettings grpcSettings = GrpcClientSettings.connectToServiceAt(settings.serverHost(), settings.serverPort(), as) - .withOverrideAuthority(settings.serverHostOverride()) - .withTls(settings.useTls()) - .withTrustManager(SSLContextUtils.trustManagerFromResource("/certs/ca.pem")); + .withOverrideAuthority(settings.serverHostOverride()) + .withTls(settings.useTls()) + .withTrustManager(SSLContextUtils.trustManagerFromResource("/certs/ca.pem")); client = TestServiceClient.create(grpcSettings, as); clientUnimplementedService = UnimplementedServiceClient.create(grpcSettings, as); } @Override public void tearDown() throws Exception { - if (client != null) client.close().toCompletableFuture().get(AWAIT_TIME_SECONDS, TimeUnit.SECONDS); - if (clientUnimplementedService != null) clientUnimplementedService.close().toCompletableFuture().get(AWAIT_TIME_SECONDS, TimeUnit.SECONDS); + if (client != null) + client.close().toCompletableFuture().get(AWAIT_TIME_SECONDS, TimeUnit.SECONDS); + if (clientUnimplementedService != null) + clientUnimplementedService + .close() + .toCompletableFuture() + .get(AWAIT_TIME_SECONDS, TimeUnit.SECONDS); } @Override public void emptyUnary() throws Exception { Assert.assertEquals( - EmptyProtos.Empty.newBuilder().build(), - client.emptyCall(EmptyProtos.Empty.newBuilder().build()).toCompletableFuture().get() - ); + EmptyProtos.Empty.newBuilder().build(), + client.emptyCall(EmptyProtos.Empty.newBuilder().build()).toCompletableFuture().get()); } @Override @@ -102,14 +107,18 @@ public void cacheableUnary() { @Override public void largeUnary() throws Exception { - final Messages.SimpleRequest request = Messages.SimpleRequest.newBuilder() - .setResponseSize(314159) - .setPayload(Messages.Payload.newBuilder().setBody(ByteString.copyFrom(new byte[271828]))) - .build(); + final Messages.SimpleRequest request = + Messages.SimpleRequest.newBuilder() + .setResponseSize(314159) + .setPayload( + Messages.Payload.newBuilder().setBody(ByteString.copyFrom(new byte[271828]))) + .build(); - final Messages.SimpleResponse expectedResponse = Messages.SimpleResponse.newBuilder() - .setPayload(Messages.Payload.newBuilder().setBody(ByteString.copyFrom(new byte[314159]))) - .build(); + final Messages.SimpleResponse expectedResponse = + Messages.SimpleResponse.newBuilder() + .setPayload( + Messages.Payload.newBuilder().setBody(ByteString.copyFrom(new byte[314159]))) + .build(); final Messages.SimpleResponse response = client.unaryCall(request).toCompletableFuture().get(); assertEquals(expectedResponse, response); @@ -128,20 +137,28 @@ public void serverCompressedUnary() throws Exception { @Override public void clientStreaming() throws Exception { final List requests = new ArrayList<>(); - requests.add(Messages.StreamingInputCallRequest.newBuilder().setPayload( - Messages.Payload.newBuilder().setBody(ByteString.copyFrom(new byte[27182]))).build()); - requests.add(Messages.StreamingInputCallRequest.newBuilder().setPayload( - Messages.Payload.newBuilder().setBody(ByteString.copyFrom(new byte[8]))).build()); - requests.add(Messages.StreamingInputCallRequest.newBuilder().setPayload( - Messages.Payload.newBuilder().setBody(ByteString.copyFrom(new byte[1828]))).build()); - requests.add(Messages.StreamingInputCallRequest.newBuilder().setPayload( - Messages.Payload.newBuilder().setBody(ByteString.copyFrom(new byte[45904]))).build()); + requests.add( + Messages.StreamingInputCallRequest.newBuilder() + .setPayload(Messages.Payload.newBuilder().setBody(ByteString.copyFrom(new byte[27182]))) + .build()); + requests.add( + Messages.StreamingInputCallRequest.newBuilder() + .setPayload(Messages.Payload.newBuilder().setBody(ByteString.copyFrom(new byte[8]))) + .build()); + requests.add( + Messages.StreamingInputCallRequest.newBuilder() + .setPayload(Messages.Payload.newBuilder().setBody(ByteString.copyFrom(new byte[1828]))) + .build()); + requests.add( + Messages.StreamingInputCallRequest.newBuilder() + .setPayload(Messages.Payload.newBuilder().setBody(ByteString.copyFrom(new byte[45904]))) + .build()); final Messages.StreamingInputCallResponse expectedResponse = - Messages.StreamingInputCallResponse.newBuilder().setAggregatedPayloadSize(74922).build(); + Messages.StreamingInputCallResponse.newBuilder().setAggregatedPayloadSize(74922).build(); final Messages.StreamingInputCallResponse response = - client.streamingInputCall(Source.from(requests)).toCompletableFuture().get(); + client.streamingInputCall(Source.from(requests)).toCompletableFuture().get(); assertEquals(expectedResponse, response); } @@ -154,28 +171,38 @@ public void clientCompressedStreaming(boolean probe) throws Exception { @Override public void serverStreaming() throws Exception { final Messages.StreamingOutputCallRequest request = - Messages.StreamingOutputCallRequest.newBuilder() - .addResponseParameters(Messages.ResponseParameters.newBuilder().setSize(31415)) - .addResponseParameters(Messages.ResponseParameters.newBuilder().setSize(9)) - .addResponseParameters(Messages.ResponseParameters.newBuilder().setSize(2653)) - .addResponseParameters(Messages.ResponseParameters.newBuilder().setSize(58979)) - .build(); + Messages.StreamingOutputCallRequest.newBuilder() + .addResponseParameters(Messages.ResponseParameters.newBuilder().setSize(31415)) + .addResponseParameters(Messages.ResponseParameters.newBuilder().setSize(9)) + .addResponseParameters(Messages.ResponseParameters.newBuilder().setSize(2653)) + .addResponseParameters(Messages.ResponseParameters.newBuilder().setSize(58979)) + .build(); final List expectedResponse = new ArrayList<>(); - expectedResponse.add(Messages.StreamingOutputCallResponse.newBuilder() - .setPayload(Messages.Payload.newBuilder().setBody(ByteString.copyFrom(new byte[31415]))) - .build()); - expectedResponse.add(Messages.StreamingOutputCallResponse.newBuilder() - .setPayload(Messages.Payload.newBuilder().setBody(ByteString.copyFrom(new byte[9]))) - .build()); - expectedResponse.add(Messages.StreamingOutputCallResponse.newBuilder() - .setPayload(Messages.Payload.newBuilder().setBody(ByteString.copyFrom(new byte[2653]))) - .build()); - expectedResponse.add(Messages.StreamingOutputCallResponse.newBuilder() - .setPayload(Messages.Payload.newBuilder().setBody(ByteString.copyFrom(new byte[58979]))) - .build()); - - final List response = client.streamingOutputCall(request).toMat(Sink.seq(), Keep.right()).run(mat).toCompletableFuture().get(); + expectedResponse.add( + Messages.StreamingOutputCallResponse.newBuilder() + .setPayload(Messages.Payload.newBuilder().setBody(ByteString.copyFrom(new byte[31415]))) + .build()); + expectedResponse.add( + Messages.StreamingOutputCallResponse.newBuilder() + .setPayload(Messages.Payload.newBuilder().setBody(ByteString.copyFrom(new byte[9]))) + .build()); + expectedResponse.add( + Messages.StreamingOutputCallResponse.newBuilder() + .setPayload(Messages.Payload.newBuilder().setBody(ByteString.copyFrom(new byte[2653]))) + .build()); + expectedResponse.add( + Messages.StreamingOutputCallResponse.newBuilder() + .setPayload(Messages.Payload.newBuilder().setBody(ByteString.copyFrom(new byte[58979]))) + .build()); + + final List response = + client + .streamingOutputCall(request) + .toMat(Sink.seq(), Keep.right()) + .run(mat) + .toCompletableFuture() + .get(); assertEquals(expectedResponse.size(), response.size()); for (int i = 0; i < expectedResponse.size(); i++) { @@ -186,20 +213,34 @@ public void serverStreaming() throws Exception { @Override public void serverCompressedStreaming() throws Exception { final Messages.StreamingOutputCallRequest request = - Messages.StreamingOutputCallRequest.newBuilder() - .addResponseParameters(Messages.ResponseParameters.newBuilder().setSize(31415).setCompressed(Messages.BoolValue.newBuilder().setValue(true))) - .addResponseParameters(Messages.ResponseParameters.newBuilder().setSize(92653).setCompressed(Messages.BoolValue.newBuilder().setValue(true))) - .build(); + Messages.StreamingOutputCallRequest.newBuilder() + .addResponseParameters( + Messages.ResponseParameters.newBuilder() + .setSize(31415) + .setCompressed(Messages.BoolValue.newBuilder().setValue(true))) + .addResponseParameters( + Messages.ResponseParameters.newBuilder() + .setSize(92653) + .setCompressed(Messages.BoolValue.newBuilder().setValue(true))) + .build(); final List expectedResponse = new ArrayList<>(); - expectedResponse.add(Messages.StreamingOutputCallResponse.newBuilder() - .setPayload(Messages.Payload.newBuilder().setBody(ByteString.copyFrom(new byte[31415]))) - .build()); - expectedResponse.add(Messages.StreamingOutputCallResponse.newBuilder() - .setPayload(Messages.Payload.newBuilder().setBody(ByteString.copyFrom(new byte[92653]))) - .build()); + expectedResponse.add( + Messages.StreamingOutputCallResponse.newBuilder() + .setPayload(Messages.Payload.newBuilder().setBody(ByteString.copyFrom(new byte[31415]))) + .build()); + expectedResponse.add( + Messages.StreamingOutputCallResponse.newBuilder() + .setPayload(Messages.Payload.newBuilder().setBody(ByteString.copyFrom(new byte[92653]))) + .build()); - final List response = client.streamingOutputCall(request).toMat(Sink.seq(), Keep.right()).run(mat).toCompletableFuture().get(); + final List response = + client + .streamingOutputCall(request) + .toMat(Sink.seq(), Keep.right()) + .run(mat) + .toCompletableFuture() + .get(); assertEquals(expectedResponse.size(), response.size()); for (int i = 0; i < expectedResponse.size(); i++) { @@ -210,38 +251,52 @@ public void serverCompressedStreaming() throws Exception { @Override public void pingPong() throws Exception { final List requests = new ArrayList<>(); - requests.add(Messages.StreamingOutputCallRequest.newBuilder() - .addResponseParameters(Messages.ResponseParameters.newBuilder().setSize(31415)) - .setPayload(Messages.Payload.newBuilder().setBody(ByteString.copyFrom(new byte[27182]))) - .build()); - requests.add(Messages.StreamingOutputCallRequest.newBuilder() - .addResponseParameters(Messages.ResponseParameters.newBuilder().setSize(9)) - .setPayload(Messages.Payload.newBuilder().setBody(ByteString.copyFrom(new byte[8]))) - .build()); - requests.add(Messages.StreamingOutputCallRequest.newBuilder() - .addResponseParameters(Messages.ResponseParameters.newBuilder().setSize(2653)) - .setPayload(Messages.Payload.newBuilder().setBody(ByteString.copyFrom(new byte[1828]))) - .build()); - requests.add(Messages.StreamingOutputCallRequest.newBuilder() - .addResponseParameters(Messages.ResponseParameters.newBuilder().setSize(58979)) - .setPayload(Messages.Payload.newBuilder().setBody(ByteString.copyFrom(new byte[45904]))) - .build()); + requests.add( + Messages.StreamingOutputCallRequest.newBuilder() + .addResponseParameters(Messages.ResponseParameters.newBuilder().setSize(31415)) + .setPayload(Messages.Payload.newBuilder().setBody(ByteString.copyFrom(new byte[27182]))) + .build()); + requests.add( + Messages.StreamingOutputCallRequest.newBuilder() + .addResponseParameters(Messages.ResponseParameters.newBuilder().setSize(9)) + .setPayload(Messages.Payload.newBuilder().setBody(ByteString.copyFrom(new byte[8]))) + .build()); + requests.add( + Messages.StreamingOutputCallRequest.newBuilder() + .addResponseParameters(Messages.ResponseParameters.newBuilder().setSize(2653)) + .setPayload(Messages.Payload.newBuilder().setBody(ByteString.copyFrom(new byte[1828]))) + .build()); + requests.add( + Messages.StreamingOutputCallRequest.newBuilder() + .addResponseParameters(Messages.ResponseParameters.newBuilder().setSize(58979)) + .setPayload(Messages.Payload.newBuilder().setBody(ByteString.copyFrom(new byte[45904]))) + .build()); final List expectedResponse = new ArrayList<>(); - expectedResponse.add(Messages.StreamingOutputCallResponse.newBuilder() - .setPayload(Messages.Payload.newBuilder().setBody(ByteString.copyFrom(new byte[31415]))) - .build()); - expectedResponse.add(Messages.StreamingOutputCallResponse.newBuilder() - .setPayload(Messages.Payload.newBuilder().setBody(ByteString.copyFrom(new byte[9]))) - .build()); - expectedResponse.add(Messages.StreamingOutputCallResponse.newBuilder() - .setPayload(Messages.Payload.newBuilder().setBody(ByteString.copyFrom(new byte[2653]))) - .build()); - expectedResponse.add(Messages.StreamingOutputCallResponse.newBuilder() - .setPayload(Messages.Payload.newBuilder().setBody(ByteString.copyFrom(new byte[58979]))) - .build()); - - final List response = client.fullDuplexCall(Source.from(requests)).toMat(Sink.seq(), Keep.right()).run(mat).toCompletableFuture().get(); + expectedResponse.add( + Messages.StreamingOutputCallResponse.newBuilder() + .setPayload(Messages.Payload.newBuilder().setBody(ByteString.copyFrom(new byte[31415]))) + .build()); + expectedResponse.add( + Messages.StreamingOutputCallResponse.newBuilder() + .setPayload(Messages.Payload.newBuilder().setBody(ByteString.copyFrom(new byte[9]))) + .build()); + expectedResponse.add( + Messages.StreamingOutputCallResponse.newBuilder() + .setPayload(Messages.Payload.newBuilder().setBody(ByteString.copyFrom(new byte[2653]))) + .build()); + expectedResponse.add( + Messages.StreamingOutputCallResponse.newBuilder() + .setPayload(Messages.Payload.newBuilder().setBody(ByteString.copyFrom(new byte[58979]))) + .build()); + + final List response = + client + .fullDuplexCall(Source.from(requests)) + .toMat(Sink.seq(), Keep.right()) + .run(mat) + .toCompletableFuture() + .get(); assertEquals(expectedResponse.size(), response.size()); for (int i = 0; i < expectedResponse.size(); i++) { @@ -252,8 +307,12 @@ public void pingPong() throws Exception { @Override public void emptyStream() throws Exception { final List response = - client.fullDuplexCall(Source.empty()).toMat(Sink.seq(), Keep.right()) - .run(mat).toCompletableFuture().get(); + client + .fullDuplexCall(Source.empty()) + .toMat(Sink.seq(), Keep.right()) + .run(mat) + .toCompletableFuture() + .get(); assertEquals(0, response.size()); } @@ -263,7 +322,8 @@ public void computeEngineCreds(String serviceAccount, String oauthScope) throws } @Override - public void serviceAccountCreds(String jsonKey, InputStream credentialsStream, String authScope) throws Exception { + public void serviceAccountCreds(String jsonKey, InputStream credentialsStream, String authScope) + throws Exception { throw new UnsupportedOperationException("Not implemented!"); } @@ -273,55 +333,71 @@ public void jwtTokenCreds(InputStream serviceAccountJson) throws Exception { } @Override - public void oauth2AuthToken(String jsonKey, InputStream credentialsStream, String authScope) throws Exception { + public void oauth2AuthToken(String jsonKey, InputStream credentialsStream, String authScope) + throws Exception { throw new UnsupportedOperationException("Not implemented!"); } @Override - public void perRpcCreds(String jsonKey, InputStream credentialsStream, String oauthScope) throws Exception { + public void perRpcCreds(String jsonKey, InputStream credentialsStream, String oauthScope) + throws Exception { throw new UnsupportedOperationException("Not implemented!"); } @Override public void customMetadata() throws Exception { // unary call - org.apache.pekko.util.ByteString binaryValue = org.apache.pekko.util.ByteString.fromInts(0xababab); - CompletionStage> unaryResponseCs = client.unaryCall() - .addHeader("x-grpc-test-echo-initial", "test_initial_metadata_value") - .addHeader("x-grpc-test-echo-trailing-bin", binaryValue) - .invokeWithMetadata(Messages.SimpleRequest.newBuilder() - .setResponseSize(314159) - .setPayload(Messages.Payload.newBuilder() - .setBody(ByteString.copyFrom(new byte[271828])) - .build()) - .build() - ); - - GrpcSingleResponse unaryResponse = unaryResponseCs.toCompletableFuture().get(); - Optional unaryInitialMetadata = unaryResponse.getHeaders().getText("x-grpc-test-echo-initial"); + org.apache.pekko.util.ByteString binaryValue = + org.apache.pekko.util.ByteString.fromInts(0xababab); + CompletionStage> unaryResponseCs = + client + .unaryCall() + .addHeader("x-grpc-test-echo-initial", "test_initial_metadata_value") + .addHeader("x-grpc-test-echo-trailing-bin", binaryValue) + .invokeWithMetadata( + Messages.SimpleRequest.newBuilder() + .setResponseSize(314159) + .setPayload( + Messages.Payload.newBuilder() + .setBody(ByteString.copyFrom(new byte[271828])) + .build()) + .build()); + + GrpcSingleResponse unaryResponse = + unaryResponseCs.toCompletableFuture().get(); + Optional unaryInitialMetadata = + unaryResponse.getHeaders().getText("x-grpc-test-echo-initial"); assertEquals("test_initial_metadata_value", unaryInitialMetadata.get()); Metadata unaryTrailers = unaryResponse.getTrailers().toCompletableFuture().get(); - assertEquals( - binaryValue, - unaryTrailers.getBinary("x-grpc-test-echo-trailing-bin").get()); + assertEquals(binaryValue, unaryTrailers.getBinary("x-grpc-test-echo-trailing-bin").get()); // full duplex - Source> fullDuplexSource = - client.fullDuplexCall() - .addHeader("x-grpc-test-echo-initial", "test_initial_metadata_value") - .addHeader("x-grpc-test-echo-trailing-bin", org.apache.pekko.util.ByteString.fromInts(0xababab)) - .invokeWithMetadata(Source.single(Messages.StreamingOutputCallRequest.newBuilder() - .addResponseParameters(Messages.ResponseParameters.newBuilder().setSize(314159).build()) - .setPayload(Messages.Payload.newBuilder() - .setBody(ByteString.copyFrom(new byte[271828])) - .build()) - .build() - )); - - Pair, CompletionStage> fullDuplexResult = - fullDuplexSource.toMat(Sink.head(), Keep.both()).run(mat); - - Messages.StreamingOutputCallResponse response = fullDuplexResult.second().toCompletableFuture().get(); + Source> + fullDuplexSource = + client + .fullDuplexCall() + .addHeader("x-grpc-test-echo-initial", "test_initial_metadata_value") + .addHeader( + "x-grpc-test-echo-trailing-bin", + org.apache.pekko.util.ByteString.fromInts(0xababab)) + .invokeWithMetadata( + Source.single( + Messages.StreamingOutputCallRequest.newBuilder() + .addResponseParameters( + Messages.ResponseParameters.newBuilder().setSize(314159).build()) + .setPayload( + Messages.Payload.newBuilder() + .setBody(ByteString.copyFrom(new byte[271828])) + .build()) + .build())); + + Pair< + CompletionStage, + CompletionStage> + fullDuplexResult = fullDuplexSource.toMat(Sink.head(), Keep.both()).run(mat); + + Messages.StreamingOutputCallResponse response = + fullDuplexResult.second().toCompletableFuture().get(); GrpcResponseMetadata fullDuplexMetadata = fullDuplexResult.first().toCompletableFuture().get(); assertEquals( @@ -339,60 +415,77 @@ public void customMetadata() throws Exception { public void statusCodeAndMessage() throws Exception { // assert unary final String errorMessage = "test status message"; - final Messages.EchoStatus echoStatus = Messages.EchoStatus.newBuilder() - .setCode(Status.UNKNOWN.getCode().value()) - .setMessage(errorMessage) - .build(); - final Messages.SimpleRequest request = Messages.SimpleRequest.newBuilder() - .setResponseStatus(echoStatus) - .build(); + final Messages.EchoStatus echoStatus = + Messages.EchoStatus.newBuilder() + .setCode(Status.UNKNOWN.getCode().value()) + .setMessage(errorMessage) + .build(); + final Messages.SimpleRequest request = + Messages.SimpleRequest.newBuilder().setResponseStatus(echoStatus).build(); final CompletionStage response = client.unaryCall(request); - response.toCompletableFuture().handle((res, ex) -> { - if (!(ex instanceof StatusRuntimeException)) { - ex.printStackTrace(); - fail("Expected [StatusRuntimeException] but got " + (ex == null ? "null" : ex.getClass().toString())); - } - - final StatusRuntimeException e = (StatusRuntimeException)ex; - assertEquals(Status.UNKNOWN.getCode(), e.getStatus().getCode()); - assertEquals(errorMessage, e.getStatus().getDescription()); - - return null; - }).get(); + response + .toCompletableFuture() + .handle( + (res, ex) -> { + if (!(ex instanceof StatusRuntimeException)) { + ex.printStackTrace(); + fail( + "Expected [StatusRuntimeException] but got " + + (ex == null ? "null" : ex.getClass().toString())); + } + + final StatusRuntimeException e = (StatusRuntimeException) ex; + assertEquals(Status.UNKNOWN.getCode(), e.getStatus().getCode()); + assertEquals(errorMessage, e.getStatus().getDescription()); + + return null; + }) + .get(); // assert streaming final Messages.StreamingOutputCallRequest streamingRequest = - Messages.StreamingOutputCallRequest.newBuilder().setResponseStatus(echoStatus).build(); + Messages.StreamingOutputCallRequest.newBuilder().setResponseStatus(echoStatus).build(); final CompletionStage streamingResponse = - client.fullDuplexCall(Source.single(streamingRequest)).runWith(Sink.head(), mat); - streamingResponse.toCompletableFuture().handle((res, ex) -> { - if (!(ex instanceof StatusRuntimeException)) - fail("Expected [StatusRuntimeException] but got " + (ex == null ? "null" : ex.getClass().toString())); - - final StatusRuntimeException e = (StatusRuntimeException)ex; - assertEquals(Status.UNKNOWN.getCode(), e.getStatus().getCode()); - assertEquals(errorMessage, e.getStatus().getDescription()); - - return null; - }).get(); + client.fullDuplexCall(Source.single(streamingRequest)).runWith(Sink.head(), mat); + streamingResponse + .toCompletableFuture() + .handle( + (res, ex) -> { + if (!(ex instanceof StatusRuntimeException)) + fail( + "Expected [StatusRuntimeException] but got " + + (ex == null ? "null" : ex.getClass().toString())); + + final StatusRuntimeException e = (StatusRuntimeException) ex; + assertEquals(Status.UNKNOWN.getCode(), e.getStatus().getCode()); + assertEquals(errorMessage, e.getStatus().getDescription()); + + return null; + }) + .get(); } @Override public void unimplementedMethod() { try { - client.unimplementedCall(EmptyProtos.Empty.newBuilder().build()).toCompletableFuture() - .handle((res, ex) -> { - if (!(ex instanceof StatusRuntimeException)) - fail("Expected [StatusRuntimeException] but got " + (ex == null ? "null" : ex.getClass().toString())); - - final StatusRuntimeException e = (StatusRuntimeException) ex; - assertEquals(Status.UNIMPLEMENTED.getCode(), e.getStatus().getCode()); - - return null; - }).get(); - } - catch (Exception ex) { + client + .unimplementedCall(EmptyProtos.Empty.newBuilder().build()) + .toCompletableFuture() + .handle( + (res, ex) -> { + if (!(ex instanceof StatusRuntimeException)) + fail( + "Expected [StatusRuntimeException] but got " + + (ex == null ? "null" : ex.getClass().toString())); + + final StatusRuntimeException e = (StatusRuntimeException) ex; + assertEquals(Status.UNIMPLEMENTED.getCode(), e.getStatus().getCode()); + + return null; + }) + .get(); + } catch (Exception ex) { throw new RuntimeException(ex); } } @@ -400,18 +493,23 @@ public void unimplementedMethod() { @Override public void unimplementedService() { try { - clientUnimplementedService.unimplementedCall(EmptyProtos.Empty.newBuilder().build()).toCompletableFuture() - .handle((res, ex) -> { - if (!(ex instanceof StatusRuntimeException)) - fail("Expected [StatusRuntimeException] but got " + (ex == null ? "null" : ex.getClass().toString())); - - final StatusRuntimeException e = (StatusRuntimeException) ex; - assertEquals(Status.UNIMPLEMENTED.getCode(), e.getStatus().getCode()); - - return null; - }).get(); - } - catch (Exception ex) { + clientUnimplementedService + .unimplementedCall(EmptyProtos.Empty.newBuilder().build()) + .toCompletableFuture() + .handle( + (res, ex) -> { + if (!(ex instanceof StatusRuntimeException)) + fail( + "Expected [StatusRuntimeException] but got " + + (ex == null ? "null" : ex.getClass().toString())); + + final StatusRuntimeException e = (StatusRuntimeException) ex; + assertEquals(Status.UNIMPLEMENTED.getCode(), e.getStatus().getCode()); + + return null; + }) + .get(); + } catch (Exception ex) { throw new RuntimeException(ex); } } diff --git a/plugin-tester-java/src/main/java/example/myapp/echo/EchoServiceImpl.java b/plugin-tester-java/src/main/java/example/myapp/echo/EchoServiceImpl.java index 4e82da2c..90f50df4 100644 --- a/plugin-tester-java/src/main/java/example/myapp/echo/EchoServiceImpl.java +++ b/plugin-tester-java/src/main/java/example/myapp/echo/EchoServiceImpl.java @@ -13,11 +13,10 @@ package example.myapp.echo; +import example.myapp.echo.grpc.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; -import example.myapp.echo.grpc.*; - public class EchoServiceImpl implements EchoService { @Override @@ -25,4 +24,3 @@ public CompletionStage echo(EchoMessage in) { return CompletableFuture.completedFuture(in); } } - diff --git a/plugin-tester-java/src/main/java/example/myapp/helloworld/AuthenticatedGreeterServer.java b/plugin-tester-java/src/main/java/example/myapp/helloworld/AuthenticatedGreeterServer.java index 11fbdbaa..352fea85 100644 --- a/plugin-tester-java/src/main/java/example/myapp/helloworld/AuthenticatedGreeterServer.java +++ b/plugin-tester-java/src/main/java/example/myapp/helloworld/AuthenticatedGreeterServer.java @@ -13,39 +13,39 @@ package example.myapp.helloworld; -import java.util.concurrent.CompletionStage; +import static org.apache.pekko.http.javadsl.server.Directives.*; -import org.apache.pekko.http.javadsl.model.StatusCodes; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; - +import example.myapp.helloworld.grpc.GreeterService; +import example.myapp.helloworld.grpc.GreeterServiceHandlerFactory; +import java.util.concurrent.CompletionStage; import org.apache.pekko.actor.ActorSystem; import org.apache.pekko.http.javadsl.Http; import org.apache.pekko.http.javadsl.ServerBinding; import org.apache.pekko.http.javadsl.model.HttpRequest; import org.apache.pekko.http.javadsl.model.HttpResponse; +import org.apache.pekko.http.javadsl.model.StatusCodes; import org.apache.pekko.http.javadsl.server.Route; import org.apache.pekko.japi.function.Function; -import org.apache.pekko.stream.SystemMaterializer; import org.apache.pekko.stream.Materializer; - -import example.myapp.helloworld.grpc.GreeterService; -import example.myapp.helloworld.grpc.GreeterServiceHandlerFactory; - -import static org.apache.pekko.http.javadsl.server.Directives.*; +import org.apache.pekko.stream.SystemMaterializer; class AuthenticatedGreeterServer { public static void main(String[] args) throws Exception { // important to enable HTTP/2 in ActorSystem's config - Config conf = ConfigFactory.parseString("pekko.http.server.preview.enable-http2 = on") + Config conf = + ConfigFactory.parseString("pekko.http.server.preview.enable-http2 = on") .withFallback(ConfigFactory.defaultApplication()); // ActorSystem Boot ActorSystem sys = ActorSystem.create("HelloWorld", conf); - run(sys).thenAccept(binding -> { - System.out.println("gRPC server bound to: " + binding.localAddress()); - }); + run(sys) + .thenAccept( + binding -> { + System.out.println("gRPC server bound to: " + binding.localAddress()); + }); // ActorSystem threads will keep the app alive until `system.terminate()` is called } @@ -53,45 +53,39 @@ public static void main(String[] args) throws Exception { public static CompletionStage run(ActorSystem sys) throws Exception { Materializer mat = SystemMaterializer.get(sys).materializer(); - //#http-route + // #http-route // A Route to authenticate with - Route authentication = path("login", () -> - get(() -> - complete("Psst, please use token XYZ!") - ) - ); - //#http-route + Route authentication = path("login", () -> get(() -> complete("Psst, please use token XYZ!"))); + // #http-route - //#grpc-route + // #grpc-route // Instantiate implementation GreeterService impl = new GreeterServiceImpl(mat); - Function> handler = GreeterServiceHandlerFactory.create(impl, sys); + Function> handler = + GreeterServiceHandlerFactory.create(impl, sys); // As a Route Route handlerRoute = handle(handler); - //#grpc-route + // #grpc-route - //#grpc-protected + // #grpc-protected // Protect the handler route Route protectedHandler = - headerValueByName("token", token -> { - if ("XYZ".equals(token)) { - return handlerRoute; - } else { - return complete(StatusCodes.UNAUTHORIZED); - } - }); - //#grpc-protected - - //#combined - Route finalRoute = concat( - authentication, - protectedHandler - ); - - return Http.get(sys) - .newServerAt("127.0.0.1", 8090) - .bind(finalRoute); - //#combined + headerValueByName( + "token", + token -> { + if ("XYZ".equals(token)) { + return handlerRoute; + } else { + return complete(StatusCodes.UNAUTHORIZED); + } + }); + // #grpc-protected + + // #combined + Route finalRoute = concat(authentication, protectedHandler); + + return Http.get(sys).newServerAt("127.0.0.1", 8090).bind(finalRoute); + // #combined } } diff --git a/plugin-tester-java/src/main/java/example/myapp/helloworld/GreeterClient.java b/plugin-tester-java/src/main/java/example/myapp/helloworld/GreeterClient.java index b82d3acd..dc865dd8 100644 --- a/plugin-tester-java/src/main/java/example/myapp/helloworld/GreeterClient.java +++ b/plugin-tester-java/src/main/java/example/myapp/helloworld/GreeterClient.java @@ -11,27 +11,24 @@ * Copyright (C) 2018-2021 Lightbend Inc. */ -//#full-client +// #full-client package example.myapp.helloworld; +import example.myapp.helloworld.grpc.*; +import io.grpc.StatusRuntimeException; +import java.time.Duration; import java.util.Arrays; import java.util.List; import java.util.concurrent.CompletionStage; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; -import java.time.Duration; - -import io.grpc.StatusRuntimeException; - import org.apache.pekko.Done; import org.apache.pekko.NotUsed; import org.apache.pekko.actor.ActorSystem; -import org.apache.pekko.stream.SystemMaterializer; +import org.apache.pekko.grpc.GrpcClientSettings; import org.apache.pekko.stream.Materializer; +import org.apache.pekko.stream.SystemMaterializer; import org.apache.pekko.stream.javadsl.Source; -import org.apache.pekko.grpc.GrpcClientSettings; - -import example.myapp.helloworld.grpc.*; class GreeterClient { public static void main(String[] args) throws Exception { @@ -43,7 +40,8 @@ public static void main(String[] args) throws Exception { Materializer materializer = SystemMaterializer.get(system).materializer(); // Configure the client by code: - GrpcClientSettings settings = GrpcClientSettings.connectToServiceAt("127.0.0.1", 8090, system).withTls(false); + GrpcClientSettings settings = + GrpcClientSettings.connectToServiceAt("127.0.0.1", 8090, system).withTls(false); // Or via application.conf: // GrpcClientSettings settings = GrpcClientSettings.fromConfig(GreeterService.name, system); @@ -57,16 +55,14 @@ public static void main(String[] args) throws Exception { streamingReply(client, materializer); streamingRequestReply(client, materializer); - } catch (StatusRuntimeException e) { System.out.println("Status: " + e.getStatus()); - } catch (Exception e) { + } catch (Exception e) { System.out.println(e); } finally { if (client != null) client.close(); system.terminate(); } - } private static void singleRequestReply(GreeterService client) throws Exception { @@ -76,41 +72,43 @@ private static void singleRequestReply(GreeterService client) throws Exception { } private static void streamingRequest(GreeterService client) throws Exception { - List requests = Arrays.asList("Alice", "Bob", "Peter") - .stream().map(name -> HelloRequest.newBuilder().setName(name).build()) - .collect(Collectors.toList()); + List requests = + Arrays.asList("Alice", "Bob", "Peter").stream() + .map(name -> HelloRequest.newBuilder().setName(name).build()) + .collect(Collectors.toList()); CompletionStage reply = client.itKeepsTalking(Source.from(requests)); - System.out.println("got single reply for streaming requests: " + - reply.toCompletableFuture().get(5, TimeUnit.SECONDS)); + System.out.println( + "got single reply for streaming requests: " + + reply.toCompletableFuture().get(5, TimeUnit.SECONDS)); } private static void streamingReply(GreeterService client, Materializer mat) throws Exception { HelloRequest request = HelloRequest.newBuilder().setName("Alice").build(); Source responseStream = client.itKeepsReplying(request); CompletionStage done = - responseStream.runForeach(reply -> - System.out.println("got streaming reply: " + reply.getMessage()), mat); + responseStream.runForeach( + reply -> System.out.println("got streaming reply: " + reply.getMessage()), mat); done.toCompletableFuture().get(60, TimeUnit.SECONDS); } - private static void streamingRequestReply(GreeterService client, Materializer mat) throws Exception { + private static void streamingRequestReply(GreeterService client, Materializer mat) + throws Exception { Duration interval = Duration.ofSeconds(1); - Source requestStream = Source - .tick(interval, interval, "tick") - .zipWithIndex() - .map(pair -> pair.second()) - .map(i -> HelloRequest.newBuilder().setName("Alice-" + i).build()) - .take(10) - .mapMaterializedValue(m -> NotUsed.getInstance()); + Source requestStream = + Source.tick(interval, interval, "tick") + .zipWithIndex() + .map(pair -> pair.second()) + .map(i -> HelloRequest.newBuilder().setName("Alice-" + i).build()) + .take(10) + .mapMaterializedValue(m -> NotUsed.getInstance()); Source responseStream = client.streamHellos(requestStream); CompletionStage done = - responseStream.runForeach(reply -> - System.out.println("got streaming reply: " + reply.getMessage()), mat); + responseStream.runForeach( + reply -> System.out.println("got streaming reply: " + reply.getMessage()), mat); done.toCompletableFuture().get(60, TimeUnit.SECONDS); } - } -//#full-client +// #full-client diff --git a/plugin-tester-java/src/main/java/example/myapp/helloworld/GreeterServer.java b/plugin-tester-java/src/main/java/example/myapp/helloworld/GreeterServer.java index bfe3bcfb..dffa16cd 100644 --- a/plugin-tester-java/src/main/java/example/myapp/helloworld/GreeterServer.java +++ b/plugin-tester-java/src/main/java/example/myapp/helloworld/GreeterServer.java @@ -11,34 +11,35 @@ * Copyright (C) 2018-2021 Lightbend Inc. */ -//#full-server +// #full-server package example.myapp.helloworld; -import org.apache.pekko.actor.ActorSystem; -import org.apache.pekko.http.javadsl.ConnectHttp; -import org.apache.pekko.http.javadsl.Http; -import org.apache.pekko.http.javadsl.ServerBinding; -import org.apache.pekko.stream.SystemMaterializer; -import org.apache.pekko.stream.Materializer; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import example.myapp.helloworld.grpc.GreeterService; import example.myapp.helloworld.grpc.GreeterServiceHandlerFactory; - import java.util.concurrent.CompletionStage; +import org.apache.pekko.actor.ActorSystem; +import org.apache.pekko.http.javadsl.Http; +import org.apache.pekko.http.javadsl.ServerBinding; +import org.apache.pekko.stream.Materializer; +import org.apache.pekko.stream.SystemMaterializer; class GreeterServer { public static void main(String[] args) throws Exception { // important to enable HTTP/2 in ActorSystem's config - Config conf = ConfigFactory.parseString("pekko.http.server.preview.enable-http2 = on") + Config conf = + ConfigFactory.parseString("pekko.http.server.preview.enable-http2 = on") .withFallback(ConfigFactory.defaultApplication()); // ActorSystem Boot ActorSystem sys = ActorSystem.create("HelloWorld", conf); - run(sys).thenAccept(binding -> { - System.out.println("gRPC server bound to: " + binding.localAddress()); - }); + run(sys) + .thenAccept( + binding -> { + System.out.println("gRPC server bound to: " + binding.localAddress()); + }); // ActorSystem threads will keep the app alive until `system.terminate()` is called } @@ -49,10 +50,9 @@ public static CompletionStage run(ActorSystem sys) throws Excepti // Instantiate implementation GreeterService impl = new GreeterServiceImpl(mat); - return Http - .get(sys) - .newServerAt("127.0.0.1", 8090) - .bind(GreeterServiceHandlerFactory.create(impl, sys)); + return Http.get(sys) + .newServerAt("127.0.0.1", 8090) + .bind(GreeterServiceHandlerFactory.create(impl, sys)); } } -//#full-server +// #full-server diff --git a/plugin-tester-java/src/main/java/example/myapp/helloworld/GreeterServiceImpl.java b/plugin-tester-java/src/main/java/example/myapp/helloworld/GreeterServiceImpl.java index 002c3fb0..b8fab657 100644 --- a/plugin-tester-java/src/main/java/example/myapp/helloworld/GreeterServiceImpl.java +++ b/plugin-tester-java/src/main/java/example/myapp/helloworld/GreeterServiceImpl.java @@ -11,36 +11,35 @@ * Copyright (C) 2018-2021 Lightbend Inc. */ -//#full-service-impl +// #full-service-impl package example.myapp.helloworld; +import com.google.protobuf.Timestamp; +import example.myapp.helloworld.grpc.*; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.stream.Collectors; - import org.apache.pekko.NotUsed; import org.apache.pekko.stream.Materializer; import org.apache.pekko.stream.javadsl.Sink; import org.apache.pekko.stream.javadsl.Source; -import com.google.protobuf.Timestamp; -import example.myapp.helloworld.grpc.*; - public class GreeterServiceImpl implements GreeterService { private final Materializer mat; public GreeterServiceImpl(Materializer mat) { - this.mat = mat; + this.mat = mat; } @Override public CompletionStage sayHello(HelloRequest in) { System.out.println("sayHello to " + in.getName()); - HelloReply reply = HelloReply.newBuilder() - .setMessage("Hello, " + in.getName()) - .setTimestamp(Timestamp.newBuilder().setSeconds(1234567890).setNanos(12345).build()) - .build(); + HelloReply reply = + HelloReply.newBuilder() + .setMessage("Hello, " + in.getName()) + .setTimestamp(Timestamp.newBuilder().setSeconds(1234567890).setNanos(12345).build()) + .build(); return CompletableFuture.completedFuture(reply); } @@ -48,28 +47,34 @@ public CompletionStage sayHello(HelloRequest in) { public CompletionStage itKeepsTalking(Source in) { System.out.println("sayHello to in stream..."); return in.runWith(Sink.seq(), mat) - .thenApply(elements -> { - String elementsStr = elements.stream().map(elem -> elem.getName()) - .collect(Collectors.toList()).toString(); - return HelloReply.newBuilder().setMessage("Hello, " + elementsStr).build(); - }); + .thenApply( + elements -> { + String elementsStr = + elements.stream() + .map(elem -> elem.getName()) + .collect(Collectors.toList()) + .toString(); + return HelloReply.newBuilder().setMessage("Hello, " + elementsStr).build(); + }); } @Override public Source itKeepsReplying(HelloRequest in) { System.out.println("sayHello to " + in.getName() + " with stream of chars"); - List characters = ("Hello, " + in.getName()) - .chars().mapToObj(c -> (char) c).collect(Collectors.toList()); + List characters = + ("Hello, " + in.getName()).chars().mapToObj(c -> (char) c).collect(Collectors.toList()); return Source.from(characters) - .map(character -> { - return HelloReply.newBuilder().setMessage(String.valueOf(character)).build(); - }); + .map( + character -> { + return HelloReply.newBuilder().setMessage(String.valueOf(character)).build(); + }); } @Override public Source streamHellos(Source in) { System.out.println("sayHello to stream..."); - return in.map(request -> HelloReply.newBuilder().setMessage("Hello, " + request.getName()).build()); + return in.map( + request -> HelloReply.newBuilder().setMessage("Hello, " + request.getName()).build()); } } -//#full-service-impl +// #full-service-impl diff --git a/plugin-tester-java/src/main/java/example/myapp/helloworld/GreeterServicePowerApiImpl.java b/plugin-tester-java/src/main/java/example/myapp/helloworld/GreeterServicePowerApiImpl.java index be6792e5..d358de63 100644 --- a/plugin-tester-java/src/main/java/example/myapp/helloworld/GreeterServicePowerApiImpl.java +++ b/plugin-tester-java/src/main/java/example/myapp/helloworld/GreeterServicePowerApiImpl.java @@ -11,28 +11,27 @@ * Copyright (C) 2018-2021 Lightbend Inc. */ -//#full-service-impl +// #full-service-impl package example.myapp.helloworld; -import org.apache.pekko.NotUsed; -import org.apache.pekko.grpc.javadsl.Metadata; -import org.apache.pekko.stream.Materializer; -import org.apache.pekko.stream.javadsl.Sink; -import org.apache.pekko.stream.javadsl.Source; import example.myapp.helloworld.grpc.GreeterServicePowerApi; import example.myapp.helloworld.grpc.HelloReply; import example.myapp.helloworld.grpc.HelloRequest; - import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.stream.Collectors; +import org.apache.pekko.NotUsed; +import org.apache.pekko.grpc.javadsl.Metadata; +import org.apache.pekko.stream.Materializer; +import org.apache.pekko.stream.javadsl.Sink; +import org.apache.pekko.stream.javadsl.Source; public class GreeterServicePowerApiImpl implements GreeterServicePowerApi { private final Materializer mat; public GreeterServicePowerApiImpl(Materializer mat) { - this.mat = mat; + this.mat = mat; } @Override @@ -44,32 +43,43 @@ public CompletionStage sayHello(HelloRequest in, Metadata metadata) } @Override - public CompletionStage itKeepsTalking(Source in, Metadata metadata) { + public CompletionStage itKeepsTalking( + Source in, Metadata metadata) { System.out.println("sayHello to in stream..."); return in.runWith(Sink.seq(), mat) - .thenApply(elements -> { - String elementsStr = elements.stream().map(elem -> authTaggedName(elem, metadata)) - .collect(Collectors.toList()).toString(); - return HelloReply.newBuilder().setMessage("Hello, " + elementsStr).build(); - }); + .thenApply( + elements -> { + String elementsStr = + elements.stream() + .map(elem -> authTaggedName(elem, metadata)) + .collect(Collectors.toList()) + .toString(); + return HelloReply.newBuilder().setMessage("Hello, " + elementsStr).build(); + }); } @Override public Source itKeepsReplying(HelloRequest in, Metadata metadata) { String greetee = authTaggedName(in, metadata); System.out.println("sayHello to " + greetee + " with stream of chars"); - List characters = ("Hello, " + greetee) - .chars().mapToObj(c -> (char) c).collect(Collectors.toList()); + List characters = + ("Hello, " + greetee).chars().mapToObj(c -> (char) c).collect(Collectors.toList()); return Source.from(characters) - .map(character -> { - return HelloReply.newBuilder().setMessage(String.valueOf(character)).build(); - }); + .map( + character -> { + return HelloReply.newBuilder().setMessage(String.valueOf(character)).build(); + }); } @Override - public Source streamHellos(Source in, Metadata metadata) { + public Source streamHellos( + Source in, Metadata metadata) { System.out.println("sayHello to stream..."); - return in.map(request -> HelloReply.newBuilder().setMessage("Hello, " + authTaggedName(request, metadata)).build()); + return in.map( + request -> + HelloReply.newBuilder() + .setMessage("Hello, " + authTaggedName(request, metadata)) + .build()); } // Bare-bones just for GRPC metadata demonstration purposes @@ -79,7 +89,8 @@ private boolean isAuthenticated(Metadata metadata) { private String authTaggedName(HelloRequest in, Metadata metadata) { boolean authenticated = isAuthenticated(metadata); - return String.format("%s (%sauthenticated)", in.getName(), isAuthenticated(metadata) ? "" : "not "); + return String.format( + "%s (%sauthenticated)", in.getName(), isAuthenticated(metadata) ? "" : "not "); } } -//#full-service-impl +// #full-service-impl diff --git a/plugin-tester-java/src/main/java/example/myapp/helloworld/LiftedGreeterClient.java b/plugin-tester-java/src/main/java/example/myapp/helloworld/LiftedGreeterClient.java index f599e0ba..2702d3a4 100644 --- a/plugin-tester-java/src/main/java/example/myapp/helloworld/LiftedGreeterClient.java +++ b/plugin-tester-java/src/main/java/example/myapp/helloworld/LiftedGreeterClient.java @@ -13,24 +13,21 @@ package example.myapp.helloworld; +import example.myapp.helloworld.grpc.*; +import io.grpc.StatusRuntimeException; +import java.time.Duration; import java.util.Arrays; import java.util.List; import java.util.concurrent.CompletionStage; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; -import java.time.Duration; - -import io.grpc.StatusRuntimeException; - import org.apache.pekko.Done; import org.apache.pekko.NotUsed; import org.apache.pekko.actor.ActorSystem; -import org.apache.pekko.stream.SystemMaterializer; +import org.apache.pekko.grpc.GrpcClientSettings; import org.apache.pekko.stream.Materializer; +import org.apache.pekko.stream.SystemMaterializer; import org.apache.pekko.stream.javadsl.Source; -import org.apache.pekko.grpc.GrpcClientSettings; - -import example.myapp.helloworld.grpc.*; class LiftedGreeterClient { public static void main(String[] args) throws Exception { @@ -51,69 +48,65 @@ public static void main(String[] args) throws Exception { streamingReply(client, materializer); streamingRequestReply(client, materializer); - } catch (StatusRuntimeException e) { System.out.println("Status: " + e.getStatus()); - } catch (Exception e) { + } catch (Exception e) { System.out.println(e); } finally { if (client != null) client.close(); system.terminate(); } - } - //#with-metadata + // #with-metadata private static void singleRequestReply(GreeterServiceClient client) throws Exception { HelloRequest request = HelloRequest.newBuilder().setName("Alice").build(); - CompletionStage reply = client.sayHello() - .addHeader("key", "value") - .invoke(request); + CompletionStage reply = client.sayHello().addHeader("key", "value").invoke(request); System.out.println("got single reply: " + reply.toCompletableFuture().get(5, TimeUnit.SECONDS)); } - //#with-metadata + // #with-metadata private static void streamingRequest(GreeterServiceClient client) throws Exception { - List requests = Arrays.asList("Alice", "Bob", "Peter") - .stream().map(name -> HelloRequest.newBuilder().setName(name).build()) - .collect(Collectors.toList()); - CompletionStage reply = client.itKeepsTalking() - .addHeader("key", "value") - .invoke(Source.from(requests)); - System.out.println("got single reply for streaming requests: " + - reply.toCompletableFuture().get(5, TimeUnit.SECONDS)); + List requests = + Arrays.asList("Alice", "Bob", "Peter").stream() + .map(name -> HelloRequest.newBuilder().setName(name).build()) + .collect(Collectors.toList()); + CompletionStage reply = + client.itKeepsTalking().addHeader("key", "value").invoke(Source.from(requests)); + System.out.println( + "got single reply for streaming requests: " + + reply.toCompletableFuture().get(5, TimeUnit.SECONDS)); } - private static void streamingReply(GreeterServiceClient client, Materializer mat) throws Exception { + private static void streamingReply(GreeterServiceClient client, Materializer mat) + throws Exception { HelloRequest request = HelloRequest.newBuilder().setName("Alice").build(); - Source responseStream = client.itKeepsReplying() - .addHeader("key", "value") - .invoke(request); + Source responseStream = + client.itKeepsReplying().addHeader("key", "value").invoke(request); CompletionStage done = - responseStream.runForeach(reply -> - System.out.println("got streaming reply: " + reply.getMessage()), mat); + responseStream.runForeach( + reply -> System.out.println("got streaming reply: " + reply.getMessage()), mat); done.toCompletableFuture().get(60, TimeUnit.SECONDS); } - private static void streamingRequestReply(GreeterServiceClient client, Materializer mat) throws Exception { + private static void streamingRequestReply(GreeterServiceClient client, Materializer mat) + throws Exception { Duration interval = Duration.ofSeconds(1); - Source requestStream = Source - .tick(interval, interval, "tick") - .zipWithIndex() - .map(pair -> pair.second()) - .map(i -> HelloRequest.newBuilder().setName("Alice-" + i).build()) - .take(10) - .mapMaterializedValue(m -> NotUsed.getInstance()); - - Source responseStream = client.streamHellos() - .addHeader("key", "value") - .invoke(requestStream); + Source requestStream = + Source.tick(interval, interval, "tick") + .zipWithIndex() + .map(pair -> pair.second()) + .map(i -> HelloRequest.newBuilder().setName("Alice-" + i).build()) + .take(10) + .mapMaterializedValue(m -> NotUsed.getInstance()); + + Source responseStream = + client.streamHellos().addHeader("key", "value").invoke(requestStream); CompletionStage done = - responseStream.runForeach(reply -> - System.out.println("got streaming reply: " + reply.getMessage()), mat); + responseStream.runForeach( + reply -> System.out.println("got streaming reply: " + reply.getMessage()), mat); done.toCompletableFuture().get(60, TimeUnit.SECONDS); } - } diff --git a/plugin-tester-java/src/main/java/example/myapp/helloworld/LoggingErrorHandlingGreeterServer.java b/plugin-tester-java/src/main/java/example/myapp/helloworld/LoggingErrorHandlingGreeterServer.java index 5f975f07..8ccf02de 100644 --- a/plugin-tester-java/src/main/java/example/myapp/helloworld/LoggingErrorHandlingGreeterServer.java +++ b/plugin-tester-java/src/main/java/example/myapp/helloworld/LoggingErrorHandlingGreeterServer.java @@ -13,6 +13,18 @@ package example.myapp.helloworld; +import static org.apache.pekko.http.javadsl.server.Directives.*; + +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import example.myapp.helloworld.grpc.GreeterService; +import example.myapp.helloworld.grpc.GreeterServiceHandlerFactory; +import example.myapp.helloworld.grpc.HelloReply; +import example.myapp.helloworld.grpc.HelloRequest; +import io.grpc.Status; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.function.BiFunction; import org.apache.pekko.actor.ActorSystem; import org.apache.pekko.event.Logging; import org.apache.pekko.grpc.Trailers; @@ -25,41 +37,32 @@ import org.apache.pekko.japi.Function; import org.apache.pekko.stream.Materializer; import org.apache.pekko.stream.SystemMaterializer; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; -import example.myapp.helloworld.grpc.GreeterService; -import example.myapp.helloworld.grpc.GreeterServiceHandlerFactory; -import example.myapp.helloworld.grpc.HelloReply; -import example.myapp.helloworld.grpc.HelloRequest; -import io.grpc.Status; - -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionStage; -import java.util.function.BiFunction; - -import static org.apache.pekko.http.javadsl.server.Directives.*; public class LoggingErrorHandlingGreeterServer { public static void main(String[] args) throws Exception { // important to enable HTTP/2 in ActorSystem's config - Config conf = ConfigFactory.parseString("pekko.http.server.preview.enable-http2 = on") - .withFallback(ConfigFactory.defaultApplication()); + Config conf = + ConfigFactory.parseString("pekko.http.server.preview.enable-http2 = on") + .withFallback(ConfigFactory.defaultApplication()); // ActorSystem Boot ActorSystem sys = ActorSystem.create("HelloWorld", conf); - run(sys).thenAccept(binding -> { - System.out.println("gRPC server bound to: " + binding.localAddress()); - }); + run(sys) + .thenAccept( + binding -> { + System.out.println("gRPC server bound to: " + binding.localAddress()); + }); // ActorSystem threads will keep the app alive until `system.terminate()` is called } - //#implementation + // #implementation private static class Impl extends GreeterServiceImpl { public Impl(Materializer mat) { super(mat); } + @Override public CompletionStage sayHello(HelloRequest in) { if (Character.isLowerCase(in.getName().charAt(0))) { @@ -67,67 +70,84 @@ public CompletionStage sayHello(HelloRequest in) { reply.completeExceptionally(new IllegalArgumentException("Name must be capitalized")); return reply; } else { - HelloReply reply = HelloReply.newBuilder() - .setMessage("Hello, " + in.getName()) - .build(); + HelloReply reply = HelloReply.newBuilder().setMessage("Hello, " + in.getName()).build(); return CompletableFuture.completedFuture(reply); } } } - //#implementation + // #implementation - //#method + // #method private static Route loggingErrorHandlingGrpcRoute( - Function buildImpl, - Function> errorHandler, - BiFunction>, org.apache.pekko.japi.function.Function>> buildHandler - ) { - return logRequest("loggingErrorHandlingGrpcRoute", Logging.InfoLevel(), () -> extractRequestContext(ctx -> { - Function> loggingErrorHandler = (actorSystem) -> (throwable) -> { - Function function = errorHandler.apply(actorSystem); - Trailers trailers = function.apply(throwable); - if (trailers != null) { - ctx.getLog().error(throwable, "Grpc failure handled and mapped to " + trailers); - return trailers; + Function buildImpl, + Function> errorHandler, + BiFunction< + ServiceImpl, + Function>, + org.apache.pekko.japi.function.Function>> + buildHandler) { + return logRequest( + "loggingErrorHandlingGrpcRoute", + Logging.InfoLevel(), + () -> + extractRequestContext( + ctx -> { + Function> loggingErrorHandler = + (actorSystem) -> + (throwable) -> { + Function function = + errorHandler.apply(actorSystem); + Trailers trailers = function.apply(throwable); + if (trailers != null) { + ctx.getLog() + .error( + throwable, "Grpc failure handled and mapped to " + trailers); + return trailers; + } else { + Trailers internal = new Trailers(Status.INTERNAL); + ctx.getLog() + .error( + throwable, + "Grpc failure UNHANDLED and mapped to " + internal); + return internal; + } + }; + try { + ServiceImpl impl = buildImpl.apply(ctx); + org.apache.pekko.japi.function.Function< + HttpRequest, CompletionStage> + handler = buildHandler.apply(impl, loggingErrorHandler); + return handle(handler); + } catch (Exception e) { + return failWith(e); + } + })); + } + // #method + + // #custom-error-mapping + private static final Function customErrorMapping = + (throwable) -> { + if (throwable instanceof IllegalArgumentException) { + return new Trailers(Status.INVALID_ARGUMENT); } else { - Trailers internal = new Trailers(Status.INTERNAL); - ctx.getLog().error(throwable, "Grpc failure UNHANDLED and mapped to " + internal); - return internal; + return null; } }; - try { - ServiceImpl impl = buildImpl.apply(ctx); - org.apache.pekko.japi.function.Function> handler = buildHandler.apply(impl, loggingErrorHandler); - return handle(handler); - } catch (Exception e) { - return failWith(e); - } - })); - } - //#method - - //#custom-error-mapping - private final static Function customErrorMapping = (throwable) -> { - if (throwable instanceof IllegalArgumentException) { - return new Trailers(Status.INVALID_ARGUMENT); - } else { - return null; - } - }; - //#custom-error-mapping + // #custom-error-mapping public static CompletionStage run(ActorSystem sys) throws Exception { Materializer mat = SystemMaterializer.get(sys).materializer(); - //#combined - Route route = loggingErrorHandlingGrpcRoute( - (rc) -> new Impl(rc.getMaterializer()), - (actorSystem) -> customErrorMapping, - (impl, eHandler) -> GreeterServiceHandlerFactory.partial(impl, GreeterService.name, mat, eHandler, sys) - ); - return Http.get(sys) - .newServerAt("127.0.0.1", 8082) - .bind(route); - //#combined + // #combined + Route route = + loggingErrorHandlingGrpcRoute( + (rc) -> new Impl(rc.getMaterializer()), + (actorSystem) -> customErrorMapping, + (impl, eHandler) -> + GreeterServiceHandlerFactory.partial( + impl, GreeterService.name, mat, eHandler, sys)); + return Http.get(sys).newServerAt("127.0.0.1", 8082).bind(route); + // #combined } } diff --git a/plugin-tester-java/src/main/java/example/myapp/helloworld/PowerGreeterServer.java b/plugin-tester-java/src/main/java/example/myapp/helloworld/PowerGreeterServer.java index ebd8c51b..a588f5fe 100644 --- a/plugin-tester-java/src/main/java/example/myapp/helloworld/PowerGreeterServer.java +++ b/plugin-tester-java/src/main/java/example/myapp/helloworld/PowerGreeterServer.java @@ -11,48 +11,48 @@ * Copyright (C) 2018-2021 Lightbend Inc. */ -//#full-server +// #full-server package example.myapp.helloworld; -import org.apache.pekko.actor.ActorSystem; -import org.apache.pekko.http.javadsl.ConnectHttp; -import org.apache.pekko.http.javadsl.Http; -import org.apache.pekko.http.javadsl.ServerBinding; -import org.apache.pekko.stream.SystemMaterializer; -import org.apache.pekko.stream.Materializer; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import example.myapp.helloworld.grpc.GreeterServicePowerApi; import example.myapp.helloworld.grpc.GreeterServicePowerApiHandlerFactory; - import java.util.concurrent.CompletionStage; +import org.apache.pekko.actor.ActorSystem; +import org.apache.pekko.http.javadsl.Http; +import org.apache.pekko.http.javadsl.ServerBinding; +import org.apache.pekko.stream.Materializer; +import org.apache.pekko.stream.SystemMaterializer; class PowerGreeterServer { public static void main(String[] args) throws Exception { - // important to enable HTTP/2 in ActorSystem's config - Config conf = ConfigFactory.parseString("pekko.http.server.preview.enable-http2 = on") - .withFallback(ConfigFactory.defaultApplication()); + // important to enable HTTP/2 in ActorSystem's config + Config conf = + ConfigFactory.parseString("pekko.http.server.preview.enable-http2 = on") + .withFallback(ConfigFactory.defaultApplication()); - // ActorSystem Boot - ActorSystem sys = ActorSystem.create("HelloWorld", conf); + // ActorSystem Boot + ActorSystem sys = ActorSystem.create("HelloWorld", conf); - run(sys).thenAccept(binding -> { - System.out.println("gRPC server bound to: " + binding.localAddress()); - }); + run(sys) + .thenAccept( + binding -> { + System.out.println("gRPC server bound to: " + binding.localAddress()); + }); // ActorSystem threads will keep the app alive until `system.terminate()` is called } public static CompletionStage run(ActorSystem sys) throws Exception { - Materializer mat = SystemMaterializer.get(sys).materializer(); + Materializer mat = SystemMaterializer.get(sys).materializer(); - // Instantiate implementation - GreeterServicePowerApi impl = new GreeterServicePowerApiImpl(mat); + // Instantiate implementation + GreeterServicePowerApi impl = new GreeterServicePowerApiImpl(mat); - return Http - .get(sys) + return Http.get(sys) .newServerAt("127.0.0.1", 8091) - .bind(GreeterServicePowerApiHandlerFactory.create(impl,sys)); + .bind(GreeterServicePowerApiHandlerFactory.create(impl, sys)); } } -//#full-server +// #full-server diff --git a/plugin-tester-java/src/main/java/example/myapp/statefulhelloworld/GreeterActor.java b/plugin-tester-java/src/main/java/example/myapp/statefulhelloworld/GreeterActor.java index 11274b29..0121b610 100644 --- a/plugin-tester-java/src/main/java/example/myapp/statefulhelloworld/GreeterActor.java +++ b/plugin-tester-java/src/main/java/example/myapp/statefulhelloworld/GreeterActor.java @@ -21,15 +21,19 @@ public class GreeterActor extends AbstractActor { public static class ChangeGreeting { public final String newGreeting; + public ChangeGreeting(String newGreeting) { this.newGreeting = newGreeting; } } + public static class GetGreeting {} + public static GetGreeting GET_GREETING = new GetGreeting(); public static class Greeting { public final String greeting; + public Greeting(String greeting) { this.greeting = greeting; } diff --git a/plugin-tester-java/src/main/java/example/myapp/statefulhelloworld/GreeterServiceImpl.java b/plugin-tester-java/src/main/java/example/myapp/statefulhelloworld/GreeterServiceImpl.java index c941910d..ed8568f8 100644 --- a/plugin-tester-java/src/main/java/example/myapp/statefulhelloworld/GreeterServiceImpl.java +++ b/plugin-tester-java/src/main/java/example/myapp/statefulhelloworld/GreeterServiceImpl.java @@ -13,15 +13,14 @@ package example.myapp.statefulhelloworld; -import example.myapp.statefulhelloworld.grpc.*; - -import org.apache.pekko.actor.ActorSystem; -import org.apache.pekko.actor.ActorRef; import static org.apache.pekko.pattern.Patterns.ask; +import example.myapp.statefulhelloworld.grpc.*; import java.time.Duration; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; +import org.apache.pekko.actor.ActorRef; +import org.apache.pekko.actor.ActorSystem; // #stateful-service public final class GreeterServiceImpl implements GreeterService { @@ -36,17 +35,16 @@ public GreeterServiceImpl(ActorSystem system) { public CompletionStage sayHello(HelloRequest in) { return ask(greeterActor, GreeterActor.GET_GREETING, Duration.ofSeconds(5)) - .thenApply(message -> - HelloReply.newBuilder() - .setMessage(((GreeterActor.Greeting) message).greeting) - .build() - ); + .thenApply( + message -> + HelloReply.newBuilder() + .setMessage(((GreeterActor.Greeting) message).greeting) + .build()); } public CompletionStage changeGreeting(ChangeRequest in) { greeterActor.tell(new GreeterActor.ChangeGreeting(in.getNewGreeting()), ActorRef.noSender()); return CompletableFuture.completedFuture(ChangeResponse.newBuilder().build()); } - } // #stateful-service diff --git a/project/plugins.sbt b/project/plugins.sbt index 2d45d425..bbae590e 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -14,6 +14,7 @@ val sbtProtocV = "1.0.7" buildInfoKeys := Seq[BuildInfoKey]("sbtProtocVersion" -> sbtProtocV) addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.5.2") +addSbtPlugin("com.lightbend.sbt" % "sbt-java-formatter" % "0.8.0") addSbtPlugin("de.heikoseeberger" % "sbt-header" % "5.10.0") addSbtPlugin("com.thesamet" % "sbt-protoc" % sbtProtocV) addSbtPlugin("org.playframework.twirl" % "sbt-twirl" % "2.0.3") diff --git a/runtime/src/main/java/org/apache/pekko/grpc/PekkoGrpcGenerated.java b/runtime/src/main/java/org/apache/pekko/grpc/PekkoGrpcGenerated.java index da1a272b..f175f191 100644 --- a/runtime/src/main/java/org/apache/pekko/grpc/PekkoGrpcGenerated.java +++ b/runtime/src/main/java/org/apache/pekko/grpc/PekkoGrpcGenerated.java @@ -17,4 +17,4 @@ import java.lang.annotation.RetentionPolicy; @Retention(RetentionPolicy.CLASS) -public @interface PekkoGrpcGenerated { } +public @interface PekkoGrpcGenerated {} diff --git a/runtime/src/test/java/jdocs/org/apache/pekko/grpc/client/GrpcClientSettingsCompileOnly.java b/runtime/src/test/java/jdocs/org/apache/pekko/grpc/client/GrpcClientSettingsCompileOnly.java index 376dcd21..40805740 100644 --- a/runtime/src/test/java/jdocs/org/apache/pekko/grpc/client/GrpcClientSettingsCompileOnly.java +++ b/runtime/src/test/java/jdocs/org/apache/pekko/grpc/client/GrpcClientSettingsCompileOnly.java @@ -13,43 +13,38 @@ package jdocs.org.apache.pekko.grpc.client; +import java.time.Duration; import org.apache.pekko.actor.ActorSystem; import org.apache.pekko.discovery.Discovery; import org.apache.pekko.discovery.ServiceDiscovery; import org.apache.pekko.grpc.GrpcClientSettings; -import scala.Some; - -import java.time.Duration; public class GrpcClientSettingsCompileOnly { - public static void sd() { - - ActorSystem actorSystem = ActorSystem.create(); - //#simple - GrpcClientSettings.connectToServiceAt("localhost", 443, actorSystem); - //#simple - - //#simple-programmatic - GrpcClientSettings.connectToServiceAt("localhost", 443, actorSystem) - .withDeadline(Duration.ofSeconds(1)) - .withTls(false); - //#simple-programmatic - - ServiceDiscovery serviceDiscovery = Discovery.get(actorSystem).discovery(); - - //#provide-sd - // An ActorSystem's default service discovery mechanism - GrpcClientSettings - .usingServiceDiscovery("my-service", actorSystem) - .withServicePortName("https"); // (optional) refine the lookup operation to only https ports - //#provide-sd - - //#sd-settings - // an ActorSystem is required for service discovery - GrpcClientSettings.fromConfig( - "project.WithConfigServiceDiscovery", actorSystem - ); - //#sd-settings - - } + public static void sd() { + + ActorSystem actorSystem = ActorSystem.create(); + // #simple + GrpcClientSettings.connectToServiceAt("localhost", 443, actorSystem); + // #simple + + // #simple-programmatic + GrpcClientSettings.connectToServiceAt("localhost", 443, actorSystem) + .withDeadline(Duration.ofSeconds(1)) + .withTls(false); + // #simple-programmatic + + ServiceDiscovery serviceDiscovery = Discovery.get(actorSystem).discovery(); + + // #provide-sd + // An ActorSystem's default service discovery mechanism + GrpcClientSettings.usingServiceDiscovery("my-service", actorSystem) + .withServicePortName("https"); // (optional) refine the lookup operation to only https ports + // #provide-sd + + // #sd-settings + // an ActorSystem is required for service discovery + GrpcClientSettings.fromConfig("project.WithConfigServiceDiscovery", actorSystem); + // #sd-settings + + } } diff --git a/runtime/src/test/java/org/apache/pekko/grpc/javadsl/ConcatOrNotFoundTest.java b/runtime/src/test/java/org/apache/pekko/grpc/javadsl/ConcatOrNotFoundTest.java index 6f24a834..e3598ba2 100644 --- a/runtime/src/test/java/org/apache/pekko/grpc/javadsl/ConcatOrNotFoundTest.java +++ b/runtime/src/test/java/org/apache/pekko/grpc/javadsl/ConcatOrNotFoundTest.java @@ -13,37 +13,37 @@ package org.apache.pekko.grpc.javadsl; +import static org.junit.Assert.assertEquals; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.TimeUnit; import org.apache.pekko.grpc.internal.GrpcProtocolNative; import org.apache.pekko.http.javadsl.model.*; import org.apache.pekko.util.ByteString; import org.junit.Test; import org.scalatestplus.junit.JUnitSuite; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionStage; -import java.util.concurrent.TimeUnit; - -import static org.junit.Assert.assertEquals; - public class ConcatOrNotFoundTest extends JUnitSuite { private final HttpRequest emptyGrpcRequest = - HttpRequest.create().withEntity(GrpcProtocolNative.contentType(), ByteString.emptyByteString()); + HttpRequest.create() + .withEntity(GrpcProtocolNative.contentType(), ByteString.emptyByteString()); - private final CompletionStage notFound = CompletableFuture.completedFuture( - HttpResponse.create().withStatus(StatusCodes.NOT_FOUND)); + private final CompletionStage notFound = + CompletableFuture.completedFuture(HttpResponse.create().withStatus(StatusCodes.NOT_FOUND)); private CompletionStage response(int code) { - return CompletableFuture.completedFuture(HttpResponse.create().withStatus(StatusCodes.custom( - code, "reason-" + code, "message-" + code))); + return CompletableFuture.completedFuture( + HttpResponse.create() + .withStatus(StatusCodes.custom(code, "reason-" + code, "message-" + code))); } @Test @SuppressWarnings("unchecked") // unchecked generic array creation public void testSingleMatching() throws Exception { - CompletionStage response = ServiceHandler.concatOrNotFound( - req -> response(231) - ).apply(emptyGrpcRequest); + CompletionStage response = + ServiceHandler.concatOrNotFound(req -> response(231)).apply(emptyGrpcRequest); assertEquals(231, response.toCompletableFuture().get(3, TimeUnit.SECONDS).status().intValue()); } @@ -51,10 +51,9 @@ public void testSingleMatching() throws Exception { @Test @SuppressWarnings("unchecked") public void testFirstMatching() throws Exception { - CompletionStage response = ServiceHandler.concatOrNotFound( - req -> response(231), - req -> notFound - ).apply(emptyGrpcRequest); + CompletionStage response = + ServiceHandler.concatOrNotFound(req -> response(231), req -> notFound) + .apply(emptyGrpcRequest); assertEquals(231, response.toCompletableFuture().get(3, TimeUnit.SECONDS).status().intValue()); } @@ -62,11 +61,9 @@ public void testFirstMatching() throws Exception { @Test @SuppressWarnings("unchecked") public void testMiddleMatching() throws Exception { - CompletionStage response = ServiceHandler.concatOrNotFound( - req -> notFound, - req -> response(232), - req -> notFound - ).apply(emptyGrpcRequest); + CompletionStage response = + ServiceHandler.concatOrNotFound(req -> notFound, req -> response(232), req -> notFound) + .apply(emptyGrpcRequest); assertEquals(232, response.toCompletableFuture().get(3, TimeUnit.SECONDS).status().intValue()); } @@ -74,11 +71,9 @@ public void testMiddleMatching() throws Exception { @Test @SuppressWarnings("unchecked") public void testLastMatching() throws Exception { - CompletionStage response = ServiceHandler.concatOrNotFound( - req -> notFound, - req -> notFound, - req -> response(233) - ).apply(emptyGrpcRequest); + CompletionStage response = + ServiceHandler.concatOrNotFound(req -> notFound, req -> notFound, req -> response(233)) + .apply(emptyGrpcRequest); assertEquals(233, response.toCompletableFuture().get(3, TimeUnit.SECONDS).status().intValue()); } @@ -86,11 +81,9 @@ public void testLastMatching() throws Exception { @Test @SuppressWarnings("unchecked") public void testNoneMatching() throws Exception { - CompletionStage response = ServiceHandler.concatOrNotFound( - req -> notFound, - req -> notFound, - req -> notFound - ).apply(emptyGrpcRequest); + CompletionStage response = + ServiceHandler.concatOrNotFound(req -> notFound, req -> notFound, req -> notFound) + .apply(emptyGrpcRequest); assertEquals(404, response.toCompletableFuture().get(3, TimeUnit.SECONDS).status().intValue()); } @@ -98,7 +91,8 @@ public void testNoneMatching() throws Exception { @Test @SuppressWarnings("unchecked") public void testEmpty() throws Exception { - CompletionStage response = ServiceHandler.concatOrNotFound().apply(emptyGrpcRequest); + CompletionStage response = + ServiceHandler.concatOrNotFound().apply(emptyGrpcRequest); assertEquals(404, response.toCompletableFuture().get(3, TimeUnit.SECONDS).status().intValue()); } @@ -109,11 +103,9 @@ public void testCompletedLater() throws Exception { CompletableFuture laterNotFound = new CompletableFuture<>(); CompletableFuture laterResponse = new CompletableFuture<>(); - CompletionStage response = ServiceHandler.concatOrNotFound( - req -> laterNotFound, - req -> laterResponse, - req -> notFound - ).apply(emptyGrpcRequest); + CompletionStage response = + ServiceHandler.concatOrNotFound(req -> laterNotFound, req -> laterResponse, req -> notFound) + .apply(emptyGrpcRequest); Thread.sleep(100); laterNotFound.complete(notFound.toCompletableFuture().get()); @@ -123,5 +115,4 @@ public void testCompletedLater() throws Exception { assertEquals(232, response.toCompletableFuture().get(3, TimeUnit.SECONDS).status().intValue()); } - }