Skip to content

Commit

Permalink
ARTEMIS-5037: option to limit mirror propagation
Browse files Browse the repository at this point in the history
Add a new option in the Mirror setting to prevent a broker from
propagating messages.

When working with a topology where 4 nodes form a square and each node
mirrors to its two neighbors, a message leaving a corner can reach the
opposite corner of the square by two different routes. This is causing
the ordering of message to get broken.

example:
1 <-> 2
^     ^
|     |
v     v
4 <-> 3

A message from a will reach 3 by 2 and 4. Message duplication checks
will prevent the message from being duplicated but won't help regarding
the order of the messages.

Using the new option to not forward messages coming from a link, we
break the possibilities to have two routes to reach the opposite corner.

On the example above, we ask 4 to not forward any messages coming from
1 if 1 is the primary sender of the messages.

This means that now, a message sent to 1 will reach 3 only by 2 and not
by 4.
  • Loading branch information
lavocatt committed Sep 6, 2024
1 parent daba842 commit a37f602
Show file tree
Hide file tree
Showing 5 changed files with 298 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -432,11 +432,29 @@ private void doConnect() {
final boolean coreTunnelingEnabled = isCoreMessageTunnelingEnabled(replica);
final Symbol[] desiredCapabilities;

if (coreTunnelingEnabled) {
desiredCapabilities = new Symbol[] {AMQPMirrorControllerSource.MIRROR_CAPABILITY,
AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT};
} else {
desiredCapabilities = new Symbol[] {AMQPMirrorControllerSource.MIRROR_CAPABILITY};
if (!replica.isForward()){
if (coreTunnelingEnabled) {
desiredCapabilities = new Symbol[] {
AMQPMirrorControllerSource.MIRROR_CAPABILITY,
AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT,
AMQPMirrorControllerSource.NO_FORWARD
};
} else {
desiredCapabilities = new Symbol[] {
AMQPMirrorControllerSource.MIRROR_CAPABILITY,
AMQPMirrorControllerSource.NO_FORWARD
};
}
}else {
if (coreTunnelingEnabled) {
desiredCapabilities = new Symbol[] {
AMQPMirrorControllerSource.MIRROR_CAPABILITY,
AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT};
} else {
desiredCapabilities = new Symbol[] {
AMQPMirrorControllerSource.MIRROR_CAPABILITY
};
}
}

final Symbol[] requiredOfferedCapabilities = new Symbol[] {AMQPMirrorControllerSource.MIRROR_CAPABILITY};
Expand All @@ -447,7 +465,7 @@ private void doConnect() {
(r) -> AMQPMirrorControllerSource.validateProtocolData(protonProtocolManager.getReferenceIDSupplier(), r, getMirrorSNF(replica)),
server.getNodeID().toString(),
desiredCapabilities,
null,
replica.isForward() ? null : new Symbol[] {AMQPMirrorControllerSource.NO_FORWARD},
requiredOfferedCapabilities);
} else if (connectionElement.getType() == AMQPBrokerConnectionAddressType.FEDERATION) {
// Starting the Federation triggers rebuild of federation links
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ public class AMQPMirrorControllerSource extends BasicMirrorController<Sender> im

public static final SimpleString INTERNAL_ID_EXTRA_PROPERTY = SimpleString.of(INTERNAL_ID.toString());
public static final SimpleString INTERNAL_BROKER_ID_EXTRA_PROPERTY = SimpleString.of(BROKER_ID.toString());
public static final Symbol NO_FORWARD = Symbol.getSymbol("amq.no.forward");

private static final ThreadLocal<RoutingContext> mirrorControlRouting = ThreadLocal.withInitial(() -> new RoutingContextImpl(null));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.apache.activemq.artemis.protocol.amqp.proton.ProtonAbstractReceiver;
import org.apache.activemq.artemis.utils.ByteUtil;
import org.apache.activemq.artemis.utils.pools.MpscPool;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations;
Expand Down Expand Up @@ -264,6 +265,14 @@ public void flow() {
protected void actualDelivery(Message message, Delivery delivery, DeliveryAnnotations deliveryAnnotations, Receiver receiver, Transaction tx) {
OperationContext oldContext = recoverContext();
incrementSettle();
boolean noForward = false;
if (receiver.getRemoteDesiredCapabilities() != null) {
for (Symbol capability : receiver.getRemoteDesiredCapabilities()) {
if (capability == AMQPMirrorControllerSource.NO_FORWARD) {
noForward = true;
}
}
}

logger.trace("{}::actualDelivery call for {}", server, message);
setControllerInUse(this);
Expand Down Expand Up @@ -320,14 +329,14 @@ protected void actualDelivery(Message message, Delivery delivery, DeliveryAnnota
}
}
} else {
if (sendMessage(amqpMessage, deliveryAnnotations, messageAckOperation)) {
if (sendMessage(amqpMessage, deliveryAnnotations, messageAckOperation, noForward)) {
// since the send was successful, we give up the reference here,
// so there won't be any call on afterCompleteOperations
messageAckOperation = null;
}
}
} else {
if (sendMessage(message, deliveryAnnotations, messageAckOperation)) {
if (sendMessage(message, deliveryAnnotations, messageAckOperation, noForward)) {
// since the send was successful, we give up the reference here,
// so there won't be any call on afterCompleteOperations
messageAckOperation = null;
Expand Down Expand Up @@ -486,7 +495,7 @@ private void performAck(String nodeID,
* as the sendMessage was successful the OperationContext of the transaction will take care of the completion.
* The caller of this method should give up any reference to messageCompletionAck when this method returns true.
* */
private boolean sendMessage(Message message, DeliveryAnnotations deliveryAnnotations, ACKMessageOperation messageCompletionAck) throws Exception {
private boolean sendMessage(Message message, DeliveryAnnotations deliveryAnnotations, ACKMessageOperation messageCompletionAck, boolean noForward) throws Exception {
if (message.getMessageID() <= 0) {
message.setMessageID(server.getStorageManager().generateID());
}
Expand All @@ -512,6 +521,12 @@ private boolean sendMessage(Message message, DeliveryAnnotations deliveryAnnotat

routingContext.setDuplicateDetection(false); // we do our own duplicate detection here

if (noForward) {
message.usageDown(); // large messages would be removed here
flow();
return false;
}

DuplicateIDCache duplicateIDCache;
if (lruDuplicateIDKey != null && lruDuplicateIDKey.equals(internalMirrorID)) {
duplicateIDCache = lruduplicateIDCache;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ public class AMQPMirrorBrokerConnectionElement extends AMQPBrokerConnectionEleme

boolean queueCreation = true;

boolean forward = true;

boolean queueRemoval = true;

boolean messageAcknowledgements = true;
Expand Down Expand Up @@ -75,6 +77,12 @@ public AMQPMirrorBrokerConnectionElement setQueueCreation(boolean queueCreation)
return this;
}

public boolean isForward () { return forward; }
public AMQPMirrorBrokerConnectionElement setForward(boolean forward) {
this.forward = forward;
return this;
}

public boolean isQueueRemoval() {
return queueRemoval;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,247 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.amqp.connect;

import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.utils.Wait;
import org.junit.jupiter.api.Test;

import javax.jms.*;

import static org.junit.jupiter.api.Assertions.*;

public class AMQSquareMirroringTest extends AmqpClientTestSupport {

protected static final int AMQP_PORT_2 = 5673;
protected static final int AMQP_PORT_3 = 5674;
protected static final int AMQP_PORT_4 = 5675;

ActiveMQServer server_2;
ActiveMQServer server_3;
ActiveMQServer server_4;

@Override
protected ActiveMQServer createServer() throws Exception {
return createServer(AMQP_PORT, false);
}

@Test
public void testSquare() throws Exception {
server_2 = createServer(AMQP_PORT_2, false);
server_3 = createServer(AMQP_PORT_3, false);
server_4 = createServer(AMQP_PORT_4, false);

// name the servers, for convenience during debugging
server.getConfiguration().setName("1");
server_2.getConfiguration().setName("2");
server_3.getConfiguration().setName("3");
server_4.getConfiguration().setName("4");

/**
* 1 <----> 2
* ^ ^
* | |
* v v
* 4 <----> 3
*/

{
AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("to2", "tcp://localhost:" + AMQP_PORT_2).setRetryInterval(100).setReconnectAttempts(-1);
amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement());
server.getConfiguration().addAMQPConnection(amqpConnection);
amqpConnection = new AMQPBrokerConnectConfiguration("to4", "tcp://localhost:" + AMQP_PORT_4).setRetryInterval(100).setReconnectAttempts(-1);
amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().setForward(false));
server.getConfiguration().addAMQPConnection(amqpConnection);
}

{
AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("to1", "tcp://localhost:" + AMQP_PORT).setRetryInterval(100).setReconnectAttempts(-1);
amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement());
server_2.getConfiguration().addAMQPConnection(amqpConnection);
amqpConnection = new AMQPBrokerConnectConfiguration("to3", "tcp://localhost:" + AMQP_PORT_3).setRetryInterval(100).setReconnectAttempts(-1);
amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement());
server_2.getConfiguration().addAMQPConnection(amqpConnection);
}

{
AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("to2", "tcp://localhost:" + AMQP_PORT_2).setRetryInterval(100).setReconnectAttempts(-1);
amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement());
server_3.getConfiguration().addAMQPConnection(amqpConnection);
amqpConnection = new AMQPBrokerConnectConfiguration("to4", "tcp://localhost:" + AMQP_PORT_4).setRetryInterval(100).setReconnectAttempts(-1);
amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement());
server_3.getConfiguration().addAMQPConnection(amqpConnection);
}

{
AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("to1", "tcp://localhost:" + AMQP_PORT).setRetryInterval(100).setReconnectAttempts(-1);
amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement());
server_4.getConfiguration().addAMQPConnection(amqpConnection);
amqpConnection = new AMQPBrokerConnectConfiguration("to3", "tcp://localhost:" + AMQP_PORT_3).setRetryInterval(100).setReconnectAttempts(-1);
amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement());
server_4.getConfiguration().addAMQPConnection(amqpConnection);
}

server.start();
server_2.start();
server_3.start();
server_4.start();

createAddressAndQueues(server);
Wait.assertTrue(() -> server.locateQueue(getQueueName()) != null);
Wait.assertTrue(() -> server_2.locateQueue(getQueueName()) != null);
Wait.assertTrue(() -> server_3.locateQueue(getQueueName()) != null);
Wait.assertTrue(() -> server_4.locateQueue(getQueueName()) != null);

Queue q1 = server.locateQueue(getQueueName());
assertNotNull(q1);

Queue q2 = server.locateQueue(getQueueName());
assertNotNull(q2);

Queue q3 = server.locateQueue(getQueueName());
assertNotNull(q3);

Queue q4 = server.locateQueue(getQueueName());
assertNotNull(q4);

ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT+"?amqp.idleTimeout=-1");
ConnectionFactory factory2 = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT_2+"?amqp.idleTimeout=-1");
ConnectionFactory factory3 = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT_3+"?amqp.idleTimeout=-1");
ConnectionFactory factory4 = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT_4+"?amqp.idleTimeout=-1");

try (Connection conn = factory.createConnection()) {
Session session = conn.createSession();
MessageProducer producer = session.createProducer(session.createQueue(getQueueName()));
for (int i = 0; i < 40; i++) {
producer.send(session.createTextMessage("message " + i));
}
}

Thread.sleep(1_000); // some time to allow eventual loops

Wait.assertEquals(40L, q1::getMessageCount, 1000, 100);
Wait.assertEquals(40L, q2::getMessageCount, 1000, 100);
Wait.assertEquals(40L, q3::getMessageCount, 1000, 100);
Wait.assertEquals(40L, q4::getMessageCount, 1000, 100);

try (Connection conn = factory.createConnection()) {
Session session = conn.createSession();
conn.start();
MessageConsumer consumer = session.createConsumer(session.createQueue(getQueueName()));
for (int i = 0; i < 10; i++) {
TextMessage message = (TextMessage) consumer.receive(1000);
assertNotNull(message);
assertEquals("message " + i, message.getText());
}
consumer.close();
}

Wait.assertEquals(30L, q1::getMessageCount, 1000, 100);
Wait.assertEquals(30L, q2::getMessageCount, 1000, 100);
Wait.assertEquals(30L, q3::getMessageCount, 1000, 100);
Wait.assertEquals(30L, q4::getMessageCount, 1000, 100);

try (Connection conn = factory2.createConnection()) {
Session session = conn.createSession();
conn.start();
MessageConsumer consumer = session.createConsumer(session.createQueue(getQueueName()));
for (int i = 10; i < 20; i++) {
TextMessage message = (TextMessage) consumer.receive(1000);
assertNotNull(message);
assertEquals("message " + i, message.getText());
}
consumer.close();
}

Wait.assertEquals(20L, q1::getMessageCount, 1000, 100);
Wait.assertEquals(20L, q2::getMessageCount, 1000, 100);
Wait.assertEquals(20L, q3::getMessageCount, 1000, 100);
Wait.assertEquals(20L, q4::getMessageCount, 1000, 100);

try (Connection conn = factory3.createConnection()) {
Session session = conn.createSession();
conn.start();
MessageConsumer consumer = session.createConsumer(session.createQueue(getQueueName()));
for (int i = 20; i < 30; i++) {
TextMessage message = (TextMessage) consumer.receive(1000);
assertNotNull(message);
assertEquals("message " + i, message.getText());
}
consumer.close();
}

Wait.assertEquals(10L, q1::getMessageCount, 1000, 100);
Wait.assertEquals(10L, q2::getMessageCount, 1000, 100);
Wait.assertEquals(10L, q3::getMessageCount, 1000, 100);
Wait.assertEquals(10L, q4::getMessageCount, 1000, 100);

try (Connection conn = factory4.createConnection()) {
Session session = conn.createSession();
conn.start();
MessageConsumer consumer = session.createConsumer(session.createQueue(getQueueName()));
for (int i = 30; i < 40; i++) {
TextMessage message = (TextMessage) consumer.receive(1000);
assertNotNull(message);
assertEquals("message " + i, message.getText());
}
consumer.close();
}

Wait.assertEquals(0L, q1::getMessageCount, 1000, 100);
Wait.assertEquals(0L, q2::getMessageCount, 1000, 100);
Wait.assertEquals(0L, q3::getMessageCount, 1000, 100);
Wait.assertEquals(0L, q4::getMessageCount, 1000, 100);

try (Connection conn = factory.createConnection()) {
Session session = conn.createSession();
conn.start();
MessageConsumer consumer = session.createConsumer(session.createQueue(getQueueName()));
assertNull(consumer.receiveNoWait());
consumer.close();
}

try (Connection conn = factory2.createConnection()) {
Session session = conn.createSession();
conn.start();
MessageConsumer consumer = session.createConsumer(session.createQueue(getQueueName()));
assertNull(consumer.receiveNoWait());
consumer.close();
}

try (Connection conn = factory3.createConnection()) {
Session session = conn.createSession();
conn.start();
MessageConsumer consumer = session.createConsumer(session.createQueue(getQueueName()));
assertNull(consumer.receiveNoWait());
consumer.close();
}

try (Connection conn = factory4.createConnection()) {
Session session = conn.createSession();
conn.start();
MessageConsumer consumer = session.createConsumer(session.createQueue(getQueueName()));
assertNull(consumer.receiveNoWait());
consumer.close();
}

}
}

0 comments on commit a37f602

Please sign in to comment.