Skip to content

Commit

Permalink
feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
mgodave committed Nov 20, 2024
1 parent 7ed1fe5 commit c2d1688
Show file tree
Hide file tree
Showing 9 changed files with 148 additions and 31 deletions.
2 changes: 2 additions & 0 deletions servicetalk-grpc-netty/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ protobuf {
grpc {}
servicetalk_grpc {
outputSubDir = "java"
// this will eventually become the default behavior
option "defaultServiceMethods=true"
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
import io.servicetalk.client.api.ServiceDiscoverer;
import io.servicetalk.client.api.ServiceDiscovererEvent;
import io.servicetalk.concurrent.api.Publisher;
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.grpc.api.GrpcServerContext;
import io.servicetalk.grpc.api.GrpcServiceContext;
import io.servicetalk.transport.api.HostAndPort;

import io.grpc.examples.helloworld.Greeter.BlockingGreeterClient;
Expand Down Expand Up @@ -52,8 +54,13 @@ void forAddress() throws Exception {
String greetingPrefix = "Hello ";
String name = "foo";
try (GrpcServerContext serverContext = GrpcServers.forAddress(localAddress(0))
.listenAndAwait((GreeterService) (ctx, request) ->
succeeded(HelloReply.newBuilder().setMessage(greetingPrefix + request.getName()).build()));
.listenAndAwait(new GreeterService() {
@Override
public Single<HelloReply> sayHello(GrpcServiceContext ctx, HelloRequest request) {
return succeeded(HelloReply.newBuilder()
.setMessage(greetingPrefix + request.getName()).build());
}
});
// Use "localhost" to demonstrate that the address will be resolved.
BlockingGreeterClient client = GrpcClients.forAddress("localhost",
serverHostAndPort(serverContext).port(), ON_NEW_CONNECTION)
Expand All @@ -69,8 +76,13 @@ void withCustomSd() throws Exception {
String greetingPrefix = "Hello ";
String name = "foo";
try (GrpcServerContext serverContext = GrpcServers.forAddress(localAddress(0))
.listenAndAwait((GreeterService) (ctx, request) ->
succeeded(HelloReply.newBuilder().setMessage(greetingPrefix + request.getName()).build()))) {
.listenAndAwait(new GreeterService() {
@Override
public Single<HelloReply> sayHello(GrpcServiceContext ctx, HelloRequest request) {
return succeeded(HelloReply.newBuilder()
.setMessage(greetingPrefix + request.getName()).build());
}
})) {
// Use "localhost" to demonstrate that the address will be resolved.
HostAndPort hostAndPort = HostAndPort.of("localhost", serverHostAndPort(serverContext).port());
@SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package io.servicetalk.grpc.netty;

import io.servicetalk.concurrent.api.Single;
import io.servicetalk.grpc.api.GrpcServiceContext;
import io.servicetalk.http.api.HttpProtocolConfig;
import io.servicetalk.test.resources.DefaultTestCerts;
import io.servicetalk.transport.api.ClientSslConfigBuilder;
Expand Down Expand Up @@ -69,8 +71,13 @@ void tlsNegotiated(ProtocolTestMode testMode) throws Exception {
.sslConfig(new ServerSslConfigBuilder(DefaultTestCerts::loadServerPem,
DefaultTestCerts::loadServerKey).build())
.protocols(testMode.serverConfigs))
.listenAndAwait((Greeter.GreeterService) (ctx, request) ->
succeeded(HelloReply.newBuilder().setMessage(greetingPrefix + request.getName()).build()));
.listenAndAwait(new Greeter.GreeterService() {
@Override
public Single<HelloReply> sayHello(GrpcServiceContext ctx, HelloRequest request) {
return succeeded(HelloReply.newBuilder()
.setMessage(greetingPrefix + request.getName()).build());
}
});
BlockingGreeterClient client = forResolvedAddress(serverContext.listenAddress())
.initializeHttp(builder -> builder
.sslConfig(new ClientSslConfigBuilder(DefaultTestCerts::loadServerCAPem)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.servicetalk.context.api.ContextMap.Key;
import io.servicetalk.grpc.api.DefaultGrpcClientMetadata;
import io.servicetalk.grpc.api.GrpcClientMetadata;
import io.servicetalk.grpc.api.GrpcServiceContext;
import io.servicetalk.grpc.api.GrpcStatusCode;
import io.servicetalk.grpc.api.GrpcStatusException;
import io.servicetalk.http.api.HttpResponseStatus;
Expand Down Expand Up @@ -87,8 +88,12 @@ class GrpcProxyTunnelTest {
.initializeHttp(httpBuilder -> httpBuilder
.sslConfig(new ServerSslConfigBuilder(DefaultTestCerts::loadServerPem,
DefaultTestCerts::loadServerKey).build()))
.listenAndAwait((Greeter.BlockingGreeterService) (ctx, request) ->
HelloReply.newBuilder().setMessage(GREETING_PREFIX + request.getName()).build());
.listenAndAwait(new Greeter.BlockingGreeterService() {
@Override
public HelloReply sayHello(GrpcServiceContext ctx, HelloRequest request) {
return HelloReply.newBuilder().setMessage(GREETING_PREFIX + request.getName()).build();
}
});
client = GrpcClients.forAddress(serverHostAndPort(serverContext))
.initializeHttp(httpBuilder -> httpBuilder.proxyConfig(forAddress(proxyAddress))
.sslConfig(new ClientSslConfigBuilder(DefaultTestCerts::loadServerCAPem)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.servicetalk.grpc.netty;

import io.servicetalk.concurrent.api.Single;
import io.servicetalk.grpc.api.GrpcServiceContext;
import io.servicetalk.grpc.api.GrpcStatusException;
import io.servicetalk.http.api.FilterableStreamingHttpClient;
import io.servicetalk.http.api.HttpExecutionStrategy;
Expand Down Expand Up @@ -87,8 +88,12 @@ void serverFilterNeverRespondsAppliesDeadline(boolean appendNonOffloading, boole
builder.appendServiceFilter(NEVER_SERVER_FILTER);
}
})
.listenAndAwait((GreeterService) (ctx, request) ->
succeeded(HelloReply.newBuilder().setMessage("hello " + request.getName()).build()));
.listenAndAwait(new GreeterService() {
@Override
public Single<HelloReply> sayHello(GrpcServiceContext ctx, HelloRequest request) {
return succeeded(HelloReply.newBuilder().setMessage("hello " + request.getName()).build());
}
});
BlockingGreeterClient client = forResolvedAddress(serverContext.listenAddress())
.defaultTimeout(clientAppliesTimeout ? DEFAULT_TIMEOUT : null, clientAppliesTimeout)
.buildBlocking(new Greeter.ClientFactory())) {
Expand All @@ -102,8 +107,11 @@ void serverFilterNeverRespondsAppliesDeadline(boolean appendNonOffloading, boole
void clientFilterNeverRespondsAppliesDeadline(boolean builderEnableTimeout) throws Exception {
try (ServerContext serverContext = forAddress(localAddress(0))
.defaultTimeout(null, false)
.listenAndAwait((GreeterService) (ctx, request) -> {
throw new IllegalStateException("client using never filter, server shouldn't read response");
.listenAndAwait(new GreeterService() {
@Override
public Single<HelloReply> sayHello(GrpcServiceContext ctx, HelloRequest request) {
throw new IllegalStateException("client using never filter, server shouldn't read response");
}
});
BlockingGreeterClient client = forResolvedAddress(serverContext.listenAddress())
.defaultTimeout(DEFAULT_TIMEOUT, builderEnableTimeout)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package io.servicetalk.grpc.netty;

import io.servicetalk.concurrent.api.Single;
import io.servicetalk.grpc.api.GrpcServiceContext;
import io.servicetalk.transport.api.IoExecutor;
import io.servicetalk.transport.api.ServerContext;

Expand Down Expand Up @@ -61,8 +63,13 @@ void udsRoundTrip() throws Exception {
String name = "foo";
try (ServerContext serverContext = forAddress(newSocketAddress())
.initializeHttp(builder -> builder.ioExecutor(ioExecutor))
.listenAndAwait((GreeterService) (ctx, request) ->
succeeded(HelloReply.newBuilder().setMessage(greetingPrefix + request.getName()).build()));
.listenAndAwait(new GreeterService() {
@Override
public Single<HelloReply> sayHello(GrpcServiceContext ctx, HelloRequest request) {
return succeeded(HelloReply.newBuilder()
.setMessage(greetingPrefix + request.getName()).build());
}
});
BlockingGreeterClient client = forResolvedAddress(serverContext.listenAddress())
.buildBlocking(new ClientFactory())) {
HelloRequest request = HelloRequest.newBuilder().setName(name).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@
import static io.servicetalk.grpc.api.GrpcStatusCode.INTERNAL;
import static io.servicetalk.grpc.api.GrpcStatusCode.INVALID_ARGUMENT;
import static io.servicetalk.grpc.api.GrpcStatusCode.UNAUTHENTICATED;
import static io.servicetalk.grpc.api.GrpcStatusCode.UNIMPLEMENTED;
import static io.servicetalk.grpc.api.GrpcStatusCode.UNKNOWN;
import static io.servicetalk.grpc.internal.DeadlineUtils.GRPC_TIMEOUT_HEADER_KEY;
import static io.servicetalk.http.api.HttpExecutionStrategies.offloadNone;
Expand Down Expand Up @@ -610,6 +611,50 @@ void clientH2ReturnStatus() throws Exception {
}
}

@Test
void grpcJavaToServiceTalkUnimplementedService() throws Exception {
try (TestServerContext server = serviceTalkServerBlocking(ErrorMode.STATUS, false, null, null);
CompatClient client = grpcJavaClient(server.listenAddress(), null, false, null)) {
final Single<CompatResponse> response =
client.unimplementedServerCall(CompatRequest.newBuilder().setId(1).build());
validateGrpcErrorInResponse(response.toFuture(), false, UNIMPLEMENTED,
"Method grpc.netty.Compat/unimplementedServerCall is unimplemented");
}
}

@Test
void grpcJavaToGrpcJavaUnimplementedService() throws Exception {
try (TestServerContext server = grpcJavaServer(ErrorMode.NONE, false, null, null);
CompatClient client = grpcJavaClient(server.listenAddress(), null, false, null)) {
final Single<CompatResponse> response =
client.unimplementedServerCall(CompatRequest.newBuilder().setId(1).build());
validateGrpcErrorInResponse(response.toFuture(), false, UNIMPLEMENTED,
"Method grpc.netty.Compat/unimplementedServerCall is unimplemented");
}
}

@Test
void serviceTalkToGrpcJavaUnimplementedService() throws Exception {
try (TestServerContext server = grpcJavaServer(ErrorMode.NONE, false, null, null);
CompatClient client = serviceTalkClient(server.listenAddress(), false, null, null)) {
final Single<CompatResponse> response =
client.unimplementedServerCall(CompatRequest.newBuilder().setId(1).build());
validateGrpcErrorInResponse(response.toFuture(), false, UNIMPLEMENTED,
"Method grpc.netty.Compat/unimplementedServerCall is unimplemented");
}
}

@Test
void serviceTalkToServiceTalkUnimplementedService() throws Exception {
try (TestServerContext server = serviceTalkServerBlocking(ErrorMode.NONE, false, null, null);
CompatClient client = serviceTalkClient(server.listenAddress(), false, null, null)) {
final Single<CompatResponse> response =
client.unimplementedServerCall(CompatRequest.newBuilder().setId(1).build());
validateGrpcErrorInResponse(response.toFuture(), false, UNIMPLEMENTED,
"Method grpc.netty.Compat/unimplementedServerCall is unimplemented");
}
}

@ParameterizedTest
@MethodSource("sslStreamingAndCompressionParams")
void grpcJavaToServiceTalkBlockingError(final boolean ssl,
Expand Down Expand Up @@ -1496,6 +1541,27 @@ public Publisher<CompatResponse> serverStreamingCall(final GrpcClientMetadata me
return serverStreamingCall(request);
}

@Override
public Single<CompatResponse> unimplementedServerCall(final CompatRequest request) {
final Processor<CompatResponse, CompatResponse> processor =
newSingleProcessor();
finalStub.unimplementedServerCall(request, adaptResponse(processor));
return fromSource(processor);
}

@Override
public Single<CompatResponse> unimplementedServerCall(final GrpcClientMetadata metadata,
final CompatRequest request) {
return unimplementedServerCall(request);
}

@Deprecated
@Override
public Single<CompatResponse> unimplementedServerCall(final Compat.UnimplementedServerCallMetadata metadata,
final CompatRequest request) {
return unimplementedServerCall(request);
}

@Override
public void close() throws Exception {
channel.shutdown().awaitTermination(DEFAULT_TIMEOUT_SECONDS, SECONDS);
Expand Down
1 change: 1 addition & 0 deletions servicetalk-grpc-netty/src/test/proto/test_compat.proto
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,5 @@ service Compat {
rpc clientStreamingCall (stream RequestContainer.CompatRequest) returns (ResponseContainer.CompatResponse) {}
rpc serverStreamingCall (RequestContainer.CompatRequest) returns (stream ResponseContainer.CompatResponse) {}
rpc bidirectionalStreamingCall (stream RequestContainer.CompatRequest) returns (stream ResponseContainer.CompatResponse) {}
rpc unimplementedServerCall (RequestContainer.CompatRequest) returns (ResponseContainer.CompatResponse) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import static io.servicetalk.grpc.protoc.Generator.NewRpcMethodFlag.SERVER_RESPONSE;
import static io.servicetalk.grpc.protoc.NoopServiceCommentsMap.NOOP_MAP;
import static io.servicetalk.grpc.protoc.StringUtils.escapeJavaDoc;
import static io.servicetalk.grpc.protoc.StringUtils.isNullOrEmpty;
import static io.servicetalk.grpc.protoc.StringUtils.sanitizeIdentifier;
import static io.servicetalk.grpc.protoc.Types.AllGrpcRoutes;
import static io.servicetalk.grpc.protoc.Types.Arrays;
Expand Down Expand Up @@ -175,6 +174,7 @@
import static java.lang.System.lineSeparator;
import static java.util.EnumSet.noneOf;
import static java.util.stream.Collectors.joining;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Stream.concat;
import static javax.lang.model.element.Modifier.ABSTRACT;
import static javax.lang.model.element.Modifier.DEFAULT;
Expand Down Expand Up @@ -1647,38 +1647,47 @@ private TypeSpec newServiceInterfaceSpec(final State state, final boolean blocki

// generate default service methods
if (defaultServiceMethods) {
final List<MethodDescriptorProto> methodList = state.serviceProto.getMethodList();
for (int i = 0; i < methodList.size(); i++) {
final List<MethodDescriptorProto> methodList = state.serviceRpcInterfaces.stream()
.filter(intf -> intf.blocking == blocking)
.map(intf -> intf.methodProto)
.collect(toList());
for (int i = 0; i < methodList.size(); ++i) {
final MethodDescriptorProto methodProto = methodList.get(i);
final ClassName inClass = messageTypesMap.get(methodProto.getInputType());
final ClassName outClass = messageTypesMap.get(methodProto.getOutputType());
final String methodName = sanitizeIdentifier(methodProto.getName(), true);
final String serviceName = isNullOrEmpty(state.serviceProto.getName()) ?
"" : state.serviceProto.getName();
final String fullMethodName = serviceName + "/" + methodName;
final String methodPath = context.methodPath(state.serviceProto, methodProto).substring(1);
final int methodIndex = i;
interfaceSpecBuilder.addMethod(newRpcMethodSpec(inClass, outClass, methodName,
final MethodSpec methodSpec = newRpcMethodSpec(inClass, outClass, methodName,
methodProto.getClientStreaming(),
methodProto.getServerStreaming(),
!blocking ? EnumSet.of(INTERFACE) : EnumSet.of(INTERFACE, BLOCKING),
printJavaDocs, (__, c) -> {
final String errorMessage = "\"Method " + fullMethodName + " is unimplemented\"";
c.addModifiers(DEFAULT).addParameter(GrpcServiceContext, ctx);
printJavaDocs, (__, spec) -> {
final String errorMessage = "\"Method " + methodPath + " is unimplemented\"";
spec.addModifiers(DEFAULT).addParameter(GrpcServiceContext, ctx);
spec.addAnnotation(Override.class);
if (!blocking) {
final ClassName returnType = methodProto.getServerStreaming() ? Publisher : Single;
c.addStatement("return $T.failed(new $T(new $T($T.UNIMPLEMENTED, $L)))", returnType,
GrpcStatusException, GrpcStatus, GrpcStatusCode, errorMessage);
spec.addStatement("return $T.failed(new $T(new $T($T.UNIMPLEMENTED, $L)))",
returnType, GrpcStatusException, GrpcStatus, GrpcStatusCode, errorMessage);
} else {
c.addStatement("throw new $T(new $T($T.UNIMPLEMENTED, $L))",
spec.addStatement("throw new $T(new $T($T.UNIMPLEMENTED, $L))",
GrpcStatusException, GrpcStatus, GrpcStatusCode, errorMessage);
}
if (printJavaDocs) {
extractJavaDocComments(state, methodIndex, c);
c.addJavadoc(JAVADOC_PARAM + ctx +
extractJavaDocComments(state, methodIndex, spec);
spec.addJavadoc(JAVADOC_PARAM + ctx +
" context associated with this service and request." + lineSeparator());
}
return c;
}));
return spec;
});

final boolean isDeprecated = methodSpec.annotations.contains(
AnnotationSpec.builder(Deprecated.class).build());

if (!isDeprecated || !skipDeprecated) {
interfaceSpecBuilder.addMethod(methodSpec);
}
}
}

Expand Down

0 comments on commit c2d1688

Please sign in to comment.