Skip to content

Commit

Permalink
ARTEMIS-4455 - Improve message redistribution balance for OFF_WITH_RE…
Browse files Browse the repository at this point in the history
…DISTRIBUTION
  • Loading branch information
AntonRoskvist committed Oct 6, 2023
1 parent 42be518 commit 3d02fcd
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,11 @@ private Binding getNextBinding(final Message message,
// if no bindings were found, we will apply a secondary level on the routing logic
if (lastLowPriorityBinding != -1) {
nextBinding = bindings[lastLowPriorityBinding];
if (nextBinding != null && loadBalancingType.equals(MessageLoadBalancingType.OFF_WITH_REDISTRIBUTION)) {
//return before changing bindingIndex, otherwise every incoming message sets the index to the same position.
//bindingIndex is shared with the redistributor
return nextBinding;
}
nextPosition = moveNextPosition(lastLowPriorityBinding, bindingsCount);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1011,6 +1011,40 @@ public void testRedistributionToRemoteConsumerFromNewQueueLbOffWithRedistributio

}

@Test
public void testEvenRedistributionLbOffWithRedistribution() throws Exception {
final int messageCount = 1000;
final String queue = "queues.test";

setupCluster(MessageLoadBalancingType.OFF_WITH_REDISTRIBUTION);
startServers(0, 1, 2);

setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
setupSessionFactory(2, isNetty());

createQueue(0, queue, queue, null, false, RoutingType.ANYCAST);
createQueue(1, queue, queue, null, false, RoutingType.ANYCAST);
createQueue(2, queue, queue, null, false, RoutingType.ANYCAST);

addConsumer(0, 1, queue, null);
addConsumer(1, 2, queue, null);

waitForBindings(0, queue, 1, 0, true);
waitForBindings(1, queue, 1, 1, true);
waitForBindings(2, queue, 1, 1, true);

waitForBindings(0, queue, 2, 2, false);
waitForBindings(1, queue, 2, 1, false);
waitForBindings(2, queue, 2, 1, false);

send(0, queue, messageCount * 2, false, null);

Wait.assertEquals(0L, () -> servers[0].getTotalMessageCount(), 5000, 100);
Assert.assertEquals(messageCount, servers[1].getTotalMessageCount());
Assert.assertEquals(messageCount, servers[2].getTotalMessageCount());
}

@Test
public void testRedistributionToRemoteMulticastConsumerLbOffWithRedistribution() throws Exception {

Expand Down

0 comments on commit 3d02fcd

Please sign in to comment.