From 067eaa019f69aa3981ff9d0330ccd5ebfb8d171a Mon Sep 17 00:00:00 2001 From: a181321 Date: Fri, 13 Oct 2023 15:13:03 +0200 Subject: [PATCH] ARTEMIS-4455 - Improve message redistribution balance for OFF_WITH_REDISTRIBUTION --- .../core/postoffice/impl/BindingsImpl.java | 4 +-- .../postoffice/impl/CopyOnWriteBindings.java | 26 ++++++++++++++ .../MessageRedistributionTest.java | 34 +++++++++++++++++++ 3 files changed, 62 insertions(+), 2 deletions(-) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java index e3bd6feefb6..ebb8cc97ec1 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java @@ -257,7 +257,7 @@ public Message redistribute(final Message message, final int bindingsCount = bindings.length; - int nextPosition = bindingIndex.getIndex(); + int nextPosition = bindingIndex.getRedistributorIndex(); if (nextPosition >= bindingsCount) { nextPosition = 0; @@ -294,7 +294,7 @@ public Message redistribute(final Message message, context.setTransaction(new TransactionImpl(storageManager)); } - bindingIndex.setIndex(nextPosition); + bindingIndex.setRedistributorIndex(nextPosition); nextBinding.route(copyRedistribute, context); logger.debug("Redistribution successful on message={}, towards bindings={}", message, bindings); return copyRedistribute; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/CopyOnWriteBindings.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/CopyOnWriteBindings.java index 90eade2d18b..b251f9ca315 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/CopyOnWriteBindings.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/CopyOnWriteBindings.java @@ -42,21 +42,34 @@ public interface BindingIndex { */ int getIndex(); + /** + * Cannot return a negative value and returns {@code 0} if uninitialized. + */ + int getRedistributorIndex(); + /** * Cannot set a negative value. */ void setIndex(int v); + + /** + * Cannot set a negative value. + */ + void setRedistributorIndex(int v); } private static final class BindingsAndPosition extends AtomicReference implements BindingIndex { private static final AtomicIntegerFieldUpdater NEXT_POSITION_UPDATER = AtomicIntegerFieldUpdater.newUpdater(BindingsAndPosition.class, "nextPosition"); + private static final AtomicIntegerFieldUpdater NEXT_REDISTRIBUTION_POSITION_UPDATER = AtomicIntegerFieldUpdater.newUpdater(BindingsAndPosition.class, "nextRedistributionPosition"); public volatile int nextPosition; + public volatile int nextRedistributionPosition; BindingsAndPosition(Binding[] bindings) { super(bindings); NEXT_POSITION_UPDATER.lazySet(this, 0); + NEXT_REDISTRIBUTION_POSITION_UPDATER.lazySet(this, 0); } @Override @@ -64,6 +77,11 @@ public int getIndex() { return nextPosition; } + @Override + public int getRedistributorIndex() { + return nextRedistributionPosition; + } + @Override public void setIndex(int v) { if (v < 0) { @@ -71,6 +89,14 @@ public void setIndex(int v) { } NEXT_POSITION_UPDATER.lazySet(this, v); } + + @Override + public void setRedistributorIndex(int v) { + if (v < 0) { + throw new IllegalArgumentException("cannot set a negative position"); + } + NEXT_REDISTRIBUTION_POSITION_UPDATER.lazySet(this, v); + } } private final ConcurrentHashMap map; diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageRedistributionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageRedistributionTest.java index 3f8c8257986..c297a90dfe3 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageRedistributionTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageRedistributionTest.java @@ -1011,6 +1011,40 @@ public void testRedistributionToRemoteConsumerFromNewQueueLbOffWithRedistributio } + @Test + public void testEvenRedistributionLbOffWithRedistribution() throws Exception { + final int messageCount = 1000; + final String queue = "queues.test"; + + setupCluster(MessageLoadBalancingType.OFF_WITH_REDISTRIBUTION); + startServers(0, 1, 2); + + setupSessionFactory(0, isNetty()); + setupSessionFactory(1, isNetty()); + setupSessionFactory(2, isNetty()); + + createQueue(0, queue, queue, null, false, RoutingType.ANYCAST); + createQueue(1, queue, queue, null, false, RoutingType.ANYCAST); + createQueue(2, queue, queue, null, false, RoutingType.ANYCAST); + + addConsumer(0, 1, queue, null); + addConsumer(1, 2, queue, null); + + waitForBindings(0, queue, 1, 0, true); + waitForBindings(1, queue, 1, 1, true); + waitForBindings(2, queue, 1, 1, true); + + waitForBindings(0, queue, 2, 2, false); + waitForBindings(1, queue, 2, 1, false); + waitForBindings(2, queue, 2, 1, false); + + send(0, queue, messageCount * 2, false, null); + + Wait.assertEquals(0L, () -> servers[0].getTotalMessageCount(), 5000, 100); + Assert.assertEquals(messageCount, servers[1].getTotalMessageCount()); + Assert.assertEquals(messageCount, servers[2].getTotalMessageCount()); + } + @Test public void testRedistributionToRemoteMulticastConsumerLbOffWithRedistribution() throws Exception {