Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix infinite dispatch loop in MP due to stale channel #2241

Merged
merged 1 commit into from
Nov 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,10 @@ public interface MessageConsumer {
* @return ID
*/
public String getId();

/**
* Re-initializes the message consumer.
* @return {@code true} if re-initialization is successful, {@code false} otherwise.
*/
public boolean reInitialize();
}
Original file line number Diff line number Diff line change
Expand Up @@ -681,7 +681,7 @@ private void tryToDispatchToEndpoint(MessageContext messageToDispatch, Endpoint
// For each retry we need to have a fresh copy of the original message
getFreshCopyOfOriginalMessage(messageToDispatch, originalEnvelop, originalJsonInputStream);

if (messageConsumer != null && messageConsumer.isAlive()) {
if (messageConsumer != null && (messageConsumer.isAlive() || messageConsumer.reInitialize())) {
messageToDispatch.setProperty(SynapseConstants.BLOCKING_MSG_SENDER, sender);
// Clear the message context properties related to endpoint in last service invocation
Set keySet = messageToDispatch.getPropertyKeySet();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,11 @@ public MessageContext receive() {
}
}

public boolean reInitialize() {
// To keep the existing behaviour, return false
return false;
}

/**
* Ack on success message sending by processor
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,11 @@ private void reconnect() throws StoreForwardException {
}
}

public boolean reInitialize() {
// To keep the existing behaviour, return false
return false;
}

private final class CachedMessage {
private Message message = null;
private MessageContext mc = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,4 +98,10 @@ public InMemoryConsumer setDestination(Queue<MessageContext> queue) {
this.queue = queue;
return this;
}

@Override
public boolean reInitialize() {
// To keep the existing behaviour, return false
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,26 @@ public MessageContext receive() {
}
} else {
log.warn("The connection and channel to the RabbitMQ broker are unhealthy.");
reInitialize();
}
return null;
}

/**
* Reconnect to the RabbitMQ broker by creating a new connection and channel.
* return {@code true} if re-initialization is successful, {@code false} otherwise.
*/
public boolean reInitialize() {
try {
log.info("Cleanup and Reinitializing connection and channel for " + getId());
cleanup();
setConnection(store.createConnection());
setChannel(store.createChannel(connection));
} catch (Exception e) {
log.error("Failed to reinitialize connection and channel.", e);
return false;
}
return null;
return true;
}

/**
Expand Down Expand Up @@ -136,14 +151,30 @@ public boolean ack() {
*/
@Override
public boolean cleanup() {
if (connection != null) {
connection.abort();
try {
if (channel != null) {
channel.abort(); // Forcefully close the channel
}
} catch (Exception e) {
//ignore
} finally {
channel = null; // Ensure channel is nullified
}
channel = null;
connection = null;
return true;

try {
if (connection != null) {
connection.abort(); // Forcefully close the connection
}
} catch (Exception e) {
//ignore
} finally {
connection = null; // Ensure connection is nullified
}

return true; // Indicating cleanup was attempted
}


/**
* Check availability of connectivity with the message store
*
Expand Down
Loading