Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARTEMIS-913 Slow consumer detection not working when paging #156

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSessionResponseMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSharedQueueMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectConsumerMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectConsumerWithKillMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectMessage_V2;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.NullResponseMessage;
Expand Down Expand Up @@ -415,6 +416,10 @@ public Packet decode(byte packetType) {
packet = new CheckFailoverReplyMessage();
break;
}
case PacketImpl.DISCONNECT_CONSUMER_KILL: {
packet = new DisconnectConsumerWithKillMessage();
break;
}
default: {
throw ActiveMQClientMessageBundle.BUNDLE.invalidType(packetType);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2723,12 +2723,13 @@ public synchronized void resetMessagesKilled() {

@Override
public float getRate() {
long locaMessageAdded = getMessagesAdded();
float timeSlice = ((System.currentTimeMillis() - queueRateCheckTime.getAndSet(System.currentTimeMillis())) / 1000.0f);
if (timeSlice == 0) {
messagesAddedSnapshot.getAndSet(messagesAdded);
messagesAddedSnapshot.getAndSet(locaMessageAdded);
return 0.0f;
}
return BigDecimal.valueOf((messagesAdded - messagesAddedSnapshot.getAndSet(messagesAdded)) / timeSlice).setScale(2, BigDecimal.ROUND_UP).floatValue();
return BigDecimal.valueOf((locaMessageAdded - messagesAddedSnapshot.getAndSet(locaMessageAdded)) / timeSlice).setScale(2, BigDecimal.ROUND_UP).floatValue();
}

// Inner classes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import java.util.Arrays;
import java.util.Collection;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
Expand All @@ -37,15 +36,14 @@
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
import org.apache.activemq.artemis.api.core.management.ManagementHelper;
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl;
import org.apache.activemq.artemis.core.remoting.server.RemotingService;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand All @@ -55,16 +53,18 @@
public class SlowConsumerTest extends ActiveMQTestBase {

private boolean isNetty = false;
private boolean isPaging = false;

// this will ensure that all tests in this class are run twice,
// once with "true" passed to the class' constructor and once with "false"
@Parameterized.Parameters(name = "isNetty={0}")
@Parameterized.Parameters(name = "netty={0}, paging={1}")
public static Collection getParameters() {
return Arrays.asList(new Object[][]{{true}, {false}});
return Arrays.asList(new Object[][]{{true, false}, {false, false}, {true, true}, {false, true}});
}

public SlowConsumerTest(boolean isNetty) {
public SlowConsumerTest(boolean isNetty, boolean isPaging) {
this.isNetty = isNetty;
this.isPaging = isPaging;
}

private ActiveMQServer server;
Expand All @@ -78,14 +78,30 @@ public SlowConsumerTest(boolean isNetty) {
public void setUp() throws Exception {
super.setUp();

server = createServer(false, isNetty);
server = createServer(true, isNetty);

AddressSettings addressSettings = new AddressSettings().setSlowConsumerCheckPeriod(2).setSlowConsumerThreshold(10).setSlowConsumerPolicy(SlowConsumerPolicy.KILL);
AddressSettings addressSettings = new AddressSettings();
addressSettings.setSlowConsumerCheckPeriod(1);
addressSettings.setSlowConsumerThreshold(10);
addressSettings.setSlowConsumerPolicy(SlowConsumerPolicy.KILL);

if (isPaging) {
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
addressSettings.setMaxSizeBytes(10 * 1024);
addressSettings.setPageSizeBytes(1024);
} else {
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK);
addressSettings.setMaxSizeBytes(-1);
addressSettings.setPageSizeBytes(1024);

}

server.start();

server.getAddressSettingsRepository().addMatch(QUEUE.toString(), addressSettings);

server.createQueue(QUEUE, QUEUE, null, true, false).getPageSubscription().getPagingStore().startPaging();

locator = createFactory(isNetty);
}

Expand All @@ -95,10 +111,10 @@ public void testSlowConsumerKilled() throws Exception {

ClientSession session = addClientSession(sf.createSession(false, true, true, false));

session.createQueue(QUEUE, QUEUE, null, false);

ClientProducer producer = addClientProducer(session.createProducer(QUEUE));

assertPaging();

final int numMessages = 25;

for (int i = 0; i < numMessages; i++) {
Expand All @@ -118,36 +134,12 @@ public void testSlowConsumerKilled() throws Exception {
}
}

@Test
public void testDisableSlowConsumerReconnectWithKilled() throws Exception {
ClientSessionFactory sf = createSessionFactory(locator);

ClientSession session = addClientSession(sf.createSession(false, true, true, false));

session.createQueue(QUEUE, QUEUE, null, false);

ClientProducer producer = addClientProducer(session.createProducer(QUEUE));

final int numMessages = 25;

for (int i = 0; i < numMessages; i++) {
producer.send(createTextMessage(session, "m" + i));
}

ClientConsumer consumer = addClientConsumer(session.createConsumer(QUEUE));
session.start();

Thread.sleep(3000);

RemotingService service = server.getRemotingService();
Set<RemotingConnection> connections = service.getConnections();
assertTrue(connections.isEmpty());

if (sf instanceof ClientSessionFactoryImpl) {
int reconnectAttemps = ((ClientSessionFactoryImpl) sf).getReconnectAttempts();
assertEquals(0, reconnectAttemps);
private void assertPaging() throws Exception {
Queue queue = server.locateQueue(QUEUE);
if (isPaging) {
Assert.assertTrue(queue.getPageSubscription().isPaging());
} else {
fail("ClientSessionFactory is not the instance of ClientSessionFactoryImpl");
Assert.assertFalse(queue.getPageSubscription().isPaging());
}
}

Expand All @@ -158,72 +150,20 @@ public void testSlowConsumerNotification() throws Exception {

ClientSession session = addClientSession(sf.createSession(false, true, true, false));

session.createQueue(QUEUE, QUEUE, null, false);

AddressSettings addressSettings = new AddressSettings().setSlowConsumerCheckPeriod(2).setSlowConsumerThreshold(10).setSlowConsumerPolicy(SlowConsumerPolicy.NOTIFY);

server.getAddressSettingsRepository().removeMatch(QUEUE.toString());
server.getAddressSettingsRepository().addMatch(QUEUE.toString(), addressSettings);

ClientProducer producer = addClientProducer(session.createProducer(QUEUE));

final int numMessages = 25;

for (int i = 0; i < numMessages; i++) {
producer.send(createTextMessage(session, "m" + i));
AddressSettings addressSettings = new AddressSettings();
addressSettings.setSlowConsumerCheckPeriod(2);
addressSettings.setSlowConsumerThreshold(10);
addressSettings.setSlowConsumerPolicy(SlowConsumerPolicy.NOTIFY);
if (!isPaging) {
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK);
addressSettings.setMaxSizeBytes(-1);
}

SimpleString notifQueue = RandomUtil.randomSimpleString();

session.createQueue(ActiveMQDefaultConfiguration.getDefaultManagementNotificationAddress(), notifQueue, null, false);

ClientConsumer notifConsumer = session.createConsumer(notifQueue.toString(), ManagementHelper.HDR_NOTIFICATION_TYPE + "='" + CoreNotificationType.CONSUMER_SLOW + "'");

final CountDownLatch notifLatch = new CountDownLatch(1);

notifConsumer.setMessageHandler(new MessageHandler() {
@Override
public void onMessage(ClientMessage message) {
assertEquals(CoreNotificationType.CONSUMER_SLOW.toString(), message.getObjectProperty(ManagementHelper.HDR_NOTIFICATION_TYPE).toString());
assertEquals(QUEUE.toString(), message.getObjectProperty(ManagementHelper.HDR_ADDRESS).toString());
assertEquals(Integer.valueOf(1), message.getIntProperty(ManagementHelper.HDR_CONSUMER_COUNT));
if (isNetty) {
assertTrue(message.getSimpleStringProperty(ManagementHelper.HDR_REMOTE_ADDRESS).toString().startsWith("/127.0.0.1"));
} else {
assertEquals(SimpleString.toSimpleString("invm:0"), message.getSimpleStringProperty(ManagementHelper.HDR_REMOTE_ADDRESS));
}
assertNotNull(message.getSimpleStringProperty(ManagementHelper.HDR_CONNECTION_NAME));
assertNotNull(message.getSimpleStringProperty(ManagementHelper.HDR_CONSUMER_NAME));
assertNotNull(message.getSimpleStringProperty(ManagementHelper.HDR_SESSION_NAME));
try {
message.acknowledge();
} catch (ActiveMQException e) {
e.printStackTrace();
}
notifLatch.countDown();
}
});

ClientConsumer consumer = addClientConsumer(session.createConsumer(QUEUE));
session.start();

assertTrue(notifLatch.await(3, TimeUnit.SECONDS));
}

@Test
public void testSlowConsumerWithPreAckNotification() throws Exception {

ClientSessionFactory sf = createSessionFactory(locator);

ClientSession session = addClientSession(sf.createSession(false, true, true, true));

session.createQueue(QUEUE, QUEUE, null, false);

AddressSettings addressSettings = new AddressSettings().setSlowConsumerCheckPeriod(2).setSlowConsumerThreshold(1).setSlowConsumerPolicy(SlowConsumerPolicy.NOTIFY);

server.getAddressSettingsRepository().removeMatch(QUEUE.toString());
server.getAddressSettingsRepository().addMatch(QUEUE.toString(), addressSettings);

assertPaging();

ClientProducer producer = addClientProducer(session.createProducer(QUEUE));

final int numMessages = 25;
Expand All @@ -244,7 +184,6 @@ public void testSlowConsumerWithPreAckNotification() throws Exception {
@Override
public void onMessage(ClientMessage message) {
assertEquals(CoreNotificationType.CONSUMER_SLOW.toString(), message.getObjectProperty(ManagementHelper.HDR_NOTIFICATION_TYPE).toString());
IntegrationTestLogger.LOGGER.info("Slow consumer detected!");
assertEquals(QUEUE.toString(), message.getObjectProperty(ManagementHelper.HDR_ADDRESS).toString());
assertEquals(Integer.valueOf(1), message.getIntProperty(ManagementHelper.HDR_CONSUMER_COUNT));
if (isNetty) {
Expand All @@ -267,16 +206,7 @@ public void onMessage(ClientMessage message) {
ClientConsumer consumer = addClientConsumer(session.createConsumer(QUEUE));
session.start();

for (int i = 0; i < numMessages; i++) {
ClientMessage msg = consumer.receive(1000);
assertNotNull(msg);
IntegrationTestLogger.LOGGER.info("Received message.");
msg.acknowledge();
session.commit();
Thread.sleep(100);
}

assertFalse(notifLatch.await(3, TimeUnit.SECONDS));
assertTrue(notifLatch.await(15, TimeUnit.SECONDS));
}

@Test
Expand All @@ -285,8 +215,6 @@ public void testSlowConsumerSpared() throws Exception {

ClientSession session = addClientSession(sf.createSession(true, true));

session.createQueue(QUEUE, QUEUE, null, false);

ClientProducer producer = addClientProducer(session.createProducer(QUEUE));

final int numMessages = 5;
Expand All @@ -295,6 +223,8 @@ public void testSlowConsumerSpared() throws Exception {
producer.send(createTextMessage(session, "m" + i));
}

assertPaging();

ClientConsumer consumer = addClientConsumer(session.createConsumer(QUEUE));
session.start();

Expand All @@ -315,8 +245,6 @@ public void testFastThenSlowConsumerSpared() throws Exception {

final ClientSession producerSession = addClientSession(sf.createSession(true, true));

session.createQueue(QUEUE, QUEUE, null, false);

final ClientProducer producer = addClientProducer(producerSession.createProducer(QUEUE));

final AtomicLong messagesProduced = new AtomicLong(0);
Expand Down Expand Up @@ -356,6 +284,8 @@ public void run() {

t.start();

assertPaging();

ClientConsumer consumer = addClientConsumer(session.createConsumer(QUEUE));
session.start();

Expand All @@ -382,7 +312,10 @@ public void testSlowWildcardConsumer() throws Exception {
SimpleString queueName2 = new SimpleString("Q2");
SimpleString queueName = new SimpleString("Q");

AddressSettings addressSettings = new AddressSettings().setSlowConsumerCheckPeriod(2).setSlowConsumerThreshold(10).setSlowConsumerPolicy(SlowConsumerPolicy.KILL);
AddressSettings addressSettings = new AddressSettings();
addressSettings.setSlowConsumerCheckPeriod(2);
addressSettings.setSlowConsumerThreshold(10);
addressSettings.setSlowConsumerPolicy(SlowConsumerPolicy.KILL);

server.getAddressSettingsRepository().addMatch(address.toString(), addressSettings);

Expand Down