From 6c554922cc1a450c91855cb3b17e6e6d1371926b Mon Sep 17 00:00:00 2001 From: Dave Date: Fri, 20 Dec 2024 14:24:06 -0700 Subject: [PATCH] Address feedback --- .../io/servicetalk/grpc/api/GrpcRouter.java | 4 +-- .../grpc/internal/GrpcContextKeys.java | 36 +++++++++++++++++++ .../grpc/netty/DefaultGrpcServerBuilder.java | 5 ++- ...rceTrailersOnlyResponseServiceFilter.java} | 13 +++---- .../http/api/DefaultHttpResponse.java | 5 ++- .../servicetalk/http/api/HttpContextKeys.java | 3 -- 6 files changed, 51 insertions(+), 15 deletions(-) create mode 100644 servicetalk-grpc-internal/src/main/java/io/servicetalk/grpc/internal/GrpcContextKeys.java rename servicetalk-grpc-netty/src/main/java/io/servicetalk/grpc/netty/{GrpcTrailersOptimizationServiceFilter.java => GrpcEnforceTrailersOnlyResponseServiceFilter.java} (82%) diff --git a/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/GrpcRouter.java b/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/GrpcRouter.java index 816423edf5..06c580fb88 100644 --- a/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/GrpcRouter.java +++ b/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/GrpcRouter.java @@ -90,8 +90,8 @@ import static io.servicetalk.grpc.api.GrpcUtils.setStatus; import static io.servicetalk.grpc.api.GrpcUtils.setStatusOk; import static io.servicetalk.grpc.api.GrpcUtils.validateContentType; +import static io.servicetalk.grpc.internal.GrpcContextKeys.TRAILERS_ONLY_RESPONSE; import static io.servicetalk.http.api.HttpApiConversions.toStreamingHttpService; -import static io.servicetalk.http.api.HttpContextKeys.HTTP_OPTIMIZE_ERROR_STREAM; import static io.servicetalk.http.api.HttpExecutionStrategies.customStrategyBuilder; import static io.servicetalk.http.api.HttpExecutionStrategies.defaultStrategy; import static io.servicetalk.http.api.HttpExecutionStrategies.offloadAll; @@ -709,7 +709,7 @@ public void handle(final HttpServiceContext ctx, final BlockingStreamingHttpRequ methodDescriptor.httpPath(), t); HttpHeaders trailers; if (grpcResponse == null || (trailers = grpcResponse.trailers()) == null) { - response.context().put(HTTP_OPTIMIZE_ERROR_STREAM, Boolean.TRUE); + response.context().put(TRAILERS_ONLY_RESPONSE, Boolean.TRUE); setStatus(response.headers(), t, allocator); // Use HTTP response to avoid setting "OK" in trailers and allocating a serializer response.sendMetaData().close(); diff --git a/servicetalk-grpc-internal/src/main/java/io/servicetalk/grpc/internal/GrpcContextKeys.java b/servicetalk-grpc-internal/src/main/java/io/servicetalk/grpc/internal/GrpcContextKeys.java new file mode 100644 index 0000000000..aa0fe07628 --- /dev/null +++ b/servicetalk-grpc-internal/src/main/java/io/servicetalk/grpc/internal/GrpcContextKeys.java @@ -0,0 +1,36 @@ +/* + * Copyright © 2019, 2021 Apple Inc. and the ServiceTalk project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.servicetalk.grpc.internal; + +import io.servicetalk.context.api.ContextMap; + +import static io.servicetalk.context.api.ContextMap.Key.newKey; + +/** + * All {@link ContextMap.Key}(s) defined for gRPC. + */ +public final class GrpcContextKeys { + /** + * For the blocking server this key allows the router to notify an upstream filter that it is safe to consolidate + * tailing empty data frames when set to true. + * + */ + public static final ContextMap.Key TRAILERS_ONLY_RESPONSE = + newKey("TRAILERS_ONLY_RESPONSE", Boolean.class); + + private GrpcContextKeys() { + } +} diff --git a/servicetalk-grpc-netty/src/main/java/io/servicetalk/grpc/netty/DefaultGrpcServerBuilder.java b/servicetalk-grpc-netty/src/main/java/io/servicetalk/grpc/netty/DefaultGrpcServerBuilder.java index d7761616ce..502a87d9be 100644 --- a/servicetalk-grpc-netty/src/main/java/io/servicetalk/grpc/netty/DefaultGrpcServerBuilder.java +++ b/servicetalk-grpc-netty/src/main/java/io/servicetalk/grpc/netty/DefaultGrpcServerBuilder.java @@ -145,9 +145,6 @@ private Single doListen(final GrpcServiceFactory serviceFa private ExecutionContextInterceptorHttpServerBuilder preBuild() { final ExecutionContextInterceptorHttpServerBuilder interceptor = new ExecutionContextInterceptorHttpServerBuilder(httpServerBuilderSupplier.get()); - - interceptor.appendServiceFilter(GrpcTrailersOptimizationServiceFilter.INSTANCE); - interceptor.appendNonOffloadingServiceFilter(GrpcExceptionMapperServiceFilter.INSTANCE); directCallInitializer.initialize(interceptor); @@ -156,6 +153,8 @@ private ExecutionContextInterceptorHttpServerBuilder preBuild() { } initializer.initialize(interceptor); + interceptor.appendServiceFilter(GrpcEnforceTrailersOnlyResponseServiceFilter.INSTANCE); + return interceptor; } diff --git a/servicetalk-grpc-netty/src/main/java/io/servicetalk/grpc/netty/GrpcTrailersOptimizationServiceFilter.java b/servicetalk-grpc-netty/src/main/java/io/servicetalk/grpc/netty/GrpcEnforceTrailersOnlyResponseServiceFilter.java similarity index 82% rename from servicetalk-grpc-netty/src/main/java/io/servicetalk/grpc/netty/GrpcTrailersOptimizationServiceFilter.java rename to servicetalk-grpc-netty/src/main/java/io/servicetalk/grpc/netty/GrpcEnforceTrailersOnlyResponseServiceFilter.java index e1cfb4706c..35f951964f 100644 --- a/servicetalk-grpc-netty/src/main/java/io/servicetalk/grpc/netty/GrpcTrailersOptimizationServiceFilter.java +++ b/servicetalk-grpc-netty/src/main/java/io/servicetalk/grpc/netty/GrpcEnforceTrailersOnlyResponseServiceFilter.java @@ -27,12 +27,13 @@ import io.servicetalk.http.api.StreamingHttpServiceFilter; import io.servicetalk.http.api.StreamingHttpServiceFilterFactory; -import static io.servicetalk.http.api.HttpContextKeys.HTTP_OPTIMIZE_ERROR_STREAM; +import static io.servicetalk.grpc.internal.GrpcContextKeys.TRAILERS_ONLY_RESPONSE; -final class GrpcTrailersOptimizationServiceFilter implements StreamingHttpServiceFilterFactory { - static final GrpcTrailersOptimizationServiceFilter INSTANCE = new GrpcTrailersOptimizationServiceFilter(); +final class GrpcEnforceTrailersOnlyResponseServiceFilter implements StreamingHttpServiceFilterFactory { + static final GrpcEnforceTrailersOnlyResponseServiceFilter INSTANCE = + new GrpcEnforceTrailersOnlyResponseServiceFilter(); - private GrpcTrailersOptimizationServiceFilter() { + private GrpcEnforceTrailersOnlyResponseServiceFilter() { } @Override @@ -42,9 +43,9 @@ public StreamingHttpServiceFilter create(StreamingHttpService service) { public Single handle(final HttpServiceContext ctx, final StreamingHttpRequest request, final StreamingHttpResponseFactory responseFactory) { - return super.handle(ctx, request, responseFactory).flatMap(response -> { + return delegate().handle(ctx, request, responseFactory).flatMap(response -> { Single mappedResponse; - if (Boolean.TRUE.equals(response.context().get(HTTP_OPTIMIZE_ERROR_STREAM))) { + if (Boolean.TRUE.equals(response.context().get(TRAILERS_ONLY_RESPONSE))) { mappedResponse = response.toResponse().map(HttpResponse::toStreamingResponse); } else { mappedResponse = Single.succeeded(response); diff --git a/servicetalk-http-api/src/main/java/io/servicetalk/http/api/DefaultHttpResponse.java b/servicetalk-http-api/src/main/java/io/servicetalk/http/api/DefaultHttpResponse.java index bdd9718b7f..0edb3c6afc 100644 --- a/servicetalk-http-api/src/main/java/io/servicetalk/http/api/DefaultHttpResponse.java +++ b/servicetalk-http-api/src/main/java/io/servicetalk/http/api/DefaultHttpResponse.java @@ -107,6 +107,9 @@ public StreamingHttpResponse toStreamingResponse() { @Nullable final Publisher payload; if (trailers != null) { + // We can not drop empty Trailers here bcz users could do type conversion intermediately, while still + // referencing the original HttpHeaders object from an aggregated type and keep using it to add trailers + // before sending the message or converting it back to an aggregated one. payload = emptyPayloadBody ? from(trailers) : from(payloadBody, trailers); } else { payload = emptyPayloadBody ? null : from(payloadBody); @@ -127,7 +130,7 @@ public BlockingStreamingHttpResponse toBlockingStreamingResponse() { public HttpHeaders trailers() { if (trailers == null) { trailers = original.payloadHolder().headersFactory().newTrailers(); - original.transform(this); + original.transform(this); // Invoke "transform" to set PayloadInfo.mayHaveTrailers() flag } return trailers; } diff --git a/servicetalk-http-api/src/main/java/io/servicetalk/http/api/HttpContextKeys.java b/servicetalk-http-api/src/main/java/io/servicetalk/http/api/HttpContextKeys.java index 60833bd16d..a0a74cd89e 100644 --- a/servicetalk-http-api/src/main/java/io/servicetalk/http/api/HttpContextKeys.java +++ b/servicetalk-http-api/src/main/java/io/servicetalk/http/api/HttpContextKeys.java @@ -68,9 +68,6 @@ public final class HttpContextKeys { public static final Key HTTP_FORCE_NEW_CONNECTION = newKey("HTTP_FORCE_NEW_CONNECTION", Boolean.class); - public static final Key HTTP_OPTIMIZE_ERROR_STREAM = - newKey("HTTP_OPTIMIZE_ERROR_STREAM", Boolean.class); - private HttpContextKeys() { // No instances }