diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketDecoder.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketDecoder.java index 54c2022ab2c..ff6b9dde8c5 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketDecoder.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketDecoder.java @@ -32,6 +32,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSessionResponseMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSharedQueueMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectConsumerMessage; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectConsumerWithKillMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectMessage_V2; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.NullResponseMessage; @@ -415,6 +416,10 @@ public Packet decode(byte packetType) { packet = new CheckFailoverReplyMessage(); break; } + case PacketImpl.DISCONNECT_CONSUMER_KILL: { + packet = new DisconnectConsumerWithKillMessage(); + break; + } default: { throw ActiveMQClientMessageBundle.BUNDLE.invalidType(packetType); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index e01c81e2dee..27fd5430231 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -2723,12 +2723,13 @@ public synchronized void resetMessagesKilled() { @Override public float getRate() { + long locaMessageAdded = getMessagesAdded(); float timeSlice = ((System.currentTimeMillis() - queueRateCheckTime.getAndSet(System.currentTimeMillis())) / 1000.0f); if (timeSlice == 0) { - messagesAddedSnapshot.getAndSet(messagesAdded); + messagesAddedSnapshot.getAndSet(locaMessageAdded); return 0.0f; } - return BigDecimal.valueOf((messagesAdded - messagesAddedSnapshot.getAndSet(messagesAdded)) / timeSlice).setScale(2, BigDecimal.ROUND_UP).floatValue(); + return BigDecimal.valueOf((locaMessageAdded - messagesAddedSnapshot.getAndSet(locaMessageAdded)) / timeSlice).setScale(2, BigDecimal.ROUND_UP).floatValue(); } // Inner classes diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SlowConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SlowConsumerTest.java index 3643f779918..547577817e5 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SlowConsumerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SlowConsumerTest.java @@ -18,7 +18,6 @@ import java.util.Arrays; import java.util.Collection; -import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -37,15 +36,14 @@ import org.apache.activemq.artemis.api.core.client.ServerLocator; import org.apache.activemq.artemis.api.core.management.CoreNotificationType; import org.apache.activemq.artemis.api.core.management.ManagementHelper; -import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl; -import org.apache.activemq.artemis.core.remoting.server.RemotingService; import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy; -import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; -import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.utils.RandomUtil; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -55,16 +53,18 @@ public class SlowConsumerTest extends ActiveMQTestBase { private boolean isNetty = false; + private boolean isPaging = false; // this will ensure that all tests in this class are run twice, // once with "true" passed to the class' constructor and once with "false" - @Parameterized.Parameters(name = "isNetty={0}") + @Parameterized.Parameters(name = "netty={0}, paging={1}") public static Collection getParameters() { - return Arrays.asList(new Object[][]{{true}, {false}}); + return Arrays.asList(new Object[][]{{true, false}, {false, false}, {true, true}, {false, true}}); } - public SlowConsumerTest(boolean isNetty) { + public SlowConsumerTest(boolean isNetty, boolean isPaging) { this.isNetty = isNetty; + this.isPaging = isPaging; } private ActiveMQServer server; @@ -78,14 +78,30 @@ public SlowConsumerTest(boolean isNetty) { public void setUp() throws Exception { super.setUp(); - server = createServer(false, isNetty); + server = createServer(true, isNetty); - AddressSettings addressSettings = new AddressSettings().setSlowConsumerCheckPeriod(2).setSlowConsumerThreshold(10).setSlowConsumerPolicy(SlowConsumerPolicy.KILL); + AddressSettings addressSettings = new AddressSettings(); + addressSettings.setSlowConsumerCheckPeriod(1); + addressSettings.setSlowConsumerThreshold(10); + addressSettings.setSlowConsumerPolicy(SlowConsumerPolicy.KILL); + + if (isPaging) { + addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE); + addressSettings.setMaxSizeBytes(10 * 1024); + addressSettings.setPageSizeBytes(1024); + } else { + addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK); + addressSettings.setMaxSizeBytes(-1); + addressSettings.setPageSizeBytes(1024); + + } server.start(); server.getAddressSettingsRepository().addMatch(QUEUE.toString(), addressSettings); + server.createQueue(QUEUE, QUEUE, null, true, false).getPageSubscription().getPagingStore().startPaging(); + locator = createFactory(isNetty); } @@ -95,10 +111,10 @@ public void testSlowConsumerKilled() throws Exception { ClientSession session = addClientSession(sf.createSession(false, true, true, false)); - session.createQueue(QUEUE, QUEUE, null, false); - ClientProducer producer = addClientProducer(session.createProducer(QUEUE)); + assertPaging(); + final int numMessages = 25; for (int i = 0; i < numMessages; i++) { @@ -118,36 +134,12 @@ public void testSlowConsumerKilled() throws Exception { } } - @Test - public void testDisableSlowConsumerReconnectWithKilled() throws Exception { - ClientSessionFactory sf = createSessionFactory(locator); - - ClientSession session = addClientSession(sf.createSession(false, true, true, false)); - - session.createQueue(QUEUE, QUEUE, null, false); - - ClientProducer producer = addClientProducer(session.createProducer(QUEUE)); - - final int numMessages = 25; - - for (int i = 0; i < numMessages; i++) { - producer.send(createTextMessage(session, "m" + i)); - } - - ClientConsumer consumer = addClientConsumer(session.createConsumer(QUEUE)); - session.start(); - - Thread.sleep(3000); - - RemotingService service = server.getRemotingService(); - Set connections = service.getConnections(); - assertTrue(connections.isEmpty()); - - if (sf instanceof ClientSessionFactoryImpl) { - int reconnectAttemps = ((ClientSessionFactoryImpl) sf).getReconnectAttempts(); - assertEquals(0, reconnectAttemps); + private void assertPaging() throws Exception { + Queue queue = server.locateQueue(QUEUE); + if (isPaging) { + Assert.assertTrue(queue.getPageSubscription().isPaging()); } else { - fail("ClientSessionFactory is not the instance of ClientSessionFactoryImpl"); + Assert.assertFalse(queue.getPageSubscription().isPaging()); } } @@ -158,72 +150,20 @@ public void testSlowConsumerNotification() throws Exception { ClientSession session = addClientSession(sf.createSession(false, true, true, false)); - session.createQueue(QUEUE, QUEUE, null, false); - - AddressSettings addressSettings = new AddressSettings().setSlowConsumerCheckPeriod(2).setSlowConsumerThreshold(10).setSlowConsumerPolicy(SlowConsumerPolicy.NOTIFY); - - server.getAddressSettingsRepository().removeMatch(QUEUE.toString()); - server.getAddressSettingsRepository().addMatch(QUEUE.toString(), addressSettings); - - ClientProducer producer = addClientProducer(session.createProducer(QUEUE)); - - final int numMessages = 25; - - for (int i = 0; i < numMessages; i++) { - producer.send(createTextMessage(session, "m" + i)); + AddressSettings addressSettings = new AddressSettings(); + addressSettings.setSlowConsumerCheckPeriod(2); + addressSettings.setSlowConsumerThreshold(10); + addressSettings.setSlowConsumerPolicy(SlowConsumerPolicy.NOTIFY); + if (!isPaging) { + addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK); + addressSettings.setMaxSizeBytes(-1); } - SimpleString notifQueue = RandomUtil.randomSimpleString(); - - session.createQueue(ActiveMQDefaultConfiguration.getDefaultManagementNotificationAddress(), notifQueue, null, false); - - ClientConsumer notifConsumer = session.createConsumer(notifQueue.toString(), ManagementHelper.HDR_NOTIFICATION_TYPE + "='" + CoreNotificationType.CONSUMER_SLOW + "'"); - - final CountDownLatch notifLatch = new CountDownLatch(1); - - notifConsumer.setMessageHandler(new MessageHandler() { - @Override - public void onMessage(ClientMessage message) { - assertEquals(CoreNotificationType.CONSUMER_SLOW.toString(), message.getObjectProperty(ManagementHelper.HDR_NOTIFICATION_TYPE).toString()); - assertEquals(QUEUE.toString(), message.getObjectProperty(ManagementHelper.HDR_ADDRESS).toString()); - assertEquals(Integer.valueOf(1), message.getIntProperty(ManagementHelper.HDR_CONSUMER_COUNT)); - if (isNetty) { - assertTrue(message.getSimpleStringProperty(ManagementHelper.HDR_REMOTE_ADDRESS).toString().startsWith("/127.0.0.1")); - } else { - assertEquals(SimpleString.toSimpleString("invm:0"), message.getSimpleStringProperty(ManagementHelper.HDR_REMOTE_ADDRESS)); - } - assertNotNull(message.getSimpleStringProperty(ManagementHelper.HDR_CONNECTION_NAME)); - assertNotNull(message.getSimpleStringProperty(ManagementHelper.HDR_CONSUMER_NAME)); - assertNotNull(message.getSimpleStringProperty(ManagementHelper.HDR_SESSION_NAME)); - try { - message.acknowledge(); - } catch (ActiveMQException e) { - e.printStackTrace(); - } - notifLatch.countDown(); - } - }); - - ClientConsumer consumer = addClientConsumer(session.createConsumer(QUEUE)); - session.start(); - - assertTrue(notifLatch.await(3, TimeUnit.SECONDS)); - } - - @Test - public void testSlowConsumerWithPreAckNotification() throws Exception { - - ClientSessionFactory sf = createSessionFactory(locator); - - ClientSession session = addClientSession(sf.createSession(false, true, true, true)); - - session.createQueue(QUEUE, QUEUE, null, false); - - AddressSettings addressSettings = new AddressSettings().setSlowConsumerCheckPeriod(2).setSlowConsumerThreshold(1).setSlowConsumerPolicy(SlowConsumerPolicy.NOTIFY); - server.getAddressSettingsRepository().removeMatch(QUEUE.toString()); server.getAddressSettingsRepository().addMatch(QUEUE.toString(), addressSettings); + assertPaging(); + ClientProducer producer = addClientProducer(session.createProducer(QUEUE)); final int numMessages = 25; @@ -244,7 +184,6 @@ public void testSlowConsumerWithPreAckNotification() throws Exception { @Override public void onMessage(ClientMessage message) { assertEquals(CoreNotificationType.CONSUMER_SLOW.toString(), message.getObjectProperty(ManagementHelper.HDR_NOTIFICATION_TYPE).toString()); - IntegrationTestLogger.LOGGER.info("Slow consumer detected!"); assertEquals(QUEUE.toString(), message.getObjectProperty(ManagementHelper.HDR_ADDRESS).toString()); assertEquals(Integer.valueOf(1), message.getIntProperty(ManagementHelper.HDR_CONSUMER_COUNT)); if (isNetty) { @@ -267,16 +206,7 @@ public void onMessage(ClientMessage message) { ClientConsumer consumer = addClientConsumer(session.createConsumer(QUEUE)); session.start(); - for (int i = 0; i < numMessages; i++) { - ClientMessage msg = consumer.receive(1000); - assertNotNull(msg); - IntegrationTestLogger.LOGGER.info("Received message."); - msg.acknowledge(); - session.commit(); - Thread.sleep(100); - } - - assertFalse(notifLatch.await(3, TimeUnit.SECONDS)); + assertTrue(notifLatch.await(15, TimeUnit.SECONDS)); } @Test @@ -285,8 +215,6 @@ public void testSlowConsumerSpared() throws Exception { ClientSession session = addClientSession(sf.createSession(true, true)); - session.createQueue(QUEUE, QUEUE, null, false); - ClientProducer producer = addClientProducer(session.createProducer(QUEUE)); final int numMessages = 5; @@ -295,6 +223,8 @@ public void testSlowConsumerSpared() throws Exception { producer.send(createTextMessage(session, "m" + i)); } + assertPaging(); + ClientConsumer consumer = addClientConsumer(session.createConsumer(QUEUE)); session.start(); @@ -315,8 +245,6 @@ public void testFastThenSlowConsumerSpared() throws Exception { final ClientSession producerSession = addClientSession(sf.createSession(true, true)); - session.createQueue(QUEUE, QUEUE, null, false); - final ClientProducer producer = addClientProducer(producerSession.createProducer(QUEUE)); final AtomicLong messagesProduced = new AtomicLong(0); @@ -356,6 +284,8 @@ public void run() { t.start(); + assertPaging(); + ClientConsumer consumer = addClientConsumer(session.createConsumer(QUEUE)); session.start(); @@ -382,7 +312,10 @@ public void testSlowWildcardConsumer() throws Exception { SimpleString queueName2 = new SimpleString("Q2"); SimpleString queueName = new SimpleString("Q"); - AddressSettings addressSettings = new AddressSettings().setSlowConsumerCheckPeriod(2).setSlowConsumerThreshold(10).setSlowConsumerPolicy(SlowConsumerPolicy.KILL); + AddressSettings addressSettings = new AddressSettings(); + addressSettings.setSlowConsumerCheckPeriod(2); + addressSettings.setSlowConsumerThreshold(10); + addressSettings.setSlowConsumerPolicy(SlowConsumerPolicy.KILL); server.getAddressSettingsRepository().addMatch(address.toString(), addressSettings);