Skip to content

Commit

Permalink
fix #975 Add HttpClient.doAfterResponseSuccess that will be triggered…
Browse files Browse the repository at this point in the history
… only when the response is fully received.

Revise javadoc for HttpClient.doOn* methods
Deprecate HttpClient.doAfterResponse
  • Loading branch information
violetagg committed Jan 31, 2020
1 parent 978934f commit adbfb17
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 16 deletions.
53 changes: 39 additions & 14 deletions src/main/java/reactor/netty/http/client/HttpClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,7 @@ public final RequestSender delete() {
* {@link #doOnRequest(BiConsumer)} or {@link RequestSender#send(BiFunction)} might
* not be visible if the error results from a connection failure.
*
* @param doOnRequest a consumer observing connected events
* @param doOnRequest a consumer observing request failures
* @param doOnResponse a consumer observing response failures
*
* @return a new {@link HttpClient}
Expand All @@ -508,15 +508,16 @@ public final HttpClient doOnError(BiConsumer<? super HttpClientRequest, ? super


/**
* Setup a callback called when {@link HttpClientRequest} is about to be sent.
* Setup a callback called when {@link HttpClientRequest} is about to be sent
* and {@link HttpClientState#CONFIGURED} has been emitted.
*
* @param doOnRequest a consumer observing connected events
* @param doOnRequest a callback called when {@link HttpClientRequest} is about to be sent
*
* @return a new {@link HttpClient}
*/
public final HttpClient doOnRequest(BiConsumer<? super HttpClientRequest, ? super Connection> doOnRequest) {
Objects.requireNonNull(doOnRequest, "doOnRequest");
return new HttpClientDoOn(this, doOnRequest, null, null, null);
return new HttpClientDoOn(this, doOnRequest, null, null, null, null);
}

/**
Expand All @@ -525,7 +526,7 @@ public final HttpClient doOnRequest(BiConsumer<? super HttpClientRequest, ? supe
* {@link #doOnRequest(BiConsumer)} or {@link RequestSender#send(BiFunction)} might
* not be visible if the error results from a connection failure.
*
* @param doOnRequest a consumer observing connected events
* @param doOnRequest a consumer observing request failures
*
* @return a new {@link HttpClient}
*/
Expand All @@ -536,32 +537,33 @@ public final HttpClient doOnRequestError(BiConsumer<? super HttpClientRequest, ?

/**
* Setup a callback called when {@link HttpClientRequest} has been sent
* and {@link HttpClientState#REQUEST_SENT} has been emitted.
*
* @param doAfterRequest a consumer observing connected events
* @param doAfterRequest a callback called when {@link HttpClientRequest} has been sent
*
* @return a new {@link HttpClient}
*/
public final HttpClient doAfterRequest(BiConsumer<? super HttpClientRequest, ? super Connection> doAfterRequest) {
Objects.requireNonNull(doAfterRequest, "doAfterRequest");
return new HttpClientDoOn(this, null, doAfterRequest, null, null);
return new HttpClientDoOn(this, null, doAfterRequest, null, null, null);
}

/**
* Setup a callback called after {@link HttpClientResponse} headers have been
* received
* received and {@link HttpClientState#RESPONSE_RECEIVED} has been emitted.
*
* @param doOnResponse a consumer observing connected events
* @param doOnResponse a callback called after {@link HttpClientResponse} headers have been received
*
* @return a new {@link HttpClient}
*/
public final HttpClient doOnResponse(BiConsumer<? super HttpClientResponse, ? super Connection> doOnResponse) {
Objects.requireNonNull(doOnResponse, "doOnResponse");
return new HttpClientDoOn(this, null, null, doOnResponse, null);
return new HttpClientDoOn(this, null, null, doOnResponse, null, null);
}

/**
* Setup a callback called when {@link HttpClientResponse} has not been fully
* received.
* received, {@link HttpClientState#RESPONSE_INCOMPLETE} has been emitted.
*
* @param doOnResponse a consumer observing response failures
*
Expand All @@ -573,15 +575,38 @@ public final HttpClient doOnResponseError(BiConsumer<? super HttpClientResponse,
}

/**
* Setup a callback called after {@link HttpClientResponse} has been fully received.
* Setup a callback called after
* {@link HttpClientState#RESPONSE_RECEIVED} has been emitted and the connection is
* returned to the pool or closed. The callback is invoked in both cases when the
* response is successful/there are errors.
*
* @param doAfterResponse a consumer observing disconnected events
* @param doAfterResponse a callback called after
* {@link HttpClientState#RESPONSE_RECEIVED} has been emitted and the connection is
* returned to the pool or closed. The callback is invoked in both cases when the
* response is successful/there are errors.
*
* @return a new {@link HttpClient}
* @deprecated as of 0.9.5. Consider using {@link #doAfterResponseSuccess(BiConsumer)} or
* {@link #doOnResponseError(BiConsumer)}
*/
@Deprecated
public final HttpClient doAfterResponse(BiConsumer<? super HttpClientResponse, ? super Connection> doAfterResponse) {
Objects.requireNonNull(doAfterResponse, "doAfterResponse");
return new HttpClientDoOn(this, null, null, null, doAfterResponse);
return new HttpClientDoOn(this, null, null, null, doAfterResponse, null);
}

/**
* Setup a callback called after {@link HttpClientResponse} has been fully received
* and {@link HttpClientState#RESPONSE_COMPLETED} has been emitted.
*
* @param doAfterResponseSuccess a callback called after {@link HttpClientResponse} has been fully received
* and {@link HttpClientState#RESPONSE_COMPLETED} has been emitted.
*
* @return a new {@link HttpClient}
*/
public final HttpClient doAfterResponseSuccess(BiConsumer<? super HttpClientResponse, ? super Connection> doAfterResponseSuccess) {
Objects.requireNonNull(doAfterResponseSuccess, "doAfterResponseSuccess");
return new HttpClientDoOn(this, null, null, null, null, doAfterResponseSuccess);
}

/**
Expand Down
10 changes: 9 additions & 1 deletion src/main/java/reactor/netty/http/client/HttpClientDoOn.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,21 @@ final class HttpClientDoOn extends HttpClientOperator implements ConnectionObser
final BiConsumer<? super HttpClientRequest, ? super Connection> afterRequest;
final BiConsumer<? super HttpClientResponse, ? super Connection> onResponse;
final BiConsumer<? super HttpClientResponse, ? super Connection> afterResponse;
final BiConsumer<? super HttpClientResponse, ? super Connection> afterResponseSuccess;


HttpClientDoOn(HttpClient client,
@Nullable BiConsumer<? super HttpClientRequest, ? super Connection> onRequest,
@Nullable BiConsumer<? super HttpClientRequest, ? super Connection> afterRequest,
@Nullable BiConsumer<? super HttpClientResponse, ? super Connection> onResponse,
@Nullable BiConsumer<? super HttpClientResponse, ? super Connection> afterResponse) {
@Nullable BiConsumer<? super HttpClientResponse, ? super Connection> afterResponse,
@Nullable BiConsumer<? super HttpClientResponse, ? super Connection> afterResponseSuccess) {
super(client);
this.onRequest = onRequest;
this.afterRequest = afterRequest;
this.onResponse = onResponse;
this.afterResponse = afterResponse;
this.afterResponseSuccess = afterResponseSuccess;
}

@Override
Expand All @@ -70,6 +74,10 @@ public void onStateChange(Connection connection, State newState) {
}
return;
}
if (afterResponseSuccess != null && newState == HttpClientState.RESPONSE_COMPLETED) {
afterResponseSuccess.accept(connection.as(HttpClientOperations.class), connection);
return;
}
if (afterRequest != null && newState == HttpClientState.REQUEST_SENT) {
afterRequest.accept(connection.as(HttpClientOperations.class), connection);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -569,6 +569,7 @@ protected void onInboundNext(ChannelHandlerContext ctx, Object msg) {
if (markSentBody()) {
markPersistent(false);
}
listener().onStateChange(this, HttpClientState.RESPONSE_COMPLETED);
terminate();
return;
}
Expand Down
11 changes: 10 additions & 1 deletion src/main/java/reactor/netty/http/client/HttpClientState.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public String toString() {
},
/**
* The request has been sent but the response has not been fully received and the
* connection has prematurely closed
* connection has been prematurely closed
*/
RESPONSE_INCOMPLETE() {
@Override
Expand All @@ -51,5 +51,14 @@ public String toString() {
public String toString() {
return "[response_received]";
}
},
/**
* The response fully received
*/
RESPONSE_COMPLETED() {
@Override
public String toString() {
return "[response_completed]";
}
}
}
41 changes: 41 additions & 0 deletions src/test/java/reactor/netty/http/client/HttpClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
Expand Down Expand Up @@ -81,6 +82,7 @@
import reactor.core.publisher.Mono;
import reactor.netty.ByteBufFlux;
import reactor.netty.ByteBufMono;
import reactor.netty.Connection;
import reactor.netty.DisposableServer;
import reactor.netty.FutureMono;
import reactor.netty.SocketUtils;
Expand Down Expand Up @@ -641,6 +643,7 @@ public void chunkedSendFile() throws URISyntaxException {
}

@Test
@SuppressWarnings("deprecation")
public void test() {
disposableServer =
HttpServer.create()
Expand Down Expand Up @@ -992,6 +995,7 @@ public void testClientContext() throws Exception {
doTestClientContext(HttpClient.create(ConnectionProvider.newConnection()));
}

@SuppressWarnings("deprecation")
private void doTestClientContext(HttpClient client) throws Exception {
CountDownLatch latch = new CountDownLatch(4);

Expand Down Expand Up @@ -1937,4 +1941,41 @@ public void testResourceUrlSetInResponse() {
.expectComplete()
.verify(Duration.ofSeconds(30));
}

@Test
@SuppressWarnings("deprecation")
public void testIssue975() throws Exception {
disposableServer =
HttpServer.create()
.port(0)
.route(routes ->
routes.get("/dispose",
(req, res) -> res.sendString(
Flux.range(0, 10_000)
.map(i -> {
if (i == 1_000) {
res.withConnection(Connection::disposeNow);
}
return "a";
}))))
.bindNow();

AtomicBoolean doAfterResponse = new AtomicBoolean();
AtomicBoolean doAfterResponseSuccess = new AtomicBoolean();
AtomicBoolean doOnResponseError = new AtomicBoolean();
CountDownLatch latch = new CountDownLatch(1);
HttpClient.create()
.doAfterResponse((resp, conn) -> doAfterResponse.set(true))
.doAfterResponseSuccess((resp, conn) -> doAfterResponseSuccess.set(true))
.doOnResponseError((resp, exc) -> doOnResponseError.set(true))
.get()
.uri("http://localhost:" + disposableServer.port() + "/dispose")
.responseSingle((resp, bytes) -> bytes.asString())
.subscribe(null, t -> latch.countDown());

assertThat(latch.await(30, TimeUnit.SECONDS)).isTrue();
assertThat(doAfterResponse.get()).isTrue();
assertThat(doAfterResponseSuccess.get()).isFalse();
assertThat(doOnResponseError.get()).isTrue();
}
}

0 comments on commit adbfb17

Please sign in to comment.