diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/TransactionBroker.java b/activemq-broker/src/main/java/org/apache/activemq/broker/TransactionBroker.java index 12e9f7742a..fbee030885 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/TransactionBroker.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/TransactionBroker.java @@ -190,9 +190,20 @@ public void afterCommit() throws Exception { dest.clearPendingMessages(opCount); dest.getDestinationStatistics().getEnqueues().add(opCount); dest.getDestinationStatistics().getMessages().add(opCount); + + if(dest.isAdvancedStatisticsEnabled()) { + if(transactionBroker.context.isNetworkConnection()) { + dest.getDestinationStatistics().getNetworkEnqueues().add(opCount); + } + } LOG.debug("cleared pending from afterCommit: {}", destination); } else { dest.getDestinationStatistics().getDequeues().add(opCount); + if(dest.isAdvancedStatisticsEnabled()) { + if(transactionBroker.context.isNetworkConnection()) { + dest.getDestinationStatistics().getNetworkDequeues().add(opCount); + } + } } } } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java index 8abcc67163..1b05a768d4 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java @@ -599,4 +599,25 @@ public boolean isSendDuplicateFromStoreToDLQ() { public long getMaxUncommittedExceededCount() { return destination.getDestinationStatistics().getMaxUncommittedExceededCount().getCount(); } + + @Override + public boolean isAdvancedStatisticsEnabled() { + return destination.isAdvancedStatisticsEnabled(); + } + + @Override + public void setAdvancedStatisticsEnabled(boolean advancedStatisticsEnabled) { + destination.setAdvancedStatisticsEnabled(advancedStatisticsEnabled); + } + + @Override + public long getNetworkEnqueues() { + return destination.getDestinationStatistics().getNetworkEnqueues().getCount(); + } + + @Override + public long getNetworkDequeues() { + return destination.getDestinationStatistics().getNetworkDequeues().getCount(); + } + } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java index 45ed51b994..db518f78b6 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java @@ -481,4 +481,16 @@ public String sendTextMessageWithProperties(@MBeanInfo("properties") String prop @MBeanInfo("Number of times the max uncommitted limit has been exceed for this destination") long getMaxUncommittedExceededCount(); + + @MBeanInfo("Query Advanced Statistics flag") + boolean isAdvancedStatisticsEnabled(); + + @MBeanInfo("Toggle Advanced Statistics flag") + void setAdvancedStatisticsEnabled(boolean advancedStatisticsEnabled); + + @MBeanInfo("Number of messages sent to the destination via network connection") + long getNetworkEnqueues(); + + @MBeanInfo("Number of messages acknowledged from the destination via network connection") + long getNetworkDequeues(); } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java index 4ca3913c74..3964a8ce92 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java @@ -110,6 +110,8 @@ public abstract class BaseDestination implements Destination { protected final Scheduler scheduler; private boolean disposed = false; private boolean doOptimzeMessageStorage = true; + private boolean advancedStatisticsEnabled = false; + /* * percentage of in-flight messages above which optimize message store is disabled */ @@ -868,6 +870,15 @@ public void setOptimizeMessageStoreInFlightLimit(int optimizeMessageStoreInFligh this.optimizeMessageStoreInFlightLimit = optimizeMessageStoreInFlightLimit; } + @Override + public boolean isAdvancedStatisticsEnabled() { + return this.advancedStatisticsEnabled; + } + + @Override + public void setAdvancedStatisticsEnabled(boolean advancedStatisticsEnabled) { + this.advancedStatisticsEnabled = advancedStatisticsEnabled; + } @Override public abstract List getConsumers(); diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Destination.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Destination.java index 70f807be86..de36f1605a 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Destination.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Destination.java @@ -258,4 +258,10 @@ public interface Destination extends Service, Task, Message.MessageDestination { boolean isSendDuplicateFromStoreToDLQ(); void setSendDuplicateFromStoreToDLQ(boolean sendDuplicateFromStoreToDLQ); + + // [AMQ-9437] + boolean isAdvancedStatisticsEnabled(); + + void setAdvancedStatisticsEnabled(boolean advancedStatisticsEnabled); + } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java index 6b288a234f..cfc1de4509 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java @@ -409,6 +409,16 @@ public void setSendDuplicateFromStoreToDLQ(boolean sendDuplicateFromStoreToDLQ) next.setSendDuplicateFromStoreToDLQ(sendDuplicateFromStoreToDLQ); } + @Override + public boolean isAdvancedStatisticsEnabled() { + return next.isAdvancedStatisticsEnabled(); + } + + @Override + public void setAdvancedStatisticsEnabled(boolean advancedStatisticsEnabled) { + next.setAdvancedStatisticsEnabled(advancedStatisticsEnabled); + } + public void deleteSubscription(ConnectionContext context, SubscriptionKey key) throws Exception { if (next instanceof DestinationFilter) { DestinationFilter filter = (DestinationFilter) next; diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java index 9d30c622f3..dc6b17dfaf 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java @@ -46,6 +46,10 @@ public class DestinationStatistics extends StatsImpl { protected SizeStatisticImpl messageSize; protected CountStatisticImpl maxUncommittedExceededCount; + // [AMQ-9437] Advanced Statistics are optionally enabled + protected CountStatisticImpl networkEnqueues; + protected CountStatisticImpl networkDequeues; + public DestinationStatistics() { enqueues = new CountStatisticImpl("enqueues", "The number of messages that have been sent to the destination"); @@ -68,6 +72,10 @@ public DestinationStatistics() { blockedTime = new TimeStatisticImpl("blockedTime","amount of time messages are blocked for flow control"); messageSize = new SizeStatisticImpl("messageSize","Size of messages passing through the destination"); maxUncommittedExceededCount = new CountStatisticImpl("maxUncommittedExceededCount", "number of times maxUncommittedCount has been exceeded"); + + networkEnqueues = new CountStatisticImpl("networkEnqueues", "The number of messages that have been sent to the destination via network connection"); + networkDequeues = new CountStatisticImpl("networkDequeues", "The number of messages that have been acknowledged from the destination via network connection"); + addStatistic("enqueues", enqueues); addStatistic("dispatched", dispatched); addStatistic("dequeues", dequeues); @@ -83,6 +91,9 @@ public DestinationStatistics() { addStatistic("blockedTime",blockedTime); addStatistic("messageSize",messageSize); addStatistic("maxUncommittedExceededCount", maxUncommittedExceededCount); + + addStatistic("networkEnqueues", networkEnqueues); + addStatistic("networkDequeues", networkDequeues); } public CountStatisticImpl getEnqueues() { @@ -151,6 +162,14 @@ public CountStatisticImpl getMaxUncommittedExceededCount(){ return this.maxUncommittedExceededCount; } + public CountStatisticImpl getNetworkEnqueues() { + return networkEnqueues; + } + + public CountStatisticImpl getNetworkDequeues() { + return networkDequeues; + } + public void reset() { if (this.isDoReset()) { super.reset(); @@ -165,6 +184,8 @@ public void reset() { blockedTime.reset(); messageSize.reset(); maxUncommittedExceededCount.reset(); + networkEnqueues.reset(); + networkDequeues.reset(); } } @@ -187,6 +208,9 @@ public void setEnabled(boolean enabled) { messageSize.setEnabled(enabled); maxUncommittedExceededCount.setEnabled(enabled); + // [AMQ-9437] Advanced Statistics + networkEnqueues.setEnabled(enabled); + networkDequeues.setEnabled(enabled); } public void setParent(DestinationStatistics parent) { @@ -207,6 +231,8 @@ public void setParent(DestinationStatistics parent) { blockedTime.setParent(parent.blockedTime); messageSize.setParent(parent.messageSize); maxUncommittedExceededCount.setParent(parent.maxUncommittedExceededCount); + networkEnqueues.setParent(parent.networkEnqueues); + networkDequeues.setParent(parent.networkDequeues); } else { enqueues.setParent(null); dispatched.setParent(null); @@ -224,6 +250,8 @@ public void setParent(DestinationStatistics parent) { blockedTime.setParent(null); messageSize.setParent(null); maxUncommittedExceededCount.setParent(null); + networkEnqueues.setParent(null); + networkDequeues.setParent(null); } } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java index e0dc6d0f07..c1cfe00d71 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java @@ -371,6 +371,9 @@ public void afterRollback() throws Exception { ((Destination)node.getRegionDestination()).getDestinationStatistics().getDequeues().increment(); if (info.isNetworkSubscription()) { ((Destination)node.getRegionDestination()).getDestinationStatistics().getForwards().add(ack.getMessageCount()); + if(((Destination)node.getRegionDestination()).isAdvancedStatisticsEnabled()) { + ((Destination)node.getRegionDestination()).getDestinationStatistics().getNetworkDequeues().add(ack.getMessageCount()); + } } } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java index 6502a20633..b96487e139 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -1873,7 +1873,7 @@ protected void removeMessage(ConnectionContext context, Subscription sub, final // This sends the ack the the journal.. if (!ack.isInTransaction()) { acknowledge(context, sub, ack, reference); - dropMessage(reference); + dropMessage(context, reference); } else { try { acknowledge(context, sub, ack, reference); @@ -1882,7 +1882,7 @@ protected void removeMessage(ConnectionContext context, Subscription sub, final @Override public void afterCommit() throws Exception { - dropMessage(reference); + dropMessage(context, reference); wakeup(); } @@ -1910,11 +1910,18 @@ public void afterRollback() throws Exception { reference.setAcked(true); } - private void dropMessage(QueueMessageReference reference) { + private void dropMessage(ConnectionContext context, QueueMessageReference reference) { //use dropIfLive so we only process the statistics at most one time if (reference.dropIfLive()) { getDestinationStatistics().getDequeues().increment(); getDestinationStatistics().getMessages().decrement(); + + if(isAdvancedStatisticsEnabled()) { + if(context.getConnection().isNetworkConnection()) { + getDestinationStatistics().getNetworkDequeues().increment(); + } + } + pagedInMessagesLock.writeLock().lock(); try { pagedInMessages.remove(reference); @@ -1969,6 +1976,13 @@ final void messageSent(final ConnectionContext context, final Message msg) throw destinationStatistics.getEnqueues().increment(); destinationStatistics.getMessages().increment(); destinationStatistics.getMessageSize().addSize(msg.getSize()); + + if(isAdvancedStatisticsEnabled()) { + if(context.getConnection().isNetworkConnection()) { + destinationStatistics.getNetworkEnqueues().increment(); + } + } + messageDelivered(context, msg); consumersLock.readLock().lock(); try { @@ -2115,7 +2129,7 @@ private PendingList doPageInForDispatch(boolean force, boolean processExpired, i LOG.warn("{}, duplicate message {} - {} from cursor, is cursor audit disabled or too constrained? Redirecting to dlq", this, ref.getMessageId(), ref.getMessage().getMessageId().getFutureOrSequenceLong()); if (store != null) { ConnectionContext connectionContext = createConnectionContext(); - dropMessage(ref); + dropMessage(connectionContext, ref); if (gotToTheStore(ref.getMessage())) { LOG.debug("Duplicate message {} from cursor, removing from store", ref.getMessage()); store.removeMessage(connectionContext, new MessageAck(ref.getMessage(), MessageAck.POISON_ACK_TYPE, 1)); diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java index a5b97241b4..84835a7169 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java @@ -449,6 +449,9 @@ private void incrementStatsOnAck(final Destination destination, final MessageAck destination.getDestinationStatistics().getInflight().subtract(count); if (info.isNetworkSubscription()) { destination.getDestinationStatistics().getForwards().add(count); + if(destination.isAdvancedStatisticsEnabled()) { + destination.getDestinationStatistics().getNetworkDequeues().add(count); + } } if (ack.isExpiredAck()) { destination.getDestinationStatistics().getExpired().add(count); @@ -746,6 +749,9 @@ private void discard(MessageReference message, boolean expired) { matched.remove(message); if (destination != null) { destination.getDestinationStatistics().getDequeues().increment(); + if(destination.isAdvancedStatisticsEnabled()) { + destination.getDestinationStatistics().getNetworkDequeues().increment(); + } } Destination dest = (Destination) message.getRegionDestination(); if (dest != null) { diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java index 7230957022..620daf2a60 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java @@ -106,7 +106,7 @@ public class PolicyEntry extends DestinationMapEntry { private boolean doOptimzeMessageStorage = true; private int maxDestinations = -1; private boolean useTopicSubscriptionInflightStats = true; - + private boolean advancedStatisticsEnabled = false; // [AMQ-9437] /* * percentage of in-flight messages above which optimize message store is disabled */ @@ -306,6 +306,9 @@ public void baseUpdate(BaseDestination destination, Set includedProperti if (isUpdate("sendDuplicateFromStoreToDLQ", includedProperties)) { destination.setSendDuplicateFromStoreToDLQ(isSendDuplicateFromStoreToDLQ()); } + if (isUpdate("advancedStatisticsEnabled", includedProperties)) { + destination.setAdvancedStatisticsEnabled(isAdvancedStatisticsEnabled()); + } } public void baseConfiguration(Broker broker, BaseDestination destination) { @@ -1175,5 +1178,13 @@ public void setMessageInterceptorStrategy(MessageInterceptorStrategy messageInte public MessageInterceptorStrategy getMessageInterceptorStrategy() { return this.messageInterceptorStrategy; + } + + public boolean isAdvancedStatisticsEnabled() { + return this.advancedStatisticsEnabled; + } + + public void setAdvancedStatisticsEnabled(boolean advancedStatisticsEnabled) { + this.advancedStatisticsEnabled = advancedStatisticsEnabled; } }