From 29a27f7fe69db73a4ea6f21b06e6f70337f2c0b1 Mon Sep 17 00:00:00 2001 From: ran Date: Tue, 27 Jul 2021 09:55:32 +0800 Subject: [PATCH] bump mqtt version to `4.1.66.Final` (#73) --- .../streamnative/pulsar/handlers/mqtt/proxy/ProxyHandler.java | 3 +++ .../handlers/mqtt/support/ProtocolMethodProcessorImpl.java | 2 +- pom.xml | 2 +- 3 files changed, 5 insertions(+), 2 deletions(-) diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/ProxyHandler.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/ProxyHandler.java index 6ef177ad8..f36dd7b7e 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/ProxyHandler.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/ProxyHandler.java @@ -117,6 +117,9 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception { @Override public void channelRead(ChannelHandlerContext ctx, Object message) throws Exception { log.info("channel read: {}", message); + if (message instanceof MqttMessage && ((MqttMessage) message).decoderResult().isFailure()) { + log.error("Failed to decode mqttMessage.", ((MqttMessage) message).decoderResult().cause()); + } switch (state) { case Init: MqttMessage msg = (MqttMessage) message; diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/ProtocolMethodProcessorImpl.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/ProtocolMethodProcessorImpl.java index 62d7daa6d..e8989ec91 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/ProtocolMethodProcessorImpl.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/ProtocolMethodProcessorImpl.java @@ -462,7 +462,7 @@ private MqttSubAckMessage doAckMessageFromValidateFilters(List6.14.3 4.0.2 2.8.0 - 4.1.49.Final + 4.1.66.Final 2.13.3 1.18.4 1.16