diff --git a/modules/core/src/main/java/org/apache/synapse/message/MessageConsumer.java b/modules/core/src/main/java/org/apache/synapse/message/MessageConsumer.java index 18da251a14..ad3cb8376f 100644 --- a/modules/core/src/main/java/org/apache/synapse/message/MessageConsumer.java +++ b/modules/core/src/main/java/org/apache/synapse/message/MessageConsumer.java @@ -62,4 +62,10 @@ public interface MessageConsumer { * @return ID */ public String getId(); + + /** + * Re-initializes the message consumer. + * @return {@code true} if re-initialization is successful, {@code false} otherwise. + */ + public boolean reInitialize(); } diff --git a/modules/core/src/main/java/org/apache/synapse/message/processor/impl/forwarder/ForwardingService.java b/modules/core/src/main/java/org/apache/synapse/message/processor/impl/forwarder/ForwardingService.java index 7db3c822a3..294b269c64 100644 --- a/modules/core/src/main/java/org/apache/synapse/message/processor/impl/forwarder/ForwardingService.java +++ b/modules/core/src/main/java/org/apache/synapse/message/processor/impl/forwarder/ForwardingService.java @@ -681,7 +681,7 @@ private void tryToDispatchToEndpoint(MessageContext messageToDispatch, Endpoint // For each retry we need to have a fresh copy of the original message getFreshCopyOfOriginalMessage(messageToDispatch, originalEnvelop, originalJsonInputStream); - if (messageConsumer != null && messageConsumer.isAlive()) { + if (messageConsumer != null && (messageConsumer.isAlive() || messageConsumer.reInitialize())) { messageToDispatch.setProperty(SynapseConstants.BLOCKING_MSG_SENDER, sender); // Clear the message context properties related to endpoint in last service invocation Set keySet = messageToDispatch.getPropertyKeySet(); diff --git a/modules/core/src/main/java/org/apache/synapse/message/store/impl/jdbc/JDBCConsumer.java b/modules/core/src/main/java/org/apache/synapse/message/store/impl/jdbc/JDBCConsumer.java index e40db909cf..1d0dc9e5c9 100644 --- a/modules/core/src/main/java/org/apache/synapse/message/store/impl/jdbc/JDBCConsumer.java +++ b/modules/core/src/main/java/org/apache/synapse/message/store/impl/jdbc/JDBCConsumer.java @@ -90,6 +90,11 @@ public MessageContext receive() { } } + public boolean reInitialize() { + // To keep the existing behaviour, return false + return false; + } + /** * Ack on success message sending by processor * diff --git a/modules/core/src/main/java/org/apache/synapse/message/store/impl/jms/JmsConsumer.java b/modules/core/src/main/java/org/apache/synapse/message/store/impl/jms/JmsConsumer.java index 2bc40b4b31..7aa5d90893 100644 --- a/modules/core/src/main/java/org/apache/synapse/message/store/impl/jms/JmsConsumer.java +++ b/modules/core/src/main/java/org/apache/synapse/message/store/impl/jms/JmsConsumer.java @@ -283,6 +283,11 @@ private void reconnect() throws StoreForwardException { } } + public boolean reInitialize() { + // To keep the existing behaviour, return false + return false; + } + private final class CachedMessage { private Message message = null; private MessageContext mc = null; diff --git a/modules/core/src/main/java/org/apache/synapse/message/store/impl/memory/InMemoryConsumer.java b/modules/core/src/main/java/org/apache/synapse/message/store/impl/memory/InMemoryConsumer.java index f605314902..0c583e7d7e 100644 --- a/modules/core/src/main/java/org/apache/synapse/message/store/impl/memory/InMemoryConsumer.java +++ b/modules/core/src/main/java/org/apache/synapse/message/store/impl/memory/InMemoryConsumer.java @@ -98,4 +98,10 @@ public InMemoryConsumer setDestination(Queue queue) { this.queue = queue; return this; } + + @Override + public boolean reInitialize() { + // To keep the existing behaviour, return false + return false; + } } diff --git a/modules/core/src/main/java/org/apache/synapse/message/store/impl/rabbitmq/RabbitMQConsumer.java b/modules/core/src/main/java/org/apache/synapse/message/store/impl/rabbitmq/RabbitMQConsumer.java index b6fd6ac720..355414aaac 100644 --- a/modules/core/src/main/java/org/apache/synapse/message/store/impl/rabbitmq/RabbitMQConsumer.java +++ b/modules/core/src/main/java/org/apache/synapse/message/store/impl/rabbitmq/RabbitMQConsumer.java @@ -92,11 +92,26 @@ public MessageContext receive() { } } else { log.warn("The connection and channel to the RabbitMQ broker are unhealthy."); + reInitialize(); + } + return null; + } + + /** + * Reconnect to the RabbitMQ broker by creating a new connection and channel. + * return {@code true} if re-initialization is successful, {@code false} otherwise. + */ + public boolean reInitialize() { + try { + log.info("Cleanup and Reinitializing connection and channel for " + getId()); cleanup(); setConnection(store.createConnection()); setChannel(store.createChannel(connection)); + } catch (Exception e) { + log.error("Failed to reinitialize connection and channel.", e); + return false; } - return null; + return true; } /** @@ -136,14 +151,30 @@ public boolean ack() { */ @Override public boolean cleanup() { - if (connection != null) { - connection.abort(); + try { + if (channel != null) { + channel.abort(); // Forcefully close the channel + } + } catch (Exception e) { + //ignore + } finally { + channel = null; // Ensure channel is nullified } - channel = null; - connection = null; - return true; + + try { + if (connection != null) { + connection.abort(); // Forcefully close the connection + } + } catch (Exception e) { + //ignore + } finally { + connection = null; // Ensure connection is nullified + } + + return true; // Indicating cleanup was attempted } + /** * Check availability of connectivity with the message store *