Skip to content

Commit

Permalink
Clean up deprecated requester/client/filter API
Browse files Browse the repository at this point in the history
Motivation:

apple#1956 removed requester/client methods that take
`HttpExecutionStrategy`. All deprecated API can be removed now.

Modifications:

- Remove deprecated `request` method from a requester;
- Remove deprecated `reserveConnection` method from a client;
- Remove `NewToDeprecated` utility;
- Migrate all existing filters and tests to the new filter API;
- Pass an `HttpExecutionStrategy` through request context;

Result:

No deprecated API on requester/client/filter API.
  • Loading branch information
idelpivnitskiy committed Nov 18, 2021
1 parent b471187 commit 8effca0
Show file tree
Hide file tree
Showing 96 changed files with 464 additions and 1,505 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

import io.servicetalk.concurrent.api.Single;
import io.servicetalk.http.api.FilterableStreamingHttpClient;
import io.servicetalk.http.api.HttpExecutionStrategy;
import io.servicetalk.http.api.StreamingHttpClientFilter;
import io.servicetalk.http.api.StreamingHttpClientFilterFactory;
import io.servicetalk.http.api.StreamingHttpRequest;
Expand All @@ -44,9 +43,8 @@ public StreamingHttpClientFilter create(final FilterableStreamingHttpClient clie
return new StreamingHttpClientFilter(client) {
@Override
protected Single<StreamingHttpResponse> request(final StreamingHttpRequester delegate,
final HttpExecutionStrategy strategy,
final StreamingHttpRequest request) {
return delegate.request(strategy, request).flatMap(response -> {
return delegate.request(request).flatMap(response -> {
if (!OK.equals(response.status())) {
return failed(new BadResponseStatusException("Bad response status from " + backendName + ": " +
response.status()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.servicetalk.http.api.BlockingStreamingHttpResponse;
import io.servicetalk.http.api.HttpClient;
import io.servicetalk.http.api.HttpRequest;
import io.servicetalk.http.api.HttpRequestMetaData;
import io.servicetalk.http.api.HttpResponse;
import io.servicetalk.http.api.StreamingHttpClient;
import io.servicetalk.http.api.StreamingHttpRequest;
Expand All @@ -51,6 +52,7 @@
import static io.servicetalk.grpc.api.GrpcUtils.toGrpcException;
import static io.servicetalk.grpc.api.GrpcUtils.validateResponseAndGetPayload;
import static io.servicetalk.grpc.internal.DeadlineUtils.GRPC_DEADLINE_CONTEXT_KEY;
import static io.servicetalk.http.api.HttpContextKeys.HTTP_EXECUTION_STRATEGY_KEY;
import static java.util.Objects.requireNonNull;

final class DefaultGrpcClientCallFactory implements GrpcClientCallFactory {
Expand Down Expand Up @@ -102,9 +104,8 @@ public <Req, Resp> ClientCall<Req, Resp> newCall(final MethodDescriptor<Req, Res
HttpRequest httpRequest = client.post(UNKNOWN_PATH.equals(mdPath) ? metadata.path() : mdPath);
initRequest(httpRequest, requestContentType, serializer.messageEncoding(), acceptedEncoding, timeout);
httpRequest.payloadBody(serializer.serialize(request, client.executionContext().bufferAllocator()));
@Nullable
final GrpcExecutionStrategy strategy = metadata.strategy();
return (strategy == null ? client.request(httpRequest) : client.request(strategy, httpRequest))
assignStrategy(httpRequest, metadata);
return client.request(httpRequest)
.map(response -> validateResponseAndGetPayload(response, responseContentType,
client.executionContext().bufferAllocator(), readGrpcMessageEncodingRaw(response.headers(),
deserializerIdentity, deserializers, GrpcDeserializer::messageEncoding)))
Expand Down Expand Up @@ -147,10 +148,8 @@ public <Req, Resp> StreamingClientCall<Req, Resp> newStreamingCall(
initRequest(httpRequest, requestContentType, serializer.messageEncoding(), acceptedEncoding, timeout);
httpRequest.payloadBody(serializer.serialize(request,
streamingHttpClient.executionContext().bufferAllocator()));
@Nullable
final GrpcExecutionStrategy strategy = metadata.strategy();
return (strategy == null ? streamingHttpClient.request(httpRequest) :
streamingHttpClient.request(strategy, httpRequest))
assignStrategy(httpRequest, metadata);
return streamingHttpClient.request(httpRequest)
.flatMapPublisher(response -> validateResponseAndGetPayload(response, responseContentType,
streamingHttpClient.executionContext().bufferAllocator(), readGrpcMessageEncodingRaw(
response.headers(), deserializerIdentity, deserializers,
Expand Down Expand Up @@ -235,10 +234,8 @@ public <Req, Resp> BlockingClientCall<Req, Resp> newBlockingCall(
initRequest(httpRequest, requestContentType, serializer.messageEncoding(), acceptedEncoding, timeout);
httpRequest.payloadBody(serializer.serialize(request, client.executionContext().bufferAllocator()));
try {
@Nullable
final GrpcExecutionStrategy strategy = metadata.strategy();
final HttpResponse response = strategy == null ? client.request(httpRequest) :
client.request(strategy, httpRequest);
assignStrategy(httpRequest, metadata);
final HttpResponse response = client.request(httpRequest);
return validateResponseAndGetPayload(response, responseContentType,
client.executionContext().bufferAllocator(), readGrpcMessageEncodingRaw(response.headers(),
deserializerIdentity, deserializers, GrpcDeserializer::messageEncoding));
Expand Down Expand Up @@ -285,10 +282,8 @@ public <Req, Resp> BlockingStreamingClientCall<Req, Resp> newBlockingStreamingCa
httpRequest.payloadBody(serializer.serialize(request,
streamingHttpClient.executionContext().bufferAllocator()));
try {
@Nullable
final GrpcExecutionStrategy strategy = metadata.strategy();
final BlockingStreamingHttpResponse response = strategy == null ? client.request(httpRequest) :
client.request(strategy, httpRequest);
assignStrategy(httpRequest, metadata);
final BlockingStreamingHttpResponse response = client.request(httpRequest);
return validateResponseAndGetPayload(response.toStreamingResponse(), responseContentType,
client.executionContext().bufferAllocator(), readGrpcMessageEncodingRaw(
response.headers(), deserializerIdentity, deserializers,
Expand Down Expand Up @@ -379,7 +374,8 @@ public Completable onClose() {
* @param metaDataTimeout the timeout specified in client metadata or null for no timeout
* @return The timeout {@link Duration}, potentially negative or null if no timeout.
*/
private @Nullable Duration timeoutForRequest(@Nullable Duration metaDataTimeout) {
@Nullable
private Duration timeoutForRequest(@Nullable Duration metaDataTimeout) {
Long deadline = AsyncContext.get(GRPC_DEADLINE_CONTEXT_KEY);
@Nullable
Duration contextTimeout = null != deadline ? Duration.ofNanos(deadline - System.nanoTime()) : null;
Expand Down Expand Up @@ -447,4 +443,12 @@ private static <Req> GrpcStreamingSerializer<Req> streamingSerializer(
private static <Resp> GrpcDeserializer<Resp> deserializer(MethodDescriptor<?, Resp> methodDescriptor) {
return new GrpcDeserializer<>(methodDescriptor.responseDescriptor().serializerDescriptor().serializer());
}

private static void assignStrategy(HttpRequestMetaData requestMetaData, GrpcClientMetadata grpcMetadata) {
@Nullable
final GrpcExecutionStrategy strategy = grpcMetadata.strategy();
if (strategy != null) {
requestMetaData.context().put(HTTP_EXECUTION_STRATEGY_KEY, strategy);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -117,34 +117,30 @@ public StreamingHttpClientFilter create(final FilterableStreamingHttpClient clie

@Override
protected Single<StreamingHttpResponse> request(final StreamingHttpRequester delegate,
final HttpExecutionStrategy strategy,
final StreamingHttpRequest request) {
return CatchAllHttpClientFilter.request(delegate, strategy, request);
return CatchAllHttpClientFilter.request(delegate, request);
}

@Override
public Single<? extends FilterableReservedStreamingHttpConnection> reserveConnection(
final HttpExecutionStrategy strategy, final HttpRequestMetaData metaData) {
final HttpRequestMetaData metaData) {

return delegate().reserveConnection(strategy, metaData)
return delegate().reserveConnection(metaData)
.map(r -> new ReservedStreamingHttpConnectionFilter(r) {
@Override
protected Single<StreamingHttpResponse> request(final StreamingHttpRequester delegate,
final HttpExecutionStrategy strategy,
final StreamingHttpRequest request) {
return CatchAllHttpClientFilter.request(delegate, strategy, request);
public Single<StreamingHttpResponse> request(final StreamingHttpRequest request) {
return CatchAllHttpClientFilter.request(delegate(), request);
}
});
}
};
}

private static Single<StreamingHttpResponse> request(final StreamingHttpRequester delegate,
final HttpExecutionStrategy strategy,
final StreamingHttpRequest request) {
final Single<StreamingHttpResponse> resp;
try {
resp = delegate.request(strategy, request);
resp = delegate.request(request);
} catch (Throwable t) {
return failed(toGrpcException(t));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import io.servicetalk.grpc.netty.TesterProto.Tester.TesterClient;
import io.servicetalk.grpc.netty.TesterProto.Tester.TesterService;
import io.servicetalk.http.api.FilterableStreamingHttpClient;
import io.servicetalk.http.api.HttpExecutionStrategy;
import io.servicetalk.http.api.HttpServiceContext;
import io.servicetalk.http.api.StreamingHttpClientFilter;
import io.servicetalk.http.api.StreamingHttpClientFilterFactory;
Expand Down Expand Up @@ -686,7 +685,6 @@ public StreamingHttpClientFilter create(final FilterableStreamingHttpClient clie
return new StreamingHttpClientFilter(client) {
@Override
protected Single<StreamingHttpResponse> request(final StreamingHttpRequester delegate,
final HttpExecutionStrategy strategy,
final StreamingHttpRequest request) {
if (throwEx) {
return throwException(cause);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import io.servicetalk.grpc.netty.TesterProto.Tester.ServiceFactory;
import io.servicetalk.grpc.netty.TesterProto.Tester.TesterClient;
import io.servicetalk.grpc.netty.TesterProto.Tester.TesterService;
import io.servicetalk.http.api.HttpExecutionStrategy;
import io.servicetalk.http.api.StreamingHttpClientFilter;
import io.servicetalk.http.api.StreamingHttpRequest;
import io.servicetalk.http.api.StreamingHttpRequester;
Expand Down Expand Up @@ -90,13 +89,12 @@ private void setUp(boolean streamingService, boolean streamingClient) throws Exc
.initializeHttp(builder -> builder.appendClientFilter(origin -> new StreamingHttpClientFilter(origin) {
@Override
protected Single<StreamingHttpResponse> request(StreamingHttpRequester delegate,
HttpExecutionStrategy strategy,
StreamingHttpRequest request) {
// Change path to send the request to the route API that expects only a single request item
// and generates requested number of response items:
return defer(() -> {
request.requestTarget(BlockingTestResponseStreamRpc.PATH);
return delegate.request(strategy, request).subscribeShareContext();
return delegate.request(request).subscribeShareContext();
});
}
}));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import io.servicetalk.grpc.netty.TesterProto.Tester.TesterClient;
import io.servicetalk.grpc.netty.TesterProto.Tester.TesterService;
import io.servicetalk.http.api.FilterableStreamingHttpClient;
import io.servicetalk.http.api.HttpExecutionStrategy;
import io.servicetalk.http.api.HttpResponseMetaData;
import io.servicetalk.http.api.HttpServiceContext;
import io.servicetalk.http.api.StreamingHttpClientFilter;
Expand Down Expand Up @@ -223,9 +222,8 @@ public StreamingHttpClientFilter create(final FilterableStreamingHttpClient clie
return new StreamingHttpClientFilter(client) {
@Override
protected Single<StreamingHttpResponse> request(final StreamingHttpRequester delegate,
final HttpExecutionStrategy strategy,
final StreamingHttpRequest request) {
return super.request(delegate, strategy, request)
return super.request(delegate, request)
.map(response -> {
assertGrpcStatusInHeaders(response, errors);
return response;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,10 @@
*/
package io.servicetalk.http.api;

import static io.servicetalk.http.api.HttpContextKeys.HTTP_EXECUTION_STRATEGY_KEY;

/**
* The equivalent of {@link HttpClient} but with synchronous/blocking APIs instead of asynchronous APIs.
*/
public interface BlockingHttpClient extends BlockingHttpRequester {
/**
* Send a {@code request}.
*
* @param request the request to send.
* @return The response.
* @throws Exception if an exception occurs during the request processing.
*/
@Override // FIXME: 0.42 - remove, this method is defined in BlockingHttpRequester
HttpResponse request(HttpRequest request) throws Exception;

/**
* Reserve a {@link BlockingHttpConnection} based on provided {@link HttpRequestMetaData}.
*
Expand All @@ -42,26 +30,6 @@ public interface BlockingHttpClient extends BlockingHttpRequester {
*/
ReservedBlockingHttpConnection reserveConnection(HttpRequestMetaData metaData) throws Exception;

/**
* Reserve a {@link BlockingHttpConnection} based on provided {@link HttpRequestMetaData}.
*
* @param strategy {@link HttpExecutionStrategy} to use.
* @param metaData Allows the underlying layers to know what {@link BlockingHttpConnection}s are valid to
* reserve for future {@link HttpRequest requests} with the same {@link HttpRequestMetaData}.
* For example this may provide some insight into shard or other info.
* @return a {@link ReservedBlockingHttpConnection}.
* @throws Exception if an exception occurs during the reservation process.
* @deprecated Use {@link #reserveConnection(HttpRequestMetaData)}. If an {@link HttpExecutionStrategy} needs to be
* altered, provide a value for {@link HttpContextKeys#HTTP_EXECUTION_STRATEGY_KEY} in the
* {@link HttpRequestMetaData#context() request context}.
*/
@Deprecated
default ReservedBlockingHttpConnection reserveConnection(HttpExecutionStrategy strategy,
HttpRequestMetaData metaData) throws Exception {
metaData.context().put(HTTP_EXECUTION_STRATEGY_KEY, strategy);
return reserveConnection(metaData);
}

/**
* Convert this {@link BlockingHttpClient} to the {@link StreamingHttpClient} API.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,6 @@
* The equivalent of {@link HttpConnection} but with synchronous/blocking APIs instead of asynchronous APIs.
*/
public interface BlockingHttpConnection extends BlockingHttpRequester {
/**
* Send a {@code request}.
*
* @param request the request to send.
* @return The response.
* @throws Exception if an exception occurs during the request processing.
*/
@Override // FIXME: 0.42 - remove, this method is defined in BlockingHttpRequester
HttpResponse request(HttpRequest request) throws Exception;

/**
* Get the {@link HttpConnectionContext}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

import io.servicetalk.concurrent.GracefulAutoCloseable;

import static io.servicetalk.http.api.HttpContextKeys.HTTP_EXECUTION_STRATEGY_KEY;

/**
* The equivalent of {@link HttpRequester} with synchronous/blocking APIs instead of asynchronous APIs.
*/
Expand All @@ -29,24 +27,7 @@ public interface BlockingHttpRequester extends HttpRequestFactory, GracefulAutoC
* @param request the request to send.
* @return The response.
*/
default HttpResponse request(HttpRequest request) throws Exception {
// FIXME: 0.42 - remove default impl
throw new UnsupportedOperationException("Method request(HttpRequest) is not supported by " +
getClass().getName());
}

/**
* Send a {@code request} using the passed {@link HttpExecutionStrategy strategy}.
*
* @param strategy {@link HttpExecutionStrategy} to use.
* @param request the request to send.
* @return The response.
* @throws Exception if an exception occurs during the request processing.
*/
default HttpResponse request(HttpExecutionStrategy strategy, HttpRequest request) throws Exception {
request.context().put(HTTP_EXECUTION_STRATEGY_KEY, strategy);
return request(request);
}
HttpResponse request(HttpRequest request) throws Exception;

/**
* Get the {@link HttpExecutionContext} used during construction of this object.
Expand Down
Loading

0 comments on commit 8effca0

Please sign in to comment.