From 41472e0e8e0ddb80c2a536052a4111d26ecce5db Mon Sep 17 00:00:00 2001 From: Julien Viet Date: Thu, 9 Jan 2025 15:33:00 +0100 Subject: [PATCH] The WebSocket client pool should not starve on connect error. Motivation: The client WebSocket pool implementation (which is not really a pool but instead a semaphore + a wait queue) does not decrement the semaphore when the connection fails to be established. Currently, no operation is done beside calling back the caller. As consequence, the wait queue fills with new requests and never satisfy them since. Changes: When the connection fails, the semaphore should be released and check the wait queue for pending requests. --- .../vertx/core/http/impl/WebSocketGroup.java | 112 ++++++++++-------- .../io/vertx/tests/http/WebSocketTest.java | 46 ++++++- 2 files changed, 110 insertions(+), 48 deletions(-) diff --git a/vertx-core/src/main/java/io/vertx/core/http/impl/WebSocketGroup.java b/vertx-core/src/main/java/io/vertx/core/http/impl/WebSocketGroup.java index adf351476b2..a37dc7d6a6b 100644 --- a/vertx-core/src/main/java/io/vertx/core/http/impl/WebSocketGroup.java +++ b/vertx-core/src/main/java/io/vertx/core/http/impl/WebSocketGroup.java @@ -15,6 +15,7 @@ import io.vertx.core.http.WebSocketClientOptions; import io.vertx.core.http.WebSocketConnectOptions; import io.vertx.core.internal.ContextInternal; +import io.vertx.core.internal.PromiseInternal; import io.vertx.core.internal.resource.ManagedResource; import io.vertx.core.spi.metrics.ClientMetrics; import io.vertx.core.spi.metrics.PoolMetrics; @@ -47,7 +48,6 @@ private static class Waiter { private final HttpChannelConnector connector; private final Deque waiters; private int inflightConnections; - private final ClientMetrics clientMetrics; private final PoolMetrics poolMetrics; @@ -72,19 +72,7 @@ public Future requestConnection(ContextInternal ctx, WebSocketConnect return fut; } - private void onEvict() { - decRefCount(); - Waiter h; - synchronized (WebSocketGroup.this) { - if (--inflightConnections > maxPoolSize || waiters.isEmpty()) { - return; - } - h = waiters.poll(); - } - tryConnect(h.context, h.connectOptions).onComplete(h.promise); - } - - private Future tryConnect(ContextInternal ctx, WebSocketConnectOptions connectOptions) { + private void connect(ContextInternal ctx, WebSocketConnectOptions connectOptions, Promise promise) { ContextInternal eventLoopContext; if (ctx.isEventLoopContext()) { eventLoopContext = ctx; @@ -92,50 +80,80 @@ private Future tryConnect(ContextInternal ctx, WebSocketConnectOption eventLoopContext = ctx.owner().createEventLoopContext(ctx.nettyEventLoop(), ctx.workerPool(), ctx.classLoader()); } Future fut = connector.httpConnect(eventLoopContext); - return fut.compose(c -> { - if (!incRefCount()) { - c.close(); - return Future.failedFuture(new VertxException("Connection closed", true)); - } - long timeout = Math.max(connectOptions.getTimeout(), 0L); - if (connectOptions.getIdleTimeout() >= 0L) { - timeout = connectOptions.getIdleTimeout(); - } - Http1xClientConnection ci = (Http1xClientConnection) c; - Promise promise = ctx.promise(); - ci.toWebSocket( - ctx, - connectOptions.getURI(), - connectOptions.getHeaders(), - connectOptions.getAllowOriginHeader(), - options, - connectOptions.getVersion(), - connectOptions.getSubProtocols(), - timeout, - connectOptions.isRegisterWriteHandlers(), - options.getMaxFrameSize(), - promise); - return promise.future().andThen(ar -> { - if (ar.succeeded()) { - WebSocketImpl wsi = (WebSocketImpl) ar.result(); - wsi.evictionHandler(v -> onEvict()); - } else { - onEvict(); + fut.onComplete(ar -> { + if (ar.succeeded()) { + HttpClientConnectionInternal c = ar.result(); + if (!incRefCount()) { + c.close(); + promise.fail(new VertxException("Connection closed", true)); + return; } - }); + long timeout = Math.max(connectOptions.getTimeout(), 0L); + if (connectOptions.getIdleTimeout() >= 0L) { + timeout = connectOptions.getIdleTimeout(); + } + Http1xClientConnection ci = (Http1xClientConnection) c; + ci.toWebSocket( + ctx, + connectOptions.getURI(), + connectOptions.getHeaders(), + connectOptions.getAllowOriginHeader(), + options, + connectOptions.getVersion(), + connectOptions.getSubProtocols(), + timeout, + connectOptions.isRegisterWriteHandlers(), + options.getMaxFrameSize(), + promise); + } else { + promise.fail(ar.cause()); + } }); } - protected Future requestConnection2(ContextInternal ctx, WebSocketConnectOptions connectOptions, long timeout) { + private void release() { + Waiter waiter; + synchronized (WebSocketGroup.this) { + if (--inflightConnections > maxPoolSize || waiters.isEmpty()) { + return; + } + waiter = waiters.poll(); + } + connect(waiter.context, waiter.connectOptions, waiter.promise); + } + + private Future tryAcquire(ContextInternal ctx, WebSocketConnectOptions options) { synchronized (this) { if (inflightConnections >= maxPoolSize) { - Waiter waiter = new Waiter(ctx, connectOptions); + Waiter waiter = new Waiter(ctx, options); waiters.add(waiter); return waiter.promise.future(); } inflightConnections++; } - return tryConnect(ctx, connectOptions); + return null; + } + + protected Future requestConnection2(ContextInternal ctx, WebSocketConnectOptions connectOptions, long timeout) { + Future res = tryAcquire(ctx, connectOptions); + if (res == null) { + PromiseInternal promise = ctx.promise(); + connect(ctx, connectOptions, promise); + res = promise.future(); + } + res.andThen(ar -> { + if (ar.succeeded()) { + WebSocketImpl wsi = (WebSocketImpl) ar.result(); + wsi.evictionHandler(v -> { + decRefCount(); + release(); + }); + } else { + decRefCount(); + release(); + } + }); + return res; } @Override diff --git a/vertx-core/src/test/java/io/vertx/tests/http/WebSocketTest.java b/vertx-core/src/test/java/io/vertx/tests/http/WebSocketTest.java index 97fb3757470..523a0802eda 100644 --- a/vertx-core/src/test/java/io/vertx/tests/http/WebSocketTest.java +++ b/vertx-core/src/test/java/io/vertx/tests/http/WebSocketTest.java @@ -46,6 +46,7 @@ import io.vertx.core.net.NetSocket; import io.vertx.core.net.SocketAddress; import io.vertx.test.core.CheckingSender; +import io.vertx.test.core.Repeat; import io.vertx.test.core.TestUtils; import io.vertx.test.core.VertxTestBase; import io.vertx.test.http.HttpTestBase; @@ -95,7 +96,6 @@ import static io.vertx.test.http.HttpTestBase.DEFAULT_HTTP_HOST; import static io.vertx.test.http.HttpTestBase.DEFAULT_HTTP_HOST_AND_PORT; import static io.vertx.test.http.HttpTestBase.DEFAULT_HTTP_PORT; -import static org.junit.Assume.assumeTrue; /** * @author Tim Fox @@ -3987,4 +3987,48 @@ public void testCustomResponseHeadersBeforeUpgrade() throws InterruptedException })); await(); } + + @Test + public void testPoolShouldNotStarveOnConnectError() throws Exception { + + server = vertx.createHttpServer(); + + CountDownLatch shutdownLatch = new CountDownLatch(1); + AtomicInteger accepted = new AtomicInteger(); + server.webSocketHandler(ws -> { + ws.shutdownHandler(v -> shutdownLatch.countDown()); + assertTrue(accepted.getAndIncrement() == 0); + }); + + server.listen(DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST).toCompletionStage().toCompletableFuture().get(); + + // This test requires a server socket to respond for the first connection + // Subsequent connections need to fail (connect error) + // Hence we need a custom proxy server in front of Vert.x HTTP server + + int maxConnections = 5; + + client = vertx.createWebSocketClient(new WebSocketClientOptions() + .setMaxConnections(maxConnections) + .setConnectTimeout(4000)); + + Future wsFut = client.connect(DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, "/").andThen(onSuccess(v -> { + })); + + // Finish handshake + wsFut.toCompletionStage().toCompletableFuture().get(10, TimeUnit.SECONDS); + + server.shutdown(30, TimeUnit.SECONDS); + awaitLatch(shutdownLatch); + + int num = maxConnections + 10; + CountDownLatch latch = new CountDownLatch(num); + for (int i = 0;i < num;i++) { + client.connect(DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, "/").onComplete(ar -> { + latch.countDown(); + }); + } + + awaitLatch(latch, 10, TimeUnit.SECONDS); + } }