Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARTEMIS-5142 support never expiring incoming messages #5390

Merged
merged 1 commit into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,11 @@ public final class AddressSettingsInfo {
}
private long maxExpiryDelay;

static {
META_BEAN.add(Boolean.class, "noExpiry", (o, p) -> o.noExpiry = p, o -> o.noExpiry);
}
private boolean noExpiry;

static {
META_BEAN.add(Boolean.class, "enableMetrics", (o, p) -> o.enableMetrics = p, o -> o.enableMetrics);
}
Expand Down Expand Up @@ -556,6 +561,10 @@ public long getMaxExpiryDelay() {
return maxExpiryDelay;
}

public boolean isNoExpiry() {
return noExpiry;
}

public boolean isEnableMetrics() {
return enableMetrics;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {

private static final String MAX_EXPIRY_DELAY_NODE_NAME = "max-expiry-delay";

private static final String NO_EXPIRY_NODE_NAME = "no-expiry";

private static final String REDELIVERY_DELAY_NODE_NAME = "redelivery-delay";

private static final String REDELIVERY_DELAY_MULTIPLIER_NODE_NAME = "redelivery-delay-multiplier";
Expand Down Expand Up @@ -1331,6 +1333,8 @@ protected Pair<String, AddressSettings> parseAddressSettings(final Node node) {
addressSettings.setMinExpiryDelay(XMLUtil.parseLong(child));
} else if (MAX_EXPIRY_DELAY_NODE_NAME.equalsIgnoreCase(name)) {
addressSettings.setMaxExpiryDelay(XMLUtil.parseLong(child));
} else if (NO_EXPIRY_NODE_NAME.equalsIgnoreCase(name)) {
addressSettings.setNoExpiry(XMLUtil.parseBoolean(child));
} else if (REDELIVERY_DELAY_NODE_NAME.equalsIgnoreCase(name)) {
addressSettings.setRedeliveryDelay(XMLUtil.parseLong(child));
} else if (REDELIVERY_DELAY_MULTIPLIER_NODE_NAME.equalsIgnoreCase(name)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1355,28 +1355,44 @@ private RoutingStatus maybeSendToDLA(final Message message,
return status;
}

// HORNETQ-1029
private static void applyExpiryDelay(Message message, AddressSettings settings) {
protected static void applyExpiryDelay(Message message, AddressSettings settings) {
long expirationOverride = settings.getExpiryDelay();

// A -1 <expiry-delay> means don't do anything
if (expirationOverride >= 0) {
// only override the expiration on messages where the expiration hasn't been set by the user
if (settings.isNoExpiry()) {
if (message.getExpiration() != 0) {
message.setExpiration(0);
message.reencode();
}
} else if (expirationOverride >= 0) {
// A -1 <expiry-delay> means don't do anything
if (message.getExpiration() == 0) {
message.setExpiration(System.currentTimeMillis() + expirationOverride);
// only override the expiration on messages where the expiration hasn't been set by the user
setExpiration(message, expirationOverride);
}
} else {
long minExpiration = settings.getMinExpiryDelay();
long maxExpiration = settings.getMaxExpiryDelay();

if (maxExpiration != AddressSettings.DEFAULT_MAX_EXPIRY_DELAY && (message.getExpiration() == 0 || message.getExpiration() > (System.currentTimeMillis() + maxExpiration))) {
message.setExpiration(System.currentTimeMillis() + maxExpiration);
if (message.getExpiration() == 0) {
// if the incoming message has NO expiration then apply the max if set and if not set then apply the min if set
if (maxExpiration != AddressSettings.DEFAULT_MAX_EXPIRY_DELAY) {
setExpiration(message, maxExpiration);
} else if (minExpiration != AddressSettings.DEFAULT_MIN_EXPIRY_DELAY) {
setExpiration(message, minExpiration);
}
} else if (maxExpiration != AddressSettings.DEFAULT_MAX_EXPIRY_DELAY && message.getExpiration() > (System.currentTimeMillis() + maxExpiration)) {
setExpiration(message, maxExpiration);
} else if (minExpiration != AddressSettings.DEFAULT_MIN_EXPIRY_DELAY && message.getExpiration() < (System.currentTimeMillis() + minExpiration)) {
message.setExpiration(System.currentTimeMillis() + minExpiration);
setExpiration(message, minExpiration);
}
}
}

private static void setExpiration(Message m, long expiration) {
m.setExpiration(System.currentTimeMillis() + expiration);
m.reencode();
}

@Override
public MessageReference reload(final Message message, final Queue queue, final Transaction tx) throws Exception {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable

public static final long DEFAULT_MAX_EXPIRY_DELAY = -1;

public static final boolean DEFAULT_NO_EXPIRY = false;

public static final boolean DEFAULT_SEND_TO_DLA_ON_NO_ROUTE = false;

public static final long DEFAULT_SLOW_CONSUMER_THRESHOLD = -1;
Expand Down Expand Up @@ -266,6 +268,11 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
}
private Long maxExpiryDelay = null;

static {
metaBean.add(Boolean.class, "noExpiry", (t, p) -> t.noExpiry = p, t -> t.noExpiry);
}
private Boolean noExpiry = null;

static {
metaBean.add(Boolean.class, "defaultLastValueQueue", (t, p) -> t.defaultLastValueQueue = p, t -> t.defaultLastValueQueue);
}
Expand Down Expand Up @@ -1050,6 +1057,15 @@ public AddressSettings setMaxExpiryDelay(final Long maxExpiryDelay) {
return this;
}

public Boolean isNoExpiry() {
return noExpiry != null ? noExpiry : AddressSettings.DEFAULT_NO_EXPIRY;
}

public AddressSettings setNoExpiry(final Boolean noExpiry) {
this.noExpiry = noExpiry;
return this;
}

public boolean isSendToDLAOnNoRoute() {
return sendToDLAOnNoRoute != null ? sendToDLAOnNoRoute : AddressSettings.DEFAULT_SEND_TO_DLA_ON_NO_ROUTE;
}
Expand Down Expand Up @@ -1694,6 +1710,8 @@ public boolean equals(Object o) {
return false;
if (!Objects.equals(maxExpiryDelay, that.maxExpiryDelay))
return false;
if (!Objects.equals(noExpiry, that.noExpiry))
return false;
if (!Objects.equals(defaultLastValueQueue, that.defaultLastValueQueue))
return false;
if (!Objects.equals(defaultLastValueKey, that.defaultLastValueKey))
Expand Down Expand Up @@ -1830,6 +1848,7 @@ public int hashCode() {
result = 31 * result + (expiryDelay != null ? expiryDelay.hashCode() : 0);
result = 31 * result + (minExpiryDelay != null ? minExpiryDelay.hashCode() : 0);
result = 31 * result + (maxExpiryDelay != null ? maxExpiryDelay.hashCode() : 0);
result = 31 * result + (noExpiry != null ? noExpiry.hashCode() : 0);
result = 31 * result + (defaultLastValueQueue != null ? defaultLastValueQueue.hashCode() : 0);
result = 31 * result + (defaultLastValueKey != null ? defaultLastValueKey.hashCode() : 0);
result = 31 * result + (defaultNonDestructive != null ? defaultNonDestructive.hashCode() : 0);
Expand Down Expand Up @@ -1889,7 +1908,7 @@ public int hashCode() {

@Override
public String toString() {
return "AddressSettings{" + "addressFullMessagePolicy=" + addressFullMessagePolicy + ", maxSizeBytes=" + maxSizeBytes + ", maxReadPageBytes=" + maxReadPageBytes + ", maxReadPageMessages=" + maxReadPageMessages + ", prefetchPageBytes=" + prefetchPageBytes + ", prefetchPageMessages=" + prefetchPageMessages + ", pageLimitBytes=" + pageLimitBytes + ", pageLimitMessages=" + pageLimitMessages + ", pageFullMessagePolicy=" + pageFullMessagePolicy + ", maxSizeMessages=" + maxSizeMessages + ", pageSizeBytes=" + pageSizeBytes + ", pageMaxCache=" + pageCacheMaxSize + ", dropMessagesWhenFull=" + dropMessagesWhenFull + ", maxDeliveryAttempts=" + maxDeliveryAttempts + ", messageCounterHistoryDayLimit=" + messageCounterHistoryDayLimit + ", redeliveryDelay=" + redeliveryDelay + ", redeliveryMultiplier=" + redeliveryMultiplier + ", redeliveryCollisionAvoidanceFactor=" + redeliveryCollisionAvoidanceFactor + ", maxRedeliveryDelay=" + maxRedeliveryDelay + ", deadLetterAddress=" + deadLetterAddress + ", expiryAddress=" + expiryAddress + ", expiryDelay=" + expiryDelay + ", minExpiryDelay=" + minExpiryDelay + ", maxExpiryDelay=" + maxExpiryDelay + ", defaultLastValueQueue=" + defaultLastValueQueue + ", defaultLastValueKey=" + defaultLastValueKey + ", defaultNonDestructive=" + defaultNonDestructive + ", defaultExclusiveQueue=" + defaultExclusiveQueue + ", defaultGroupRebalance=" + defaultGroupRebalance + ", defaultGroupRebalancePauseDispatch=" + defaultGroupRebalancePauseDispatch + ", defaultGroupBuckets=" + defaultGroupBuckets + ", defaultGroupFirstKey=" + defaultGroupFirstKey + ", redistributionDelay=" + redistributionDelay + ", sendToDLAOnNoRoute=" + sendToDLAOnNoRoute + ", slowConsumerThreshold=" + slowConsumerThreshold + ", slowConsumerThresholdMeasurementUnit=" + slowConsumerThresholdMeasurementUnit + ", slowConsumerCheckPeriod=" + slowConsumerCheckPeriod + ", slowConsumerPolicy=" + slowConsumerPolicy + ", autoCreateJmsQueues=" + autoCreateJmsQueues + ", autoDeleteJmsQueues=" + autoDeleteJmsQueues + ", autoCreateJmsTopics=" + autoCreateJmsTopics + ", autoDeleteJmsTopics=" + autoDeleteJmsTopics + ", autoCreateQueues=" + autoCreateQueues + ", autoDeleteQueues=" + autoDeleteQueues + ", autoDeleteCreatedQueues=" + autoDeleteCreatedQueues + ", autoDeleteQueuesDelay=" + autoDeleteQueuesDelay + ", autoDeleteQueuesSkipUsageCheck=" + autoDeleteQueuesSkipUsageCheck + ", autoDeleteQueuesMessageCount=" + autoDeleteQueuesMessageCount + ", defaultRingSize=" + defaultRingSize + ", retroactiveMessageCount=" + retroactiveMessageCount + ", configDeleteQueues=" + configDeleteQueues + ", autoCreateAddresses=" + autoCreateAddresses + ", autoDeleteAddresses=" + autoDeleteAddresses + ", autoDeleteAddressesDelay=" + autoDeleteAddressesDelay + ", autoDeleteAddressesSkipUsageCheck=" + autoDeleteAddressesSkipUsageCheck + ", configDeleteAddresses=" + configDeleteAddresses + ", configDeleteDiverts=" + configDeleteDiverts + ", managementBrowsePageSize=" + managementBrowsePageSize + ", maxSizeBytesRejectThreshold=" + maxSizeBytesRejectThreshold + ", defaultMaxConsumers=" + defaultMaxConsumers + ", defaultPurgeOnNoConsumers=" + defaultPurgeOnNoConsumers + ", defaultConsumersBeforeDispatch=" + defaultConsumersBeforeDispatch + ", defaultDelayBeforeDispatch=" + defaultDelayBeforeDispatch + ", defaultQueueRoutingType=" + defaultQueueRoutingType + ", defaultAddressRoutingType=" + defaultAddressRoutingType + ", defaultConsumerWindowSize=" + defaultConsumerWindowSize + ", autoCreateDeadLetterResources=" + autoCreateDeadLetterResources + ", deadLetterQueuePrefix=" + deadLetterQueuePrefix + ", deadLetterQueueSuffix=" + deadLetterQueueSuffix + ", autoCreateExpiryResources=" + autoCreateExpiryResources + ", expiryQueuePrefix=" + expiryQueuePrefix + ", expiryQueueSuffix=" + expiryQueueSuffix + ", enableMetrics=" + enableMetrics + ", managementMessageAttributeSizeLimit=" + managementMessageAttributeSizeLimit + ", enableIngressTimestamp=" + enableIngressTimestamp + ", idCacheSize=" + idCacheSize + ", queuePrefetch=" + queuePrefetch + ", initialQueueBufferSize=" + initialQueueBufferSize
return "AddressSettings{" + "addressFullMessagePolicy=" + addressFullMessagePolicy + ", maxSizeBytes=" + maxSizeBytes + ", maxReadPageBytes=" + maxReadPageBytes + ", maxReadPageMessages=" + maxReadPageMessages + ", prefetchPageBytes=" + prefetchPageBytes + ", prefetchPageMessages=" + prefetchPageMessages + ", pageLimitBytes=" + pageLimitBytes + ", pageLimitMessages=" + pageLimitMessages + ", pageFullMessagePolicy=" + pageFullMessagePolicy + ", maxSizeMessages=" + maxSizeMessages + ", pageSizeBytes=" + pageSizeBytes + ", pageMaxCache=" + pageCacheMaxSize + ", dropMessagesWhenFull=" + dropMessagesWhenFull + ", maxDeliveryAttempts=" + maxDeliveryAttempts + ", messageCounterHistoryDayLimit=" + messageCounterHistoryDayLimit + ", redeliveryDelay=" + redeliveryDelay + ", redeliveryMultiplier=" + redeliveryMultiplier + ", redeliveryCollisionAvoidanceFactor=" + redeliveryCollisionAvoidanceFactor + ", maxRedeliveryDelay=" + maxRedeliveryDelay + ", deadLetterAddress=" + deadLetterAddress + ", expiryAddress=" + expiryAddress + ", expiryDelay=" + expiryDelay + ", minExpiryDelay=" + minExpiryDelay + ", maxExpiryDelay=" + maxExpiryDelay + ", noExpiry=" + noExpiry + ", defaultLastValueQueue=" + defaultLastValueQueue + ", defaultLastValueKey=" + defaultLastValueKey + ", defaultNonDestructive=" + defaultNonDestructive + ", defaultExclusiveQueue=" + defaultExclusiveQueue + ", defaultGroupRebalance=" + defaultGroupRebalance + ", defaultGroupRebalancePauseDispatch=" + defaultGroupRebalancePauseDispatch + ", defaultGroupBuckets=" + defaultGroupBuckets + ", defaultGroupFirstKey=" + defaultGroupFirstKey + ", redistributionDelay=" + redistributionDelay + ", sendToDLAOnNoRoute=" + sendToDLAOnNoRoute + ", slowConsumerThreshold=" + slowConsumerThreshold + ", slowConsumerThresholdMeasurementUnit=" + slowConsumerThresholdMeasurementUnit + ", slowConsumerCheckPeriod=" + slowConsumerCheckPeriod + ", slowConsumerPolicy=" + slowConsumerPolicy + ", autoCreateJmsQueues=" + autoCreateJmsQueues + ", autoDeleteJmsQueues=" + autoDeleteJmsQueues + ", autoCreateJmsTopics=" + autoCreateJmsTopics + ", autoDeleteJmsTopics=" + autoDeleteJmsTopics + ", autoCreateQueues=" + autoCreateQueues + ", autoDeleteQueues=" + autoDeleteQueues + ", autoDeleteCreatedQueues=" + autoDeleteCreatedQueues + ", autoDeleteQueuesDelay=" + autoDeleteQueuesDelay + ", autoDeleteQueuesSkipUsageCheck=" + autoDeleteQueuesSkipUsageCheck + ", autoDeleteQueuesMessageCount=" + autoDeleteQueuesMessageCount + ", defaultRingSize=" + defaultRingSize + ", retroactiveMessageCount=" + retroactiveMessageCount + ", configDeleteQueues=" + configDeleteQueues + ", autoCreateAddresses=" + autoCreateAddresses + ", autoDeleteAddresses=" + autoDeleteAddresses + ", autoDeleteAddressesDelay=" + autoDeleteAddressesDelay + ", autoDeleteAddressesSkipUsageCheck=" + autoDeleteAddressesSkipUsageCheck + ", configDeleteAddresses=" + configDeleteAddresses + ", configDeleteDiverts=" + configDeleteDiverts + ", managementBrowsePageSize=" + managementBrowsePageSize + ", maxSizeBytesRejectThreshold=" + maxSizeBytesRejectThreshold + ", defaultMaxConsumers=" + defaultMaxConsumers + ", defaultPurgeOnNoConsumers=" + defaultPurgeOnNoConsumers + ", defaultConsumersBeforeDispatch=" + defaultConsumersBeforeDispatch + ", defaultDelayBeforeDispatch=" + defaultDelayBeforeDispatch + ", defaultQueueRoutingType=" + defaultQueueRoutingType + ", defaultAddressRoutingType=" + defaultAddressRoutingType + ", defaultConsumerWindowSize=" + defaultConsumerWindowSize + ", autoCreateDeadLetterResources=" + autoCreateDeadLetterResources + ", deadLetterQueuePrefix=" + deadLetterQueuePrefix + ", deadLetterQueueSuffix=" + deadLetterQueueSuffix + ", autoCreateExpiryResources=" + autoCreateExpiryResources + ", expiryQueuePrefix=" + expiryQueuePrefix + ", expiryQueueSuffix=" + expiryQueueSuffix + ", enableMetrics=" + enableMetrics + ", managementMessageAttributeSizeLimit=" + managementMessageAttributeSizeLimit + ", enableIngressTimestamp=" + enableIngressTimestamp + ", idCacheSize=" + idCacheSize + ", queuePrefetch=" + queuePrefetch + ", initialQueueBufferSize=" + initialQueueBufferSize
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3871,6 +3871,14 @@
</xsd:annotation>
</xsd:element>

<xsd:element name="no-expiry" type="xsd:boolean" default="false" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
Overrides the expiration time for all messages so that they never expire.
</xsd:documentation>
</xsd:annotation>
</xsd:element>

<xsd:element name="expiry-delay" type="xsd:long" default="-1" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,7 @@ public void testFileConfiguration() {
assertEquals(1L, (long) conf.getAddressSettings().get("a1").getExpiryDelay());
assertEquals(2L, (long) conf.getAddressSettings().get("a1").getMinExpiryDelay());
assertEquals(3L, (long) conf.getAddressSettings().get("a1").getMaxExpiryDelay());
assertTrue(conf.getAddressSettings().get("a1").isNoExpiry());
assertEquals(AddressSettings.DEFAULT_AUTO_CREATE_EXPIRY_RESOURCES, conf.getAddressSettings().get("a1").isAutoCreateExpiryResources());
assertEquals(AddressSettings.DEFAULT_EXPIRY_QUEUE_PREFIX, conf.getAddressSettings().get("a1").getExpiryQueuePrefix());
assertEquals(AddressSettings.DEFAULT_EXPIRY_QUEUE_SUFFIX, conf.getAddressSettings().get("a1").getExpiryQueueSuffix());
Expand Down Expand Up @@ -547,6 +548,7 @@ public void testFileConfiguration() {
assertEquals(-1L, (long) conf.getAddressSettings().get("a2").getExpiryDelay());
assertEquals(-1L, (long) conf.getAddressSettings().get("a2").getMinExpiryDelay());
assertEquals(-1L, (long) conf.getAddressSettings().get("a2").getMaxExpiryDelay());
assertFalse(conf.getAddressSettings().get("a2").isNoExpiry());
assertTrue(conf.getAddressSettings().get("a2").isAutoCreateDeadLetterResources());
assertEquals("", conf.getAddressSettings().get("a2").getExpiryQueuePrefix().toString());
assertEquals(".EXP", conf.getAddressSettings().get("a2").getExpiryQueueSuffix().toString());
Expand Down
Loading
Loading