From 21ac7de6d908fb27b830181892b35c012e0b3e68 Mon Sep 17 00:00:00 2001 From: "Jan N. Klug" Date: Fri, 21 Jul 2023 16:28:49 +0200 Subject: [PATCH] Fix MQTT crashes when subscriber throws exception Signed-off-by: Jan N. Klug --- .../io/transport/mqtt/internal/Subscription.java | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/openhab/core/io/transport/mqtt/internal/Subscription.java b/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/openhab/core/io/transport/mqtt/internal/Subscription.java index 1668f254228..84a77d13563 100644 --- a/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/openhab/core/io/transport/mqtt/internal/Subscription.java +++ b/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/openhab/core/io/transport/mqtt/internal/Subscription.java @@ -18,6 +18,9 @@ import org.eclipse.jdt.annotation.NonNullByDefault; import org.openhab.core.io.transport.mqtt.MqttMessageSubscriber; +import org.openhab.core.util.HexUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.hivemq.client.mqtt.mqtt3.message.publish.Mqtt3Publish; import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish; @@ -31,6 +34,7 @@ */ @NonNullByDefault public class Subscription { + private final Logger logger = LoggerFactory.getLogger(Subscription.class); private final Map retainedMessages = new ConcurrentHashMap<>(); private final Collection subscribers = ConcurrentHashMap.newKeySet(); @@ -81,10 +85,15 @@ public void messageArrived(String topic, byte[] payload, boolean retain) { if (retain || retainedMessages.containsKey(topic)) { retainedMessages.put(topic, payload); } - subscribers.stream().forEach(subscriber -> processMessage(subscriber, topic, payload)); + subscribers.forEach(subscriber -> processMessage(subscriber, topic, payload)); } private void processMessage(MqttMessageSubscriber subscriber, String topic, byte[] payload) { - subscriber.processMessage(topic, payload); + try { + subscriber.processMessage(topic, payload); + } catch (RuntimeException e) { + logger.warn("A subscriber of type '{}' failed to process message '{}' to topic '{}'.", + subscriber.getClass(), HexUtils.bytesToHex(payload), topic); + } } }