Skip to content

Commit

Permalink
Address feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
mgodave committed Dec 20, 2024
1 parent 7f700d4 commit 6c55492
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@
import static io.servicetalk.grpc.api.GrpcUtils.setStatus;
import static io.servicetalk.grpc.api.GrpcUtils.setStatusOk;
import static io.servicetalk.grpc.api.GrpcUtils.validateContentType;
import static io.servicetalk.grpc.internal.GrpcContextKeys.TRAILERS_ONLY_RESPONSE;
import static io.servicetalk.http.api.HttpApiConversions.toStreamingHttpService;
import static io.servicetalk.http.api.HttpContextKeys.HTTP_OPTIMIZE_ERROR_STREAM;
import static io.servicetalk.http.api.HttpExecutionStrategies.customStrategyBuilder;
import static io.servicetalk.http.api.HttpExecutionStrategies.defaultStrategy;
import static io.servicetalk.http.api.HttpExecutionStrategies.offloadAll;
Expand Down Expand Up @@ -709,7 +709,7 @@ public void handle(final HttpServiceContext ctx, final BlockingStreamingHttpRequ
methodDescriptor.httpPath(), t);
HttpHeaders trailers;
if (grpcResponse == null || (trailers = grpcResponse.trailers()) == null) {
response.context().put(HTTP_OPTIMIZE_ERROR_STREAM, Boolean.TRUE);
response.context().put(TRAILERS_ONLY_RESPONSE, Boolean.TRUE);
setStatus(response.headers(), t, allocator);
// Use HTTP response to avoid setting "OK" in trailers and allocating a serializer
response.sendMetaData().close();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright © 2019, 2021 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.servicetalk.grpc.internal;

import io.servicetalk.context.api.ContextMap;

import static io.servicetalk.context.api.ContextMap.Key.newKey;

/**
* All {@link ContextMap.Key}(s) defined for gRPC.
*/
public final class GrpcContextKeys {
/**
* For the blocking server this key allows the router to notify an upstream filter that it is safe to consolidate
* tailing empty data frames when set to true.
*
*/
public static final ContextMap.Key<Boolean> TRAILERS_ONLY_RESPONSE =
newKey("TRAILERS_ONLY_RESPONSE", Boolean.class);

private GrpcContextKeys() {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,6 @@ private Single<GrpcServerContext> doListen(final GrpcServiceFactory<?> serviceFa
private ExecutionContextInterceptorHttpServerBuilder preBuild() {
final ExecutionContextInterceptorHttpServerBuilder interceptor =
new ExecutionContextInterceptorHttpServerBuilder(httpServerBuilderSupplier.get());

interceptor.appendServiceFilter(GrpcTrailersOptimizationServiceFilter.INSTANCE);

interceptor.appendNonOffloadingServiceFilter(GrpcExceptionMapperServiceFilter.INSTANCE);

directCallInitializer.initialize(interceptor);
Expand All @@ -156,6 +153,8 @@ private ExecutionContextInterceptorHttpServerBuilder preBuild() {
}
initializer.initialize(interceptor);

interceptor.appendServiceFilter(GrpcEnforceTrailersOnlyResponseServiceFilter.INSTANCE);

return interceptor;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,13 @@
import io.servicetalk.http.api.StreamingHttpServiceFilter;
import io.servicetalk.http.api.StreamingHttpServiceFilterFactory;

import static io.servicetalk.http.api.HttpContextKeys.HTTP_OPTIMIZE_ERROR_STREAM;
import static io.servicetalk.grpc.internal.GrpcContextKeys.TRAILERS_ONLY_RESPONSE;

final class GrpcTrailersOptimizationServiceFilter implements StreamingHttpServiceFilterFactory {
static final GrpcTrailersOptimizationServiceFilter INSTANCE = new GrpcTrailersOptimizationServiceFilter();
final class GrpcEnforceTrailersOnlyResponseServiceFilter implements StreamingHttpServiceFilterFactory {
static final GrpcEnforceTrailersOnlyResponseServiceFilter INSTANCE =
new GrpcEnforceTrailersOnlyResponseServiceFilter();

private GrpcTrailersOptimizationServiceFilter() {
private GrpcEnforceTrailersOnlyResponseServiceFilter() {
}

@Override
Expand All @@ -42,9 +43,9 @@ public StreamingHttpServiceFilter create(StreamingHttpService service) {
public Single<StreamingHttpResponse> handle(final HttpServiceContext ctx,
final StreamingHttpRequest request,
final StreamingHttpResponseFactory responseFactory) {
return super.handle(ctx, request, responseFactory).flatMap(response -> {
return delegate().handle(ctx, request, responseFactory).flatMap(response -> {
Single<StreamingHttpResponse> mappedResponse;
if (Boolean.TRUE.equals(response.context().get(HTTP_OPTIMIZE_ERROR_STREAM))) {
if (Boolean.TRUE.equals(response.context().get(TRAILERS_ONLY_RESPONSE))) {
mappedResponse = response.toResponse().map(HttpResponse::toStreamingResponse);
} else {
mappedResponse = Single.succeeded(response);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,9 @@ public StreamingHttpResponse toStreamingResponse() {
@Nullable
final Publisher<Object> payload;
if (trailers != null) {
// We can not drop empty Trailers here bcz users could do type conversion intermediately, while still
// referencing the original HttpHeaders object from an aggregated type and keep using it to add trailers
// before sending the message or converting it back to an aggregated one.
payload = emptyPayloadBody ? from(trailers) : from(payloadBody, trailers);
} else {
payload = emptyPayloadBody ? null : from(payloadBody);
Expand All @@ -127,7 +130,7 @@ public BlockingStreamingHttpResponse toBlockingStreamingResponse() {
public HttpHeaders trailers() {
if (trailers == null) {
trailers = original.payloadHolder().headersFactory().newTrailers();
original.transform(this);
original.transform(this); // Invoke "transform" to set PayloadInfo.mayHaveTrailers() flag
}
return trailers;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,6 @@ public final class HttpContextKeys {
public static final Key<Boolean> HTTP_FORCE_NEW_CONNECTION =
newKey("HTTP_FORCE_NEW_CONNECTION", Boolean.class);

public static final Key<Boolean> HTTP_OPTIMIZE_ERROR_STREAM =
newKey("HTTP_OPTIMIZE_ERROR_STREAM", Boolean.class);

private HttpContextKeys() {
// No instances
}
Expand Down

0 comments on commit 6c55492

Please sign in to comment.