diff --git a/servicetalk-examples/grpc/protoc-options/build.gradle b/servicetalk-examples/grpc/protoc-options/build.gradle index 5262e9e320..21b6d35c42 100644 --- a/servicetalk-examples/grpc/protoc-options/build.gradle +++ b/servicetalk-examples/grpc/protoc-options/build.gradle @@ -76,6 +76,9 @@ protobuf { option 'typeNameSuffix=St' // Option to tell the compiler to exclude all Deprecated fields, types, and methods from the output option 'skipDeprecated=true' + // Option to generate default throwing service definitions on the service interfaces. This will allow + // teams to evolve their codebases and not break dependent libraries. + option 'defaultServiceMethods=true' } } } diff --git a/servicetalk-examples/grpc/protoc-options/src/main/java/io/servicetalk/examples/grpc/protocoptions/BlockingProtocOptionsServer.java b/servicetalk-examples/grpc/protoc-options/src/main/java/io/servicetalk/examples/grpc/protocoptions/BlockingProtocOptionsServer.java index b0dbe12dec..7dd0754e43 100644 --- a/servicetalk-examples/grpc/protoc-options/src/main/java/io/servicetalk/examples/grpc/protocoptions/BlockingProtocOptionsServer.java +++ b/servicetalk-examples/grpc/protoc-options/src/main/java/io/servicetalk/examples/grpc/protocoptions/BlockingProtocOptionsServer.java @@ -15,16 +15,22 @@ */ package io.servicetalk.examples.grpc.protocoptions; +import io.servicetalk.grpc.api.GrpcServiceContext; import io.servicetalk.grpc.netty.GrpcServers; import io.grpc.examples.helloworld.GreeterSt.BlockingGreeterService; import io.grpc.examples.helloworld.HelloReply; +import io.grpc.examples.helloworld.HelloRequest; public final class BlockingProtocOptionsServer { public static void main(String[] args) throws Exception { GrpcServers.forPort(8080) - .listenAndAwait((BlockingGreeterService) (ctx, request) -> - HelloReply.newBuilder().setMessage("Hello " + request.getName()).build()) + .listenAndAwait(new BlockingGreeterService() { + @Override + public HelloReply sayHello(GrpcServiceContext ctx, HelloRequest request) { + return HelloReply.newBuilder().setMessage("Hello " + request.getName()).build(); + } + }) .awaitShutdown(); } } diff --git a/servicetalk-grpc-netty/build.gradle b/servicetalk-grpc-netty/build.gradle index e763a14b93..f8b2bd8c79 100644 --- a/servicetalk-grpc-netty/build.gradle +++ b/servicetalk-grpc-netty/build.gradle @@ -110,6 +110,8 @@ protobuf { grpc {} servicetalk_grpc { outputSubDir = "java" + // this will eventually become the default behavior + option "defaultServiceMethods=true" } } } diff --git a/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/GrpcClientResolvesOnNewConnectionTest.java b/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/GrpcClientResolvesOnNewConnectionTest.java index 4b7766f008..c58a2b11c8 100644 --- a/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/GrpcClientResolvesOnNewConnectionTest.java +++ b/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/GrpcClientResolvesOnNewConnectionTest.java @@ -19,7 +19,9 @@ import io.servicetalk.client.api.ServiceDiscoverer; import io.servicetalk.client.api.ServiceDiscovererEvent; import io.servicetalk.concurrent.api.Publisher; +import io.servicetalk.concurrent.api.Single; import io.servicetalk.grpc.api.GrpcServerContext; +import io.servicetalk.grpc.api.GrpcServiceContext; import io.servicetalk.transport.api.HostAndPort; import io.grpc.examples.helloworld.Greeter.BlockingGreeterClient; @@ -52,8 +54,13 @@ void forAddress() throws Exception { String greetingPrefix = "Hello "; String name = "foo"; try (GrpcServerContext serverContext = GrpcServers.forAddress(localAddress(0)) - .listenAndAwait((GreeterService) (ctx, request) -> - succeeded(HelloReply.newBuilder().setMessage(greetingPrefix + request.getName()).build())); + .listenAndAwait(new GreeterService() { + @Override + public Single sayHello(GrpcServiceContext ctx, HelloRequest request) { + return succeeded(HelloReply.newBuilder() + .setMessage(greetingPrefix + request.getName()).build()); + } + }); // Use "localhost" to demonstrate that the address will be resolved. BlockingGreeterClient client = GrpcClients.forAddress("localhost", serverHostAndPort(serverContext).port(), ON_NEW_CONNECTION) @@ -69,8 +76,13 @@ void withCustomSd() throws Exception { String greetingPrefix = "Hello "; String name = "foo"; try (GrpcServerContext serverContext = GrpcServers.forAddress(localAddress(0)) - .listenAndAwait((GreeterService) (ctx, request) -> - succeeded(HelloReply.newBuilder().setMessage(greetingPrefix + request.getName()).build()))) { + .listenAndAwait(new GreeterService() { + @Override + public Single sayHello(GrpcServiceContext ctx, HelloRequest request) { + return succeeded(HelloReply.newBuilder() + .setMessage(greetingPrefix + request.getName()).build()); + } + })) { // Use "localhost" to demonstrate that the address will be resolved. HostAndPort hostAndPort = HostAndPort.of("localhost", serverHostAndPort(serverContext).port()); @SuppressWarnings("unchecked") diff --git a/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/GrpcOverH1Test.java b/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/GrpcOverH1Test.java index 64958fd28f..b5989f3574 100644 --- a/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/GrpcOverH1Test.java +++ b/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/GrpcOverH1Test.java @@ -15,6 +15,8 @@ */ package io.servicetalk.grpc.netty; +import io.servicetalk.concurrent.api.Single; +import io.servicetalk.grpc.api.GrpcServiceContext; import io.servicetalk.http.api.HttpProtocolConfig; import io.servicetalk.test.resources.DefaultTestCerts; import io.servicetalk.transport.api.ClientSslConfigBuilder; @@ -69,8 +71,13 @@ void tlsNegotiated(ProtocolTestMode testMode) throws Exception { .sslConfig(new ServerSslConfigBuilder(DefaultTestCerts::loadServerPem, DefaultTestCerts::loadServerKey).build()) .protocols(testMode.serverConfigs)) - .listenAndAwait((Greeter.GreeterService) (ctx, request) -> - succeeded(HelloReply.newBuilder().setMessage(greetingPrefix + request.getName()).build())); + .listenAndAwait(new Greeter.GreeterService() { + @Override + public Single sayHello(GrpcServiceContext ctx, HelloRequest request) { + return succeeded(HelloReply.newBuilder() + .setMessage(greetingPrefix + request.getName()).build()); + } + }); BlockingGreeterClient client = forResolvedAddress(serverContext.listenAddress()) .initializeHttp(builder -> builder .sslConfig(new ClientSslConfigBuilder(DefaultTestCerts::loadServerCAPem) diff --git a/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/GrpcProxyTunnelTest.java b/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/GrpcProxyTunnelTest.java index e5c282bc64..ec84b2064f 100644 --- a/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/GrpcProxyTunnelTest.java +++ b/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/GrpcProxyTunnelTest.java @@ -19,6 +19,7 @@ import io.servicetalk.context.api.ContextMap.Key; import io.servicetalk.grpc.api.DefaultGrpcClientMetadata; import io.servicetalk.grpc.api.GrpcClientMetadata; +import io.servicetalk.grpc.api.GrpcServiceContext; import io.servicetalk.grpc.api.GrpcStatusCode; import io.servicetalk.grpc.api.GrpcStatusException; import io.servicetalk.http.api.HttpResponseStatus; @@ -87,8 +88,12 @@ class GrpcProxyTunnelTest { .initializeHttp(httpBuilder -> httpBuilder .sslConfig(new ServerSslConfigBuilder(DefaultTestCerts::loadServerPem, DefaultTestCerts::loadServerKey).build())) - .listenAndAwait((Greeter.BlockingGreeterService) (ctx, request) -> - HelloReply.newBuilder().setMessage(GREETING_PREFIX + request.getName()).build()); + .listenAndAwait(new Greeter.BlockingGreeterService() { + @Override + public HelloReply sayHello(GrpcServiceContext ctx, HelloRequest request) { + return HelloReply.newBuilder().setMessage(GREETING_PREFIX + request.getName()).build(); + } + }); client = GrpcClients.forAddress(serverHostAndPort(serverContext)) .initializeHttp(httpBuilder -> httpBuilder.proxyConfig(forAddress(proxyAddress)) .sslConfig(new ClientSslConfigBuilder(DefaultTestCerts::loadServerCAPem) diff --git a/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/GrpcServiceContextProtocolTest.java b/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/GrpcServiceContextProtocolTest.java index ad03ef70a6..f435acedb2 100644 --- a/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/GrpcServiceContextProtocolTest.java +++ b/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/GrpcServiceContextProtocolTest.java @@ -103,21 +103,21 @@ void tearDown() throws Exception { } } - @ParameterizedTest(name = "httpVersion={0) streamingService={0)") + @ParameterizedTest(name = "httpVersion={0} streamingService={1}") @MethodSource("params") void testAggregated(HttpProtocolVersion httpProtocol, boolean streamingService) throws Exception { setUp(httpProtocol, streamingService); assertResponse(client.test(newRequest())); } - @ParameterizedTest(name = "httpVersion={0) streamingService={0)") + @ParameterizedTest(name = "httpVersion={0} streamingService={1}") @MethodSource("params") void testRequestStream(HttpProtocolVersion httpProtocol, boolean streamingService) throws Exception { setUp(httpProtocol, streamingService); assertResponse(client.testRequestStream(Arrays.asList(newRequest(), newRequest()))); } - @ParameterizedTest(name = "httpVersion={0) streamingService={0)") + @ParameterizedTest(name = "httpVersion={0} streamingService={1}") @MethodSource("params") void testBiDiStream(HttpProtocolVersion httpProtocol, boolean streamingService) throws Exception { setUp(httpProtocol, streamingService); @@ -127,7 +127,7 @@ void testBiDiStream(HttpProtocolVersion httpProtocol, boolean streamingService) } } - @ParameterizedTest(name = "httpVersion={0) streamingService={0)") + @ParameterizedTest(name = "httpVersion={0} streamingService={1}") @MethodSource("params") void testResponseStream(HttpProtocolVersion httpProtocol, boolean streamingService) throws Exception { setUp(httpProtocol, streamingService); diff --git a/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/GrpcTimeoutOrderTest.java b/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/GrpcTimeoutOrderTest.java index 11d85e77f2..0bce6944ec 100644 --- a/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/GrpcTimeoutOrderTest.java +++ b/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/GrpcTimeoutOrderTest.java @@ -16,6 +16,7 @@ package io.servicetalk.grpc.netty; import io.servicetalk.concurrent.api.Single; +import io.servicetalk.grpc.api.GrpcServiceContext; import io.servicetalk.grpc.api.GrpcStatusException; import io.servicetalk.http.api.FilterableStreamingHttpClient; import io.servicetalk.http.api.HttpExecutionStrategy; @@ -87,8 +88,12 @@ void serverFilterNeverRespondsAppliesDeadline(boolean appendNonOffloading, boole builder.appendServiceFilter(NEVER_SERVER_FILTER); } }) - .listenAndAwait((GreeterService) (ctx, request) -> - succeeded(HelloReply.newBuilder().setMessage("hello " + request.getName()).build())); + .listenAndAwait(new GreeterService() { + @Override + public Single sayHello(GrpcServiceContext ctx, HelloRequest request) { + return succeeded(HelloReply.newBuilder().setMessage("hello " + request.getName()).build()); + } + }); BlockingGreeterClient client = forResolvedAddress(serverContext.listenAddress()) .defaultTimeout(clientAppliesTimeout ? DEFAULT_TIMEOUT : null, clientAppliesTimeout) .buildBlocking(new Greeter.ClientFactory())) { @@ -102,8 +107,11 @@ void serverFilterNeverRespondsAppliesDeadline(boolean appendNonOffloading, boole void clientFilterNeverRespondsAppliesDeadline(boolean builderEnableTimeout) throws Exception { try (ServerContext serverContext = forAddress(localAddress(0)) .defaultTimeout(null, false) - .listenAndAwait((GreeterService) (ctx, request) -> { - throw new IllegalStateException("client using never filter, server shouldn't read response"); + .listenAndAwait(new GreeterService() { + @Override + public Single sayHello(GrpcServiceContext ctx, HelloRequest request) { + throw new IllegalStateException("client using never filter, server shouldn't read response"); + } }); BlockingGreeterClient client = forResolvedAddress(serverContext.listenAddress()) .defaultTimeout(DEFAULT_TIMEOUT, builderEnableTimeout) diff --git a/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/GrpcUdsTest.java b/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/GrpcUdsTest.java index c936732b39..3d25538dce 100644 --- a/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/GrpcUdsTest.java +++ b/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/GrpcUdsTest.java @@ -15,6 +15,8 @@ */ package io.servicetalk.grpc.netty; +import io.servicetalk.concurrent.api.Single; +import io.servicetalk.grpc.api.GrpcServiceContext; import io.servicetalk.transport.api.IoExecutor; import io.servicetalk.transport.api.ServerContext; @@ -61,8 +63,13 @@ void udsRoundTrip() throws Exception { String name = "foo"; try (ServerContext serverContext = forAddress(newSocketAddress()) .initializeHttp(builder -> builder.ioExecutor(ioExecutor)) - .listenAndAwait((GreeterService) (ctx, request) -> - succeeded(HelloReply.newBuilder().setMessage(greetingPrefix + request.getName()).build())); + .listenAndAwait(new GreeterService() { + @Override + public Single sayHello(GrpcServiceContext ctx, HelloRequest request) { + return succeeded(HelloReply.newBuilder() + .setMessage(greetingPrefix + request.getName()).build()); + } + }); BlockingGreeterClient client = forResolvedAddress(serverContext.listenAddress()) .buildBlocking(new ClientFactory())) { HelloRequest request = HelloRequest.newBuilder().setName(name).build(); diff --git a/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/ProtocolCompatibilityTest.java b/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/ProtocolCompatibilityTest.java index 7d5601d2c2..ea95d17eed 100644 --- a/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/ProtocolCompatibilityTest.java +++ b/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/ProtocolCompatibilityTest.java @@ -1,5 +1,5 @@ /* - * Copyright © 2019-2021 Apple Inc. and the ServiceTalk project authors + * Copyright © 2019-2024 Apple Inc. and the ServiceTalk project authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -90,6 +90,7 @@ import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.util.InsecureTrustManagerFactory; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.function.ThrowingSupplier; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.CsvSource; @@ -110,6 +111,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import java.util.function.Function; import java.util.stream.IntStream; import javax.annotation.Nullable; @@ -128,6 +130,7 @@ import static io.servicetalk.grpc.api.GrpcStatusCode.INTERNAL; import static io.servicetalk.grpc.api.GrpcStatusCode.INVALID_ARGUMENT; import static io.servicetalk.grpc.api.GrpcStatusCode.UNAUTHENTICATED; +import static io.servicetalk.grpc.api.GrpcStatusCode.UNIMPLEMENTED; import static io.servicetalk.grpc.api.GrpcStatusCode.UNKNOWN; import static io.servicetalk.grpc.internal.DeadlineUtils.GRPC_TIMEOUT_HEADER_KEY; import static io.servicetalk.http.api.HttpExecutionStrategies.offloadNone; @@ -211,14 +214,13 @@ private enum ErrorMode { STATUS_IN_RESPONSE } - private static final boolean[] SSL = {false, true}; - private static final boolean[] STREAMING = {false, true}; + private static final boolean[] TRUE_FALSE = {true, false}; private static final String[] COMPRESSION = {"gzip", "identity", null}; private static Collection sslStreamingAndCompressionParams() { List args = new ArrayList<>(); - for (boolean ssl : SSL) { - for (boolean streaming : STREAMING) { + for (boolean ssl : TRUE_FALSE) { + for (boolean streaming : TRUE_FALSE) { for (String compression : COMPRESSION) { args.add(Arguments.of(ssl, streaming, compression)); } @@ -229,8 +231,8 @@ private static Collection sslStreamingAndCompressionParams() { private static Collection sslAndStreamingParams() { List args = new ArrayList<>(); - for (boolean ssl : SSL) { - for (boolean streaming : STREAMING) { + for (boolean ssl : TRUE_FALSE) { + for (boolean streaming : TRUE_FALSE) { args.add(Arguments.of(ssl, streaming)); } } @@ -239,7 +241,7 @@ private static Collection sslAndStreamingParams() { private static Collection sslAndCompressionParams() { List args = new ArrayList<>(); - for (boolean ssl : SSL) { + for (boolean ssl : TRUE_FALSE) { for (String compression : COMPRESSION) { args.add(Arguments.of(ssl, compression)); } @@ -253,7 +255,7 @@ private static Collection statusMessageParams() { }; List args = new ArrayList<>(); - for (boolean streaming : STREAMING) { + for (boolean streaming : TRUE_FALSE) { for (String message : messages) { args.add(Arguments.of(streaming, message)); } @@ -261,6 +263,24 @@ private static Collection statusMessageParams() { return args; } + private static Collection clientServerParams() { + List args = new ArrayList<>(); + for (boolean isClientServiceTalk : TRUE_FALSE) { + for (boolean isServerServiceTalk : TRUE_FALSE) { + for (boolean isServerBlocking : TRUE_FALSE) { + if (!isClientServiceTalk && isServerServiceTalk && isServerBlocking) { + // TODO there appears to be a potential bug in this combination. Separate bug filed. + continue; + } + if (isServerServiceTalk || !isServerBlocking) { + args.add(Arguments.of(isClientServiceTalk, isServerServiceTalk, isServerBlocking)); + } + } + } + } + return args; + } + @ParameterizedTest @MethodSource("sslStreamingAndCompressionParams") void grpcJavaToGrpcJava(final boolean ssl, @@ -610,6 +630,50 @@ void clientH2ReturnStatus() throws Exception { } } + @ParameterizedTest(name = "{displayName} [{index}]: serviceTalkClient={0} serviceTalkServer={1} blocking={2}") + @MethodSource("clientServerParams") + void unimplementedServiceError(final boolean isServiceTalkClient, + final boolean isServiceTalkServer, + final boolean isServerBlocking) throws Throwable { + + ThrowingSupplier serverSupplier = isServiceTalkServer ? + () -> isServerBlocking ? + serviceTalkServerBlocking(false, null, new BlockingCompatService() { }) : + serviceTalkServer(ErrorMode.NONE, false, defaultStrategy(), null, null, + new Compat.CompatService() { }) : + () -> grpcJavaServer(false, null, new CompatGrpc.CompatImplBase() { }); + + Function clientSupplier = isServiceTalkClient ? + addr -> serviceTalkClient(addr, false, null, null) : + addr -> { + try { + return grpcJavaClient(addr, null, false, null); + } catch (Exception e) { + throw new RuntimeException(e); + } + }; + + try (TestServerContext server = serverSupplier.get(); + CompatClient client = clientSupplier.apply(server.listenAddress())) { + final Single scalarResponse = + client.scalarCall(CompatRequest.newBuilder().setId(1).build()); + validateGrpcErrorInResponse(scalarResponse.toFuture(), false, UNIMPLEMENTED, + "Method grpc.netty.Compat/ScalarCall is unimplemented"); + final Single clientStreamingResponse = + client.clientStreamingCall(Publisher.from(CompatRequest.newBuilder().setId(1).build())); + validateGrpcErrorInResponse(clientStreamingResponse.toFuture(), false, UNIMPLEMENTED, + "Method grpc.netty.Compat/clientStreamingCall is unimplemented"); + final Publisher serverStreamingResponse = + client.serverStreamingCall(CompatRequest.newBuilder().setId(1).build()); + validateGrpcErrorInResponse(serverStreamingResponse.toFuture(), false, UNIMPLEMENTED, + "Method grpc.netty.Compat/serverStreamingCall is unimplemented"); + final Publisher bidirectionalStreamingResponse = + client.bidirectionalStreamingCall(Publisher.from(CompatRequest.newBuilder().setId(1).build())); + validateGrpcErrorInResponse(bidirectionalStreamingResponse.toFuture(), false, UNIMPLEMENTED, + "Method grpc.netty.Compat/bidirectionalStreamingCall is unimplemented"); + } + } + @ParameterizedTest @MethodSource("sslStreamingAndCompressionParams") void grpcJavaToServiceTalkBlockingError(final boolean ssl, @@ -1176,54 +1240,72 @@ public Single handle(final HttpServiceContext ctx, return serverBuilder; } + private static final class TestBlockingCompatService implements BlockingCompatService { + final ErrorMode errorMode; + @Nullable + final String statusMessage; + + private TestBlockingCompatService(ErrorMode errorMode, @Nullable String statusMessage) { + this.errorMode = errorMode; + this.statusMessage = statusMessage; + } + + @Override + public void bidirectionalStreamingCall( + final GrpcServiceContext ctx, final BlockingIterable request, + final GrpcPayloadWriter responseWriter) throws Exception { + maybeThrowFromRpc(errorMode, statusMessage); + for (CompatRequest requestItem : request) { + responseWriter.write(computeResponse(requestItem.getId())); + } + responseWriter.close(); + } + + @Override + public CompatResponse clientStreamingCall(final GrpcServiceContext ctx, + final BlockingIterable request) { + maybeThrowFromRpc(errorMode, statusMessage); + int sum = 0; + for (CompatRequest requestItem : request) { + sum += requestItem.getId(); + } + return computeResponse(sum); + } + + @Override + public CompatResponse scalarCall(final GrpcServiceContext ctx, + final CompatRequest request) { + maybeThrowFromRpc(errorMode, statusMessage); + return computeResponse(request.getId()); + } + + @Override + public void serverStreamingCall(final GrpcServiceContext ctx, final CompatRequest request, + final GrpcPayloadWriter responseWriter) + throws Exception { + maybeThrowFromRpc(errorMode, statusMessage); + for (int i = 0; i < request.getId(); i++) { + responseWriter.write(computeResponse(i)); + } + responseWriter.close(); + } + } + private static TestServerContext serviceTalkServerBlocking(final ErrorMode errorMode, final boolean ssl, @Nullable final String compression, @Nullable final String statusMessage) throws Exception { + return serviceTalkServerBlocking(ssl, compression, new TestBlockingCompatService(errorMode, statusMessage)); + } + + private static TestServerContext serviceTalkServerBlocking(final boolean ssl, + @Nullable final String compression, + final BlockingCompatService compatService + ) throws Exception { final ServerContext serverContext = serviceTalkServerBuilder(ErrorMode.NONE, ssl, null) .listenAndAwait(new ServiceFactory.Builder() .bufferDecoderGroup(serviceTalkDecompression(compression)) .bufferEncoders(serviceTalkCompressions(compression)) - .addBlockingService(new BlockingCompatService() { - @Override - public void bidirectionalStreamingCall( - final GrpcServiceContext ctx, final BlockingIterable request, - final GrpcPayloadWriter responseWriter) throws Exception { - maybeThrowFromRpc(errorMode, statusMessage); - for (CompatRequest requestItem : request) { - responseWriter.write(computeResponse(requestItem.getId())); - } - responseWriter.close(); - } - - @Override - public CompatResponse clientStreamingCall(final GrpcServiceContext ctx, - final BlockingIterable request) { - maybeThrowFromRpc(errorMode, statusMessage); - int sum = 0; - for (CompatRequest requestItem : request) { - sum += requestItem.getId(); - } - return computeResponse(sum); - } - - @Override - public CompatResponse scalarCall(final GrpcServiceContext ctx, - final CompatRequest request) { - maybeThrowFromRpc(errorMode, statusMessage); - return computeResponse(request.getId()); - } - - @Override - public void serverStreamingCall(final GrpcServiceContext ctx, final CompatRequest request, - final GrpcPayloadWriter responseWriter) - throws Exception { - maybeThrowFromRpc(errorMode, statusMessage); - for (int i = 0; i < request.getId(); i++) { - responseWriter.write(computeResponse(i)); - } - responseWriter.close(); - } - }).build()); + .addBlockingService(compatService).build()); return TestServerContext.fromServiceTalkServerContext(serverContext); } @@ -1285,6 +1367,79 @@ private static void throwGrpcStatusExceptionWithStatus(final String message) { throw GrpcStatusException.of(newStatus(message)); } + private static final class TestCompatService implements Compat.CompatService { + final Queue reqStreamError; + final ErrorMode errorMode; + @Nullable + final String statusMessage; + + private TestCompatService(Queue reqStreamError, + ErrorMode errorMode, + @Nullable String statusMessage) { + this.reqStreamError = reqStreamError; + this.errorMode = errorMode; + this.statusMessage = statusMessage; + } + + @Override + public Publisher bidirectionalStreamingCall(final GrpcServiceContext ctx, + final Publisher pub) { + reqStreamError.add(SERVER_PROCESSED_TOKEN); + maybeThrowFromRpc(errorMode, statusMessage); + return pub.map(req -> response(req.getId())).beforeFinally(errorConsumer()); + } + + @Override + public Single clientStreamingCall(final GrpcServiceContext ctx, + final Publisher pub) { + reqStreamError.add(SERVER_PROCESSED_TOKEN); + maybeThrowFromRpc(errorMode, statusMessage); + return pub.collect(() -> 0, (sum, req) -> sum + req.getId()).map(this::response) + .beforeFinally(errorConsumer()); + } + + @Override + public Single scalarCall(final GrpcServiceContext ctx, final CompatRequest req) { + maybeThrowFromRpc(errorMode, statusMessage); + return succeeded(response(req.getId())); + } + + @Override + public Publisher serverStreamingCall(final GrpcServiceContext ctx, + final CompatRequest req) { + maybeThrowFromRpc(errorMode, statusMessage); + return Publisher.fromIterable(() -> IntStream.range(0, req.getId()).iterator()).map(this::response); + } + + private CompatResponse response(final int value) { + final String message = statusMessage == null ? CUSTOM_ERROR_MESSAGE : statusMessage; + if (errorMode == ErrorMode.SIMPLE_IN_RESPONSE) { + throwGrpcStatusException(message); + } else if (errorMode == ErrorMode.STATUS_IN_RESPONSE) { + throwGrpcStatusExceptionWithStatus(message); + } + return computeResponse(value); + } + + private TerminalSignalConsumer errorConsumer() { + return new TerminalSignalConsumer() { + @Override + public void onComplete() { + } + + @Override + public void onError(final Throwable throwable) { + reqStreamError.add(throwable); + } + + @Override + public void cancel() { + reqStreamError.add(new IOException("cancelled")); + } + }; + } + } + private static TestServerContext serviceTalkServer(final ErrorMode errorMode, final boolean ssl, @Nullable final String compression, @Nullable final Duration duration) throws Exception { @@ -1302,66 +1457,14 @@ private static TestServerContext serviceTalkServer( final ErrorMode errorMode, final boolean ssl, final GrpcExecutionStrategy strategy, @Nullable final String compression, @Nullable final Duration timeout, Queue reqStreamError, @Nullable final String statusMessage) throws Exception { - final Compat.CompatService compatService = new Compat.CompatService() { - @Override - public Publisher bidirectionalStreamingCall(final GrpcServiceContext ctx, - final Publisher pub) { - reqStreamError.add(SERVER_PROCESSED_TOKEN); - maybeThrowFromRpc(errorMode, statusMessage); - return pub.map(req -> response(req.getId())).beforeFinally(errorConsumer()); - } - - @Override - public Single clientStreamingCall(final GrpcServiceContext ctx, - final Publisher pub) { - reqStreamError.add(SERVER_PROCESSED_TOKEN); - maybeThrowFromRpc(errorMode, statusMessage); - return pub.collect(() -> 0, (sum, req) -> sum + req.getId()).map(this::response) - .beforeFinally(errorConsumer()); - } - - @Override - public Single scalarCall(final GrpcServiceContext ctx, final CompatRequest req) { - maybeThrowFromRpc(errorMode, statusMessage); - return succeeded(response(req.getId())); - } - - @Override - public Publisher serverStreamingCall(final GrpcServiceContext ctx, - final CompatRequest req) { - maybeThrowFromRpc(errorMode, statusMessage); - return Publisher.fromIterable(() -> IntStream.range(0, req.getId()).iterator()).map(this::response); - } - - private CompatResponse response(final int value) { - final String message = statusMessage == null ? CUSTOM_ERROR_MESSAGE : statusMessage; - if (errorMode == ErrorMode.SIMPLE_IN_RESPONSE) { - throwGrpcStatusException(message); - } else if (errorMode == ErrorMode.STATUS_IN_RESPONSE) { - throwGrpcStatusExceptionWithStatus(message); - } - return computeResponse(value); - } - - private TerminalSignalConsumer errorConsumer() { - return new TerminalSignalConsumer() { - @Override - public void onComplete() { - } - - @Override - public void onError(final Throwable throwable) { - reqStreamError.add(throwable); - } - - @Override - public void cancel() { - reqStreamError.add(new IOException("cancelled")); - } - }; - } - }; + final Compat.CompatService compatService = new TestCompatService(reqStreamError, errorMode, statusMessage); + return serviceTalkServer(errorMode, ssl, strategy, compression, timeout, compatService); + } + private static TestServerContext serviceTalkServer( + final ErrorMode errorMode, final boolean ssl, final GrpcExecutionStrategy strategy, + @Nullable final String compression, @Nullable final Duration timeout, + final Compat.CompatService compatService) throws Exception { final ServiceFactory serviceFactory = new ServiceFactory.Builder() .bufferEncoders(serviceTalkCompressions(compression)) .bufferDecoderGroup(serviceTalkDecompression(compression)) @@ -1565,9 +1668,9 @@ public void onCompleted() { }; } - private static TestServerContext grpcJavaServer(final ErrorMode errorMode, final boolean ssl, + private static TestServerContext grpcJavaServer(final boolean ssl, @Nullable final String compression, - @Nullable final String statusMessage) throws Exception { + final CompatGrpc.CompatImplBase serviceImpl) throws Exception { final NettyServerBuilder builder = NettyServerBuilder.forAddress(localAddress(0)); if (ssl) { builder.useTransportSecurity(loadServerPem(), loadServerKey()); @@ -1595,107 +1698,124 @@ private static TestServerContext grpcJavaServer(final ErrorMode errorMode, final } final Server server = builder - .addService(new CompatGrpc.CompatImplBase() { - @Override - public void scalarCall(final CompatRequest request, - final StreamObserver responseObserver) { - try { - responseObserver.onNext(response(request.getId())); - responseObserver.onCompleted(); - } catch (final Throwable t) { - responseObserver.onError(t); - } - } + .addService(serviceImpl) + .build().start(); - @Override - public StreamObserver clientStreamingCall( - final StreamObserver responseObserver) { - return new StreamObserver() { - int sum; - - @Override - public void onNext(final CompatRequest value) { - sum += value.getId(); - } + return TestServerContext.fromGrpcJavaServer(server); + } - @Override - public void onError(final Throwable t) { - responseObserver.onError(t); - } + private static TestServerContext grpcJavaServer(final ErrorMode errorMode, final boolean ssl, + @Nullable final String compression, + @Nullable final String statusMessage) throws Exception { + return grpcJavaServer(ssl, compression, new TestCompatGrpcImpl(errorMode, statusMessage)); + } - @Override - public void onCompleted() { - try { - responseObserver.onNext(response(sum)); - responseObserver.onCompleted(); - } catch (final Throwable t) { - responseObserver.onError(t); - } - } - }; - } + private static final class TestCompatGrpcImpl extends CompatGrpc.CompatImplBase { + final ErrorMode errorMode; + @Nullable + final String statusMessage; - @Override - public void serverStreamingCall(final CompatRequest request, - final StreamObserver responseObserver) { - for (int i = 0; i < request.getId(); ++i) { - try { - responseObserver.onNext(response(i)); - } catch (final Throwable t) { - responseObserver.onError(t); - return; - } - } + private TestCompatGrpcImpl(ErrorMode errorMode, @Nullable String statusMessage) { + this.errorMode = errorMode; + this.statusMessage = statusMessage; + } + + @Override + public void scalarCall(final CompatRequest request, + final StreamObserver responseObserver) { + try { + responseObserver.onNext(response(request.getId())); + responseObserver.onCompleted(); + } catch (final Throwable t) { + responseObserver.onError(t); + } + } + + @Override + public StreamObserver clientStreamingCall( + final StreamObserver responseObserver) { + return new StreamObserver() { + int sum; + + @Override + public void onNext(final CompatRequest value) { + sum += value.getId(); + } + + @Override + public void onError(final Throwable t) { + responseObserver.onError(t); + } + + @Override + public void onCompleted() { + try { + responseObserver.onNext(response(sum)); responseObserver.onCompleted(); + } catch (final Throwable t) { + responseObserver.onError(t); } + } + }; + } - @Override - public StreamObserver bidirectionalStreamingCall( - final StreamObserver responseObserver) { - return new StreamObserver() { - private boolean errored; - - @Override - public void onNext(final CompatRequest demoRequest) { - try { - responseObserver.onNext(response(demoRequest.getId())); - } catch (final Throwable t) { - onError(t); - } - } + @Override + public void serverStreamingCall(final CompatRequest request, + final StreamObserver responseObserver) { + for (int i = 0; i < request.getId(); ++i) { + try { + responseObserver.onNext(response(i)); + } catch (final Throwable t) { + responseObserver.onError(t); + return; + } + } + responseObserver.onCompleted(); + } - @Override - public void onError(final Throwable t) { - if (errored) { - return; - } - errored = true; - responseObserver.onError(t); - } + @Override + public StreamObserver bidirectionalStreamingCall( + final StreamObserver responseObserver) { + return new StreamObserver() { + private boolean errored; - @Override - public void onCompleted() { - if (errored) { - return; - } - responseObserver.onCompleted(); - } - }; + @Override + public void onNext(final CompatRequest demoRequest) { + try { + responseObserver.onNext(response(demoRequest.getId())); + } catch (final Throwable t) { + onError(t); } + } - private CompatResponse response(final int value) throws Exception { - final String description = statusMessage == null ? CUSTOM_ERROR_MESSAGE : statusMessage; - if (errorMode == ErrorMode.SIMPLE) { - throw Status.INVALID_ARGUMENT.augmentDescription(description).asException(); - } - if (errorMode == ErrorMode.STATUS) { - throw StatusProto.toStatusException(newStatus(description)); - } - return computeResponse(value); + @Override + public void onError(final Throwable t) { + if (errored) { + return; } - }) - .build().start(); + errored = true; + responseObserver.onError(t); + } - return TestServerContext.fromGrpcJavaServer(server); + @Override + public void onCompleted() { + if (errored) { + return; + } + responseObserver.onCompleted(); + } + }; + } + + private CompatResponse response(final int value) throws Exception { + final String description = statusMessage == null ? CUSTOM_ERROR_MESSAGE : statusMessage; + if (errorMode == ErrorMode.SIMPLE) { + throw Status.INVALID_ARGUMENT.augmentDescription(description).asException(); + } + if (errorMode == ErrorMode.STATUS) { + throw StatusProto.toStatusException(newStatus(description)); + } + return computeResponse(value); + } } } diff --git a/servicetalk-grpc-protoc/README.adoc b/servicetalk-grpc-protoc/README.adoc index c811c73926..e3d6a6be71 100644 --- a/servicetalk-grpc-protoc/README.adoc +++ b/servicetalk-grpc-protoc/README.adoc @@ -111,3 +111,34 @@ And with Maven: ---- + +==== `defaultServiceMethods= (default: false)` +Generates default service interface methods to ensure implementations will continue to compile as the API is evolved. + +If you are using the +link:https://github.com/google/protobuf-gradle-plugin#configure-what-to-generate[protobuf-gradle-plugin] this is how you +can specify an option: + +[source,gradle] +---- +task.plugins { + servicetalk_grpc { + option 'defaultServiceMethods=true' + } +} +---- + +And with Maven: + +And with Maven: + +[source, xml] +---- + + servicetalk-grpc-protoc + + + defaultServiceMethods=true + + +---- diff --git a/servicetalk-grpc-protoc/src/main/java/io/servicetalk/grpc/protoc/Generator.java b/servicetalk-grpc-protoc/src/main/java/io/servicetalk/grpc/protoc/Generator.java index e1535d5535..5d0b15a913 100644 --- a/servicetalk-grpc-protoc/src/main/java/io/servicetalk/grpc/protoc/Generator.java +++ b/servicetalk-grpc-protoc/src/main/java/io/servicetalk/grpc/protoc/Generator.java @@ -1,5 +1,5 @@ /* - * Copyright © 2019, 2021 Apple Inc. and the ServiceTalk project authors + * Copyright © 2019-2022, 2024 Apple Inc. and the ServiceTalk project authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -90,6 +90,8 @@ import static io.servicetalk.grpc.protoc.Types.GrpcService; import static io.servicetalk.grpc.protoc.Types.GrpcServiceContext; import static io.servicetalk.grpc.protoc.Types.GrpcServiceFactory; +import static io.servicetalk.grpc.protoc.Types.GrpcStatus; +import static io.servicetalk.grpc.protoc.Types.GrpcStatusCode; import static io.servicetalk.grpc.protoc.Types.GrpcStatusException; import static io.servicetalk.grpc.protoc.Types.GrpcSupportedCodings; import static io.servicetalk.grpc.protoc.Types.Identity; @@ -172,6 +174,7 @@ import static java.lang.System.lineSeparator; import static java.util.EnumSet.noneOf; import static java.util.stream.Collectors.joining; +import static java.util.stream.Collectors.toList; import static java.util.stream.Stream.concat; import static javax.lang.model.element.Modifier.ABSTRACT; import static javax.lang.model.element.Modifier.DEFAULT; @@ -250,14 +253,17 @@ private State(ServiceDescriptorProto serviceProto, GenerationContext context, St private final ServiceCommentsMap serviceCommentsMap; private final boolean printJavaDocs; private final boolean skipDeprecated; + private final boolean defaultServiceMethods; Generator(final GenerationContext context, final Map messageTypesMap, - final boolean printJavaDocs, final boolean skipDeprecated, SourceCodeInfo sourceCodeInfo) { + final boolean printJavaDocs, final boolean skipDeprecated, final boolean defaultServiceMethods, + SourceCodeInfo sourceCodeInfo) { this.context = context; this.messageTypesMap = messageTypesMap; this.serviceCommentsMap = printJavaDocs ? new DefaultServiceCommentsMap(sourceCodeInfo) : NOOP_MAP; this.printJavaDocs = printJavaDocs; this.skipDeprecated = skipDeprecated; + this.defaultServiceMethods = defaultServiceMethods; } /** @@ -1639,6 +1645,41 @@ private TypeSpec newServiceInterfaceSpec(final State state, final boolean blocki .addStatement("return $L", blocking ? BLOCKING_METHOD_DESCRIPTORS : ASYNC_METHOD_DESCRIPTORS) .build()); + // generate default service methods + if (defaultServiceMethods) { + final List interfaces = state.serviceRpcInterfaces.stream() + .filter(intf -> intf.blocking == blocking) + .collect(toList()); + for (final RpcInterface rpcInterface : interfaces) { + final MethodDescriptorProto methodProto = rpcInterface.methodProto; + final ClassName inClass = messageTypesMap.get(methodProto.getInputType()); + final ClassName outClass = messageTypesMap.get(methodProto.getOutputType()); + final String methodName = sanitizeIdentifier(methodProto.getName(), true); + final String methodPath = context.methodPath(state.serviceProto, methodProto).substring(1); + final MethodSpec methodSpec = newRpcMethodSpec(inClass, outClass, methodName, + methodProto.getClientStreaming(), + methodProto.getServerStreaming(), + !blocking ? EnumSet.of(INTERFACE) : (skipDeprecated ? + EnumSet.of(INTERFACE, BLOCKING, SERVER_RESPONSE) : EnumSet.of(INTERFACE, BLOCKING)), + false, (__, spec) -> { + spec.addAnnotation(Override.class); + spec.addModifiers(DEFAULT).addParameter(GrpcServiceContext, ctx); + final String errorMessage = "\"Method " + methodPath + " is unimplemented\""; + if (!blocking) { + final ClassName returnType = methodProto.getServerStreaming() ? Publisher : Single; + spec.addStatement("return $T.failed(new $T(new $T($T.UNIMPLEMENTED, $L)))", + returnType, GrpcStatusException, GrpcStatus, GrpcStatusCode, errorMessage); + } else { + spec.addStatement("throw new $T(new $T($T.UNIMPLEMENTED, $L))", + GrpcStatusException, GrpcStatus, GrpcStatusCode, errorMessage); + } + return spec; + }); + + interfaceSpecBuilder.addMethod(methodSpec); + } + } + return interfaceSpecBuilder.build(); } diff --git a/servicetalk-grpc-protoc/src/main/java/io/servicetalk/grpc/protoc/Main.java b/servicetalk-grpc-protoc/src/main/java/io/servicetalk/grpc/protoc/Main.java index 12a824d423..90bf0203ee 100644 --- a/servicetalk-grpc-protoc/src/main/java/io/servicetalk/grpc/protoc/Main.java +++ b/servicetalk-grpc-protoc/src/main/java/io/servicetalk/grpc/protoc/Main.java @@ -1,5 +1,5 @@ /* - * Copyright © 2019, 2021 Apple Inc. and the ServiceTalk project authors + * Copyright © 2019-2024 Apple Inc. and the ServiceTalk project authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -117,6 +117,28 @@ public final class Main { * from protos. */ private static final String SKIP_DEPRECATED_CODE = "skipDeprecated"; + /** + * Supports an option to generate default service interface methods. + *

+ * Gradle: + *

+     * task.plugins {
+     *   servicetalk_grpc {
+     *     option 'defaultServiceMethods=true'
+     *   }
+     * }
+     * 
+ *

+ * Maven: + *

{@code
+     * 
+     *   
+     *     defaultServiceMethods=true
+     *   
+     * 
+     * }
+ */ + private static final String DEFAULT_SERVICE_METHODS = "defaultServiceMethods"; private Main() { // no instances } @@ -176,6 +198,8 @@ private static CodeGeneratorResponse generate(final CodeGeneratorRequest request final String typeSuffixValue = optionsMap.get(TYPE_NAME_SUFFIX_OPTION); final boolean printJavaDocs = parseBoolean(optionsMap.getOrDefault(PRINT_JAVA_DOCS_OPTION, "true")); final boolean skipDeprecated = parseBoolean(optionsMap.getOrDefault(SKIP_DEPRECATED_CODE, "false")); + // FIXME: 0.43 - consider changing default value to true + final boolean defaultServiceMethods = parseBoolean(optionsMap.getOrDefault(DEFAULT_SERVICE_METHODS, "false")); final List fileDescriptors = request.getProtoFileList().stream() .map(protoFile -> new FileDescriptor(protoFile, typeSuffixValue)).collect(toList()); @@ -188,8 +212,8 @@ private static CodeGeneratorResponse generate(final CodeGeneratorRequest request for (FileDescriptor f : fileDescriptors) { if (filesToGenerate.contains(f.protoFileName())) { - final Generator generator = new Generator( - f, messageTypesMap, printJavaDocs, skipDeprecated, f.sourceCodeInfo()); + final Generator generator = new Generator(f, messageTypesMap, printJavaDocs, skipDeprecated, + defaultServiceMethods, f.sourceCodeInfo()); List serviceDescriptorProtoList = f.protoServices(); for (int i = 0; i < serviceDescriptorProtoList.size(); ++i) { ServiceDescriptorProto serviceDescriptor = serviceDescriptorProtoList.get(i); diff --git a/servicetalk-grpc-protoc/src/main/java/io/servicetalk/grpc/protoc/Types.java b/servicetalk-grpc-protoc/src/main/java/io/servicetalk/grpc/protoc/Types.java index b3d6fdebef..050341b8a9 100644 --- a/servicetalk-grpc-protoc/src/main/java/io/servicetalk/grpc/protoc/Types.java +++ b/servicetalk-grpc-protoc/src/main/java/io/servicetalk/grpc/protoc/Types.java @@ -1,5 +1,5 @@ /* - * Copyright © 2019, 2021 Apple Inc. and the ServiceTalk project authors + * Copyright © 2019-2022, 2024 Apple Inc. and the ServiceTalk project authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -67,6 +67,8 @@ final class Types { static final ClassName GrpcExecutionContext = ClassName.get(grpcApiPkg, "GrpcExecutionContext"); static final ClassName GrpcExecutionStrategy = ClassName.get(grpcApiPkg, "GrpcExecutionStrategy"); static final ClassName GrpcStatusException = ClassName.get(grpcApiPkg, "GrpcStatusException"); + static final ClassName GrpcStatus = ClassName.get(grpcApiPkg, "GrpcStatus"); + static final ClassName GrpcStatusCode = ClassName.get(grpcApiPkg, "GrpcStatusCode"); static final ClassName Identity = ClassName.get(encodingApiPkg, "Identity"); static final ClassName BufferDecoderGroup = ClassName.get(encodingApiPkg, "BufferDecoderGroup"); static final ClassName EmptyBufferDecoderGroup = ClassName.get(encodingApiPkg, "EmptyBufferDecoderGroup");