From 95309966726dcf9a5afe7cc6b7c8833308b81598 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E6=B4=81?= <445990772@qq.com> Date: Tue, 12 Dec 2023 14:56:54 +0800 Subject: [PATCH 1/2] =?UTF-8?q?feat(mqtt=E6=A8=A1=E6=8B=9F=E5=99=A8)?= =?UTF-8?q?=EF=BC=9A=E6=96=B0=E5=A2=9E=E6=96=AD=E5=BC=80=E9=87=8D=E8=AF=95?= =?UTF-8?q?=E5=8F=82=E6=95=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- benchmark/mqtt/benchmark.js | 6 +- .../cmd/benchmark/MQTTBenchMark.java | 10 ++ simulator-cli/src/main/resources/logback.xml | 2 +- .../core/network/mqtt/MqttClient.java | 154 +++++++++++------- 4 files changed, 112 insertions(+), 60 deletions(-) diff --git a/benchmark/mqtt/benchmark.js b/benchmark/mqtt/benchmark.js index 8f28ac7..965a0bb 100644 --- a/benchmark/mqtt/benchmark.js +++ b/benchmark/mqtt/benchmark.js @@ -1,6 +1,8 @@ /** * JetLinks mqtt 官方协议模拟器 * benchmark mqtt --host=127.0.0.1 --port=8801 --script=demo/tcp/benchmark.js report=true reportLimit=100 interval=1000 + * --reconnectAttempts=3 重试次数 + * --reconnectInterval=1000 重试间隔 */ //绑定内置参数,否则匿名函数无法使用。 @@ -67,11 +69,11 @@ function reportProperties(client) { "containsGeo": false, "ignoreLog": true, "ignoreStorage": true - } + } } //推送mqtt - return client.publishAsync("/report-property", 0, $benchmark.toJson(msg)); + return client.publishAsync(createTopic(client, "/properties/report"), 0, $benchmark.toJson(msg)); } diff --git a/simulator-cli/src/main/java/org/jetlinks/simulator/cmd/benchmark/MQTTBenchMark.java b/simulator-cli/src/main/java/org/jetlinks/simulator/cmd/benchmark/MQTTBenchMark.java index 9815dd5..4919f4e 100644 --- a/simulator-cli/src/main/java/org/jetlinks/simulator/cmd/benchmark/MQTTBenchMark.java +++ b/simulator-cli/src/main/java/org/jetlinks/simulator/cmd/benchmark/MQTTBenchMark.java @@ -79,6 +79,16 @@ public void setPassword0(String password) { @CommandLine.Option(names = {"--topics"}, description = "attach and subscribe topics", order = 6) private String[] topics; + @CommandLine.Option(names = {"--reconnectAttempts"}, description = "MQTT reconnect times", order = 7) + public void setReconnect(int attemps) { + super.setReconnectAttempts(attemps); + } + + @CommandLine.Option(names = {"--reconnectInterval"}, description = "MQTT reconnect interval", order = 8) + public void setReconnectInterval0(long interval) { + super.setReconnectInterval(interval); + } + } diff --git a/simulator-cli/src/main/resources/logback.xml b/simulator-cli/src/main/resources/logback.xml index 938773f..4251594 100644 --- a/simulator-cli/src/main/resources/logback.xml +++ b/simulator-cli/src/main/resources/logback.xml @@ -22,7 +22,7 @@ - + \ No newline at end of file diff --git a/simulator-core/src/main/java/org/jetlinks/simulator/core/network/mqtt/MqttClient.java b/simulator-core/src/main/java/org/jetlinks/simulator/core/network/mqtt/MqttClient.java index 761471a..58ba302 100644 --- a/simulator-core/src/main/java/org/jetlinks/simulator/core/network/mqtt/MqttClient.java +++ b/simulator-core/src/main/java/org/jetlinks/simulator/core/network/mqtt/MqttClient.java @@ -17,14 +17,19 @@ import org.jetlinks.simulator.core.Global; import org.jetlinks.simulator.core.network.*; import reactor.core.Disposable; +import reactor.core.Disposables; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.publisher.MonoSink; import reactor.util.function.Tuple2; import reactor.util.function.Tuples; import java.net.InetSocketAddress; +import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.function.Consumer; @@ -40,6 +45,10 @@ public class MqttClient extends AbstractConnection { private final Map, Subscriber> subscribers = new ConcurrentHashMap<>(); private MqttClient(io.vertx.mqtt.MqttClient client, Address address) { + new MqttClient(client, address, null); + } + + public MqttClient(io.vertx.mqtt.MqttClient client, Address localAddress, MqttOptions options) { this.client = client; this.address = address; this.client @@ -51,17 +60,40 @@ private MqttClient(io.vertx.mqtt.MqttClient client, Address address) { handler.accept(msg); } catch (Throwable error) { log.warn("handle mqtt message {} {} error:{}", msg.topicName(), - msg.payload().toString(), - ExceptionUtils.getErrorMessage(error)); + msg.payload().toString(), + ExceptionUtils.getErrorMessage(error)); } } } }) - .closeHandler(e -> dispose()); + .closeHandler(e -> reconnect(options)); } + private static int counter = 1; + + private void reconnect(MqttOptions options) { + //超过重试次数且状态未连接放弃重试 + if (counter > options.getReconnectAttempts() && !isAlive()) { + dispose(); + } else { + Address localAddress = AddressManager.global().takeAddress(options.getLocalAddress()); + Disposables.composite() + .add( + Mono.delay(Duration.ofMillis(options.getReconnectInterval())) + .flatMap(i -> Mono.create(sink -> this.connect1(client, options, localAddress, sink))) + .subscribe(mqttClient -> { + log.info("客户端尝试重连:{},次数:{}", mqttClient.getId(), counter); + counter++; + }) + + ); + } + + } + + public static Mono connect(InetSocketAddress server, MqttOptions options) { return connect(Global.vertx(), server, options); @@ -73,46 +105,51 @@ public static Mono connect(Vertx vertx, Address localAddress = AddressManager.global().takeAddress(options.getLocalAddress()); return Mono.create(sink -> { - MqttClientOptions clientOptions = options.copy(); - clientOptions.setClientId(options.getClientId()); - clientOptions.setUsername(options.getUsername()); - clientOptions.setPassword(options.getPassword()); - clientOptions.setLocalAddress(localAddress.getAddress().getHostAddress()); - clientOptions.setAutoKeepAlive(true); - clientOptions.setTcpKeepAlive(true); - clientOptions.setMaxMessageSize(1024 * 1024); - clientOptions.setReusePort(true); - - io.vertx.mqtt.MqttClient client = io.vertx.mqtt.MqttClient.create(vertx, clientOptions); - - client.connect( - server.getPort(), - server.getHostString(), - res -> { - if (res.failed()) { - sink.error(res.cause()); - return; - } - - MqttConnAckMessage msg = res.result(); - if (msg != null) { - if (msg.code() == MqttConnectReturnCode.CONNECTION_ACCEPTED) { - MqttClient mqttClient = new MqttClient(client, localAddress); - mqttClient.attribute("clientId", options.getClientId()); - mqttClient.attribute("username", options.getUsername()); - mqttClient.changeState(State.connected); - sink.success(mqttClient); - } else { - sink.error(new MqttConnectionException(msg.code())); - } - } - sink.success(); - - }); - }) - .doOnError(err -> localAddress.release()); + MqttClientOptions clientOptions = options.copy(); + clientOptions.setClientId(options.getClientId()); + clientOptions.setUsername(options.getUsername()); + clientOptions.setPassword(options.getPassword()); + clientOptions.setLocalAddress(localAddress.getAddress().getHostAddress()); + clientOptions.setAutoKeepAlive(true); + clientOptions.setTcpKeepAlive(true); + clientOptions.setMaxMessageSize(1024 * 1024); + clientOptions.setReusePort(true); + + io.vertx.mqtt.MqttClient client = io.vertx.mqtt.MqttClient.create(vertx, clientOptions); + + connect1(client, options, localAddress, sink); + }) + .doOnError(err -> localAddress.release()); + } + + private static void connect1(io.vertx.mqtt.MqttClient client, MqttOptions options, Address localAddress, MonoSink sink) { + client.connect( + options.getPort(), + options.getHost(), + res -> { + if (res.failed()) { + sink.error(res.cause()); + return; + } + + MqttConnAckMessage msg = res.result(); + if (msg != null) { + if (msg.code() == MqttConnectReturnCode.CONNECTION_ACCEPTED) { + MqttClient mqttClient = new MqttClient(client, localAddress, options); + mqttClient.attribute("clientId", options.getClientId()); + mqttClient.attribute("username", options.getUsername()); + mqttClient.changeState(State.connected); + sink.success(mqttClient); + } else { + sink.error(new MqttConnectionException(msg.code())); + } + } + sink.success(); + + }); } + @Override public String getId() { return client.clientId(); @@ -202,6 +239,7 @@ public void accept(MqttPublishMessage msg) { } } } + } public void publish(String topic, int qos, Object payload) { @@ -222,21 +260,21 @@ public Mono publishAsync(String topic, int qos, ByteBuf payload) { Buffer buffer = Buffer.buffer(payload); int len = buffer.length(); return Mono.create(sink -> client - .publish(topic, - buffer, - MqttQoS.valueOf(qos), - false, - false, - res -> { - ReferenceCountUtil.safeRelease(payload); - sent(len); - if (res.failed()) { - sink.error(res.cause()); - } else { - sink.success(); - } - })) - .doOnError(this::error); + .publish(topic, + buffer, + MqttQoS.valueOf(qos), + false, + false, + res -> { + ReferenceCountUtil.safeRelease(payload); + sent(len); + if (res.failed()) { + sink.error(res.cause()); + } else { + sink.success(); + } + })) + .doOnError(this::error); } public List getSubscriptions() { @@ -245,7 +283,9 @@ public List getSubscriptions() { @Override protected void doDisposed() { - address.release(); + if (!Objects.isNull(address) && !Objects.isNull(address.getAddress())) { + address.release(); + } super.doDisposed(); if (client.isConnected()) { client.disconnect(); From 7e4e301b9f6c2ba4d6d59d379c0ef37e5ba12866 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E6=B4=81?= <445990772@qq.com> Date: Tue, 12 Dec 2023 14:56:54 +0800 Subject: [PATCH 2/2] =?UTF-8?q?feat(mqtt=E6=A8=A1=E6=8B=9F=E5=99=A8)?= =?UTF-8?q?=EF=BC=9A=E6=96=B0=E5=A2=9E=E6=96=AD=E5=BC=80=E9=87=8D=E8=AF=95?= =?UTF-8?q?=E5=8F=82=E6=95=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- benchmark/mqtt/benchmark.js | 6 +- .../cmd/benchmark/MQTTBenchMark.java | 18 +- .../cmd/mqtt/ConnectMqttCommand.java | 16 +- simulator-cli/src/main/resources/logback.xml | 2 +- .../core/network/mqtt/MqttClient.java | 181 ++++++++++++------ 5 files changed, 151 insertions(+), 72 deletions(-) diff --git a/benchmark/mqtt/benchmark.js b/benchmark/mqtt/benchmark.js index 8f28ac7..965a0bb 100644 --- a/benchmark/mqtt/benchmark.js +++ b/benchmark/mqtt/benchmark.js @@ -1,6 +1,8 @@ /** * JetLinks mqtt 官方协议模拟器 * benchmark mqtt --host=127.0.0.1 --port=8801 --script=demo/tcp/benchmark.js report=true reportLimit=100 interval=1000 + * --reconnectAttempts=3 重试次数 + * --reconnectInterval=1000 重试间隔 */ //绑定内置参数,否则匿名函数无法使用。 @@ -67,11 +69,11 @@ function reportProperties(client) { "containsGeo": false, "ignoreLog": true, "ignoreStorage": true - } + } } //推送mqtt - return client.publishAsync("/report-property", 0, $benchmark.toJson(msg)); + return client.publishAsync(createTopic(client, "/properties/report"), 0, $benchmark.toJson(msg)); } diff --git a/simulator-cli/src/main/java/org/jetlinks/simulator/cmd/benchmark/MQTTBenchMark.java b/simulator-cli/src/main/java/org/jetlinks/simulator/cmd/benchmark/MQTTBenchMark.java index 9815dd5..069ee9b 100644 --- a/simulator-cli/src/main/java/org/jetlinks/simulator/cmd/benchmark/MQTTBenchMark.java +++ b/simulator-cli/src/main/java/org/jetlinks/simulator/cmd/benchmark/MQTTBenchMark.java @@ -1,5 +1,6 @@ package org.jetlinks.simulator.cmd.benchmark; +import lombok.extern.slf4j.Slf4j; import org.jetlinks.simulator.cmd.NetClientCommandOption; import org.jetlinks.simulator.cmd.NetworkInterfaceCompleter; import org.jetlinks.simulator.core.Connection; @@ -12,6 +13,7 @@ import java.net.InetSocketAddress; import java.util.Collections; +@Slf4j @CommandLine.Command(name = "mqtt", showDefaultValues = true, description = { @@ -37,7 +39,11 @@ protected Mono createConnection(ConnectCreateContext ctx) return MqttClient .connect( InetSocketAddress.createUnresolved(command.getHost(), command.getPort()), - commandOptions + commandOptions, + () -> { + log.info("断开重试触发beforeConnect开始:{},{}", commandOptions.getUsername()); + ctx.beforeConnect(commandOptions); + } ); } @@ -79,6 +85,16 @@ public void setPassword0(String password) { @CommandLine.Option(names = {"--topics"}, description = "attach and subscribe topics", order = 6) private String[] topics; + @CommandLine.Option(names = {"--reconnectAttempts"}, description = "MQTT reconnect times", order = 7) + public void setReconnect(int attemps) { + super.setReconnectAttempts(attemps); + } + + @CommandLine.Option(names = {"--reconnectInterval"}, description = "MQTT reconnect interval", order = 8) + public void setReconnectInterval0(long interval) { + super.setReconnectInterval(interval); + } + } diff --git a/simulator-cli/src/main/java/org/jetlinks/simulator/cmd/mqtt/ConnectMqttCommand.java b/simulator-cli/src/main/java/org/jetlinks/simulator/cmd/mqtt/ConnectMqttCommand.java index 61dd752..08a1361 100644 --- a/simulator-cli/src/main/java/org/jetlinks/simulator/cmd/mqtt/ConnectMqttCommand.java +++ b/simulator-cli/src/main/java/org/jetlinks/simulator/cmd/mqtt/ConnectMqttCommand.java @@ -1,18 +1,13 @@ package org.jetlinks.simulator.cmd.mqtt; -import io.vertx.core.net.NetClientOptions; import org.jetlinks.simulator.cmd.AbstractCommand; import org.jetlinks.simulator.cmd.NetClientCommandOption; -import org.jetlinks.simulator.cmd.NetworkInterfaceCompleter; import org.jetlinks.simulator.core.ExceptionUtils; import org.jetlinks.simulator.core.network.mqtt.MqttClient; -import org.springframework.util.CollectionUtils; import picocli.CommandLine; import java.net.InetSocketAddress; -import java.net.NetworkInterface; import java.time.Duration; -import java.util.StringJoiner; @CommandLine.Command(name = "connect", showDefaultValues = true, @@ -86,6 +81,17 @@ public void setPassword0(String password) { @CommandLine.Option(names = {"--topics"}, description = "attach and subscribe topics", order = 6) private String[] topics; + @CommandLine.Option(names = {"--reconnectAttempts"}, description = "MQTT reconnect times", order = 7) + public void setReconnect(int attemps) { + super.setReconnectAttempts(attemps); + } + + @CommandLine.Option(names = {"--reconnectInterval"}, description = "MQTT reconnect interval", order = 8) + public void setReconnectInterval0(long interval) { + super.setReconnectInterval(interval); + } + + } } diff --git a/simulator-cli/src/main/resources/logback.xml b/simulator-cli/src/main/resources/logback.xml index 938773f..4251594 100644 --- a/simulator-cli/src/main/resources/logback.xml +++ b/simulator-cli/src/main/resources/logback.xml @@ -22,7 +22,7 @@ - + \ No newline at end of file diff --git a/simulator-core/src/main/java/org/jetlinks/simulator/core/network/mqtt/MqttClient.java b/simulator-core/src/main/java/org/jetlinks/simulator/core/network/mqtt/MqttClient.java index 761471a..8381411 100644 --- a/simulator-core/src/main/java/org/jetlinks/simulator/core/network/mqtt/MqttClient.java +++ b/simulator-core/src/main/java/org/jetlinks/simulator/core/network/mqtt/MqttClient.java @@ -13,20 +13,26 @@ import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.jetlinks.core.utils.TopicUtils; +import org.jetlinks.simulator.core.Connection; import org.jetlinks.simulator.core.ExceptionUtils; import org.jetlinks.simulator.core.Global; import org.jetlinks.simulator.core.network.*; import reactor.core.Disposable; +import reactor.core.Disposables; import reactor.core.publisher.Mono; +import reactor.core.publisher.MonoSink; import reactor.util.function.Tuple2; import reactor.util.function.Tuples; import java.net.InetSocketAddress; +import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; @Slf4j @@ -39,9 +45,21 @@ public class MqttClient extends AbstractConnection { private final Map, Subscriber> subscribers = new ConcurrentHashMap<>(); - private MqttClient(io.vertx.mqtt.MqttClient client, Address address) { + private Disposable disposables = Disposables.disposed(); + + private AtomicInteger counter = new AtomicInteger(1); + + private static Runnable DEFAULT = () -> { + }; + + public MqttClient(io.vertx.mqtt.MqttClient client, Address address, MqttOptions options, AtomicInteger counter) { + new MqttClient(client, address, null, this.counter); + } + + + public MqttClient(io.vertx.mqtt.MqttClient client, Address localAddress, MqttOptions options, AtomicInteger counter, Runnable fallback) { + this.counter = counter; this.client = client; - this.address = address; this.client .publishHandler(msg -> { received(msg.payload().length()); @@ -51,68 +69,101 @@ private MqttClient(io.vertx.mqtt.MqttClient client, Address address) { handler.accept(msg); } catch (Throwable error) { log.warn("handle mqtt message {} {} error:{}", msg.topicName(), - msg.payload().toString(), - ExceptionUtils.getErrorMessage(error)); + msg.payload().toString(), + ExceptionUtils.getErrorMessage(error)); } } } }) - .closeHandler(e -> dispose()); + .closeHandler(e -> { + reconnect(options, fallback); + log.info("客户端尝试重连:{},次数:{}", this.client.clientId(), counter.getAndIncrement()); + }); + + } + + + private void reconnect(MqttOptions options, Runnable fallback) { + //超过重试次数且状态未连接放弃重试 + if (counter.get() > options.getReconnectAttempts() && !isAlive()) { + counter.set(1); + dispose(); + } else { + + disposables = Mono.delay(Duration.ofMillis(options.getReconnectInterval())) + .doOnNext(i -> fallback.run()) + .flatMap(i -> Mono.create(sink -> this.connect1(client, options, this.address, sink, counter, fallback))) + .subscribe(); + } } public static Mono connect(InetSocketAddress server, MqttOptions options) { - return connect(Global.vertx(), server, options); + return connect(Global.vertx(), server, options, DEFAULT); } - public static Mono connect(Vertx vertx, - InetSocketAddress server, - MqttOptions options) { + + public static Mono connect(InetSocketAddress server, MqttOptions options, Runnable retryFallback) { + return connect(Global.vertx(), server, options, retryFallback); + } + + private static Mono connect(Vertx vertx, InetSocketAddress server, MqttOptions options, Runnable retryFallback) { Address localAddress = AddressManager.global().takeAddress(options.getLocalAddress()); return Mono.create(sink -> { - MqttClientOptions clientOptions = options.copy(); - clientOptions.setClientId(options.getClientId()); - clientOptions.setUsername(options.getUsername()); - clientOptions.setPassword(options.getPassword()); - clientOptions.setLocalAddress(localAddress.getAddress().getHostAddress()); - clientOptions.setAutoKeepAlive(true); - clientOptions.setTcpKeepAlive(true); - clientOptions.setMaxMessageSize(1024 * 1024); - clientOptions.setReusePort(true); - - io.vertx.mqtt.MqttClient client = io.vertx.mqtt.MqttClient.create(vertx, clientOptions); - - client.connect( - server.getPort(), - server.getHostString(), - res -> { - if (res.failed()) { - sink.error(res.cause()); - return; - } - - MqttConnAckMessage msg = res.result(); - if (msg != null) { - if (msg.code() == MqttConnectReturnCode.CONNECTION_ACCEPTED) { - MqttClient mqttClient = new MqttClient(client, localAddress); - mqttClient.attribute("clientId", options.getClientId()); - mqttClient.attribute("username", options.getUsername()); - mqttClient.changeState(State.connected); - sink.success(mqttClient); - } else { - sink.error(new MqttConnectionException(msg.code())); - } - } - sink.success(); - - }); - }) - .doOnError(err -> localAddress.release()); + MqttClientOptions clientOptions = options.copy(); + clientOptions.setClientId(options.getClientId()); + clientOptions.setUsername(options.getUsername()); + clientOptions.setPassword(options.getPassword()); + clientOptions.setLocalAddress(localAddress.getAddress().getHostAddress()); + clientOptions.setAutoKeepAlive(true); + clientOptions.setTcpKeepAlive(true); + clientOptions.setMaxMessageSize(1024 * 1024); + clientOptions.setReusePort(true); + + io.vertx.mqtt.MqttClient client = io.vertx.mqtt.MqttClient.create(vertx, clientOptions); + + connect1(client, options, localAddress, sink, new AtomicInteger(1), retryFallback); + }) + .doOnError(err -> localAddress.release()); + } + + + private static void connect1(io.vertx.mqtt.MqttClient client, + MqttOptions options, + Address localAddress, + MonoSink sink, + AtomicInteger counter, + Runnable retryFallback) { + client.connect( + options.getPort(), + options.getHost(), + res -> { + if (res.failed()) { + sink.error(res.cause()); + return; + } + + MqttConnAckMessage msg = res.result(); + if (msg != null) { + if (msg.code() == MqttConnectReturnCode.CONNECTION_ACCEPTED) { + MqttClient mqttClient = new MqttClient(client, localAddress, options, counter, retryFallback); + mqttClient.attribute("clientId", options.getClientId()); + mqttClient.attribute("username", options.getUsername()); + mqttClient.changeState(State.connected); + sink.success(mqttClient); + } else { + sink.error(new MqttConnectionException(msg.code())); + } + } + sink.success(); + + }); } + @Override public String getId() { return client.clientId(); @@ -202,6 +253,7 @@ public void accept(MqttPublishMessage msg) { } } } + } public void publish(String topic, int qos, Object payload) { @@ -222,21 +274,21 @@ public Mono publishAsync(String topic, int qos, ByteBuf payload) { Buffer buffer = Buffer.buffer(payload); int len = buffer.length(); return Mono.create(sink -> client - .publish(topic, - buffer, - MqttQoS.valueOf(qos), - false, - false, - res -> { - ReferenceCountUtil.safeRelease(payload); - sent(len); - if (res.failed()) { - sink.error(res.cause()); - } else { - sink.success(); - } - })) - .doOnError(this::error); + .publish(topic, + buffer, + MqttQoS.valueOf(qos), + false, + false, + res -> { + ReferenceCountUtil.safeRelease(payload); + sent(len); + if (res.failed()) { + sink.error(res.cause()); + } else { + sink.success(); + } + })) + .doOnError(this::error); } public List getSubscriptions() { @@ -245,8 +297,11 @@ public List getSubscriptions() { @Override protected void doDisposed() { - address.release(); + if (!Objects.isNull(address) && !Objects.isNull(address.getAddress())) { + address.release(); + } super.doDisposed(); + disposables.dispose(); if (client.isConnected()) { client.disconnect(); }