Skip to content

Commit 078b574

Browse files
committed
Add support for MessageProducer.setDeliveryDelay() (from JMS 2.0)
Forwards compatible implementation. Only accepts multiples of 1000 since SQS only supports this in seconds instead of milliseconds.
1 parent ce40ad9 commit 078b574

File tree

2 files changed

+80
-0
lines changed

2 files changed

+80
-0
lines changed

src/main/java/com/amazon/sqs/javamessaging/SQSMessageProducer.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import java.util.Enumeration;
1818
import java.util.HashMap;
1919
import java.util.Map;
20+
import java.util.concurrent.TimeUnit;
2021
import java.util.concurrent.atomic.AtomicBoolean;
2122

2223
import javax.jms.Destination;
@@ -55,6 +56,10 @@
5556
public class SQSMessageProducer implements MessageProducer, QueueSender {
5657
private static final Log LOG = LogFactory.getLog(SQSMessageProducer.class);
5758

59+
private long MAXIMUM_DELIVERY_DELAY_MILLISECONDS = TimeUnit.MILLISECONDS.convert(15, TimeUnit.MINUTES);
60+
61+
private int deliveryDelaySeconds = 0;
62+
5863
/** This field is not actually used. */
5964
private long timeToLive;
6065
/** This field is not actually used. */
@@ -122,6 +127,10 @@ void sendInternal(SQSQueueDestination queue, Message rawMessage) throws JMSExcep
122127
SendMessageRequest sendMessageRequest = new SendMessageRequest(queue.getQueueUrl(), sqsMessageBody);
123128
sendMessageRequest.setMessageAttributes(messageAttributes);
124129

130+
if (deliveryDelaySeconds != 0) {
131+
sendMessageRequest.setDelaySeconds(deliveryDelaySeconds);
132+
}
133+
125134
//for FIFO queues, we have to specify both MessageGroupId, which we obtain from standard property JMSX_GROUP_ID
126135
//and MessageDeduplicationId, which we obtain from a custom provider specific property JMS_SQS_DEDUPLICATION_ID
127136
//notice that this code does not validate if the values are actually set by the JMS user
@@ -493,7 +502,32 @@ public void setTimeToLive(long timeToLive) throws JMSException {
493502
public long getTimeToLive() throws JMSException {
494503
return timeToLive;
495504
}
505+
506+
/**
507+
* Sets the minimum length of time in milliseconds that must elapse after a
508+
* message is sent before the JMS provider may deliver the message to a consumer.
509+
* <p>
510+
* This must be a multiple of 1000, since SQS only supports delivery delays
511+
* in seconds.
512+
*/
513+
public void setDeliveryDelay(long deliveryDelay) {
514+
if (deliveryDelay < 0 || deliveryDelay > MAXIMUM_DELIVERY_DELAY_MILLISECONDS) {
515+
throw new IllegalArgumentException("Delivery delay must be non-negative and at most 15 minutes: " + deliveryDelay);
516+
}
517+
if (deliveryDelay % 1000 != 0) {
518+
throw new IllegalArgumentException("Delivery delay must be a multiple of 1000: " + deliveryDelay);
519+
}
520+
this.deliveryDelaySeconds = (int)(deliveryDelay / 1000);
521+
}
496522

523+
/**
524+
* Gets the minimum length of time in milliseconds that must elapse after a
525+
* message is sent before the JMS provider may deliver the message to a consumer.
526+
*/
527+
public long getDeliveryDelay() {
528+
return deliveryDelaySeconds * 1000;
529+
}
530+
497531
void checkClosed() throws IllegalStateException {
498532
if (closed.get()) {
499533
throw new IllegalStateException("The producer is closed.");

src/test/java/com/amazon/sqs/javamessaging/SQSMessageProducerTest.java

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@
4646
import java.util.HashSet;
4747
import java.util.List;
4848
import java.util.Map;
49+
import java.util.concurrent.TimeUnit;
50+
4951
import org.junit.Before;
5052
import org.junit.Test;
5153
import org.mockito.ArgumentCaptor;
@@ -699,6 +701,50 @@ public void testClose() throws JMSException {
699701
verify(sqsSession).removeProducer(producer);
700702
}
701703

704+
@Test
705+
public void testSetDeliveryDelay() throws JMSException {
706+
assertEquals(0, producer.getDeliveryDelay());
707+
708+
producer.setDeliveryDelay(2000);
709+
710+
assertEquals(2000, producer.getDeliveryDelay());
711+
712+
ArgumentCaptor<SendMessageRequest> requestCaptor = ArgumentCaptor.forClass(SendMessageRequest.class);
713+
when(amazonSQSClient.sendMessage(requestCaptor.capture()))
714+
.thenReturn(new SendMessageResult().withMessageId(MESSAGE_ID_1));
715+
716+
SQSTextMessage msg = new SQSTextMessage("Sorry I'm late!");
717+
producer.send(msg);
718+
719+
assertEquals(2, requestCaptor.getValue().getDelaySeconds().intValue());
720+
}
721+
722+
723+
@Test
724+
public void testSetDeliveryDelayInvalidDelays() throws JMSException {
725+
try {
726+
producer.setDeliveryDelay(-1);
727+
fail();
728+
} catch (IllegalArgumentException ide) {
729+
// expected
730+
}
731+
732+
try {
733+
producer.setDeliveryDelay(TimeUnit.MILLISECONDS.convert(1, TimeUnit.DAYS));
734+
fail();
735+
} catch (IllegalArgumentException ide) {
736+
// expected
737+
}
738+
739+
try {
740+
producer.setDeliveryDelay(20);
741+
fail();
742+
} catch (IllegalArgumentException ide) {
743+
// expected
744+
}
745+
}
746+
747+
702748
private Map<String, MessageAttributeValue> createMessageAttribute(String type) {
703749
MessageAttributeValue messageAttributeValue = new MessageAttributeValue();
704750
messageAttributeValue.setDataType("String");

0 commit comments

Comments
 (0)