diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/AddressSettingsInfo.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/AddressSettingsInfo.java index 718fdb8551f..ac2b164d9ca 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/AddressSettingsInfo.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/AddressSettingsInfo.java @@ -304,6 +304,11 @@ public final class AddressSettingsInfo { } private long maxExpiryDelay; + static { + META_BEAN.add(Boolean.class, "noExpiry", (o, p) -> o.noExpiry = p, o -> o.noExpiry); + } + private boolean noExpiry; + static { META_BEAN.add(Boolean.class, "enableMetrics", (o, p) -> o.enableMetrics = p, o -> o.enableMetrics); } @@ -556,6 +561,10 @@ public long getMaxExpiryDelay() { return maxExpiryDelay; } + public boolean isNoExpiry() { + return noExpiry; + } + public boolean isEnableMetrics() { return enableMetrics; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java index 6819b307ade..dc7306d3b14 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java @@ -233,6 +233,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil { private static final String MAX_EXPIRY_DELAY_NODE_NAME = "max-expiry-delay"; + private static final String NO_EXPIRY_NODE_NAME = "no-expiry"; + private static final String REDELIVERY_DELAY_NODE_NAME = "redelivery-delay"; private static final String REDELIVERY_DELAY_MULTIPLIER_NODE_NAME = "redelivery-delay-multiplier"; @@ -1331,6 +1333,8 @@ protected Pair parseAddressSettings(final Node node) { addressSettings.setMinExpiryDelay(XMLUtil.parseLong(child)); } else if (MAX_EXPIRY_DELAY_NODE_NAME.equalsIgnoreCase(name)) { addressSettings.setMaxExpiryDelay(XMLUtil.parseLong(child)); + } else if (NO_EXPIRY_NODE_NAME.equalsIgnoreCase(name)) { + addressSettings.setNoExpiry(XMLUtil.parseBoolean(child)); } else if (REDELIVERY_DELAY_NODE_NAME.equalsIgnoreCase(name)) { addressSettings.setRedeliveryDelay(XMLUtil.parseLong(child)); } else if (REDELIVERY_DELAY_MULTIPLIER_NODE_NAME.equalsIgnoreCase(name)) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java index 8d38b131d49..ad66e7f1941 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java @@ -1355,28 +1355,44 @@ private RoutingStatus maybeSendToDLA(final Message message, return status; } - // HORNETQ-1029 - private static void applyExpiryDelay(Message message, AddressSettings settings) { + protected static void applyExpiryDelay(Message message, AddressSettings settings) { long expirationOverride = settings.getExpiryDelay(); - // A -1 means don't do anything - if (expirationOverride >= 0) { - // only override the expiration on messages where the expiration hasn't been set by the user + if (settings.isNoExpiry()) { + if (message.getExpiration() != 0) { + message.setExpiration(0); + message.reencode(); + } + } else if (expirationOverride >= 0) { + // A -1 means don't do anything if (message.getExpiration() == 0) { - message.setExpiration(System.currentTimeMillis() + expirationOverride); + // only override the expiration on messages where the expiration hasn't been set by the user + setExpiration(message, expirationOverride); } } else { long minExpiration = settings.getMinExpiryDelay(); long maxExpiration = settings.getMaxExpiryDelay(); - if (maxExpiration != AddressSettings.DEFAULT_MAX_EXPIRY_DELAY && (message.getExpiration() == 0 || message.getExpiration() > (System.currentTimeMillis() + maxExpiration))) { - message.setExpiration(System.currentTimeMillis() + maxExpiration); + if (message.getExpiration() == 0) { + // if the incoming message has NO expiration then apply the max if set and if not set then apply the min if set + if (maxExpiration != AddressSettings.DEFAULT_MAX_EXPIRY_DELAY) { + setExpiration(message, maxExpiration); + } else if (minExpiration != AddressSettings.DEFAULT_MIN_EXPIRY_DELAY) { + setExpiration(message, minExpiration); + } + } else if (maxExpiration != AddressSettings.DEFAULT_MAX_EXPIRY_DELAY && message.getExpiration() > (System.currentTimeMillis() + maxExpiration)) { + setExpiration(message, maxExpiration); } else if (minExpiration != AddressSettings.DEFAULT_MIN_EXPIRY_DELAY && message.getExpiration() < (System.currentTimeMillis() + minExpiration)) { - message.setExpiration(System.currentTimeMillis() + minExpiration); + setExpiration(message, minExpiration); } } } + private static void setExpiration(Message m, long expiration) { + m.setExpiration(System.currentTimeMillis() + expiration); + m.reencode(); + } + @Override public MessageReference reload(final Message message, final Queue queue, final Transaction tx) throws Exception { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java index 6056a239c86..3ca29240337 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java @@ -117,6 +117,8 @@ public class AddressSettings implements Mergeable, Serializable public static final long DEFAULT_MAX_EXPIRY_DELAY = -1; + public static final boolean DEFAULT_NO_EXPIRY = false; + public static final boolean DEFAULT_SEND_TO_DLA_ON_NO_ROUTE = false; public static final long DEFAULT_SLOW_CONSUMER_THRESHOLD = -1; @@ -266,6 +268,11 @@ public class AddressSettings implements Mergeable, Serializable } private Long maxExpiryDelay = null; + static { + metaBean.add(Boolean.class, "noExpiry", (t, p) -> t.noExpiry = p, t -> t.noExpiry); + } + private Boolean noExpiry = null; + static { metaBean.add(Boolean.class, "defaultLastValueQueue", (t, p) -> t.defaultLastValueQueue = p, t -> t.defaultLastValueQueue); } @@ -1050,6 +1057,15 @@ public AddressSettings setMaxExpiryDelay(final Long maxExpiryDelay) { return this; } + public Boolean isNoExpiry() { + return noExpiry != null ? noExpiry : AddressSettings.DEFAULT_NO_EXPIRY; + } + + public AddressSettings setNoExpiry(final Boolean noExpiry) { + this.noExpiry = noExpiry; + return this; + } + public boolean isSendToDLAOnNoRoute() { return sendToDLAOnNoRoute != null ? sendToDLAOnNoRoute : AddressSettings.DEFAULT_SEND_TO_DLA_ON_NO_ROUTE; } @@ -1694,6 +1710,8 @@ public boolean equals(Object o) { return false; if (!Objects.equals(maxExpiryDelay, that.maxExpiryDelay)) return false; + if (!Objects.equals(noExpiry, that.noExpiry)) + return false; if (!Objects.equals(defaultLastValueQueue, that.defaultLastValueQueue)) return false; if (!Objects.equals(defaultLastValueKey, that.defaultLastValueKey)) @@ -1830,6 +1848,7 @@ public int hashCode() { result = 31 * result + (expiryDelay != null ? expiryDelay.hashCode() : 0); result = 31 * result + (minExpiryDelay != null ? minExpiryDelay.hashCode() : 0); result = 31 * result + (maxExpiryDelay != null ? maxExpiryDelay.hashCode() : 0); + result = 31 * result + (noExpiry != null ? noExpiry.hashCode() : 0); result = 31 * result + (defaultLastValueQueue != null ? defaultLastValueQueue.hashCode() : 0); result = 31 * result + (defaultLastValueKey != null ? defaultLastValueKey.hashCode() : 0); result = 31 * result + (defaultNonDestructive != null ? defaultNonDestructive.hashCode() : 0); @@ -1889,7 +1908,7 @@ public int hashCode() { @Override public String toString() { - return "AddressSettings{" + "addressFullMessagePolicy=" + addressFullMessagePolicy + ", maxSizeBytes=" + maxSizeBytes + ", maxReadPageBytes=" + maxReadPageBytes + ", maxReadPageMessages=" + maxReadPageMessages + ", prefetchPageBytes=" + prefetchPageBytes + ", prefetchPageMessages=" + prefetchPageMessages + ", pageLimitBytes=" + pageLimitBytes + ", pageLimitMessages=" + pageLimitMessages + ", pageFullMessagePolicy=" + pageFullMessagePolicy + ", maxSizeMessages=" + maxSizeMessages + ", pageSizeBytes=" + pageSizeBytes + ", pageMaxCache=" + pageCacheMaxSize + ", dropMessagesWhenFull=" + dropMessagesWhenFull + ", maxDeliveryAttempts=" + maxDeliveryAttempts + ", messageCounterHistoryDayLimit=" + messageCounterHistoryDayLimit + ", redeliveryDelay=" + redeliveryDelay + ", redeliveryMultiplier=" + redeliveryMultiplier + ", redeliveryCollisionAvoidanceFactor=" + redeliveryCollisionAvoidanceFactor + ", maxRedeliveryDelay=" + maxRedeliveryDelay + ", deadLetterAddress=" + deadLetterAddress + ", expiryAddress=" + expiryAddress + ", expiryDelay=" + expiryDelay + ", minExpiryDelay=" + minExpiryDelay + ", maxExpiryDelay=" + maxExpiryDelay + ", defaultLastValueQueue=" + defaultLastValueQueue + ", defaultLastValueKey=" + defaultLastValueKey + ", defaultNonDestructive=" + defaultNonDestructive + ", defaultExclusiveQueue=" + defaultExclusiveQueue + ", defaultGroupRebalance=" + defaultGroupRebalance + ", defaultGroupRebalancePauseDispatch=" + defaultGroupRebalancePauseDispatch + ", defaultGroupBuckets=" + defaultGroupBuckets + ", defaultGroupFirstKey=" + defaultGroupFirstKey + ", redistributionDelay=" + redistributionDelay + ", sendToDLAOnNoRoute=" + sendToDLAOnNoRoute + ", slowConsumerThreshold=" + slowConsumerThreshold + ", slowConsumerThresholdMeasurementUnit=" + slowConsumerThresholdMeasurementUnit + ", slowConsumerCheckPeriod=" + slowConsumerCheckPeriod + ", slowConsumerPolicy=" + slowConsumerPolicy + ", autoCreateJmsQueues=" + autoCreateJmsQueues + ", autoDeleteJmsQueues=" + autoDeleteJmsQueues + ", autoCreateJmsTopics=" + autoCreateJmsTopics + ", autoDeleteJmsTopics=" + autoDeleteJmsTopics + ", autoCreateQueues=" + autoCreateQueues + ", autoDeleteQueues=" + autoDeleteQueues + ", autoDeleteCreatedQueues=" + autoDeleteCreatedQueues + ", autoDeleteQueuesDelay=" + autoDeleteQueuesDelay + ", autoDeleteQueuesSkipUsageCheck=" + autoDeleteQueuesSkipUsageCheck + ", autoDeleteQueuesMessageCount=" + autoDeleteQueuesMessageCount + ", defaultRingSize=" + defaultRingSize + ", retroactiveMessageCount=" + retroactiveMessageCount + ", configDeleteQueues=" + configDeleteQueues + ", autoCreateAddresses=" + autoCreateAddresses + ", autoDeleteAddresses=" + autoDeleteAddresses + ", autoDeleteAddressesDelay=" + autoDeleteAddressesDelay + ", autoDeleteAddressesSkipUsageCheck=" + autoDeleteAddressesSkipUsageCheck + ", configDeleteAddresses=" + configDeleteAddresses + ", configDeleteDiverts=" + configDeleteDiverts + ", managementBrowsePageSize=" + managementBrowsePageSize + ", maxSizeBytesRejectThreshold=" + maxSizeBytesRejectThreshold + ", defaultMaxConsumers=" + defaultMaxConsumers + ", defaultPurgeOnNoConsumers=" + defaultPurgeOnNoConsumers + ", defaultConsumersBeforeDispatch=" + defaultConsumersBeforeDispatch + ", defaultDelayBeforeDispatch=" + defaultDelayBeforeDispatch + ", defaultQueueRoutingType=" + defaultQueueRoutingType + ", defaultAddressRoutingType=" + defaultAddressRoutingType + ", defaultConsumerWindowSize=" + defaultConsumerWindowSize + ", autoCreateDeadLetterResources=" + autoCreateDeadLetterResources + ", deadLetterQueuePrefix=" + deadLetterQueuePrefix + ", deadLetterQueueSuffix=" + deadLetterQueueSuffix + ", autoCreateExpiryResources=" + autoCreateExpiryResources + ", expiryQueuePrefix=" + expiryQueuePrefix + ", expiryQueueSuffix=" + expiryQueueSuffix + ", enableMetrics=" + enableMetrics + ", managementMessageAttributeSizeLimit=" + managementMessageAttributeSizeLimit + ", enableIngressTimestamp=" + enableIngressTimestamp + ", idCacheSize=" + idCacheSize + ", queuePrefetch=" + queuePrefetch + ", initialQueueBufferSize=" + initialQueueBufferSize + return "AddressSettings{" + "addressFullMessagePolicy=" + addressFullMessagePolicy + ", maxSizeBytes=" + maxSizeBytes + ", maxReadPageBytes=" + maxReadPageBytes + ", maxReadPageMessages=" + maxReadPageMessages + ", prefetchPageBytes=" + prefetchPageBytes + ", prefetchPageMessages=" + prefetchPageMessages + ", pageLimitBytes=" + pageLimitBytes + ", pageLimitMessages=" + pageLimitMessages + ", pageFullMessagePolicy=" + pageFullMessagePolicy + ", maxSizeMessages=" + maxSizeMessages + ", pageSizeBytes=" + pageSizeBytes + ", pageMaxCache=" + pageCacheMaxSize + ", dropMessagesWhenFull=" + dropMessagesWhenFull + ", maxDeliveryAttempts=" + maxDeliveryAttempts + ", messageCounterHistoryDayLimit=" + messageCounterHistoryDayLimit + ", redeliveryDelay=" + redeliveryDelay + ", redeliveryMultiplier=" + redeliveryMultiplier + ", redeliveryCollisionAvoidanceFactor=" + redeliveryCollisionAvoidanceFactor + ", maxRedeliveryDelay=" + maxRedeliveryDelay + ", deadLetterAddress=" + deadLetterAddress + ", expiryAddress=" + expiryAddress + ", expiryDelay=" + expiryDelay + ", minExpiryDelay=" + minExpiryDelay + ", maxExpiryDelay=" + maxExpiryDelay + ", noExpiry=" + noExpiry + ", defaultLastValueQueue=" + defaultLastValueQueue + ", defaultLastValueKey=" + defaultLastValueKey + ", defaultNonDestructive=" + defaultNonDestructive + ", defaultExclusiveQueue=" + defaultExclusiveQueue + ", defaultGroupRebalance=" + defaultGroupRebalance + ", defaultGroupRebalancePauseDispatch=" + defaultGroupRebalancePauseDispatch + ", defaultGroupBuckets=" + defaultGroupBuckets + ", defaultGroupFirstKey=" + defaultGroupFirstKey + ", redistributionDelay=" + redistributionDelay + ", sendToDLAOnNoRoute=" + sendToDLAOnNoRoute + ", slowConsumerThreshold=" + slowConsumerThreshold + ", slowConsumerThresholdMeasurementUnit=" + slowConsumerThresholdMeasurementUnit + ", slowConsumerCheckPeriod=" + slowConsumerCheckPeriod + ", slowConsumerPolicy=" + slowConsumerPolicy + ", autoCreateJmsQueues=" + autoCreateJmsQueues + ", autoDeleteJmsQueues=" + autoDeleteJmsQueues + ", autoCreateJmsTopics=" + autoCreateJmsTopics + ", autoDeleteJmsTopics=" + autoDeleteJmsTopics + ", autoCreateQueues=" + autoCreateQueues + ", autoDeleteQueues=" + autoDeleteQueues + ", autoDeleteCreatedQueues=" + autoDeleteCreatedQueues + ", autoDeleteQueuesDelay=" + autoDeleteQueuesDelay + ", autoDeleteQueuesSkipUsageCheck=" + autoDeleteQueuesSkipUsageCheck + ", autoDeleteQueuesMessageCount=" + autoDeleteQueuesMessageCount + ", defaultRingSize=" + defaultRingSize + ", retroactiveMessageCount=" + retroactiveMessageCount + ", configDeleteQueues=" + configDeleteQueues + ", autoCreateAddresses=" + autoCreateAddresses + ", autoDeleteAddresses=" + autoDeleteAddresses + ", autoDeleteAddressesDelay=" + autoDeleteAddressesDelay + ", autoDeleteAddressesSkipUsageCheck=" + autoDeleteAddressesSkipUsageCheck + ", configDeleteAddresses=" + configDeleteAddresses + ", configDeleteDiverts=" + configDeleteDiverts + ", managementBrowsePageSize=" + managementBrowsePageSize + ", maxSizeBytesRejectThreshold=" + maxSizeBytesRejectThreshold + ", defaultMaxConsumers=" + defaultMaxConsumers + ", defaultPurgeOnNoConsumers=" + defaultPurgeOnNoConsumers + ", defaultConsumersBeforeDispatch=" + defaultConsumersBeforeDispatch + ", defaultDelayBeforeDispatch=" + defaultDelayBeforeDispatch + ", defaultQueueRoutingType=" + defaultQueueRoutingType + ", defaultAddressRoutingType=" + defaultAddressRoutingType + ", defaultConsumerWindowSize=" + defaultConsumerWindowSize + ", autoCreateDeadLetterResources=" + autoCreateDeadLetterResources + ", deadLetterQueuePrefix=" + deadLetterQueuePrefix + ", deadLetterQueueSuffix=" + deadLetterQueueSuffix + ", autoCreateExpiryResources=" + autoCreateExpiryResources + ", expiryQueuePrefix=" + expiryQueuePrefix + ", expiryQueueSuffix=" + expiryQueueSuffix + ", enableMetrics=" + enableMetrics + ", managementMessageAttributeSizeLimit=" + managementMessageAttributeSizeLimit + ", enableIngressTimestamp=" + enableIngressTimestamp + ", idCacheSize=" + idCacheSize + ", queuePrefetch=" + queuePrefetch + ", initialQueueBufferSize=" + initialQueueBufferSize + '}'; } } diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd index afd809ad1a9..db1e93eba04 100644 --- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd +++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd @@ -3871,6 +3871,14 @@ + + + + Overrides the expiration time for all messages so that they never expire. + + + + diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java index e32527ee31b..de29b9ab8fe 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java @@ -506,6 +506,7 @@ public void testFileConfiguration() { assertEquals(1L, (long) conf.getAddressSettings().get("a1").getExpiryDelay()); assertEquals(2L, (long) conf.getAddressSettings().get("a1").getMinExpiryDelay()); assertEquals(3L, (long) conf.getAddressSettings().get("a1").getMaxExpiryDelay()); + assertTrue(conf.getAddressSettings().get("a1").isNoExpiry()); assertEquals(AddressSettings.DEFAULT_AUTO_CREATE_EXPIRY_RESOURCES, conf.getAddressSettings().get("a1").isAutoCreateExpiryResources()); assertEquals(AddressSettings.DEFAULT_EXPIRY_QUEUE_PREFIX, conf.getAddressSettings().get("a1").getExpiryQueuePrefix()); assertEquals(AddressSettings.DEFAULT_EXPIRY_QUEUE_SUFFIX, conf.getAddressSettings().get("a1").getExpiryQueueSuffix()); @@ -547,6 +548,7 @@ public void testFileConfiguration() { assertEquals(-1L, (long) conf.getAddressSettings().get("a2").getExpiryDelay()); assertEquals(-1L, (long) conf.getAddressSettings().get("a2").getMinExpiryDelay()); assertEquals(-1L, (long) conf.getAddressSettings().get("a2").getMaxExpiryDelay()); + assertFalse(conf.getAddressSettings().get("a2").isNoExpiry()); assertTrue(conf.getAddressSettings().get("a2").isAutoCreateDeadLetterResources()); assertEquals("", conf.getAddressSettings().get("a2").getExpiryQueuePrefix().toString()); assertEquals(".EXP", conf.getAddressSettings().get("a2").getExpiryQueueSuffix().toString()); diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImplTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImplTest.java new file mode 100644 index 00000000000..3730a1ee7e0 --- /dev/null +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImplTest.java @@ -0,0 +1,308 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.postoffice.impl; + +import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; + +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class PostOfficeImplTest { + + private static final int EXPIRATION_DELTA = 5000; + + @Test + public void testNoExpiryWhenExpirationSetLow() { + Message mockMessage = Mockito.mock(Message.class); + Mockito.when(mockMessage.getExpiration()).thenReturn(1L); + PostOfficeImpl.applyExpiryDelay(mockMessage, new AddressSettings().setNoExpiry(true)); + Mockito.verify(mockMessage).setExpiration(0); + } + + @Test + public void testNoExpiryWhenExpirationSetHigh() { + Message mockMessage = Mockito.mock(Message.class); + Mockito.when(mockMessage.getExpiration()).thenReturn(Long.MAX_VALUE); + PostOfficeImpl.applyExpiryDelay(mockMessage, new AddressSettings().setNoExpiry(true)); + Mockito.verify(mockMessage).setExpiration(0); + } + + @Test + public void testNoExpiryWhenExpirationNotSet() { + Message mockMessage = Mockito.mock(Message.class); + Mockito.when(mockMessage.getExpiration()).thenReturn(0L); + PostOfficeImpl.applyExpiryDelay(mockMessage, new AddressSettings().setNoExpiry(true)); + Mockito.verify(mockMessage, Mockito.never()).setExpiration(Mockito.anyLong()); + } + + @Test + public void testExpiryDelayWhenExpirationNotSet() { + Message mockMessage = Mockito.mock(Message.class); + Mockito.when(mockMessage.getExpiration()).thenReturn(0L); + final long expiryDelay = 123456L; + final long startTime = System.currentTimeMillis(); + + PostOfficeImpl.applyExpiryDelay(mockMessage, new AddressSettings().setExpiryDelay(expiryDelay)); + + final ArgumentCaptor captor = ArgumentCaptor.forClass(Long.class); + Mockito.verify(mockMessage).setExpiration(captor.capture()); + + final long expectedExpirationLow = startTime + expiryDelay; + final long expectedExpirationHigh = expectedExpirationLow + EXPIRATION_DELTA; // Allowing a delta + final Long actualExpirationSet = captor.getValue(); + + assertExpirationSetAsExpected(expectedExpirationLow, expectedExpirationHigh, actualExpirationSet); + } + + @Test + public void testExpiryDelayWhenExpirationSet() { + Message mockMessage = Mockito.mock(Message.class); + Mockito.when(mockMessage.getExpiration()).thenReturn(1L); + PostOfficeImpl.applyExpiryDelay(mockMessage, new AddressSettings().setExpiryDelay(9999L)); + Mockito.verify(mockMessage, Mockito.never()).setExpiration(Mockito.anyLong()); + } + + @Test + public void testMinExpiryDelayWhenExpirationNotSet() { + Message mockMessage = Mockito.mock(Message.class); + Mockito.when(mockMessage.getExpiration()).thenReturn(0L); + final long minExpiryDelay = 123456L; + final long startTime = System.currentTimeMillis(); + + PostOfficeImpl.applyExpiryDelay(mockMessage, new AddressSettings().setMinExpiryDelay(minExpiryDelay)); + + final ArgumentCaptor captor = ArgumentCaptor.forClass(Long.class); + Mockito.verify(mockMessage).setExpiration(captor.capture()); + + final long expectedExpirationLow = startTime + minExpiryDelay; + final long expectedExpirationHigh = expectedExpirationLow + EXPIRATION_DELTA; // Allowing a delta + final Long actualExpirationSet = captor.getValue(); + + assertExpirationSetAsExpected(expectedExpirationLow, expectedExpirationHigh, actualExpirationSet); + } + + @Test + public void testMinExpiryDelayWhenExpirationSet() { + Message mockMessage = Mockito.mock(Message.class); + long origExpiration = 1234L; + Mockito.when(mockMessage.getExpiration()).thenReturn(origExpiration); + final long minExpiryDelay = 123456L; + assertTrue(minExpiryDelay > origExpiration); + final long startTime = System.currentTimeMillis(); + + PostOfficeImpl.applyExpiryDelay(mockMessage, new AddressSettings().setMinExpiryDelay(minExpiryDelay)); + + final ArgumentCaptor captor = ArgumentCaptor.forClass(Long.class); + Mockito.verify(mockMessage).setExpiration(captor.capture()); + + final long expectedExpirationLow = startTime + minExpiryDelay; + final long expectedExpirationHigh = expectedExpirationLow + EXPIRATION_DELTA; // Allowing a delta + final Long actualExpirationSet = captor.getValue(); + + assertExpirationSetAsExpected(expectedExpirationLow, expectedExpirationHigh, actualExpirationSet); + } + + @Test + public void testMaxExpiryDelayWhenExpirationNotSet() { + Message mockMessage = Mockito.mock(Message.class); + Mockito.when(mockMessage.getExpiration()).thenReturn(0L); + final long maxExpiryDelay = 123456L; + final long startTime = System.currentTimeMillis(); + + PostOfficeImpl.applyExpiryDelay(mockMessage, new AddressSettings().setMaxExpiryDelay(maxExpiryDelay)); + + final ArgumentCaptor captor = ArgumentCaptor.forClass(Long.class); + Mockito.verify(mockMessage).setExpiration(captor.capture()); + + final long expectedExpirationLow = startTime + maxExpiryDelay; + final long expectedExpirationHigh = expectedExpirationLow + EXPIRATION_DELTA; // Allowing a delta + final Long actualExpirationSet = captor.getValue(); + + assertExpirationSetAsExpected(expectedExpirationLow, expectedExpirationHigh, actualExpirationSet); + } + + @Test + public void testMaxExpiryDelayWhenExpirationSet() { + Message mockMessage = Mockito.mock(Message.class); + Mockito.when(mockMessage.getExpiration()).thenReturn(Long.MAX_VALUE); + final long maxExpiryDelay = 123456L; + final long startTime = System.currentTimeMillis(); + + PostOfficeImpl.applyExpiryDelay(mockMessage, new AddressSettings().setMaxExpiryDelay(maxExpiryDelay)); + + final ArgumentCaptor captor = ArgumentCaptor.forClass(Long.class); + Mockito.verify(mockMessage).setExpiration(captor.capture()); + + final long expectedExpirationLow = startTime + maxExpiryDelay; + final long expectedExpirationHigh = expectedExpirationLow + EXPIRATION_DELTA; // Allowing a delta + final Long actualExpirationSet = captor.getValue(); + + assertExpirationSetAsExpected(expectedExpirationLow, expectedExpirationHigh, actualExpirationSet); + } + + @Test + public void testMinAndMaxExpiryDelayWhenExpirationNotSet() { + Message mockMessage = Mockito.mock(Message.class); + long origExpiration = 0L; + Mockito.when(mockMessage.getExpiration()).thenReturn(origExpiration); + final long minExpiryDelay = 100_000L; + final long maxExpiryDelay = 300_000L; + final long startTime = System.currentTimeMillis(); + + PostOfficeImpl.applyExpiryDelay(mockMessage, new AddressSettings().setMinExpiryDelay(minExpiryDelay).setMaxExpiryDelay(maxExpiryDelay)); + + final ArgumentCaptor captor = ArgumentCaptor.forClass(Long.class); + Mockito.verify(mockMessage).setExpiration(captor.capture()); + + final long expectedExpirationLow = startTime + maxExpiryDelay; + final long expectedExpirationHigh = expectedExpirationLow + EXPIRATION_DELTA; // Allowing a delta + final Long actualExpirationSet = captor.getValue(); + + assertExpirationSetAsExpected(expectedExpirationLow, expectedExpirationHigh, actualExpirationSet); + } + + @Test + public void testMinAndMaxExpiryDelayWhenExpirationSetInbetween() { + Message mockMessage = Mockito.mock(Message.class); + final long startTime = System.currentTimeMillis(); + long origExpiration = startTime + 200_000L; + Mockito.when(mockMessage.getExpiration()).thenReturn(origExpiration); + final long minExpiryDelay = 100_000L; + final long maxExpiryDelay = 300_000L; + + PostOfficeImpl.applyExpiryDelay(mockMessage, new AddressSettings().setMinExpiryDelay(minExpiryDelay).setMaxExpiryDelay(maxExpiryDelay)); + + Mockito.verify(mockMessage, Mockito.never()).setExpiration(Mockito.anyLong()); + } + + @Test + public void testMinAndMaxExpiryDelayWhenExpirationSetAbove() { + Message mockMessage = Mockito.mock(Message.class); + final long startTime = System.currentTimeMillis(); + long origExpiration = startTime + 400_000L; + Mockito.when(mockMessage.getExpiration()).thenReturn(origExpiration); + final long minExpiryDelay = 100_000L; + final long maxExpiryDelay = 300_000L; + + PostOfficeImpl.applyExpiryDelay(mockMessage, new AddressSettings().setMinExpiryDelay(minExpiryDelay).setMaxExpiryDelay(maxExpiryDelay)); + + final ArgumentCaptor captor = ArgumentCaptor.forClass(Long.class); + Mockito.verify(mockMessage).setExpiration(captor.capture()); + + final long expectedExpirationLow = startTime + maxExpiryDelay; + final long expectedExpirationHigh = expectedExpirationLow + EXPIRATION_DELTA; // Allowing a delta + final Long actualExpirationSet = captor.getValue(); + + assertExpirationSetAsExpected(expectedExpirationLow, expectedExpirationHigh, actualExpirationSet); + } + + @Test + public void testMinAndMaxExpiryDelayWhenExpirationSetBelow() { + Message mockMessage = Mockito.mock(Message.class); + final long startTime = System.currentTimeMillis(); + long origExpiration = startTime + 50_000; + Mockito.when(mockMessage.getExpiration()).thenReturn(origExpiration); + final long minExpiryDelay = 100_000L; + final long maxExpiryDelay = 300_000L; + + PostOfficeImpl.applyExpiryDelay(mockMessage, new AddressSettings().setMinExpiryDelay(minExpiryDelay).setMaxExpiryDelay(maxExpiryDelay)); + + final ArgumentCaptor captor = ArgumentCaptor.forClass(Long.class); + Mockito.verify(mockMessage).setExpiration(captor.capture()); + + final long expectedExpirationLow = startTime + minExpiryDelay; + final long expectedExpirationHigh = expectedExpirationLow + EXPIRATION_DELTA; // Allowing a delta + final Long actualExpirationSet = captor.getValue(); + + assertExpirationSetAsExpected(expectedExpirationLow, expectedExpirationHigh, actualExpirationSet); + } + + private void assertExpirationSetAsExpected(final long expectedExpirationLow, final long expectedExpirationHigh, final Long actualExpirationSet) { + assertNotNull(actualExpirationSet); + + assertTrue(actualExpirationSet >= expectedExpirationLow, () -> "Expected set expiration of at least " + expectedExpirationLow + ", but was: " + actualExpirationSet); + assertTrue(actualExpirationSet < expectedExpirationHigh, "Expected set expiration less than " + expectedExpirationHigh + ", but was: " + actualExpirationSet); + } + + @Test + public void testPrecedencNoExpiryOverExpiryDelay() { + Message mockMessage = Mockito.mock(Message.class); + Mockito.when(mockMessage.getExpiration()).thenReturn(0L); + PostOfficeImpl.applyExpiryDelay(mockMessage, new AddressSettings().setNoExpiry(true).setExpiryDelay(10L)); + Mockito.verify(mockMessage, Mockito.never()).setExpiration(Mockito.anyLong()); + } + + @Test + public void testPrecedencNoExpiryOverMaxExpiryDelay() { + Message mockMessage = Mockito.mock(Message.class); + Mockito.when(mockMessage.getExpiration()).thenReturn(0L); + PostOfficeImpl.applyExpiryDelay(mockMessage, new AddressSettings().setNoExpiry(true).setMaxExpiryDelay(10L)); + Mockito.verify(mockMessage, Mockito.never()).setExpiration(Mockito.anyLong()); + } + + @Test + public void testPrecedencNoExpiryOverMinExpiryDelay() { + Message mockMessage = Mockito.mock(Message.class); + Mockito.when(mockMessage.getExpiration()).thenReturn(0L); + PostOfficeImpl.applyExpiryDelay(mockMessage, new AddressSettings().setNoExpiry(true).setMinExpiryDelay(10L)); + Mockito.verify(mockMessage, Mockito.never()).setExpiration(Mockito.anyLong()); + } + + @Test + public void testPrecedencExpiryDelayOverMaxExpiryDelay() { + Message mockMessage = Mockito.mock(Message.class); + Mockito.when(mockMessage.getExpiration()).thenReturn(0L); + final long expiryDelay = 1000L; + final long maxExpiryDelay = 999999999L; + final long startTime = System.currentTimeMillis(); + + PostOfficeImpl.applyExpiryDelay(mockMessage, new AddressSettings().setExpiryDelay(expiryDelay).setMaxExpiryDelay(maxExpiryDelay)); + + final ArgumentCaptor captor = ArgumentCaptor.forClass(Long.class); + Mockito.verify(mockMessage).setExpiration(captor.capture()); + + final long expectedExpirationLow = startTime + expiryDelay; + final long expectedExpirationHigh = expectedExpirationLow + EXPIRATION_DELTA; // Allowing a delta + final Long actualExpirationSet = captor.getValue(); + + assertExpirationSetAsExpected(expectedExpirationLow, expectedExpirationHigh, actualExpirationSet); + } + + @Test + public void testPrecedencExpiryDelayOverMinExpiryDelay() { + Message mockMessage = Mockito.mock(Message.class); + Mockito.when(mockMessage.getExpiration()).thenReturn(0L); + final long expiryDelay = 1000L; + final long minExpiryDelay = 999999999L; + final long startTime = System.currentTimeMillis(); + + PostOfficeImpl.applyExpiryDelay(mockMessage, new AddressSettings().setExpiryDelay(expiryDelay).setMinExpiryDelay(minExpiryDelay)); + + final ArgumentCaptor captor = ArgumentCaptor.forClass(Long.class); + Mockito.verify(mockMessage).setExpiration(captor.capture()); + + final long expectedExpirationLow = startTime + expiryDelay; + final long expectedExpirationHigh = expectedExpirationLow + EXPIRATION_DELTA; // Allowing a delta + final Long actualExpirationSet = captor.getValue(); + + assertExpirationSetAsExpected(expectedExpirationLow, expectedExpirationHigh, actualExpirationSet); + } +} diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/settings/AddressSettingsTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/settings/AddressSettingsTest.java index 02e08e3078f..9b45770295e 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/settings/AddressSettingsTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/settings/AddressSettingsTest.java @@ -18,6 +18,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; import java.lang.invoke.MethodHandles; @@ -60,6 +61,7 @@ public void testDefaults() { assertEquals(AddressSettings.DEFAULT_AUTO_DELETE_ADDRESSES, addressSettings.isAutoDeleteAddresses()); assertEquals(ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers(), addressSettings.isDefaultPurgeOnNoConsumers()); assertEquals(Integer.valueOf(ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers()), addressSettings.getDefaultMaxConsumers()); + assertEquals(AddressSettings.DEFAULT_NO_EXPIRY, addressSettings.isNoExpiry()); } @Test @@ -93,6 +95,7 @@ private void testSingleMerge(boolean copy) { addressSettingsToMerge.setMaxExpiryDelay(777L); addressSettingsToMerge.setIDCacheSize(5); addressSettingsToMerge.setInitialQueueBufferSize(256); + addressSettingsToMerge.setNoExpiry(true); if (copy) { addressSettings = addressSettings.mergeCopy(addressSettingsToMerge); @@ -115,6 +118,7 @@ private void testSingleMerge(boolean copy) { assertEquals(Long.valueOf(777), addressSettings.getMaxExpiryDelay()); assertEquals(Integer.valueOf(5), addressSettings.getIDCacheSize()); assertEquals(Integer.valueOf(256), addressSettings.getInitialQueueBufferSize()); + assertTrue(addressSettings.isNoExpiry()); } @Test @@ -139,6 +143,7 @@ private void testMultipleMerge(boolean copy) { addressSettingsToMerge.setMessageCounterHistoryDayLimit(1002); addressSettingsToMerge.setAddressFullMessagePolicy(AddressFullMessagePolicy.DROP); addressSettingsToMerge.setMaxSizeBytesRejectThreshold(10 * 1024); + addressSettingsToMerge.setNoExpiry(true); if (copy) { addressSettings = addressSettings.mergeCopy(addressSettingsToMerge); } else { @@ -166,6 +171,7 @@ private void testMultipleMerge(boolean copy) { assertEquals(addressSettings.getRedeliveryMultiplier(), 2.5, 0.000001); assertEquals(AddressFullMessagePolicy.DROP, addressSettings.getAddressFullMessagePolicy()); assertEquals(addressSettings.getMaxSizeBytesRejectThreshold(), 10 * 1024); + assertTrue(addressSettings.isNoExpiry()); } @Test @@ -236,6 +242,7 @@ public void testToJSON() { addressSettings.setRedeliveryDelay(1003); addressSettings.setRedeliveryMultiplier(1.0); addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.DROP); + addressSettings.setNoExpiry(true); String json = addressSettings.toJSON(); logger.info("Json:: {}", json); diff --git a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml index f53d5a4844b..ad9e0b1a295 100644 --- a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml +++ b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml @@ -577,6 +577,7 @@ 1 2 3 + true 1 0.5 817M diff --git a/artemis-server/src/test/resources/ConfigurationTest-xinclude-config-address-settings.xml b/artemis-server/src/test/resources/ConfigurationTest-xinclude-config-address-settings.xml index 4750e7a8996..217e42dba26 100644 --- a/artemis-server/src/test/resources/ConfigurationTest-xinclude-config-address-settings.xml +++ b/artemis-server/src/test/resources/ConfigurationTest-xinclude-config-address-settings.xml @@ -21,6 +21,7 @@ 1 2 3 + true 1 0.5 817M diff --git a/artemis-server/src/test/resources/ConfigurationTest-xinclude-schema-config-address-settings.xml b/artemis-server/src/test/resources/ConfigurationTest-xinclude-schema-config-address-settings.xml index 4750e7a8996..217e42dba26 100644 --- a/artemis-server/src/test/resources/ConfigurationTest-xinclude-schema-config-address-settings.xml +++ b/artemis-server/src/test/resources/ConfigurationTest-xinclude-schema-config-address-settings.xml @@ -21,6 +21,7 @@ 1 2 3 + true 1 0.5 817M diff --git a/docs/user-manual/address-settings.adoc b/docs/user-manual/address-settings.adoc index 305f0512082..ba197f86976 100644 --- a/docs/user-manual/address-settings.adoc +++ b/docs/user-manual/address-settings.adoc @@ -31,7 +31,10 @@ Here an example of an `address-setting` entry that might be found in the `broker false - 123 + false + -1 + -1 + -1 5000 1.0 0.0 @@ -123,12 +126,24 @@ The suffix used for automatically created expiry queues. Default is empty. Read more in the chapter about xref:message-expiry.adoc#message-expiry[message expiry]. +no-expiry:: +If `true` this overrides the expiration time for _all_ messages so that they never expire. +The default is `false`. +Read more about xref:message-expiry.adoc#configuring-expiry-delay[message expiry]. + expiry-delay:: -The expiration time that will be used for messages which are using the default expiration time (i.e. 0). -For example, if `expiry-delay` is set to "10" and a message which is using the default expiration time (i.e. 0) arrives then its expiration time of "0" will be changed to "10." However, if a message which is using an expiration time of "20" arrives then its expiration time will remain unchanged. -Setting `expiry-delay` to "-1" will disable this feature. -The default is "-1". -Read more about xref:message-expiry.adoc#configuring-expiry-addresses[message expiry]. +The expiration time that will be used for messages which are using the default expiration time (i.e. `0`). +For example, if `expiry-delay` is set to `10` and a message which is using the default expiration time (i.e. `0`) arrives then its expiration time of `0` will be changed to `10`. +However, if a message which is using an expiration time of `20` arrives then its expiration time will remain unchanged. +Setting `expiry-delay` to `-1` will disable this feature. +The default is `-1`. +Read more about xref:message-expiry.adoc#configuring-expiry-delay[message expiry]. + +min-expiry-delay:: +max-expiry-delay:: +These are applied if the aforementioned `expiry-delay` isn't set. +Unlike `expiry-delay`, they can impact the expiration of a message even if that message is using a non-default expiration time. +There are a xref:message-expiry.adoc#configuring-expiry-delay[handful of rules] which dictate the behavior of these settings. max-delivery-attempts:: defines how many time a cancelled message can be redelivered before sending to the `dead-letter-address`. diff --git a/docs/user-manual/message-expiry.adoc b/docs/user-manual/message-expiry.adoc index 7906665096b..09ab7de331f 100644 --- a/docs/user-manual/message-expiry.adoc +++ b/docs/user-manual/message-expiry.adoc @@ -36,7 +36,40 @@ a Long property containing the _actual expiration time_ of the expired message == Configuring Expiry Delay -Default expiry delay can be configured in the address-setting configuration: +There are multiple address-settings which you can use to modify the expiry delay for incoming messages: + +. `no-expiry` +. `expiry-delay` +. `max-expiry-delay` & `min-expiry-delay` + +These settings are applied exclusively in this order of precedence. For example, if `no-expiry` is set and `expiry-delay` is also set then `expiry-delay` is ignored completely and `no-expiry` is enforced. + +[WARNING] +==== +If you set any of these values for the `expiry-address` then messages which expire will have corresponding new expiry delays potentially causing the expired messages to themselves expire and be removed completely from the broker. +==== + +Let's look at each of these in turn. + +=== Never Expire + +If you want to force messages to _never_ expire regardless of their existing settings then set `no-expiry` to `true`, e.g.: + +[,xml] +---- + + + true + +---- + +For example, if `no-expiry` is set to `true` and a message which is using an expiration of `10` arrives then its expiration time of `10` will be changed to `0`. + +The default is `false`. + +=== Modify Default Expiry + +To modify the expiry delay on a message using the _default expiration_ (i.e. `0`) set `expiry-delay`, e.g. [,xml] ---- @@ -47,14 +80,14 @@ Default expiry delay can be configured in the address-setting configuration: ---- -`expiry-delay` defines the expiration time in milliseconds that will be used for messages which are using the default expiration time (i.e. 0). +For example, if `expiry-delay` is set to `10` and a message which is using the default expiration time (i.e. `0`) arrives then its expiration time of `0` will be changed to `10`. +However, if a message which is using an expiration time of `20` arrives then its expiration time will remain unchanged. -For example, if `expiry-delay` is set to "10" and a message which is using the default expiration time (i.e. 10) arrives then its expiration time of "0" will be changed to "10." However, if a message which is using an expiration time of "20" arrives then its expiration time will remain unchanged. -Setting `expiry-delay` to "-1" will disable this feature. +This value is measured in milliseconds. The default is `-1` (i.e. disabled). -The default is `-1`. +=== Enforce an Expiry Range -If `expiry-delay` is _not set_ then minimum and maximum expiry delay values can be configured in the address-setting configuration. +To enforce a range of expiry delay values [,xml] ---- @@ -67,20 +100,17 @@ If `expiry-delay` is _not set_ then minimum and maximum expiry delay values can Semantics are as follows: * Messages _without_ an expiration will be set to `max-expiry-delay`. -If `max-expiry-delay` is not defined then the message will be set to `min-expiry-delay`. -If `min-expiry-delay` is not defined then the message will not be changed. -* Messages with an expiration _above_ `max-expiry-delay` will be set to `max-expiry-delay` -* Messages with an expiration _below_ `min-expiry-delay` will be set to `min-expiry-delay` -* Messages with an expiration _within_ `min-expiry-delay` and `max-expiry-delay` range will not be changed -* Any value set for `expiry-delay` other than the default (i.e. `-1`) will override the aforementioned min/max settings. +** If `max-expiry-delay` is not defined then the message will be set to `min-expiry-delay`. +** If `min-expiry-delay` is not defined then the message will not be changed. +* Messages with an expiration _above_ `max-expiry-delay` will be set to `max-expiry-delay`. +* Messages with an expiration _below_ `min-expiry-delay` will be set to `min-expiry-delay`. +* Messages with an expiration _within_ `min-expiry-delay` and `max-expiry-delay` range will not be changed. -The default for both `min-expiry-delay` and `max-expiry-delay` is `-1` (i.e. disabled). +These values are measured in milliseconds. The default for both is `-1` (i.e. disabled). [WARNING] ==== -**If you set expiry-delay, or min/max-expiry-delay, on the expiration target address beware of the following:** - -* Messages will get a new expiration when moved to the expiry queue, rather than being set to 0 as usual, and so may disappear after the new expiration. +Setting a value of `0` for `max-expiry-delay` will cause messages to expire _immediately_. ==== == Configuring Expiry Addresses diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/JMSMessageExpiryTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/JMSMessageExpiryTest.java new file mode 100644 index 00000000000..3bf8a859f70 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/JMSMessageExpiryTest.java @@ -0,0 +1,275 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.jms.multiprotocol; + +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import java.lang.invoke.MethodHandles; + +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.utils.RandomUtil; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class JMSMessageExpiryTest extends MultiprotocolJMSClientTestSupport { + + protected static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + private final long EXPIRY_DELAY = 10_000_000L; + + @Test + @Timeout(30) + public void testCoreMessageExpiryDelay() throws Exception { + testExpiry(CoreConnection, DelayType.NORMAL, false); + } + + @Test + @Timeout(30) + public void testAmqpMessageExpiryDelay() throws Exception { + testExpiry(AMQPConnection, DelayType.NORMAL, false); + } + + @Test + @Timeout(30) + public void testOpenWireMessageExpiryDelay() throws Exception { + testExpiry(OpenWireConnection, DelayType.NORMAL, false); + } + + @Test + @Timeout(30) + public void testCoreLargeMessageExpiryDelay() throws Exception { + testExpiry(CoreConnection, DelayType.NORMAL, false, true, false); + } + + @Test + @Timeout(30) + public void testAmqpLargeMessageExpiryDelay() throws Exception { + testExpiry(AMQPConnection, DelayType.NORMAL, false, true, false); + } + + @Test + @Timeout(30) + public void testOpenWireLargeMessageExpiryDelay() throws Exception { + testExpiry(OpenWireConnection, DelayType.NORMAL, false, true, false); + } + + @Test + @Timeout(30) + public void testCoreLargeMessageExpiryDelayWithBrokerRestart() throws Exception { + testExpiry(CoreConnection, DelayType.NORMAL, false, true, true); + } + + @Test + @Timeout(30) + public void testAmqpLargeMessageExpiryDelayWithBrokerRestart() throws Exception { + testExpiry(AMQPConnection, DelayType.NORMAL, false, true, true); + } + + @Test + @Timeout(30) + public void testOpenWireLargeMessageExpiryDelayWithBrokerRestart() throws Exception { + testExpiry(OpenWireConnection, DelayType.NORMAL, false, true, true); + } + + @Test + @Timeout(30) + public void testCoreMessageExpiryDelayWithBrokerRestart() throws Exception { + testExpiry(CoreConnection, DelayType.NORMAL, false, false, true); + } + + @Test + @Timeout(30) + public void testAmqpMessageExpiryDelayWithBrokerRestart() throws Exception { + testExpiry(AMQPConnection, DelayType.NORMAL, false, false, true); + } + + @Test + @Timeout(30) + public void testOpenWireMessageExpiryDelayWithBrokerRestart() throws Exception { + testExpiry(OpenWireConnection, DelayType.NORMAL, false, false, true); + } + + @Test + @Timeout(30) + public void testCoreMaxExpiryDelayNoExpiration() throws Exception { + testExpiry(CoreConnection, DelayType.MAX, false); + } + + @Test + @Timeout(30) + public void testAmqpMaxExpiryDelayNoExpiration() throws Exception { + testExpiry(AMQPConnection, DelayType.MAX, false); + } + + @Test + @Timeout(30) + public void testOpenWireMaxExpiryDelayNoExpiration() throws Exception { + testExpiry(OpenWireConnection, DelayType.MAX, false); + } + + @Test + @Timeout(30) + public void testCoreMinExpiryDelayNoExpiration() throws Exception { + testExpiry(CoreConnection, DelayType.MIN, false); + } + + @Test + @Timeout(30) + public void testAmqpMinExpiryDelayNoExpiration() throws Exception { + testExpiry(AMQPConnection, DelayType.MIN, false); + } + + @Test + @Timeout(30) + public void testOpenWireMinExpiryDelayNoExpiration() throws Exception { + testExpiry(OpenWireConnection, DelayType.MIN, false); + } + + @Test + @Timeout(30) + public void testCoreMaxExpiryDelayWithExpiration() throws Exception { + testExpiry(CoreConnection, DelayType.MAX, true); + } + + @Test + @Timeout(30) + public void testAmqpMaxExpiryDelayWithExpiration() throws Exception { + testExpiry(AMQPConnection, DelayType.MAX, true); + } + + @Test + @Timeout(30) + public void testOpenWireMaxExpiryDelayWithExpiration() throws Exception { + testExpiry(OpenWireConnection, DelayType.MAX, true); + } + + @Test + @Timeout(30) + public void testCoreMinExpiryDelayWithExpiration() throws Exception { + testExpiry(CoreConnection, DelayType.MIN, true); + } + + @Test + @Timeout(30) + public void testAmqpMinExpiryDelayWithExpiration() throws Exception { + testExpiry(AMQPConnection, DelayType.MIN, true); + } + + @Test + @Timeout(30) + public void testOpenWireMinExpiryDelayWithExpiration() throws Exception { + testExpiry(OpenWireConnection, DelayType.MIN, true); + } + + @Test + @Timeout(30) + public void testCoreMessageNoExpiry() throws Exception { + testExpiry(CoreConnection, DelayType.NEVER, true); + } + + @Test + @Timeout(30) + public void testAmqpMessageNoExpiry() throws Exception { + testExpiry(AMQPConnection, DelayType.NEVER, true); + } + + @Test + @Timeout(30) + public void testOpenWireMessageNoExpiry() throws Exception { + testExpiry(OpenWireConnection, DelayType.NEVER, true); + } + + private void testExpiry(ConnectionSupplier supplier, DelayType delayType, boolean setTimeToLive) throws Exception { + testExpiry(supplier, delayType, setTimeToLive, false, false); + } + + private void testExpiry(ConnectionSupplier supplier, DelayType delayType, boolean setTimeToLive, boolean useLargeMessage, boolean restartBroker) throws Exception { + AddressSettings addressSettings = new AddressSettings(); + if (delayType == DelayType.NORMAL) { + addressSettings.setExpiryDelay(EXPIRY_DELAY); + } else if (delayType == DelayType.MIN) { + addressSettings.setMinExpiryDelay(EXPIRY_DELAY); + } else if (delayType == DelayType.MAX) { + addressSettings.setMaxExpiryDelay(EXPIRY_DELAY); + } else if (delayType == DelayType.NEVER) { + addressSettings.setNoExpiry(true); + } + server.getAddressSettingsRepository().addMatch(getQueueName(), addressSettings); + + Connection producerConnection = supplier.createConnection(); + Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue q = session.createQueue(getQueueName()); + MessageProducer producer = session.createProducer(q); + if (setTimeToLive) { + if (delayType == DelayType.MIN) { + producer.setTimeToLive(EXPIRY_DELAY / 2); + } else if (delayType == DelayType.MAX) { + producer.setTimeToLive(EXPIRY_DELAY * 2); + } else if (delayType == DelayType.NEVER) { + producer.setTimeToLive(EXPIRY_DELAY); + } + } + BytesMessage m = session.createBytesMessage(); + if (useLargeMessage) { + m.writeBytes(RandomUtil.randomBytes(server.getConfiguration().getJournalBufferSize_NIO() * 2)); + } + long start = System.currentTimeMillis(); + producer.send(m); + producerConnection.close(); + if (useLargeMessage) { + validateNoFilesOnLargeDir(getLargeMessagesDir(), 1); + } + if (restartBroker) { + server.stop(); + server.start(); + } + Connection consumerConnection = supplier.createConnection(); + session = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + q = session.createQueue(getQueueName()); + MessageConsumer consumer = session.createConsumer(q); + consumerConnection.start(); + m = (BytesMessage) consumer.receive(1500); + long stop = System.currentTimeMillis(); + assertNotNull(m); + consumerConnection.close(); + if (delayType == DelayType.NEVER) { + assertEquals(0, m.getJMSExpiration()); + } else { + long duration = stop - start; + long delayOnMessage = m.getJMSExpiration() - stop; + assertTrue(delayOnMessage >= (EXPIRY_DELAY - duration)); + assertTrue(delayOnMessage <= EXPIRY_DELAY); + } + if (useLargeMessage) { + validateNoFilesOnLargeDir(); + } + } + + enum DelayType { + NORMAL, MIN, MAX, NEVER; + } +}