Skip to content

Commit

Permalink
[mqtt-perf] Fix reconnect lost username & password (#1065)
Browse files Browse the repository at this point in the history
  • Loading branch information
mattisonchao authored Aug 2, 2023
1 parent 6c1a540 commit 0e749ee
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -119,18 +119,18 @@ public void run() {
case MQTT_VERSION_5:
LOG.info("Preparing the MQTT 5 connection.");
final var connectFutures = IntStream.range(0, connections).parallel().mapToObj(index -> {
final var mqtt5AsyncClient = clientBuilder
.identifier(clientId.concat(String.valueOf(index)))
.useMqttVersion5().buildAsync();
final var connectBuilder = mqtt5AsyncClient.connectWith();
final var mqtt5ClientBuilder = clientBuilder
.useMqttVersion5()
.identifier(clientId.concat(String.valueOf(index)));
if (!Strings.isNullOrEmpty(password)) {
connectBuilder
mqtt5ClientBuilder
.simpleAuth()
.username(username)
.password(password.getBytes(StandardCharsets.UTF_8))
.applySimpleAuth();
}
return connectBuilder.send().thenApply(__ -> {
final var mqtt5AsyncClient = mqtt5ClientBuilder.buildAsync();
return mqtt5AsyncClient.connect().thenApply(__ -> {
connected.increment();
return mqtt5AsyncClient;
});
Expand Down
19 changes: 9 additions & 10 deletions mqtt-perf/src/main/java/io/streamnative/mqtt/perf/CommandSub.java
Original file line number Diff line number Diff line change
Expand Up @@ -102,22 +102,21 @@ public void run() {
case MQTT_VERSION_5:
LOG.info("Preparing the MQTT 5 connection.");
final var connectFutures = IntStream.range(0, connections).parallel().mapToObj(index -> {
final var mqtt5AsyncClient = clientBuilder
.identifier(clientId.concat(String.valueOf(index)))
.useMqttVersion5().buildAsync();
final var connectBuilder = mqtt5AsyncClient.connectWith();
final var mqtt5ClientBuilder = clientBuilder
.useMqttVersion5()
.identifier(clientId.concat(String.valueOf(index)));
if (!Strings.isNullOrEmpty(password)) {
connectBuilder
mqtt5ClientBuilder
.simpleAuth()
.username(username)
.password(password.getBytes(StandardCharsets.UTF_8))
.applySimpleAuth();
}
return connectBuilder.send()
.thenApply(__ -> {
connected.increment();
return mqtt5AsyncClient;
});
final var mqtt5AsyncClient = mqtt5ClientBuilder.buildAsync();
return mqtt5AsyncClient.connect().thenApply(__ -> {
connected.increment();
return mqtt5AsyncClient;
});
}).collect(Collectors.toList());
allOf(connectFutures.toArray(new CompletableFuture[]{})).join();
LOG.info("Preparing subscription.");
Expand Down

0 comments on commit 0e749ee

Please sign in to comment.