Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes updates to traffic shaping options with shared servers #5282

Open
wants to merge 2 commits into
base: 4.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 11 additions & 7 deletions src/main/java/io/vertx/core/net/impl/TCPServerBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -153,14 +153,16 @@ public void updateTrafficShapingOptions(TrafficShapingOptions options) {
if (options == null) {
throw new IllegalArgumentException("Invalid null value passed for traffic shaping options update");
}
if (trafficShapingHandler == null) {
throw new IllegalStateException("Unable to update traffic shaping options because the server was not configured " +
"to use traffic shaping during startup");
}
TCPServerBase server = actualServer;
// Update the traffic shaping options only for the actual/main server
if (server != null && server != this) {
server.updateTrafficShapingOptions(options);
server.updateTrafficShapingOptions(options);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you update this, with return here to avoid repeated calls

Copy link
Author

@arjunashok arjunashok Aug 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is an interesting question. The repeated calls would happen if the consumer of this API invokes this method for all the shared servers.

The decision here is based on the contract/interface we would like to expose to clients invoking updateTrafficShapingOptions (and similar APIs that affect configuration across shared servers)

  1. Is it fair to expect clients to apply/change this setting for all the servers, while having the underlying implementation only update it when the actual server is involved? (which is what I think you are suggesting)

  2. Alternatively, should we define the contract such that this setting should be applied/changed to ANY of the servers, and under the hood, vertx will only update it for the actual/main server? (current implementation)

It seems like we currently have a model like (1). My only concern is that for invocations that are not on a actual/main server, it will do nothing (silently).
If this is indeed the pattern we would like to follow, then I would prefer to have some signal (even if it were a log line) indicating that this was a worker server, so no updates were made. This might be useful, in the event that someone updates a server’s setting (that is not a main server) to not see the change taking effect.

I see that updateSSLOptions does something similar to (1), although if it were called for all the servers, we would update the SSL options for the actual server N times (where N = no. verticles)

(2) comes with the assumption that this is a shared/global setting that applies to all servers, and hence can be updated on ANY of the servers. Although, one could argue that it is ambiguous for the client as to which server's configuration should be updated, and if they were to apply this to all servers, it would result in several redundant invocations for the actual server.

@vietj Do you have any thoughts or comments on this?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the latest commit, I made a change to only update the traffic-shaping options when they change. So, we address both concerns by:

  1. Updating the actual-server's options regardless of which server is used for the update
  2. Only perform the above update when the new options differ from the existing ones

} else {
if (trafficShapingHandler == null) {
throw new IllegalStateException("Unable to update traffic shaping options because the server was not configured " +
"to use traffic shaping during startup");
}

long checkIntervalForStatsInMillis = options.getCheckIntervalForStatsTimeUnit().toMillis(options.getCheckIntervalForStats());
trafficShapingHandler.configure(options.getOutboundGlobalBandwidth(), options.getInboundGlobalBandwidth(), checkIntervalForStatsInMillis);

Expand Down Expand Up @@ -292,8 +294,10 @@ private synchronized Future<Channel> listen(SocketAddress localAddress, ContextI
} else {
// Server already exists with that host/port - we will use that
actualServer = main;
metrics = main.metrics;
childHandler = childHandler(listenContext, localAddress, main.trafficShapingHandler);
metrics = actualServer.metrics;
// Ensure the workers inherit the actual server's traffic-shaping handler
trafficShapingHandler = actualServer.trafficShapingHandler;
childHandler = childHandler(listenContext, localAddress, actualServer.trafficShapingHandler);
worker = ch -> childHandler.accept(ch, actualServer.sslChannelProvider.result().sslChannelProvider());
actualServer.servers.add(this);
actualServer.channelBalancer.addWorker(eventLoop, worker);
Expand Down
51 changes: 50 additions & 1 deletion src/test/java/io/vertx/core/http/HttpBandwidthLimitingTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -204,6 +205,54 @@ public void start(Promise<Void> startPromise) {
Assert.assertTrue(elapsedMillis > expectedTimeMillis(totalReceivedLength.get(), OUTBOUND_LIMIT)); // because there are simultaneous 2 requests
}

@Test
public void testDynamicOutboundRateUpdateSharedServers() throws IOException, InterruptedException
{
int numEventLoops = 5; // We start a shared TCP server with 2 event-loops
List<HttpServer> servers = new ArrayList<>();
Future<String> listenLatch = vertx.deployVerticle(() -> new AbstractVerticle() {
@Override
public void start(Promise<Void> startPromise) {
HttpServer testServer = serverFactory.apply(vertx);
servers.add(testServer);
testServer.requestHandler(HANDLERS.getFile(sampleF))
.listen(DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST).<Void>mapEmpty().onComplete(startPromise);
}
}, new DeploymentOptions().setInstances(numEventLoops));

HttpClient testClient = clientFactory.apply(vertx);
CountDownLatch waitForResponse = new CountDownLatch(2);
AtomicLong startTime = new AtomicLong();
AtomicLong totalReceivedLength = new AtomicLong();
long expectedLength = Files.size(Paths.get(sampleF.getAbsolutePath()));
listenLatch.onComplete(onSuccess(v -> {
startTime.set(System.nanoTime());
for (int i = 0; i < 2; i++) {
testClient.request(HttpMethod.GET, DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, "/get-file")
.compose(req -> req.send()
.andThen(onSuccess(resp -> assertEquals(200, resp.statusCode())))
.compose(HttpClientResponse::body))
.onComplete(onSuccess(body -> {
long receivedBytes = body.getBytes().length;
totalReceivedLength.addAndGet(receivedBytes);
Assert.assertEquals(expectedLength, receivedBytes);
waitForResponse.countDown();
}));
}
}));
awaitLatch(waitForResponse);
TrafficShapingOptions updatedTrafficOptions = new TrafficShapingOptions()
.setInboundGlobalBandwidth(INBOUND_LIMIT) // unchanged
.setOutboundGlobalBandwidth(OUTBOUND_LIMIT);

for (int i = 0; i < numEventLoops; i++) {
servers.forEach(s -> s.updateTrafficShapingOptions(updatedTrafficOptions));
}

long elapsedMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime.get());
Assert.assertTrue(elapsedMillis > expectedTimeMillis(totalReceivedLength.get(), OUTBOUND_LIMIT));
}

@Test
public void testDynamicOutboundRateUpdate() throws Exception {
Buffer expectedBuffer = TestUtils.randomBuffer(TEST_CONTENT_SIZE);
Expand Down