Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AMQ-9426] Add destination advancedStatisticsEnabled flag and network… #1156

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -190,9 +190,20 @@ public void afterCommit() throws Exception {
dest.clearPendingMessages(opCount);
dest.getDestinationStatistics().getEnqueues().add(opCount);
dest.getDestinationStatistics().getMessages().add(opCount);

if(dest.isAdvancedStatisticsEnabled()) {
if(transactionBroker.context.isNetworkConnection()) {
dest.getDestinationStatistics().getNetworkEnqueues().add(opCount);
}
}
LOG.debug("cleared pending from afterCommit: {}", destination);
} else {
dest.getDestinationStatistics().getDequeues().add(opCount);
if(dest.isAdvancedStatisticsEnabled()) {
if(transactionBroker.context.isNetworkConnection()) {
dest.getDestinationStatistics().getNetworkDequeues().add(opCount);
}
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -599,4 +599,25 @@ public boolean isSendDuplicateFromStoreToDLQ() {
public long getMaxUncommittedExceededCount() {
return destination.getDestinationStatistics().getMaxUncommittedExceededCount().getCount();
}

@Override
public boolean isAdvancedStatisticsEnabled() {
return destination.isAdvancedStatisticsEnabled();
}

@Override
public void setAdvancedStatisticsEnabled(boolean advancedStatisticsEnabled) {
destination.setAdvancedStatisticsEnabled(advancedStatisticsEnabled);
}

@Override
public long getNetworkEnqueues() {
return destination.getDestinationStatistics().getNetworkEnqueues().getCount();
}

@Override
public long getNetworkDequeues() {
return destination.getDestinationStatistics().getNetworkDequeues().getCount();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -481,4 +481,16 @@ public String sendTextMessageWithProperties(@MBeanInfo("properties") String prop

@MBeanInfo("Number of times the max uncommitted limit has been exceed for this destination")
long getMaxUncommittedExceededCount();

@MBeanInfo("Query Advanced Statistics flag")
boolean isAdvancedStatisticsEnabled();

@MBeanInfo("Toggle Advanced Statistics flag")
void setAdvancedStatisticsEnabled(boolean advancedStatisticsEnabled);

@MBeanInfo("Number of messages sent to the destination via network connection")
long getNetworkEnqueues();

@MBeanInfo("Number of messages acknowledged from the destination via network connection")
long getNetworkDequeues();
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
import org.apache.activemq.security.SecurityContext;
import org.apache.activemq.state.ProducerState;
import org.apache.activemq.store.MessageStore;
Expand Down Expand Up @@ -109,6 +108,8 @@ public abstract class BaseDestination implements Destination {
protected final Scheduler scheduler;
private boolean disposed = false;
private boolean doOptimzeMessageStorage = true;
private boolean advancedStatisticsEnabled = false;

/*
* percentage of in-flight messages above which optimize message store is disabled
*/
Expand Down Expand Up @@ -867,6 +868,15 @@ public void setOptimizeMessageStoreInFlightLimit(int optimizeMessageStoreInFligh
this.optimizeMessageStoreInFlightLimit = optimizeMessageStoreInFlightLimit;
}

@Override
public boolean isAdvancedStatisticsEnabled() {
return this.advancedStatisticsEnabled;
}

@Override
public void setAdvancedStatisticsEnabled(boolean advancedStatisticsEnabled) {
this.advancedStatisticsEnabled = advancedStatisticsEnabled;
}

@Override
public abstract List<Subscription> getConsumers();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,4 +258,10 @@ public interface Destination extends Service, Task, Message.MessageDestination {
boolean isSendDuplicateFromStoreToDLQ();

void setSendDuplicateFromStoreToDLQ(boolean sendDuplicateFromStoreToDLQ);

// [AMQ-9437]
boolean isAdvancedStatisticsEnabled();

void setAdvancedStatisticsEnabled(boolean advancedStatisticsEnabled);

}
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,16 @@ public void setSendDuplicateFromStoreToDLQ(boolean sendDuplicateFromStoreToDLQ)
next.setSendDuplicateFromStoreToDLQ(sendDuplicateFromStoreToDLQ);
}

@Override
public boolean isAdvancedStatisticsEnabled() {
return next.isAdvancedStatisticsEnabled();
}

@Override
public void setAdvancedStatisticsEnabled(boolean advancedStatisticsEnabled) {
next.setAdvancedStatisticsEnabled(advancedStatisticsEnabled);
}

public void deleteSubscription(ConnectionContext context, SubscriptionKey key) throws Exception {
if (next instanceof DestinationFilter) {
DestinationFilter filter = (DestinationFilter) next;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ public class DestinationStatistics extends StatsImpl {
protected SizeStatisticImpl messageSize;
protected CountStatisticImpl maxUncommittedExceededCount;

// [AMQ-9437] Advanced Statistics are optionally enabled
protected CountStatisticImpl networkEnqueues;
protected CountStatisticImpl networkDequeues;

public DestinationStatistics() {

enqueues = new CountStatisticImpl("enqueues", "The number of messages that have been sent to the destination");
Expand All @@ -68,6 +72,10 @@ public DestinationStatistics() {
blockedTime = new TimeStatisticImpl("blockedTime","amount of time messages are blocked for flow control");
messageSize = new SizeStatisticImpl("messageSize","Size of messages passing through the destination");
maxUncommittedExceededCount = new CountStatisticImpl("maxUncommittedExceededCount", "number of times maxUncommittedCount has been exceeded");

networkEnqueues = new CountStatisticImpl("networkEnqueues", "The number of messages that have been sent to the destination via network connection");
networkDequeues = new CountStatisticImpl("networkDequeues", "The number of messages that have been acknowledged from the destination via network connection");

addStatistic("enqueues", enqueues);
addStatistic("dispatched", dispatched);
addStatistic("dequeues", dequeues);
Expand All @@ -83,6 +91,9 @@ public DestinationStatistics() {
addStatistic("blockedTime",blockedTime);
addStatistic("messageSize",messageSize);
addStatistic("maxUncommittedExceededCount", maxUncommittedExceededCount);

addStatistic("networkEnqueues", networkEnqueues);
addStatistic("networkDequeues", networkDequeues);
}

public CountStatisticImpl getEnqueues() {
Expand Down Expand Up @@ -151,6 +162,14 @@ public CountStatisticImpl getMaxUncommittedExceededCount(){
return this.maxUncommittedExceededCount;
}

public CountStatisticImpl getNetworkEnqueues() {
return networkEnqueues;
}

public CountStatisticImpl getNetworkDequeues() {
return networkDequeues;
}

public void reset() {
if (this.isDoReset()) {
super.reset();
Expand All @@ -165,6 +184,8 @@ public void reset() {
blockedTime.reset();
messageSize.reset();
maxUncommittedExceededCount.reset();
networkEnqueues.reset();
networkDequeues.reset();
}
}

Expand All @@ -187,6 +208,9 @@ public void setEnabled(boolean enabled) {
messageSize.setEnabled(enabled);
maxUncommittedExceededCount.setEnabled(enabled);

// [AMQ-9437] Advanced Statistics
networkEnqueues.setEnabled(enabled);
networkDequeues.setEnabled(enabled);
}

public void setParent(DestinationStatistics parent) {
Expand All @@ -207,6 +231,8 @@ public void setParent(DestinationStatistics parent) {
blockedTime.setParent(parent.blockedTime);
messageSize.setParent(parent.messageSize);
maxUncommittedExceededCount.setParent(parent.maxUncommittedExceededCount);
networkEnqueues.setParent(parent.networkEnqueues);
networkDequeues.setParent(parent.networkDequeues);
} else {
enqueues.setParent(null);
dispatched.setParent(null);
Expand All @@ -224,6 +250,8 @@ public void setParent(DestinationStatistics parent) {
blockedTime.setParent(null);
messageSize.setParent(null);
maxUncommittedExceededCount.setParent(null);
networkEnqueues.setParent(null);
networkDequeues.setParent(null);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,9 @@ public void afterRollback() throws Exception {
((Destination)node.getRegionDestination()).getDestinationStatistics().getDequeues().increment();
if (info.isNetworkSubscription()) {
((Destination)node.getRegionDestination()).getDestinationStatistics().getForwards().add(ack.getMessageCount());
if(((Destination)node.getRegionDestination()).isAdvancedStatisticsEnabled()) {
((Destination)node.getRegionDestination()).getDestinationStatistics().getNetworkDequeues().add(ack.getMessageCount());
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1862,7 +1862,7 @@ protected void removeMessage(ConnectionContext context, Subscription sub, final
// This sends the ack the the journal..
if (!ack.isInTransaction()) {
acknowledge(context, sub, ack, reference);
dropMessage(reference);
dropMessage(context, reference);
} else {
try {
acknowledge(context, sub, ack, reference);
Expand All @@ -1871,7 +1871,7 @@ protected void removeMessage(ConnectionContext context, Subscription sub, final

@Override
public void afterCommit() throws Exception {
dropMessage(reference);
dropMessage(context, reference);
wakeup();
}

Expand Down Expand Up @@ -1899,11 +1899,18 @@ public void afterRollback() throws Exception {
reference.setAcked(true);
}

private void dropMessage(QueueMessageReference reference) {
private void dropMessage(ConnectionContext context, QueueMessageReference reference) {
//use dropIfLive so we only process the statistics at most one time
if (reference.dropIfLive()) {
getDestinationStatistics().getDequeues().increment();
getDestinationStatistics().getMessages().decrement();

if(isAdvancedStatisticsEnabled()) {
if(context.getConnection().isNetworkConnection()) {
getDestinationStatistics().getNetworkDequeues().increment();
}
}

pagedInMessagesLock.writeLock().lock();
try {
pagedInMessages.remove(reference);
Expand Down Expand Up @@ -1958,6 +1965,13 @@ final void messageSent(final ConnectionContext context, final Message msg) throw
destinationStatistics.getEnqueues().increment();
destinationStatistics.getMessages().increment();
destinationStatistics.getMessageSize().addSize(msg.getSize());

if(isAdvancedStatisticsEnabled()) {
if(context.getConnection().isNetworkConnection()) {
destinationStatistics.getNetworkEnqueues().increment();
}
}

messageDelivered(context, msg);
consumersLock.readLock().lock();
try {
Expand Down Expand Up @@ -2104,7 +2118,7 @@ private PendingList doPageInForDispatch(boolean force, boolean processExpired, i
LOG.warn("{}, duplicate message {} - {} from cursor, is cursor audit disabled or too constrained? Redirecting to dlq", this, ref.getMessageId(), ref.getMessage().getMessageId().getFutureOrSequenceLong());
if (store != null) {
ConnectionContext connectionContext = createConnectionContext();
dropMessage(ref);
dropMessage(connectionContext, ref);
if (gotToTheStore(ref.getMessage())) {
LOG.debug("Duplicate message {} from cursor, removing from store", ref.getMessage());
store.removeMessage(connectionContext, new MessageAck(ref.getMessage(), MessageAck.POISON_ACK_TYPE, 1));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,9 @@ private void incrementStatsOnAck(final Destination destination, final MessageAck
destination.getDestinationStatistics().getInflight().subtract(count);
if (info.isNetworkSubscription()) {
destination.getDestinationStatistics().getForwards().add(count);
if(destination.isAdvancedStatisticsEnabled()) {
destination.getDestinationStatistics().getNetworkDequeues().add(count);
}
}
if (ack.isExpiredAck()) {
destination.getDestinationStatistics().getExpired().add(count);
Expand Down Expand Up @@ -746,6 +749,9 @@ private void discard(MessageReference message, boolean expired) {
matched.remove(message);
if (destination != null) {
destination.getDestinationStatistics().getDequeues().increment();
if(destination.isAdvancedStatisticsEnabled()) {
destination.getDestinationStatistics().getNetworkDequeues().increment();
}
}
Destination dest = (Destination) message.getRegionDestination();
if (dest != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public class PolicyEntry extends DestinationMapEntry {
private boolean doOptimzeMessageStorage = true;
private int maxDestinations = -1;
private boolean useTopicSubscriptionInflightStats = true;

private boolean advancedStatisticsEnabled = false; // [AMQ-9437]
/*
* percentage of in-flight messages above which optimize message store is disabled
*/
Expand All @@ -115,7 +115,6 @@ public class PolicyEntry extends DestinationMapEntry {
private int sendFailIfNoSpace = -1;
private long sendFailIfNoSpaceAfterTimeout = -1;


public void configure(Broker broker,Queue queue) {
baseConfiguration(broker,queue);
if (dispatchPolicy != null) {
Expand Down Expand Up @@ -303,6 +302,9 @@ public void baseUpdate(BaseDestination destination, Set<String> includedProperti
if (isUpdate("sendDuplicateFromStoreToDLQ", includedProperties)) {
destination.setSendDuplicateFromStoreToDLQ(isSendDuplicateFromStoreToDLQ());
}
if (isUpdate("advancedStatisticsEnabled", includedProperties)) {
destination.setAdvancedStatisticsEnabled(isAdvancedStatisticsEnabled());
}
}

public void baseConfiguration(Broker broker, BaseDestination destination) {
Expand Down Expand Up @@ -1165,4 +1167,12 @@ public boolean isUseTopicSubscriptionInflightStats() {
public void setUseTopicSubscriptionInflightStats(boolean useTopicSubscriptionInflightStats) {
this.useTopicSubscriptionInflightStats = useTopicSubscriptionInflightStats;
}

public boolean isAdvancedStatisticsEnabled() {
return this.advancedStatisticsEnabled;
}

public void setAdvancedStatisticsEnabled(boolean advancedStatisticsEnabled) {
this.advancedStatisticsEnabled = advancedStatisticsEnabled;
}
}