Skip to content

Commit

Permalink
ARTEMIS-5339 Adjust configuration to avoid federation receiver loops
Browse files Browse the repository at this point in the history
Adjust the configuration for consumer priority handling to an opt in policy
for creating multipe differing priority federation receivers such that if a
user configures bi-directional federation and enabled ignore federation
receivers they wouldn't end up with an infinite reflection of receivers.
  • Loading branch information
tabish121 authored and gemmellr committed Mar 6, 2025
1 parent 2aaf256 commit 18afc8a
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public final class AMQPFederationConfiguration {
* consumer subscription is used or if the policy priority offset value is simply applied to the default consumer
* priority value.
*/
public static final boolean DEFAULT_IGNNORE_QUEUE_CONSUMER_PRIORITIES = false;
public static final boolean DEFAULT_IGNNORE_QUEUE_CONSUMER_PRIORITIES = true;

/**
* Default timeout (milliseconds) applied to federation receivers that are being stopped due to removal of local
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,7 @@ public void testFederationConfiguredCreatesControlLink() throws Exception {
federation.addProperty(RECEIVER_QUIESCE_TIMEOUT, AMQP_RECEIVER_QUIESCE_TIMEOUT);
federation.addProperty(ADDRESS_RECEIVER_IDLE_TIMEOUT, AMQP_ADDRESS_RECEIVER_IDLE_TIMEOUT);
federation.addProperty(QUEUE_RECEIVER_IDLE_TIMEOUT, AMQP_QUEUE_RECEIVER_IDLE_TIMEOUT);
federation.addProperty(IGNORE_QUEUE_CONSUMER_PRIORITIES, Boolean.toString(AMQP_INGNORE_CONSUMER_PRIORITIES));
amqpConnection.addElement(federation);
server.getConfiguration().addAMQPConnection(amqpConnection);
server.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1492,9 +1492,7 @@ private void doTestFederationCreatesQueueReceiverWithCorrectPriorityOffset(boole
receiveFromQueue.setName("queue-policy");
receiveFromQueue.setPriorityAdjustment(TEST_BASE_PRIORITY_ADJUSTMENT);
receiveFromQueue.addToIncludes("test", "test");
if (ignoreConsumerPriority) {
receiveFromQueue.addProperty(IGNORE_QUEUE_CONSUMER_PRIORITIES, Boolean.valueOf(ignoreConsumerPriority).toString());
}
receiveFromQueue.addProperty(IGNORE_QUEUE_CONSUMER_PRIORITIES, Boolean.valueOf(ignoreConsumerPriority).toString());

final AMQPFederatedBrokerConnectionElement element = new AMQPFederatedBrokerConnectionElement();
element.setName(getTestName());
Expand Down Expand Up @@ -2531,6 +2529,7 @@ public void testBrokerCanFederateQueueIfOnlyDemandIsFromAnotherBrokerFederationS
element.setName(getTestName());
element.addLocalQueuePolicy(receiveFromQueue);
element.addProperty(QUEUE_RECEIVER_IDLE_TIMEOUT, 5);
element.addProperty(IGNORE_QUEUE_CONSUMER_PRIORITIES, "false");

final AMQPBrokerConnectConfiguration amqpConnection =
new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + remoteURI.getHost() + ":" + remoteURI.getPort());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1493,30 +1493,43 @@ private void doTestQueueConsumerPullsMessagesAndRemovesDemandLeavingSomeOnRemote
@RepeatedTest(1)
@Timeout(20)
public void testTwoPullConsumerOnPullingFederationConfigurationEachCanTakeOneMessageProduceOnLocal() throws Exception {
doTestTwoPullConsumerOnPullingFederationConfigurationEachCanTakeOneMessage(true);
doTestTwoPullConsumerOnPullingFederationConfigurationEachCanTakeOneMessage(true, false);
}


@RepeatedTest(1)
@Timeout(20)
public void testTwoPullConsumerOnPullingFederationConfigurationEachCanTakeOneMessageProduceOnRemote() throws Exception {
doTestTwoPullConsumerOnPullingFederationConfigurationEachCanTakeOneMessage(false);
doTestTwoPullConsumerOnPullingFederationConfigurationEachCanTakeOneMessage(false, false);
}

@RepeatedTest(1)
@Timeout(20)
public void testTwoPullConsumerOnPullingFederationConfigurationEachCanTakeOneMessageProduceOnLocalIncludeFederated() throws Exception {
doTestTwoPullConsumerOnPullingFederationConfigurationEachCanTakeOneMessage(true, true);
}

public void doTestTwoPullConsumerOnPullingFederationConfigurationEachCanTakeOneMessage(boolean produceLocal) throws Exception {
@RepeatedTest(1)
@Timeout(20)
public void testTwoPullConsumerOnPullingFederationConfigurationEachCanTakeOneMessageProduceOnRemoteIncludeFederated() throws Exception {
doTestTwoPullConsumerOnPullingFederationConfigurationEachCanTakeOneMessage(false, true);
}

public void doTestTwoPullConsumerOnPullingFederationConfigurationEachCanTakeOneMessage(boolean produceLocal, boolean includeFederated) throws Exception {
logger.info("Test started: {}", getTestName());

final AMQPFederationQueuePolicyElement localQueuePolicy = new AMQPFederationQueuePolicyElement();
localQueuePolicy.setName("test-policy-1");
localQueuePolicy.addToIncludes(getTestName(), getTestName());
localQueuePolicy.addProperty(RECEIVER_CREDITS, 0); // Enable Pull mode
localQueuePolicy.addProperty(PULL_RECEIVER_BATCH_SIZE, 1); // Pull mode batch is one
localQueuePolicy.setIncludeFederated(includeFederated);

final AMQPFederationQueuePolicyElement remoteQueuePolicy = new AMQPFederationQueuePolicyElement();
remoteQueuePolicy.setName("test-policy-2");
remoteQueuePolicy.addToIncludes(getTestName(), getTestName());
remoteQueuePolicy.addProperty(RECEIVER_CREDITS, 0); // Enable Pull mode
remoteQueuePolicy.addProperty(PULL_RECEIVER_BATCH_SIZE, 1); // Pull mode batch is one
remoteQueuePolicy.setIncludeFederated(includeFederated);

final AMQPFederatedBrokerConnectionElement element = new AMQPFederatedBrokerConnectionElement();
element.setName(getTestName());
Expand Down Expand Up @@ -1586,6 +1599,10 @@ public void doTestTwoPullConsumerOnPullingFederationConfigurationEachCanTakeOneM

assertNotNull(messageL);
assertNotNull(messageR);

// Should be a single JMS consumer and a single Federation consumer on each server's Queue
Wait.assertTrue(() -> server.queueQuery(SimpleString.of(getTestName())).getConsumerCount() == 2, 10_000);
Wait.assertTrue(() -> remoteServer.queueQuery(SimpleString.of(getTestName())).getConsumerCount() == 2, 10_000);
}
}
}

0 comments on commit 18afc8a

Please sign in to comment.