Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARTEMIS-4924 Proper handling of invalid messages in SNF queues #5091

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,12 @@ public ActiveMQException createException(String msg) {
public ActiveMQException createException(String msg) {
return new ActiveMQTimeoutException(msg);
}
},
INVALID_MESSAGE_EXCEPTION(224) {
@Override
public ActiveMQException createException(String msg) {
return new ActiveMQInvalidMessageException(msg);
}
};
private static final Map<Integer, ActiveMQExceptionType> TYPE_MAP;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.activemq.artemis.api.core;

public class ActiveMQInvalidMessageException extends ActiveMQException {

public ActiveMQInvalidMessageException(String message) {
super(ActiveMQExceptionType.INVALID_MESSAGE_EXCEPTION, message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,11 @@ public interface Message {
*/
SimpleString HDR_INGRESS_TIMESTAMP = SimpleString.of("_AMQ_INGRESS_TIMESTAMP");

/**
* This gives extra information as to why the messages is sent to DLQ
*/
SimpleString HDR_ROUTE_DLQ_DETAIL = SimpleString.of("_AMQ_DLQ_DETAIL");

/**
* The prefix used (if any) when sending this message. For protocols (e.g. STOMP) that need to track this and restore
* the prefix when the message is consumed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -561,4 +561,7 @@ IllegalStateException invalidRoutingTypeUpdate(String queueName,

@Message(id = 229256, value = "{} must be a positive power of 2 (actual value: {})")
IllegalArgumentException positivePowerOfTwo(String name, Number val);

@Message(id = 229257, value = "Missing header {}")
String messageMissingHeader(SimpleString idsHeaderName);
}
Original file line number Diff line number Diff line change
Expand Up @@ -641,9 +641,8 @@ void slowConsumerDetected(String sessionID,
@LogMessage(id = 222109, value = "Timed out waiting for write lock on consumer {} from {}. Check the Thread dump", level = LogMessage.Level.WARN)
void timeoutLockingConsumer(String consumer, String remoteAddress);

@LogMessage(id = 222110, value = "no queue IDs defined!, originalMessage = {}, copiedMessage = {}, props={}", level = LogMessage.Level.WARN)
@LogMessage(id = 222110, value = "no queue IDs defined!, originalMessage = {}, props={}", level = LogMessage.Level.WARN)
void noQueueIdDefined(org.apache.activemq.artemis.api.core.Message message,
org.apache.activemq.artemis.api.core.Message messageCopy,
SimpleString idsHeaderName);

@LogMessage(id = 222111, value = "exception while invoking {} on {}", level = LogMessage.Level.TRACE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -511,7 +511,7 @@ public void failed(Throwable t) {
}

/* Hook for processing message before forwarding */
protected Message beforeForward(Message message, final SimpleString forwardingAddress) {
protected Message beforeForward(Message message, final SimpleString forwardingAddress) throws ActiveMQException {
message = message.copy();
((RefCountMessage)message).setParentRef((RefCountMessage)message);

Expand Down Expand Up @@ -610,7 +610,15 @@ public HandleStatus handle(final MessageReference ref) throws Exception {
dest = ref.getMessage().getAddressSimpleString();
}

final Message message = beforeForward(ref.getMessage(), dest);
final Message message;
try {
message = beforeForward(ref.getMessage(), dest);
} catch (ActiveMQException ex) {
ref.getMessage().putStringProperty(Message.HDR_ROUTE_DLQ_DETAIL, SimpleString.of(ex.toString()));
ref.getQueue().sendToDeadLetterAddress(null, ref);
refs.remove(ref.getMessageID());
return HandleStatus.HANDLED;
}

pendingAcks.countUp();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.postoffice.BindingType;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType;
Expand Down Expand Up @@ -185,7 +186,7 @@ protected ClientSessionFactoryInternal createSessionFactory() throws Exception {
}

@Override
protected Message beforeForward(final Message message, final SimpleString forwardingAddress) {
protected Message beforeForward(final Message message, final SimpleString forwardingAddress) throws ActiveMQException {
// We make a copy of the message, then we strip out the unwanted routing id headers and leave
// only
// the one pertinent for the address node - this is important since different queues on different
Expand All @@ -200,11 +201,9 @@ protected Message beforeForward(final Message message, final SimpleString forwar
Set<SimpleString> propNames = new HashSet<>(messageCopy.getPropertyNames());

byte[] queueIds = message.getExtraBytesProperty(idsHeaderName);

if (queueIds == null) {
// Sanity check only
ActiveMQServerLogger.LOGGER.noQueueIdDefined(message, messageCopy, idsHeaderName);
gaohoward marked this conversation as resolved.
Show resolved Hide resolved
throw new IllegalStateException("no queueIDs defined");
ActiveMQServerLogger.LOGGER.noQueueIdDefined(message, idsHeaderName);
throw ActiveMQExceptionType.INVALID_MESSAGE_EXCEPTION.createException(ActiveMQMessageBundle.BUNDLE.messageMissingHeader(idsHeaderName));
}

for (SimpleString propName : propNames) {
Expand Down
5 changes: 5 additions & 0 deletions docs/user-manual/clusters.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -715,6 +715,11 @@ The default value is `-1`.

It often makes sense to introduce a delay before redistributing as it's a common case that a consumer closes but another one quickly is created on the same queue, in such a case you probably don't want to redistribute immediately since the new consumer will arrive shortly.

[WARNING]
====
The broker uses internal store and forward queues to handle message redistribution. Be aware that any clients should not directly send messages to the sore and forward queues. If a client sends messages to a store and forward queue, the messages will be sent to dead letter address. If security is enabled, make sure the clients do not have `send` permission on any store and forward queues. (The name pattern for a store and forward queue is <internal-naming-prefix>.sf.<cluster-name>.<nodeID> where the default internal-naming-prefix is `$.activemq.internal`, the cluster-name is the name of the cluster-connection, and the nodeID is the target node's ID)
====

== Cluster topologies

Apache ActiveMQ Artemis clusters can be connected together in many different topologies, let's consider the two most common ones here
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
Expand All @@ -43,6 +44,7 @@
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.Bindings;
import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.cluster.MessageFlowRecord;
import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding;
import org.apache.activemq.artemis.core.server.cluster.impl.BridgeTestAccessor;
Expand Down Expand Up @@ -413,6 +415,107 @@ public void testPauseAddressBlockingSnFQueue() throws Exception {
stopServers(0, 1);
}

@Test
public void testBadClientSendMessagesToSnFQueue() throws Exception {
setupServer(0, isFileStorage(), isNetty());
setupServer(1, isFileStorage(), isNetty());

setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1);

setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0);

String dla = "DLA";
AddressSettings addressSettings = new AddressSettings();
addressSettings.setDeadLetterAddress(SimpleString.of(dla));

servers[0].getAddressSettingsRepository().addMatch("#", addressSettings);
servers[1].getAddressSettingsRepository().addMatch("#", addressSettings);

startServers(0, 1);

setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());

createQueue(0, dla, dla, null, true);
createQueue(1, dla, dla, null, true);

waitForBindings(0, dla, 1, 0, true);
waitForBindings(1, dla, 1, 0, true);

ClientSession session0 = sfs[0].createSession();
ClientSession session1 = sfs[1].createSession();

session0.start();
session1.start();

final int num = 10;

SimpleString nodeId1 = servers[1].getNodeID();
ClusterConnectionImpl cc0 = (ClusterConnectionImpl) servers[0].getClusterManager().getClusterConnection("cluster0");
SimpleString snfQueue0 = cc0.getSfQueueName(nodeId1.toString());

ClientProducer badProducer0 = session0.createProducer(snfQueue0);
for (int i = 0; i < num; i++) {
Message msg = session0.createMessage(true);
msg.putStringProperty("origin", "from producer 0");
badProducer0.send(msg);
}

//add a remote queue and consumer to enable message to flow from node 0 to node 1
createQueue(1, "queues.testaddress", "queue0", null, true);
ClientConsumer consumer1 = session1.createConsumer("queue0");

waitForBindings(0, "queues.testaddress", 0, 0, true);
waitForBindings(1, "queues.testaddress", 1, 1, true);

waitForBindings(0, "queues.testaddress", 1, 1, false);
waitForBindings(1, "queues.testaddress", 0, 0, false);

ClientConsumer dlqConsumer = session0.createConsumer(dla);

for (int i = 0; i < num; i++) {
Message msg = session0.createMessage(true);
msg.putStringProperty("origin", "from producer 0");
badProducer0.send(msg);
}

//messages will never reache the consumer
assertNull(consumer1.receiveImmediate());

SimpleString idsHeaderName = Message.HDR_ROUTE_TO_IDS.concat(snfQueue0);
for (int i = 0; i < num * 2; i++) {
ClientMessage m = dlqConsumer.receive(5000);
assertNotNull(m);
String propValue = m.getStringProperty("origin");
assertEquals("from producer 0", propValue);
propValue = m.getStringProperty(Message.HDR_ROUTE_DLQ_DETAIL);
ActiveMQException expected = ActiveMQExceptionType.INVALID_MESSAGE_EXCEPTION.createException(ActiveMQMessageBundle.BUNDLE.messageMissingHeader(idsHeaderName));
assertEquals(expected.toString(), propValue);
m.acknowledge();
}
assertNull(dlqConsumer.receiveImmediate());

//normal message flow should work
ClientProducer goodProducer0 = session0.createProducer("queues.testaddress");
for (int i = 0; i < num; i++) {
Message msg = session0.createMessage(true);
msg.putStringProperty("origin", "from producer 0");
goodProducer0.send(msg);
}

//consumer1 can receive from node0
for (int i = 0; i < num; i++) {
ClientMessage m = consumer1.receive(5000);
assertNotNull(m);
String propValue = m.getStringProperty("origin");
assertEquals("from producer 0", propValue);
m.acknowledge();
}
assertNull(consumer1.receiveImmediate());

stopServers(0, 1);
}

@Override
@AfterEach
public void tearDown() throws Exception {
Expand Down
Loading