Skip to content

Commit

Permalink
The WebSocket client pool should not starve on connect error.
Browse files Browse the repository at this point in the history
Motivation:

The client WebSocket pool implementation (which is not really a pool but instead a semaphore + a wait queue) does not decrement the semaphore when the connection fails to be established.

Currently, no operation is done beside calling back the caller. As consequence, the wait queue fills with new requests and never satisfy them since.

Changes:

When the connection fails, the semaphore should be released and check the wait queue for pending requests.
  • Loading branch information
vietj committed Jan 10, 2025
1 parent 56a122a commit 41472e0
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 48 deletions.
112 changes: 65 additions & 47 deletions vertx-core/src/main/java/io/vertx/core/http/impl/WebSocketGroup.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import io.vertx.core.http.WebSocketClientOptions;
import io.vertx.core.http.WebSocketConnectOptions;
import io.vertx.core.internal.ContextInternal;
import io.vertx.core.internal.PromiseInternal;
import io.vertx.core.internal.resource.ManagedResource;
import io.vertx.core.spi.metrics.ClientMetrics;
import io.vertx.core.spi.metrics.PoolMetrics;
Expand Down Expand Up @@ -47,7 +48,6 @@ private static class Waiter {
private final HttpChannelConnector connector;
private final Deque<Waiter> waiters;
private int inflightConnections;

private final ClientMetrics clientMetrics;
private final PoolMetrics poolMetrics;

Expand All @@ -72,70 +72,88 @@ public Future<WebSocket> requestConnection(ContextInternal ctx, WebSocketConnect
return fut;
}

private void onEvict() {
decRefCount();
Waiter h;
synchronized (WebSocketGroup.this) {
if (--inflightConnections > maxPoolSize || waiters.isEmpty()) {
return;
}
h = waiters.poll();
}
tryConnect(h.context, h.connectOptions).onComplete(h.promise);
}

private Future<WebSocket> tryConnect(ContextInternal ctx, WebSocketConnectOptions connectOptions) {
private void connect(ContextInternal ctx, WebSocketConnectOptions connectOptions, Promise<WebSocket> promise) {
ContextInternal eventLoopContext;
if (ctx.isEventLoopContext()) {
eventLoopContext = ctx;
} else {
eventLoopContext = ctx.owner().createEventLoopContext(ctx.nettyEventLoop(), ctx.workerPool(), ctx.classLoader());
}
Future<HttpClientConnectionInternal> fut = connector.httpConnect(eventLoopContext);
return fut.compose(c -> {
if (!incRefCount()) {
c.close();
return Future.failedFuture(new VertxException("Connection closed", true));
}
long timeout = Math.max(connectOptions.getTimeout(), 0L);
if (connectOptions.getIdleTimeout() >= 0L) {
timeout = connectOptions.getIdleTimeout();
}
Http1xClientConnection ci = (Http1xClientConnection) c;
Promise<WebSocket> promise = ctx.promise();
ci.toWebSocket(
ctx,
connectOptions.getURI(),
connectOptions.getHeaders(),
connectOptions.getAllowOriginHeader(),
options,
connectOptions.getVersion(),
connectOptions.getSubProtocols(),
timeout,
connectOptions.isRegisterWriteHandlers(),
options.getMaxFrameSize(),
promise);
return promise.future().andThen(ar -> {
if (ar.succeeded()) {
WebSocketImpl wsi = (WebSocketImpl) ar.result();
wsi.evictionHandler(v -> onEvict());
} else {
onEvict();
fut.onComplete(ar -> {
if (ar.succeeded()) {
HttpClientConnectionInternal c = ar.result();
if (!incRefCount()) {
c.close();
promise.fail(new VertxException("Connection closed", true));
return;
}
});
long timeout = Math.max(connectOptions.getTimeout(), 0L);
if (connectOptions.getIdleTimeout() >= 0L) {
timeout = connectOptions.getIdleTimeout();
}
Http1xClientConnection ci = (Http1xClientConnection) c;
ci.toWebSocket(
ctx,
connectOptions.getURI(),
connectOptions.getHeaders(),
connectOptions.getAllowOriginHeader(),
options,
connectOptions.getVersion(),
connectOptions.getSubProtocols(),
timeout,
connectOptions.isRegisterWriteHandlers(),
options.getMaxFrameSize(),
promise);
} else {
promise.fail(ar.cause());
}
});
}

protected Future<WebSocket> requestConnection2(ContextInternal ctx, WebSocketConnectOptions connectOptions, long timeout) {
private void release() {
Waiter waiter;
synchronized (WebSocketGroup.this) {
if (--inflightConnections > maxPoolSize || waiters.isEmpty()) {
return;
}
waiter = waiters.poll();
}
connect(waiter.context, waiter.connectOptions, waiter.promise);
}

private Future<WebSocket> tryAcquire(ContextInternal ctx, WebSocketConnectOptions options) {
synchronized (this) {
if (inflightConnections >= maxPoolSize) {
Waiter waiter = new Waiter(ctx, connectOptions);
Waiter waiter = new Waiter(ctx, options);
waiters.add(waiter);
return waiter.promise.future();
}
inflightConnections++;
}
return tryConnect(ctx, connectOptions);
return null;
}

protected Future<WebSocket> requestConnection2(ContextInternal ctx, WebSocketConnectOptions connectOptions, long timeout) {
Future<WebSocket> res = tryAcquire(ctx, connectOptions);
if (res == null) {
PromiseInternal<WebSocket> promise = ctx.promise();
connect(ctx, connectOptions, promise);
res = promise.future();
}
res.andThen(ar -> {
if (ar.succeeded()) {
WebSocketImpl wsi = (WebSocketImpl) ar.result();
wsi.evictionHandler(v -> {
decRefCount();
release();
});
} else {
decRefCount();
release();
}
});
return res;
}

@Override
Expand Down
46 changes: 45 additions & 1 deletion vertx-core/src/test/java/io/vertx/tests/http/WebSocketTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import io.vertx.core.net.NetSocket;
import io.vertx.core.net.SocketAddress;
import io.vertx.test.core.CheckingSender;
import io.vertx.test.core.Repeat;
import io.vertx.test.core.TestUtils;
import io.vertx.test.core.VertxTestBase;
import io.vertx.test.http.HttpTestBase;
Expand Down Expand Up @@ -95,7 +96,6 @@
import static io.vertx.test.http.HttpTestBase.DEFAULT_HTTP_HOST;
import static io.vertx.test.http.HttpTestBase.DEFAULT_HTTP_HOST_AND_PORT;
import static io.vertx.test.http.HttpTestBase.DEFAULT_HTTP_PORT;
import static org.junit.Assume.assumeTrue;

/**
* @author <a href="http://tfox.org">Tim Fox</a>
Expand Down Expand Up @@ -3987,4 +3987,48 @@ public void testCustomResponseHeadersBeforeUpgrade() throws InterruptedException
}));
await();
}

@Test
public void testPoolShouldNotStarveOnConnectError() throws Exception {

server = vertx.createHttpServer();

CountDownLatch shutdownLatch = new CountDownLatch(1);
AtomicInteger accepted = new AtomicInteger();
server.webSocketHandler(ws -> {
ws.shutdownHandler(v -> shutdownLatch.countDown());
assertTrue(accepted.getAndIncrement() == 0);
});

server.listen(DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST).toCompletionStage().toCompletableFuture().get();

// This test requires a server socket to respond for the first connection
// Subsequent connections need to fail (connect error)
// Hence we need a custom proxy server in front of Vert.x HTTP server

int maxConnections = 5;

client = vertx.createWebSocketClient(new WebSocketClientOptions()
.setMaxConnections(maxConnections)
.setConnectTimeout(4000));

Future<WebSocket> wsFut = client.connect(DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, "/").andThen(onSuccess(v -> {
}));

// Finish handshake
wsFut.toCompletionStage().toCompletableFuture().get(10, TimeUnit.SECONDS);

server.shutdown(30, TimeUnit.SECONDS);
awaitLatch(shutdownLatch);

int num = maxConnections + 10;
CountDownLatch latch = new CountDownLatch(num);
for (int i = 0;i < num;i++) {
client.connect(DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, "/").onComplete(ar -> {
latch.countDown();
});
}

awaitLatch(latch, 10, TimeUnit.SECONDS);
}
}

0 comments on commit 41472e0

Please sign in to comment.