From 7148e46b79d5d9d7dbe1d73cf48e0f3f88b54d6f Mon Sep 17 00:00:00 2001 From: Matt Pavlovich Date: Fri, 23 Feb 2024 08:51:17 -0600 Subject: [PATCH 1/2] [AMQ-9437] AdvancedDestination statistics networkEnqueue and networkDequeue counters --- .../activemq/broker/TransactionBroker.java | 11 ++ .../activemq/broker/jmx/DestinationView.java | 21 ++++ .../broker/jmx/DestinationViewMBean.java | 12 ++ .../broker/region/BaseDestination.java | 11 ++ .../activemq/broker/region/Destination.java | 6 + .../broker/region/DestinationFilter.java | 10 ++ .../broker/region/DestinationStatistics.java | 28 +++++ .../region/DurableTopicSubscription.java | 3 + .../apache/activemq/broker/region/Queue.java | 18 ++- .../apache/activemq/broker/region/Topic.java | 5 + .../broker/region/TopicSubscription.java | 6 + .../broker/region/policy/PolicyEntry.java | 13 ++- .../org/apache/activemq/PolicyEntryTest.java | 15 +++ ...ntryTest-policy-advancedStatistics-mod.xml | 36 ++++++ ...icyEntryTest-policy-advancedStatistics.xml | 36 ++++++ .../NetworkAdvancedStatisticsTest.java | 106 ++++++++++++++++++ .../localBroker-advancedStatistics.xml | 63 +++++++++++ .../remoteBroker-advancedStatistics.xml | 49 ++++++++ 18 files changed, 444 insertions(+), 5 deletions(-) create mode 100644 activemq-runtime-config/src/test/resources/org/apache/activemq/policyEntryTest-policy-advancedStatistics-mod.xml create mode 100644 activemq-runtime-config/src/test/resources/org/apache/activemq/policyEntryTest-policy-advancedStatistics.xml create mode 100644 activemq-unit-tests/src/test/java/org/apache/activemq/network/NetworkAdvancedStatisticsTest.java create mode 100644 activemq-unit-tests/src/test/resources/org/apache/activemq/network/localBroker-advancedStatistics.xml create mode 100644 activemq-unit-tests/src/test/resources/org/apache/activemq/network/remoteBroker-advancedStatistics.xml diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/TransactionBroker.java b/activemq-broker/src/main/java/org/apache/activemq/broker/TransactionBroker.java index 12e9f7742a..fbee030885 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/TransactionBroker.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/TransactionBroker.java @@ -190,9 +190,20 @@ public void afterCommit() throws Exception { dest.clearPendingMessages(opCount); dest.getDestinationStatistics().getEnqueues().add(opCount); dest.getDestinationStatistics().getMessages().add(opCount); + + if(dest.isAdvancedStatisticsEnabled()) { + if(transactionBroker.context.isNetworkConnection()) { + dest.getDestinationStatistics().getNetworkEnqueues().add(opCount); + } + } LOG.debug("cleared pending from afterCommit: {}", destination); } else { dest.getDestinationStatistics().getDequeues().add(opCount); + if(dest.isAdvancedStatisticsEnabled()) { + if(transactionBroker.context.isNetworkConnection()) { + dest.getDestinationStatistics().getNetworkDequeues().add(opCount); + } + } } } } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java index 8abcc67163..1b05a768d4 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java @@ -599,4 +599,25 @@ public boolean isSendDuplicateFromStoreToDLQ() { public long getMaxUncommittedExceededCount() { return destination.getDestinationStatistics().getMaxUncommittedExceededCount().getCount(); } + + @Override + public boolean isAdvancedStatisticsEnabled() { + return destination.isAdvancedStatisticsEnabled(); + } + + @Override + public void setAdvancedStatisticsEnabled(boolean advancedStatisticsEnabled) { + destination.setAdvancedStatisticsEnabled(advancedStatisticsEnabled); + } + + @Override + public long getNetworkEnqueues() { + return destination.getDestinationStatistics().getNetworkEnqueues().getCount(); + } + + @Override + public long getNetworkDequeues() { + return destination.getDestinationStatistics().getNetworkDequeues().getCount(); + } + } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java index 45ed51b994..db518f78b6 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java @@ -481,4 +481,16 @@ public String sendTextMessageWithProperties(@MBeanInfo("properties") String prop @MBeanInfo("Number of times the max uncommitted limit has been exceed for this destination") long getMaxUncommittedExceededCount(); + + @MBeanInfo("Query Advanced Statistics flag") + boolean isAdvancedStatisticsEnabled(); + + @MBeanInfo("Toggle Advanced Statistics flag") + void setAdvancedStatisticsEnabled(boolean advancedStatisticsEnabled); + + @MBeanInfo("Number of messages sent to the destination via network connection") + long getNetworkEnqueues(); + + @MBeanInfo("Number of messages acknowledged from the destination via network connection") + long getNetworkDequeues(); } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java index 4ca3913c74..3964a8ce92 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java @@ -110,6 +110,8 @@ public abstract class BaseDestination implements Destination { protected final Scheduler scheduler; private boolean disposed = false; private boolean doOptimzeMessageStorage = true; + private boolean advancedStatisticsEnabled = false; + /* * percentage of in-flight messages above which optimize message store is disabled */ @@ -868,6 +870,15 @@ public void setOptimizeMessageStoreInFlightLimit(int optimizeMessageStoreInFligh this.optimizeMessageStoreInFlightLimit = optimizeMessageStoreInFlightLimit; } + @Override + public boolean isAdvancedStatisticsEnabled() { + return this.advancedStatisticsEnabled; + } + + @Override + public void setAdvancedStatisticsEnabled(boolean advancedStatisticsEnabled) { + this.advancedStatisticsEnabled = advancedStatisticsEnabled; + } @Override public abstract List getConsumers(); diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Destination.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Destination.java index 70f807be86..de36f1605a 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Destination.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Destination.java @@ -258,4 +258,10 @@ public interface Destination extends Service, Task, Message.MessageDestination { boolean isSendDuplicateFromStoreToDLQ(); void setSendDuplicateFromStoreToDLQ(boolean sendDuplicateFromStoreToDLQ); + + // [AMQ-9437] + boolean isAdvancedStatisticsEnabled(); + + void setAdvancedStatisticsEnabled(boolean advancedStatisticsEnabled); + } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java index 6b288a234f..cfc1de4509 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java @@ -409,6 +409,16 @@ public void setSendDuplicateFromStoreToDLQ(boolean sendDuplicateFromStoreToDLQ) next.setSendDuplicateFromStoreToDLQ(sendDuplicateFromStoreToDLQ); } + @Override + public boolean isAdvancedStatisticsEnabled() { + return next.isAdvancedStatisticsEnabled(); + } + + @Override + public void setAdvancedStatisticsEnabled(boolean advancedStatisticsEnabled) { + next.setAdvancedStatisticsEnabled(advancedStatisticsEnabled); + } + public void deleteSubscription(ConnectionContext context, SubscriptionKey key) throws Exception { if (next instanceof DestinationFilter) { DestinationFilter filter = (DestinationFilter) next; diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java index 9d30c622f3..dc6b17dfaf 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java @@ -46,6 +46,10 @@ public class DestinationStatistics extends StatsImpl { protected SizeStatisticImpl messageSize; protected CountStatisticImpl maxUncommittedExceededCount; + // [AMQ-9437] Advanced Statistics are optionally enabled + protected CountStatisticImpl networkEnqueues; + protected CountStatisticImpl networkDequeues; + public DestinationStatistics() { enqueues = new CountStatisticImpl("enqueues", "The number of messages that have been sent to the destination"); @@ -68,6 +72,10 @@ public DestinationStatistics() { blockedTime = new TimeStatisticImpl("blockedTime","amount of time messages are blocked for flow control"); messageSize = new SizeStatisticImpl("messageSize","Size of messages passing through the destination"); maxUncommittedExceededCount = new CountStatisticImpl("maxUncommittedExceededCount", "number of times maxUncommittedCount has been exceeded"); + + networkEnqueues = new CountStatisticImpl("networkEnqueues", "The number of messages that have been sent to the destination via network connection"); + networkDequeues = new CountStatisticImpl("networkDequeues", "The number of messages that have been acknowledged from the destination via network connection"); + addStatistic("enqueues", enqueues); addStatistic("dispatched", dispatched); addStatistic("dequeues", dequeues); @@ -83,6 +91,9 @@ public DestinationStatistics() { addStatistic("blockedTime",blockedTime); addStatistic("messageSize",messageSize); addStatistic("maxUncommittedExceededCount", maxUncommittedExceededCount); + + addStatistic("networkEnqueues", networkEnqueues); + addStatistic("networkDequeues", networkDequeues); } public CountStatisticImpl getEnqueues() { @@ -151,6 +162,14 @@ public CountStatisticImpl getMaxUncommittedExceededCount(){ return this.maxUncommittedExceededCount; } + public CountStatisticImpl getNetworkEnqueues() { + return networkEnqueues; + } + + public CountStatisticImpl getNetworkDequeues() { + return networkDequeues; + } + public void reset() { if (this.isDoReset()) { super.reset(); @@ -165,6 +184,8 @@ public void reset() { blockedTime.reset(); messageSize.reset(); maxUncommittedExceededCount.reset(); + networkEnqueues.reset(); + networkDequeues.reset(); } } @@ -187,6 +208,9 @@ public void setEnabled(boolean enabled) { messageSize.setEnabled(enabled); maxUncommittedExceededCount.setEnabled(enabled); + // [AMQ-9437] Advanced Statistics + networkEnqueues.setEnabled(enabled); + networkDequeues.setEnabled(enabled); } public void setParent(DestinationStatistics parent) { @@ -207,6 +231,8 @@ public void setParent(DestinationStatistics parent) { blockedTime.setParent(parent.blockedTime); messageSize.setParent(parent.messageSize); maxUncommittedExceededCount.setParent(parent.maxUncommittedExceededCount); + networkEnqueues.setParent(parent.networkEnqueues); + networkDequeues.setParent(parent.networkDequeues); } else { enqueues.setParent(null); dispatched.setParent(null); @@ -224,6 +250,8 @@ public void setParent(DestinationStatistics parent) { blockedTime.setParent(null); messageSize.setParent(null); maxUncommittedExceededCount.setParent(null); + networkEnqueues.setParent(null); + networkDequeues.setParent(null); } } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java index e0dc6d0f07..c1cfe00d71 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java @@ -371,6 +371,9 @@ public void afterRollback() throws Exception { ((Destination)node.getRegionDestination()).getDestinationStatistics().getDequeues().increment(); if (info.isNetworkSubscription()) { ((Destination)node.getRegionDestination()).getDestinationStatistics().getForwards().add(ack.getMessageCount()); + if(((Destination)node.getRegionDestination()).isAdvancedStatisticsEnabled()) { + ((Destination)node.getRegionDestination()).getDestinationStatistics().getNetworkDequeues().add(ack.getMessageCount()); + } } } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java index 6502a20633..4bbbefab54 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -1873,7 +1873,7 @@ protected void removeMessage(ConnectionContext context, Subscription sub, final // This sends the ack the the journal.. if (!ack.isInTransaction()) { acknowledge(context, sub, ack, reference); - dropMessage(reference); + dropMessage(context, reference); } else { try { acknowledge(context, sub, ack, reference); @@ -1882,7 +1882,7 @@ protected void removeMessage(ConnectionContext context, Subscription sub, final @Override public void afterCommit() throws Exception { - dropMessage(reference); + dropMessage(context, reference); wakeup(); } @@ -1910,11 +1910,16 @@ public void afterRollback() throws Exception { reference.setAcked(true); } - private void dropMessage(QueueMessageReference reference) { + private void dropMessage(ConnectionContext context, QueueMessageReference reference) { //use dropIfLive so we only process the statistics at most one time if (reference.dropIfLive()) { getDestinationStatistics().getDequeues().increment(); getDestinationStatistics().getMessages().decrement(); + + if(context.getConnection().isNetworkConnection() && isAdvancedStatisticsEnabled()) { + getDestinationStatistics().getNetworkDequeues().increment(); + } + pagedInMessagesLock.writeLock().lock(); try { pagedInMessages.remove(reference); @@ -1969,6 +1974,11 @@ final void messageSent(final ConnectionContext context, final Message msg) throw destinationStatistics.getEnqueues().increment(); destinationStatistics.getMessages().increment(); destinationStatistics.getMessageSize().addSize(msg.getSize()); + + if(context.getConnection().isNetworkConnection() && isAdvancedStatisticsEnabled()) { + destinationStatistics.getNetworkEnqueues().increment(); + } + messageDelivered(context, msg); consumersLock.readLock().lock(); try { @@ -2115,7 +2125,7 @@ private PendingList doPageInForDispatch(boolean force, boolean processExpired, i LOG.warn("{}, duplicate message {} - {} from cursor, is cursor audit disabled or too constrained? Redirecting to dlq", this, ref.getMessageId(), ref.getMessage().getMessageId().getFutureOrSequenceLong()); if (store != null) { ConnectionContext connectionContext = createConnectionContext(); - dropMessage(ref); + dropMessage(connectionContext, ref); if (gotToTheStore(ref.getMessage())) { LOG.debug("Duplicate message {} from cursor, removing from store", ref.getMessage()); store.removeMessage(connectionContext, new MessageAck(ref.getMessage(), MessageAck.POISON_ACK_TYPE, 1)); diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java index cad0d3b883..1c394f2ff9 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java @@ -778,6 +778,11 @@ protected void dispatch(final ConnectionContext context, Message message) throws // misleading metrics. // destinationStatistics.getMessages().increment(); destinationStatistics.getEnqueues().increment(); + + if(context.isNetworkConnection() && isAdvancedStatisticsEnabled()) { + destinationStatistics.getNetworkEnqueues().increment(); + } + destinationStatistics.getMessageSize().addSize(message.getSize()); MessageEvaluationContext msgContext = null; diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java index a5b97241b4..84835a7169 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java @@ -449,6 +449,9 @@ private void incrementStatsOnAck(final Destination destination, final MessageAck destination.getDestinationStatistics().getInflight().subtract(count); if (info.isNetworkSubscription()) { destination.getDestinationStatistics().getForwards().add(count); + if(destination.isAdvancedStatisticsEnabled()) { + destination.getDestinationStatistics().getNetworkDequeues().add(count); + } } if (ack.isExpiredAck()) { destination.getDestinationStatistics().getExpired().add(count); @@ -746,6 +749,9 @@ private void discard(MessageReference message, boolean expired) { matched.remove(message); if (destination != null) { destination.getDestinationStatistics().getDequeues().increment(); + if(destination.isAdvancedStatisticsEnabled()) { + destination.getDestinationStatistics().getNetworkDequeues().increment(); + } } Destination dest = (Destination) message.getRegionDestination(); if (dest != null) { diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java index 7230957022..620daf2a60 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java @@ -106,7 +106,7 @@ public class PolicyEntry extends DestinationMapEntry { private boolean doOptimzeMessageStorage = true; private int maxDestinations = -1; private boolean useTopicSubscriptionInflightStats = true; - + private boolean advancedStatisticsEnabled = false; // [AMQ-9437] /* * percentage of in-flight messages above which optimize message store is disabled */ @@ -306,6 +306,9 @@ public void baseUpdate(BaseDestination destination, Set includedProperti if (isUpdate("sendDuplicateFromStoreToDLQ", includedProperties)) { destination.setSendDuplicateFromStoreToDLQ(isSendDuplicateFromStoreToDLQ()); } + if (isUpdate("advancedStatisticsEnabled", includedProperties)) { + destination.setAdvancedStatisticsEnabled(isAdvancedStatisticsEnabled()); + } } public void baseConfiguration(Broker broker, BaseDestination destination) { @@ -1175,5 +1178,13 @@ public void setMessageInterceptorStrategy(MessageInterceptorStrategy messageInte public MessageInterceptorStrategy getMessageInterceptorStrategy() { return this.messageInterceptorStrategy; + } + + public boolean isAdvancedStatisticsEnabled() { + return this.advancedStatisticsEnabled; + } + + public void setAdvancedStatisticsEnabled(boolean advancedStatisticsEnabled) { + this.advancedStatisticsEnabled = advancedStatisticsEnabled; } } diff --git a/activemq-runtime-config/src/test/java/org/apache/activemq/PolicyEntryTest.java b/activemq-runtime-config/src/test/java/org/apache/activemq/PolicyEntryTest.java index 0f46e089e1..83fdc145c4 100644 --- a/activemq-runtime-config/src/test/java/org/apache/activemq/PolicyEntryTest.java +++ b/activemq-runtime-config/src/test/java/org/apache/activemq/PolicyEntryTest.java @@ -57,6 +57,18 @@ public void testModSendDuplicateFromStoreToDLQ() throws Exception { verifyBooleanField("AMQ.8397", "sendDuplicateFromStoreToDLQ", true); } + @Test + public void testModAdvancedStatistics() throws Exception { + final String brokerConfig = configurationSeed + "-policy-ml-broker"; + applyNewConfig(brokerConfig, configurationSeed + "-policy-advancedStatistics"); + startBroker(brokerConfig); + assertTrue("broker alive", brokerService.isStarted()); + + verifyBooleanField("AMQ.9437", "advancedStatisticsEnabled", false); + applyNewConfig(brokerConfig, configurationSeed + "-policy-advancedStatistics-mod", SLEEP); + verifyBooleanField("AMQ.9437", "advancedStatisticsEnabled", true); + } + @Test public void testAddNdMod() throws Exception { final String brokerConfig = configurationSeed + "-policy-ml-broker"; @@ -121,6 +133,9 @@ private void verifyBooleanField(String dest, String fieldName, boolean value) th session.createConsumer(session.createQueue(dest)); switch(fieldName) { + case "advancedStatisticsEnabled": + assertEquals(value, brokerService.getRegionBroker().getDestinationMap().get(new ActiveMQQueue(dest)).isAdvancedStatisticsEnabled()); + break; case "sendDuplicateFromStoreToDLQ": assertEquals(value, brokerService.getRegionBroker().getDestinationMap().get(new ActiveMQQueue(dest)).isSendDuplicateFromStoreToDLQ()); break; diff --git a/activemq-runtime-config/src/test/resources/org/apache/activemq/policyEntryTest-policy-advancedStatistics-mod.xml b/activemq-runtime-config/src/test/resources/org/apache/activemq/policyEntryTest-policy-advancedStatistics-mod.xml new file mode 100644 index 0000000000..d5a44a27c9 --- /dev/null +++ b/activemq-runtime-config/src/test/resources/org/apache/activemq/policyEntryTest-policy-advancedStatistics-mod.xml @@ -0,0 +1,36 @@ + + + + + + + + + + + + + + + + + diff --git a/activemq-runtime-config/src/test/resources/org/apache/activemq/policyEntryTest-policy-advancedStatistics.xml b/activemq-runtime-config/src/test/resources/org/apache/activemq/policyEntryTest-policy-advancedStatistics.xml new file mode 100644 index 0000000000..a6c710e075 --- /dev/null +++ b/activemq-runtime-config/src/test/resources/org/apache/activemq/policyEntryTest-policy-advancedStatistics.xml @@ -0,0 +1,36 @@ + + + + + + + + + + + + + + + + + diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/NetworkAdvancedStatisticsTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/NetworkAdvancedStatisticsTest.java new file mode 100644 index 0000000000..f5c90e589d --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/NetworkAdvancedStatisticsTest.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.network; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import jakarta.jms.Message; +import jakarta.jms.MessageConsumer; +import jakarta.jms.MessageProducer; + +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.util.Wait; +import org.apache.activemq.util.Wait.Condition; +import org.junit.Test; +import org.springframework.context.support.AbstractApplicationContext; + +public class NetworkAdvancedStatisticsTest extends BaseNetworkTest { + + protected static final int MESSAGE_COUNT = 10; + + protected AbstractApplicationContext context; + protected ActiveMQTopic included; + protected ActiveMQTopic excluded; + protected String consumerName = "durableSubs"; + + @Override + protected void doSetUp(boolean deleteAllMessages) throws Exception { + super.doSetUp(deleteAllMessages); + + included = new ActiveMQTopic("include.test.bar"); + excluded = new ActiveMQTopic("exclude.test.bar"); + } + + @Override + protected String getRemoteBrokerURI() { + return "org/apache/activemq/network/remoteBroker-advancedStatistics.xml"; + } + + @Override + protected String getLocalBrokerURI() { + return "org/apache/activemq/network/localBroker-advancedStatistics.xml"; + } + + //Added for AMQ-9437 test advancedStatistics for networkEnqueue and networkDequeue + @Test(timeout = 60 * 1000) + public void testNetworkAdvancedStatistics() throws Exception { + + // create a remote durable consumer to create demand + MessageConsumer remoteConsumer = remoteSession.createDurableSubscriber(included, consumerName); + Thread.sleep(1000); + + MessageProducer producer = localSession.createProducer(included); + for (int i = 0; i < MESSAGE_COUNT; i++) { + Message test = localSession.createTextMessage("test-" + i); + producer.send(test); + } + Thread.sleep(1000); + + //Make sure stats are correct for local -> remote + assertEquals(MESSAGE_COUNT, localBroker.getDestination(included).getDestinationStatistics().getForwards().getCount()); + assertEquals(MESSAGE_COUNT, localBroker.getDestination(included).getDestinationStatistics().getNetworkDequeues().getCount()); + assertEquals(0, localBroker.getDestination(included).getDestinationStatistics().getNetworkEnqueues().getCount()); + assertEquals(MESSAGE_COUNT, remoteBroker.getDestination(included).getDestinationStatistics().getNetworkEnqueues().getCount()); + assertEquals(0, remoteBroker.getDestination(included).getDestinationStatistics().getNetworkDequeues().getCount()); + + assertTrue(Wait.waitFor(new Condition() { + + @Override + public boolean isSatisified() throws Exception { + return localBroker.getSystemUsage().getMemoryUsage().getUsage() == 0; + } + }, 10000, 500)); + remoteConsumer.close(); + } + + protected void assertNetworkBridgeStatistics(final long expectedLocalSent, final long expectedRemoteSent) throws Exception { + + final NetworkBridge localBridge = localBroker.getNetworkConnectors().get(0).activeBridges().iterator().next(); + final NetworkBridge remoteBridge = remoteBroker.getNetworkConnectors().get(0).activeBridges().iterator().next(); + + assertTrue(Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return expectedLocalSent == localBridge.getNetworkBridgeStatistics().getDequeues().getCount() && + 0 == localBridge.getNetworkBridgeStatistics().getReceivedCount().getCount() && + expectedRemoteSent == remoteBridge.getNetworkBridgeStatistics().getDequeues().getCount() && + 0 == remoteBridge.getNetworkBridgeStatistics().getReceivedCount().getCount(); + } + })); + } +} diff --git a/activemq-unit-tests/src/test/resources/org/apache/activemq/network/localBroker-advancedStatistics.xml b/activemq-unit-tests/src/test/resources/org/apache/activemq/network/localBroker-advancedStatistics.xml new file mode 100644 index 0000000000..b5c67e2d39 --- /dev/null +++ b/activemq-unit-tests/src/test/resources/org/apache/activemq/network/localBroker-advancedStatistics.xml @@ -0,0 +1,63 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/activemq-unit-tests/src/test/resources/org/apache/activemq/network/remoteBroker-advancedStatistics.xml b/activemq-unit-tests/src/test/resources/org/apache/activemq/network/remoteBroker-advancedStatistics.xml new file mode 100644 index 0000000000..85953ce695 --- /dev/null +++ b/activemq-unit-tests/src/test/resources/org/apache/activemq/network/remoteBroker-advancedStatistics.xml @@ -0,0 +1,49 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + From 1e948323464d5f22f225eb79c020723abe771080 Mon Sep 17 00:00:00 2001 From: Matt Pavlovich Date: Mon, 16 Sep 2024 10:09:35 -0500 Subject: [PATCH 2/2] TODO: This commit will be squashed after reviews - Rename to advancedNetworkStatistics - Add unit test to cover queue and topic use case - Improve unit tests to confirm network stats do not incorrectly increment on non-network enqueue/dequeue --- .../activemq/broker/TransactionBroker.java | 12 +- .../activemq/broker/jmx/DestinationView.java | 8 +- .../broker/jmx/DestinationViewMBean.java | 8 +- .../broker/region/BaseDestination.java | 10 +- .../activemq/broker/region/Destination.java | 4 +- .../broker/region/DestinationFilter.java | 8 +- .../region/DurableTopicSubscription.java | 2 +- .../apache/activemq/broker/region/Queue.java | 4 +- .../apache/activemq/broker/region/Topic.java | 2 +- .../broker/region/TopicSubscription.java | 4 +- .../broker/region/policy/PolicyEntry.java | 14 +-- .../org/apache/activemq/PolicyEntryTest.java | 14 +-- ...-policy-advancedNetworkStatistics-mod.xml} | 2 +- ...Test-policy-advancedNetworkStatistics.xml} | 0 .../NetworkAdvancedStatisticsTest.java | 105 ++++++++++++++---- ...localBroker-advancedNetworkStatistics.xml} | 8 +- ...emoteBroker-advancedNetworkStatistics.xml} | 8 +- 17 files changed, 135 insertions(+), 78 deletions(-) rename activemq-runtime-config/src/test/resources/org/apache/activemq/{policyEntryTest-policy-advancedStatistics-mod.xml => policyEntryTest-policy-advancedNetworkStatistics-mod.xml} (94%) rename activemq-runtime-config/src/test/resources/org/apache/activemq/{policyEntryTest-policy-advancedStatistics.xml => policyEntryTest-policy-advancedNetworkStatistics.xml} (100%) rename activemq-unit-tests/src/test/resources/org/apache/activemq/network/{localBroker-advancedStatistics.xml => localBroker-advancedNetworkStatistics.xml} (88%) rename activemq-unit-tests/src/test/resources/org/apache/activemq/network/{remoteBroker-advancedStatistics.xml => remoteBroker-advancedNetworkStatistics.xml} (85%) diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/TransactionBroker.java b/activemq-broker/src/main/java/org/apache/activemq/broker/TransactionBroker.java index fbee030885..c8af4a6144 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/TransactionBroker.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/TransactionBroker.java @@ -191,18 +191,14 @@ public void afterCommit() throws Exception { dest.getDestinationStatistics().getEnqueues().add(opCount); dest.getDestinationStatistics().getMessages().add(opCount); - if(dest.isAdvancedStatisticsEnabled()) { - if(transactionBroker.context.isNetworkConnection()) { - dest.getDestinationStatistics().getNetworkEnqueues().add(opCount); - } + if(dest.isAdvancedNetworkStatisticsEnabled() && transactionBroker.context != null && transactionBroker.context.isNetworkConnection()) { + dest.getDestinationStatistics().getNetworkEnqueues().add(opCount); } LOG.debug("cleared pending from afterCommit: {}", destination); } else { dest.getDestinationStatistics().getDequeues().add(opCount); - if(dest.isAdvancedStatisticsEnabled()) { - if(transactionBroker.context.isNetworkConnection()) { - dest.getDestinationStatistics().getNetworkDequeues().add(opCount); - } + if(dest.isAdvancedNetworkStatisticsEnabled() && transactionBroker.context != null && transactionBroker.context.isNetworkConnection()) { + dest.getDestinationStatistics().getNetworkDequeues().add(opCount); } } } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java index 1b05a768d4..02a7f65057 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java @@ -601,13 +601,13 @@ public long getMaxUncommittedExceededCount() { } @Override - public boolean isAdvancedStatisticsEnabled() { - return destination.isAdvancedStatisticsEnabled(); + public boolean isAdvancedNetworkStatisticsEnabled() { + return destination.isAdvancedNetworkStatisticsEnabled(); } @Override - public void setAdvancedStatisticsEnabled(boolean advancedStatisticsEnabled) { - destination.setAdvancedStatisticsEnabled(advancedStatisticsEnabled); + public void setAdvancedNetworkStatisticsEnabled(boolean advancedNetworkStatisticsEnabled) { + destination.setAdvancedNetworkStatisticsEnabled(advancedNetworkStatisticsEnabled); } @Override diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java index db518f78b6..328ddb09f0 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java @@ -482,11 +482,11 @@ public String sendTextMessageWithProperties(@MBeanInfo("properties") String prop @MBeanInfo("Number of times the max uncommitted limit has been exceed for this destination") long getMaxUncommittedExceededCount(); - @MBeanInfo("Query Advanced Statistics flag") - boolean isAdvancedStatisticsEnabled(); + @MBeanInfo("Query Advanced Network Statistics flag") + boolean isAdvancedNetworkStatisticsEnabled(); - @MBeanInfo("Toggle Advanced Statistics flag") - void setAdvancedStatisticsEnabled(boolean advancedStatisticsEnabled); + @MBeanInfo("Toggle Advanced Network Statistics flag") + void setAdvancedNetworkStatisticsEnabled(boolean advancedNetworkStatisticsEnabled); @MBeanInfo("Number of messages sent to the destination via network connection") long getNetworkEnqueues(); diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java index 3964a8ce92..e34f23a4dd 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java @@ -110,7 +110,7 @@ public abstract class BaseDestination implements Destination { protected final Scheduler scheduler; private boolean disposed = false; private boolean doOptimzeMessageStorage = true; - private boolean advancedStatisticsEnabled = false; + private boolean advancedNetworkStatisticsEnabled = false; /* * percentage of in-flight messages above which optimize message store is disabled @@ -871,13 +871,13 @@ public void setOptimizeMessageStoreInFlightLimit(int optimizeMessageStoreInFligh } @Override - public boolean isAdvancedStatisticsEnabled() { - return this.advancedStatisticsEnabled; + public boolean isAdvancedNetworkStatisticsEnabled() { + return this.advancedNetworkStatisticsEnabled; } @Override - public void setAdvancedStatisticsEnabled(boolean advancedStatisticsEnabled) { - this.advancedStatisticsEnabled = advancedStatisticsEnabled; + public void setAdvancedNetworkStatisticsEnabled(boolean advancedNetworkStatisticsEnabled) { + this.advancedNetworkStatisticsEnabled = advancedNetworkStatisticsEnabled; } @Override diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Destination.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Destination.java index de36f1605a..45e3de7b3c 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Destination.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Destination.java @@ -260,8 +260,8 @@ public interface Destination extends Service, Task, Message.MessageDestination { void setSendDuplicateFromStoreToDLQ(boolean sendDuplicateFromStoreToDLQ); // [AMQ-9437] - boolean isAdvancedStatisticsEnabled(); + boolean isAdvancedNetworkStatisticsEnabled(); - void setAdvancedStatisticsEnabled(boolean advancedStatisticsEnabled); + void setAdvancedNetworkStatisticsEnabled(boolean advancedNetworkStatisticsEnabled); } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java index cfc1de4509..85ef367a77 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java @@ -410,13 +410,13 @@ public void setSendDuplicateFromStoreToDLQ(boolean sendDuplicateFromStoreToDLQ) } @Override - public boolean isAdvancedStatisticsEnabled() { - return next.isAdvancedStatisticsEnabled(); + public boolean isAdvancedNetworkStatisticsEnabled() { + return next.isAdvancedNetworkStatisticsEnabled(); } @Override - public void setAdvancedStatisticsEnabled(boolean advancedStatisticsEnabled) { - next.setAdvancedStatisticsEnabled(advancedStatisticsEnabled); + public void setAdvancedNetworkStatisticsEnabled(boolean advancedNetworkStatisticsEnabled) { + next.setAdvancedNetworkStatisticsEnabled(advancedNetworkStatisticsEnabled); } public void deleteSubscription(ConnectionContext context, SubscriptionKey key) throws Exception { diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java index c1cfe00d71..6946a33fa5 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java @@ -371,7 +371,7 @@ public void afterRollback() throws Exception { ((Destination)node.getRegionDestination()).getDestinationStatistics().getDequeues().increment(); if (info.isNetworkSubscription()) { ((Destination)node.getRegionDestination()).getDestinationStatistics().getForwards().add(ack.getMessageCount()); - if(((Destination)node.getRegionDestination()).isAdvancedStatisticsEnabled()) { + if(((Destination)node.getRegionDestination()).isAdvancedNetworkStatisticsEnabled() && getContext() != null && getContext().isNetworkConnection()) { ((Destination)node.getRegionDestination()).getDestinationStatistics().getNetworkDequeues().add(ack.getMessageCount()); } } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java index 4bbbefab54..0ed6763f7b 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -1916,7 +1916,7 @@ private void dropMessage(ConnectionContext context, QueueMessageReference refere getDestinationStatistics().getDequeues().increment(); getDestinationStatistics().getMessages().decrement(); - if(context.getConnection().isNetworkConnection() && isAdvancedStatisticsEnabled()) { + if(isAdvancedNetworkStatisticsEnabled() && context.getConnection() != null && context.getConnection().isNetworkConnection()) { getDestinationStatistics().getNetworkDequeues().increment(); } @@ -1975,7 +1975,7 @@ final void messageSent(final ConnectionContext context, final Message msg) throw destinationStatistics.getMessages().increment(); destinationStatistics.getMessageSize().addSize(msg.getSize()); - if(context.getConnection().isNetworkConnection() && isAdvancedStatisticsEnabled()) { + if(isAdvancedNetworkStatisticsEnabled() && context.getConnection() != null && context.getConnection().isNetworkConnection()) { destinationStatistics.getNetworkEnqueues().increment(); } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java index 1c394f2ff9..a9e07874e0 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java @@ -779,7 +779,7 @@ protected void dispatch(final ConnectionContext context, Message message) throws // destinationStatistics.getMessages().increment(); destinationStatistics.getEnqueues().increment(); - if(context.isNetworkConnection() && isAdvancedStatisticsEnabled()) { + if(isAdvancedNetworkStatisticsEnabled() && context != null && context.isNetworkConnection()) { destinationStatistics.getNetworkEnqueues().increment(); } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java index 84835a7169..4403dea6b5 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java @@ -449,7 +449,7 @@ private void incrementStatsOnAck(final Destination destination, final MessageAck destination.getDestinationStatistics().getInflight().subtract(count); if (info.isNetworkSubscription()) { destination.getDestinationStatistics().getForwards().add(count); - if(destination.isAdvancedStatisticsEnabled()) { + if(destination.isAdvancedNetworkStatisticsEnabled() && getContext() != null && getContext().isNetworkConnection()) { destination.getDestinationStatistics().getNetworkDequeues().add(count); } } @@ -749,7 +749,7 @@ private void discard(MessageReference message, boolean expired) { matched.remove(message); if (destination != null) { destination.getDestinationStatistics().getDequeues().increment(); - if(destination.isAdvancedStatisticsEnabled()) { + if(destination.isAdvancedNetworkStatisticsEnabled() && getContext() != null && getContext().isNetworkConnection()) { destination.getDestinationStatistics().getNetworkDequeues().increment(); } } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java index 620daf2a60..e33f13b48c 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java @@ -106,7 +106,7 @@ public class PolicyEntry extends DestinationMapEntry { private boolean doOptimzeMessageStorage = true; private int maxDestinations = -1; private boolean useTopicSubscriptionInflightStats = true; - private boolean advancedStatisticsEnabled = false; // [AMQ-9437] + private boolean advancedNetworkStatisticsEnabled = false; // [AMQ-9437] /* * percentage of in-flight messages above which optimize message store is disabled */ @@ -306,8 +306,8 @@ public void baseUpdate(BaseDestination destination, Set includedProperti if (isUpdate("sendDuplicateFromStoreToDLQ", includedProperties)) { destination.setSendDuplicateFromStoreToDLQ(isSendDuplicateFromStoreToDLQ()); } - if (isUpdate("advancedStatisticsEnabled", includedProperties)) { - destination.setAdvancedStatisticsEnabled(isAdvancedStatisticsEnabled()); + if (isUpdate("advancedNetworkStatisticsEnabled", includedProperties)) { + destination.setAdvancedNetworkStatisticsEnabled(isAdvancedNetworkStatisticsEnabled()); } } @@ -1180,11 +1180,11 @@ public MessageInterceptorStrategy getMessageInterceptorStrategy() { return this.messageInterceptorStrategy; } - public boolean isAdvancedStatisticsEnabled() { - return this.advancedStatisticsEnabled; + public boolean isAdvancedNetworkStatisticsEnabled() { + return this.advancedNetworkStatisticsEnabled; } - public void setAdvancedStatisticsEnabled(boolean advancedStatisticsEnabled) { - this.advancedStatisticsEnabled = advancedStatisticsEnabled; + public void setAdvancedNetworkStatisticsEnabled(boolean advancedNetworkStatisticsEnabled) { + this.advancedNetworkStatisticsEnabled = advancedNetworkStatisticsEnabled; } } diff --git a/activemq-runtime-config/src/test/java/org/apache/activemq/PolicyEntryTest.java b/activemq-runtime-config/src/test/java/org/apache/activemq/PolicyEntryTest.java index 83fdc145c4..53341c3a53 100644 --- a/activemq-runtime-config/src/test/java/org/apache/activemq/PolicyEntryTest.java +++ b/activemq-runtime-config/src/test/java/org/apache/activemq/PolicyEntryTest.java @@ -58,15 +58,15 @@ public void testModSendDuplicateFromStoreToDLQ() throws Exception { } @Test - public void testModAdvancedStatistics() throws Exception { + public void testModAdvancedNetworkStatistics() throws Exception { final String brokerConfig = configurationSeed + "-policy-ml-broker"; - applyNewConfig(brokerConfig, configurationSeed + "-policy-advancedStatistics"); + applyNewConfig(brokerConfig, configurationSeed + "-policy-advancedNetworkStatistics"); startBroker(brokerConfig); assertTrue("broker alive", brokerService.isStarted()); - verifyBooleanField("AMQ.9437", "advancedStatisticsEnabled", false); - applyNewConfig(brokerConfig, configurationSeed + "-policy-advancedStatistics-mod", SLEEP); - verifyBooleanField("AMQ.9437", "advancedStatisticsEnabled", true); + verifyBooleanField("AMQ.9437", "advancedNetworkStatisticsEnabled", false); + applyNewConfig(brokerConfig, configurationSeed + "-policy-advancedNetworkStatistics-mod", SLEEP); + verifyBooleanField("AMQ.9437", "advancedNetworkStatisticsEnabled", true); } @Test @@ -133,8 +133,8 @@ private void verifyBooleanField(String dest, String fieldName, boolean value) th session.createConsumer(session.createQueue(dest)); switch(fieldName) { - case "advancedStatisticsEnabled": - assertEquals(value, brokerService.getRegionBroker().getDestinationMap().get(new ActiveMQQueue(dest)).isAdvancedStatisticsEnabled()); + case "advancedNetworkStatisticsEnabled": + assertEquals(value, brokerService.getRegionBroker().getDestinationMap().get(new ActiveMQQueue(dest)).isAdvancedNetworkStatisticsEnabled()); break; case "sendDuplicateFromStoreToDLQ": assertEquals(value, brokerService.getRegionBroker().getDestinationMap().get(new ActiveMQQueue(dest)).isSendDuplicateFromStoreToDLQ()); diff --git a/activemq-runtime-config/src/test/resources/org/apache/activemq/policyEntryTest-policy-advancedStatistics-mod.xml b/activemq-runtime-config/src/test/resources/org/apache/activemq/policyEntryTest-policy-advancedNetworkStatistics-mod.xml similarity index 94% rename from activemq-runtime-config/src/test/resources/org/apache/activemq/policyEntryTest-policy-advancedStatistics-mod.xml rename to activemq-runtime-config/src/test/resources/org/apache/activemq/policyEntryTest-policy-advancedNetworkStatistics-mod.xml index d5a44a27c9..534f884d4b 100644 --- a/activemq-runtime-config/src/test/resources/org/apache/activemq/policyEntryTest-policy-advancedStatistics-mod.xml +++ b/activemq-runtime-config/src/test/resources/org/apache/activemq/policyEntryTest-policy-advancedNetworkStatistics-mod.xml @@ -28,7 +28,7 @@ - + diff --git a/activemq-runtime-config/src/test/resources/org/apache/activemq/policyEntryTest-policy-advancedStatistics.xml b/activemq-runtime-config/src/test/resources/org/apache/activemq/policyEntryTest-policy-advancedNetworkStatistics.xml similarity index 100% rename from activemq-runtime-config/src/test/resources/org/apache/activemq/policyEntryTest-policy-advancedStatistics.xml rename to activemq-runtime-config/src/test/resources/org/apache/activemq/policyEntryTest-policy-advancedNetworkStatistics.xml diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/NetworkAdvancedStatisticsTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/NetworkAdvancedStatisticsTest.java index f5c90e589d..df99bab354 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/network/NetworkAdvancedStatisticsTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/NetworkAdvancedStatisticsTest.java @@ -19,41 +19,61 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; + +import java.util.Arrays; +import java.util.Collection; + import jakarta.jms.Message; import jakarta.jms.MessageConsumer; +import jakarta.jms.MessageListener; import jakarta.jms.MessageProducer; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.util.Wait; import org.apache.activemq.util.Wait.Condition; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import org.springframework.context.support.AbstractApplicationContext; +@RunWith(value = Parameterized.class) public class NetworkAdvancedStatisticsTest extends BaseNetworkTest { + @Parameterized.Parameters(name="includedDestination={0}, excludedDestination={1}") + public static Collection data() { + return Arrays.asList(new Object[][] { + { new ActiveMQTopic("include.test.bar"), new ActiveMQTopic("exclude.test.bar")}, + { new ActiveMQQueue("include.test.foo"), new ActiveMQQueue("exclude.test.foo")}}); + } + protected static final int MESSAGE_COUNT = 10; protected AbstractApplicationContext context; - protected ActiveMQTopic included; - protected ActiveMQTopic excluded; protected String consumerName = "durableSubs"; + private final ActiveMQDestination includedDestination; + private final ActiveMQDestination excludedDestination; + + public NetworkAdvancedStatisticsTest(ActiveMQDestination includedDestionation, ActiveMQDestination excludedDestination) { + this.includedDestination = includedDestionation; + this.excludedDestination = excludedDestination; + } + @Override protected void doSetUp(boolean deleteAllMessages) throws Exception { super.doSetUp(deleteAllMessages); - - included = new ActiveMQTopic("include.test.bar"); - excluded = new ActiveMQTopic("exclude.test.bar"); } @Override protected String getRemoteBrokerURI() { - return "org/apache/activemq/network/remoteBroker-advancedStatistics.xml"; + return "org/apache/activemq/network/remoteBroker-advancedNetworkStatistics.xml"; } @Override protected String getLocalBrokerURI() { - return "org/apache/activemq/network/localBroker-advancedStatistics.xml"; + return "org/apache/activemq/network/localBroker-advancedNetworkStatistics.xml"; } //Added for AMQ-9437 test advancedStatistics for networkEnqueue and networkDequeue @@ -61,30 +81,71 @@ protected String getLocalBrokerURI() { public void testNetworkAdvancedStatistics() throws Exception { // create a remote durable consumer to create demand - MessageConsumer remoteConsumer = remoteSession.createDurableSubscriber(included, consumerName); + MessageConsumer remoteConsumer; + if(includedDestination.isTopic()) { + remoteConsumer = remoteSession.createDurableSubscriber(ActiveMQTopic.class.cast(includedDestination), consumerName); + } else { + remoteConsumer = remoteSession.createConsumer(includedDestination); + remoteConsumer.setMessageListener(new MessageListener() { + @Override + public void onMessage(Message message) { + } + }); + } Thread.sleep(1000); - MessageProducer producer = localSession.createProducer(included); + MessageProducer producer = localSession.createProducer(includedDestination); for (int i = 0; i < MESSAGE_COUNT; i++) { Message test = localSession.createTextMessage("test-" + i); producer.send(test); } Thread.sleep(1000); - //Make sure stats are correct for local -> remote - assertEquals(MESSAGE_COUNT, localBroker.getDestination(included).getDestinationStatistics().getForwards().getCount()); - assertEquals(MESSAGE_COUNT, localBroker.getDestination(included).getDestinationStatistics().getNetworkDequeues().getCount()); - assertEquals(0, localBroker.getDestination(included).getDestinationStatistics().getNetworkEnqueues().getCount()); - assertEquals(MESSAGE_COUNT, remoteBroker.getDestination(included).getDestinationStatistics().getNetworkEnqueues().getCount()); - assertEquals(0, remoteBroker.getDestination(included).getDestinationStatistics().getNetworkDequeues().getCount()); - - assertTrue(Wait.waitFor(new Condition() { + MessageProducer producerExcluded = localSession.createProducer(excludedDestination); + for (int i = 0; i < MESSAGE_COUNT; i++) { + Message test = localSession.createTextMessage("test-" + i); + producerExcluded.send(test); + } + Thread.sleep(1000); - @Override - public boolean isSatisified() throws Exception { - return localBroker.getSystemUsage().getMemoryUsage().getUsage() == 0; - } - }, 10000, 500)); + //Make sure stats are correct for local -> remote + assertEquals(MESSAGE_COUNT, localBroker.getDestination(includedDestination).getDestinationStatistics().getEnqueues().getCount()); + assertEquals(MESSAGE_COUNT, localBroker.getDestination(includedDestination).getDestinationStatistics().getDequeues().getCount()); + assertEquals(MESSAGE_COUNT, localBroker.getDestination(includedDestination).getDestinationStatistics().getForwards().getCount()); + assertEquals(MESSAGE_COUNT, localBroker.getDestination(includedDestination).getDestinationStatistics().getNetworkDequeues().getCount()); + assertEquals(0, localBroker.getDestination(includedDestination).getDestinationStatistics().getNetworkEnqueues().getCount()); + assertEquals(MESSAGE_COUNT, remoteBroker.getDestination(includedDestination).getDestinationStatistics().getEnqueues().getCount()); + assertEquals(0, remoteBroker.getDestination(includedDestination).getDestinationStatistics().getForwards().getCount()); + assertEquals(MESSAGE_COUNT, remoteBroker.getDestination(includedDestination).getDestinationStatistics().getNetworkEnqueues().getCount()); + assertEquals(0, remoteBroker.getDestination(includedDestination).getDestinationStatistics().getNetworkDequeues().getCount()); + + // Make sure stats do not increment for local-only + assertEquals(MESSAGE_COUNT, localBroker.getDestination(excludedDestination).getDestinationStatistics().getEnqueues().getCount()); + assertEquals(0, localBroker.getDestination(excludedDestination).getDestinationStatistics().getForwards().getCount()); + assertEquals(0, localBroker.getDestination(excludedDestination).getDestinationStatistics().getNetworkDequeues().getCount()); + assertEquals(0, localBroker.getDestination(excludedDestination).getDestinationStatistics().getNetworkEnqueues().getCount()); + assertEquals(0, remoteBroker.getDestination(excludedDestination).getDestinationStatistics().getEnqueues().getCount()); + assertEquals(0, remoteBroker.getDestination(excludedDestination).getDestinationStatistics().getDequeues().getCount()); + assertEquals(0, remoteBroker.getDestination(excludedDestination).getDestinationStatistics().getForwards().getCount()); + assertEquals(0, remoteBroker.getDestination(excludedDestination).getDestinationStatistics().getNetworkEnqueues().getCount()); + assertEquals(0, remoteBroker.getDestination(excludedDestination).getDestinationStatistics().getNetworkDequeues().getCount()); + + if(includedDestination.isTopic()) { + assertTrue(Wait.waitFor(new Condition() { + @Override + public boolean isSatisified() throws Exception { + return localBroker.getSystemUsage().getMemoryUsage().getUsage() == 0; + } + }, 10000, 500)); + } else { + assertTrue(Wait.waitFor(new Condition() { + @Override + public boolean isSatisified() throws Exception { + // The number of message that remain is due to the exclude queue + return localBroker.getAdminView().getTotalMessageCount() == MESSAGE_COUNT; + } + }, 10000, 500)); + } remoteConsumer.close(); } diff --git a/activemq-unit-tests/src/test/resources/org/apache/activemq/network/localBroker-advancedStatistics.xml b/activemq-unit-tests/src/test/resources/org/apache/activemq/network/localBroker-advancedNetworkStatistics.xml similarity index 88% rename from activemq-unit-tests/src/test/resources/org/apache/activemq/network/localBroker-advancedStatistics.xml rename to activemq-unit-tests/src/test/resources/org/apache/activemq/network/localBroker-advancedNetworkStatistics.xml index b5c67e2d39..b543169f6e 100644 --- a/activemq-unit-tests/src/test/resources/org/apache/activemq/network/localBroker-advancedStatistics.xml +++ b/activemq-unit-tests/src/test/resources/org/apache/activemq/network/localBroker-advancedNetworkStatistics.xml @@ -28,11 +28,11 @@ - - + + - - + + diff --git a/activemq-unit-tests/src/test/resources/org/apache/activemq/network/remoteBroker-advancedStatistics.xml b/activemq-unit-tests/src/test/resources/org/apache/activemq/network/remoteBroker-advancedNetworkStatistics.xml similarity index 85% rename from activemq-unit-tests/src/test/resources/org/apache/activemq/network/remoteBroker-advancedStatistics.xml rename to activemq-unit-tests/src/test/resources/org/apache/activemq/network/remoteBroker-advancedNetworkStatistics.xml index 85953ce695..a9cf93f73a 100644 --- a/activemq-unit-tests/src/test/resources/org/apache/activemq/network/remoteBroker-advancedStatistics.xml +++ b/activemq-unit-tests/src/test/resources/org/apache/activemq/network/remoteBroker-advancedNetworkStatistics.xml @@ -28,11 +28,11 @@ - - + + - - + +