diff --git a/src/main/java/reactor/netty/http/server/HttpServer.java b/src/main/java/reactor/netty/http/server/HttpServer.java index 3fd2915ede..e7447bc038 100644 --- a/src/main/java/reactor/netty/http/server/HttpServer.java +++ b/src/main/java/reactor/netty/http/server/HttpServer.java @@ -39,6 +39,7 @@ import reactor.netty.NettyPipeline; import reactor.netty.channel.BootstrapHandlers; import reactor.netty.http.HttpProtocol; +import reactor.netty.http.websocket.WebSocketSpec; import reactor.netty.tcp.SslProvider; import reactor.netty.tcp.TcpServer; import reactor.util.Logger; @@ -173,6 +174,10 @@ public final void bindUntilJavaShutdown(Duration timeout, * Specifies whether GZip response compression/websocket compression * extension is enabled if the client request * presents accept encoding/websocket extensions headers. + *

+ * Note: Using this method for enabling websocket compression is strongly discouraged. + * As of 0.9.5, use {@link WebSocketSpec#builder()} for providing websocket configuration. + *

* * @param compressionEnabled if true GZip response compression/websocket compression * extension is enabled if the client request presents diff --git a/src/main/java/reactor/netty/http/server/WebsocketServerOperations.java b/src/main/java/reactor/netty/http/server/WebsocketServerOperations.java index 226baa98d2..8e083bfba8 100644 --- a/src/main/java/reactor/netty/http/server/WebsocketServerOperations.java +++ b/src/main/java/reactor/netty/http/server/WebsocketServerOperations.java @@ -96,8 +96,8 @@ final class WebsocketServerOperations extends HttpServerOperations request.headers() .set(replaced.nettyRequest.headers()); - if (channel().pipeline() - .get(NettyPipeline.CompressionHandler) != null) { + if (webSocketSpec.compress() || + channel().pipeline().get(NettyPipeline.CompressionHandler) != null) { removeHandler(NettyPipeline.CompressionHandler); WebSocketServerCompressionHandler wsServerCompressionHandler = diff --git a/src/main/java/reactor/netty/http/websocket/WebSocketSpec.java b/src/main/java/reactor/netty/http/websocket/WebSocketSpec.java index b6e08ccbb1..68ac34b424 100644 --- a/src/main/java/reactor/netty/http/websocket/WebSocketSpec.java +++ b/src/main/java/reactor/netty/http/websocket/WebSocketSpec.java @@ -22,17 +22,40 @@ * Wrapper for websocket configuration * * @author Dmitrii Borin + * @author Violeta Georgieva * @since 0.9.5 */ public interface WebSocketSpec { + /** + * Returns the configured sub protocols. + * + * @return returns the configured sub protocols. + */ @Nullable String protocols(); + /** + * Returns the configured maximum allowable frame payload length. + * + * @return returns the configured maximum allowable frame payload length. + */ int maxFramePayloadLength(); + /** + * Returns whether to proxy websocket PING frames or respond to them. + * + * @return returns whether to proxy websocket PING frames or respond to them. + */ boolean handlePing(); + /** + * Returns whether the websocket compression extension is enabled. + * + * @return returns whether the websocket compression extension is enabled. + */ + boolean compress(); + /** * Create builder with default properties:
* protocols = null @@ -50,7 +73,8 @@ static Builder builder() { final class Builder { String protocols; int maxFramePayloadLength = 65536; - boolean handlePing = false; + boolean handlePing; + boolean compress; private Builder() { } @@ -91,6 +115,20 @@ public final Builder handlePing(boolean handlePing) { return this; } + /** + * Sets flag whether the websocket compression extension is enabled + * if the client request presents websocket extensions headers. + * By default compression is disabled. + * + * @param compress whether the websocket compression extension is enabled + * if the client request presents websocket extensions headers. + * @return {@literal this} + */ + public final Builder compress(boolean compress) { + this.compress = compress; + return this; + } + /** * Builds new {@link WebSocketSpec} * diff --git a/src/main/java/reactor/netty/http/websocket/WebsocketSpecImpl.java b/src/main/java/reactor/netty/http/websocket/WebsocketSpecImpl.java index b77ee70362..f73b89a4c4 100644 --- a/src/main/java/reactor/netty/http/websocket/WebsocketSpecImpl.java +++ b/src/main/java/reactor/netty/http/websocket/WebsocketSpecImpl.java @@ -20,17 +20,20 @@ * Configurer implementation for {@link WebSocketSpec} * * @author Dmitrii Borin + * @author Violeta Georgieva * @since 0.9.5 */ final class WebsocketSpecImpl implements WebSocketSpec { private final String protocols; private final int maxFramePayloadLength; private final boolean proxyPing; + private final boolean compress; WebsocketSpecImpl(WebSocketSpec.Builder builder) { this.protocols = builder.protocols; this.maxFramePayloadLength = builder.maxFramePayloadLength; this.proxyPing = builder.handlePing; + this.compress = builder.compress; } @Override @@ -47,4 +50,9 @@ public final int maxFramePayloadLength() { public final boolean handlePing() { return proxyPing; } + + @Override + public boolean compress() { + return compress; + } } diff --git a/src/test/java/reactor/netty/http/client/WebsocketTest.java b/src/test/java/reactor/netty/http/client/WebsocketTest.java index dc0cfcf6c9..98519c32c5 100644 --- a/src/test/java/reactor/netty/http/client/WebsocketTest.java +++ b/src/test/java/reactor/netty/http/client/WebsocketTest.java @@ -1379,4 +1379,66 @@ public void testIssue967() { .expectNextCount(10) .verifyComplete(); } + + @Test + public void testIssue970() { + doTestIssue970(true); + doTestIssue970(false); + } + + private void doTestIssue970(boolean compress) { + httpServer = + HttpServer.create() + .port(0) + .handle((req, res) -> + res.sendWebsocket( + (in, out) -> out.sendString(Mono.just("test")), + WebSocketSpec.builder().compress(compress).build())) + .wiretap(true) + .bindNow(); + + AtomicBoolean clientHandler = new AtomicBoolean(); + HttpClient client = + HttpClient.create() + .addressSupplier(httpServer::address) + .wiretap(true); + + String perMessageDeflateEncoder = "io.netty.handler.codec.http.websocketx.extensions.compression.PerMessageDeflateEncoder"; + BiFunction>> receiver = + (in, out) -> { + in.withConnection(conn -> + clientHandler.set(conn.channel() + .pipeline() + .get(perMessageDeflateEncoder) != null) + ); + + String header = in.headers() + .get(HttpHeaderNames.SEC_WEBSOCKET_EXTENSIONS); + return in.receive() + .aggregate() + .asString() + .zipWith(Mono.just(header == null ? "null" : header)); + }; + + Predicate> predicate = t -> "test".equals(t.getT1()) && "null".equals(t.getT2()); + StepVerifier.create(client.websocket() + .uri("/") + .handle(receiver)) + .expectNextMatches(predicate) + .expectComplete() + .verify(Duration.ofSeconds(30)); + assertThat(clientHandler.get()).isFalse(); + + if (compress) { + predicate = t -> "test".equals(t.getT1()) && !"null".equals(t.getT2()); + } + StepVerifier.create(client.compress(true) + .websocket() + .uri("/") + .handle(receiver)) + .expectNextMatches(predicate) + .expectComplete() + .verify(Duration.ofSeconds(30)); + assertThat(clientHandler.get()).isEqualTo(compress); + } }