From 93dea80c27d8040240d6cf1c3a10d3a26891e6a1 Mon Sep 17 00:00:00 2001 From: Arjun Ashok Date: Wed, 14 Aug 2024 14:29:36 -0700 Subject: [PATCH 1/2] Fixes updates to traffic shaping options with shared servers This fixes the update-path for traffic shaping options to check for the existence of the traffic shaping handler only for the actual server. --- .../io/vertx/core/net/impl/TCPServerBase.java | 18 ++++--- .../core/http/HttpBandwidthLimitingTest.java | 51 ++++++++++++++++++- 2 files changed, 61 insertions(+), 8 deletions(-) diff --git a/src/main/java/io/vertx/core/net/impl/TCPServerBase.java b/src/main/java/io/vertx/core/net/impl/TCPServerBase.java index 08cac0c2773..787c6f8f32f 100644 --- a/src/main/java/io/vertx/core/net/impl/TCPServerBase.java +++ b/src/main/java/io/vertx/core/net/impl/TCPServerBase.java @@ -153,14 +153,16 @@ public void updateTrafficShapingOptions(TrafficShapingOptions options) { if (options == null) { throw new IllegalArgumentException("Invalid null value passed for traffic shaping options update"); } - if (trafficShapingHandler == null) { - throw new IllegalStateException("Unable to update traffic shaping options because the server was not configured " + - "to use traffic shaping during startup"); - } TCPServerBase server = actualServer; + // Update the traffic shaping options only for the actual/main server if (server != null && server != this) { - server.updateTrafficShapingOptions(options); + server.updateTrafficShapingOptions(options); } else { + if (trafficShapingHandler == null) { + throw new IllegalStateException("Unable to update traffic shaping options because the server was not configured " + + "to use traffic shaping during startup"); + } + long checkIntervalForStatsInMillis = options.getCheckIntervalForStatsTimeUnit().toMillis(options.getCheckIntervalForStats()); trafficShapingHandler.configure(options.getOutboundGlobalBandwidth(), options.getInboundGlobalBandwidth(), checkIntervalForStatsInMillis); @@ -292,8 +294,10 @@ private synchronized Future listen(SocketAddress localAddress, ContextI } else { // Server already exists with that host/port - we will use that actualServer = main; - metrics = main.metrics; - childHandler = childHandler(listenContext, localAddress, main.trafficShapingHandler); + metrics = actualServer.metrics; + // Ensure the workers inherit the actual server's traffic-shaping handler + trafficShapingHandler = actualServer.trafficShapingHandler; + childHandler = childHandler(listenContext, localAddress, actualServer.trafficShapingHandler); worker = ch -> childHandler.accept(ch, actualServer.sslChannelProvider.result().sslChannelProvider()); actualServer.servers.add(this); actualServer.channelBalancer.addWorker(eventLoop, worker); diff --git a/src/test/java/io/vertx/core/http/HttpBandwidthLimitingTest.java b/src/test/java/io/vertx/core/http/HttpBandwidthLimitingTest.java index f8f9fb21331..639a3c6a725 100644 --- a/src/test/java/io/vertx/core/http/HttpBandwidthLimitingTest.java +++ b/src/test/java/io/vertx/core/http/HttpBandwidthLimitingTest.java @@ -13,9 +13,10 @@ import java.io.File; import java.io.IOException; import java.nio.file.Files; -import java.nio.file.Path; import java.nio.file.Paths; +import java.util.ArrayList; import java.util.Arrays; +import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -204,6 +205,54 @@ public void start(Promise startPromise) { Assert.assertTrue(elapsedMillis > expectedTimeMillis(totalReceivedLength.get(), OUTBOUND_LIMIT)); // because there are simultaneous 2 requests } + @Test + public void testDynamicOutboundRateUpdateSharedServers() throws IOException, InterruptedException + { + int numEventLoops = 5; // We start a shared TCP server with 2 event-loops + List servers = new ArrayList<>(); + Future listenLatch = vertx.deployVerticle(() -> new AbstractVerticle() { + @Override + public void start(Promise startPromise) { + HttpServer testServer = serverFactory.apply(vertx); + servers.add(testServer); + testServer.requestHandler(HANDLERS.getFile(sampleF)) + .listen(DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST).mapEmpty().onComplete(startPromise); + } + }, new DeploymentOptions().setInstances(numEventLoops)); + + HttpClient testClient = clientFactory.apply(vertx); + CountDownLatch waitForResponse = new CountDownLatch(2); + AtomicLong startTime = new AtomicLong(); + AtomicLong totalReceivedLength = new AtomicLong(); + long expectedLength = Files.size(Paths.get(sampleF.getAbsolutePath())); + listenLatch.onComplete(onSuccess(v -> { + startTime.set(System.nanoTime()); + for (int i = 0; i < 2; i++) { + testClient.request(HttpMethod.GET, DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, "/get-file") + .compose(req -> req.send() + .andThen(onSuccess(resp -> assertEquals(200, resp.statusCode()))) + .compose(HttpClientResponse::body)) + .onComplete(onSuccess(body -> { + long receivedBytes = body.getBytes().length; + totalReceivedLength.addAndGet(receivedBytes); + Assert.assertEquals(expectedLength, receivedBytes); + waitForResponse.countDown(); + })); + } + })); + awaitLatch(waitForResponse); + TrafficShapingOptions updatedTrafficOptions = new TrafficShapingOptions() + .setInboundGlobalBandwidth(INBOUND_LIMIT) // unchanged + .setOutboundGlobalBandwidth(OUTBOUND_LIMIT); + + for (int i = 0; i < numEventLoops; i++) { + servers.forEach(s -> s.updateTrafficShapingOptions(updatedTrafficOptions)); + } + + long elapsedMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime.get()); + Assert.assertTrue(elapsedMillis > expectedTimeMillis(totalReceivedLength.get(), OUTBOUND_LIMIT)); + } + @Test public void testDynamicOutboundRateUpdate() throws Exception { Buffer expectedBuffer = TestUtils.randomBuffer(TEST_CONTENT_SIZE); From 77d4bbe9739b91a41fb2e9abf88283be58a290de Mon Sep 17 00:00:00 2001 From: Arjun Ashok Date: Thu, 15 Aug 2024 16:22:33 -0700 Subject: [PATCH 2/2] Update to prevent redundant updates to traffic-shaping options when they do not change This is especially relevant for shared servers, where we now 1) update the actual-server's options, and 2) only perform the update when the traffic options change. --- .../vertx/core/net/TrafficShapingOptions.java | 24 +++++++++++++++++++ .../io/vertx/core/net/impl/TCPServerBase.java | 23 +++++++++++------- .../core/http/HttpBandwidthLimitingTest.java | 2 +- 3 files changed, 40 insertions(+), 9 deletions(-) diff --git a/src/main/java/io/vertx/core/net/TrafficShapingOptions.java b/src/main/java/io/vertx/core/net/TrafficShapingOptions.java index 1e1f33f2e0c..e978ad27dcd 100644 --- a/src/main/java/io/vertx/core/net/TrafficShapingOptions.java +++ b/src/main/java/io/vertx/core/net/TrafficShapingOptions.java @@ -222,4 +222,28 @@ public long getCheckIntervalForStats() { public TimeUnit getCheckIntervalForStatsTimeUnit() { return checkIntervalForStatsTimeUnit; } + + @Override + public boolean equals(Object obj) { + TrafficShapingOptions that = (TrafficShapingOptions) obj; + return inboundGlobalBandwidth == that.inboundGlobalBandwidth && + outboundGlobalBandwidth == that.outboundGlobalBandwidth && + peakOutboundGlobalBandwidth == that.peakOutboundGlobalBandwidth && + maxDelayToWait == that.maxDelayToWait && + maxDelayToWaitTimeUnit == that.maxDelayToWaitTimeUnit && + checkIntervalForStats == that.checkIntervalForStats && + checkIntervalForStatsTimeUnit == that.checkIntervalForStatsTimeUnit; + } + + @Override + public int hashCode() { + return Objects.hash(inboundGlobalBandwidth, + outboundGlobalBandwidth, + peakOutboundGlobalBandwidth, + maxDelayToWait, + maxDelayToWaitTimeUnit, + checkIntervalForStats, + checkIntervalForStatsTimeUnit); + } + } diff --git a/src/main/java/io/vertx/core/net/impl/TCPServerBase.java b/src/main/java/io/vertx/core/net/impl/TCPServerBase.java index 787c6f8f32f..9f98235769e 100644 --- a/src/main/java/io/vertx/core/net/impl/TCPServerBase.java +++ b/src/main/java/io/vertx/core/net/impl/TCPServerBase.java @@ -156,22 +156,29 @@ public void updateTrafficShapingOptions(TrafficShapingOptions options) { TCPServerBase server = actualServer; // Update the traffic shaping options only for the actual/main server if (server != null && server != this) { - server.updateTrafficShapingOptions(options); + server.updateTrafficShapingOptions(options); } else { if (trafficShapingHandler == null) { throw new IllegalStateException("Unable to update traffic shaping options because the server was not configured " + "to use traffic shaping during startup"); } - long checkIntervalForStatsInMillis = options.getCheckIntervalForStatsTimeUnit().toMillis(options.getCheckIntervalForStats()); - trafficShapingHandler.configure(options.getOutboundGlobalBandwidth(), options.getInboundGlobalBandwidth(), checkIntervalForStatsInMillis); + // Compare with existing traffic-shaping options to ensure they are updated only when they differ. + if(!options.equals(server.options.getTrafficShapingOptions())) { + server.options.setTrafficShapingOptions(options); + long checkIntervalForStatsInMillis = options.getCheckIntervalForStatsTimeUnit().toMillis(options.getCheckIntervalForStats()); + trafficShapingHandler.configure(options.getOutboundGlobalBandwidth(), options.getInboundGlobalBandwidth(), checkIntervalForStatsInMillis); - if (options.getPeakOutboundGlobalBandwidth() != 0) { - trafficShapingHandler.setMaxGlobalWriteSize(options.getPeakOutboundGlobalBandwidth()); + if (options.getPeakOutboundGlobalBandwidth() != 0) { + trafficShapingHandler.setMaxGlobalWriteSize(options.getPeakOutboundGlobalBandwidth()); + } + if (options.getMaxDelayToWait() != 0) { + long maxDelayToWaitInMillis = options.getMaxDelayToWaitTimeUnit().toMillis(options.getMaxDelayToWait()); + trafficShapingHandler.setMaxWriteDelay(maxDelayToWaitInMillis); + } } - if (options.getMaxDelayToWait() != 0) { - long maxDelayToWaitInMillis = options.getMaxDelayToWaitTimeUnit().toMillis(options.getMaxDelayToWait()); - trafficShapingHandler.setMaxWriteDelay(maxDelayToWaitInMillis); + else { + log.info("Not updating traffic shaping options as they have not changed"); } } } diff --git a/src/test/java/io/vertx/core/http/HttpBandwidthLimitingTest.java b/src/test/java/io/vertx/core/http/HttpBandwidthLimitingTest.java index 639a3c6a725..ba283152e77 100644 --- a/src/test/java/io/vertx/core/http/HttpBandwidthLimitingTest.java +++ b/src/test/java/io/vertx/core/http/HttpBandwidthLimitingTest.java @@ -243,7 +243,7 @@ public void start(Promise startPromise) { awaitLatch(waitForResponse); TrafficShapingOptions updatedTrafficOptions = new TrafficShapingOptions() .setInboundGlobalBandwidth(INBOUND_LIMIT) // unchanged - .setOutboundGlobalBandwidth(OUTBOUND_LIMIT); + .setOutboundGlobalBandwidth(2 * OUTBOUND_LIMIT); for (int i = 0; i < numEventLoops; i++) { servers.forEach(s -> s.updateTrafficShapingOptions(updatedTrafficOptions));