|
17 | 17 | import java.util.Enumeration; |
18 | 18 | import java.util.HashMap; |
19 | 19 | import java.util.Map; |
| 20 | +import java.util.concurrent.TimeUnit; |
20 | 21 | import java.util.concurrent.atomic.AtomicBoolean; |
21 | 22 |
|
22 | 23 | import javax.jms.Destination; |
|
55 | 56 | public class SQSMessageProducer implements MessageProducer, QueueSender { |
56 | 57 | private static final Log LOG = LogFactory.getLog(SQSMessageProducer.class); |
57 | 58 |
|
| 59 | + private long MAXIMUM_DELIVERY_DELAY_MILLISECONDS = TimeUnit.MILLISECONDS.convert(15, TimeUnit.MINUTES); |
| 60 | + |
| 61 | + private int deliveryDelaySeconds = 0; |
| 62 | + |
58 | 63 | /** This field is not actually used. */ |
59 | 64 | private long timeToLive; |
60 | 65 | /** This field is not actually used. */ |
@@ -122,6 +127,10 @@ void sendInternal(SQSQueueDestination queue, Message rawMessage) throws JMSExcep |
122 | 127 | SendMessageRequest sendMessageRequest = new SendMessageRequest(queue.getQueueUrl(), sqsMessageBody); |
123 | 128 | sendMessageRequest.setMessageAttributes(messageAttributes); |
124 | 129 |
|
| 130 | + if (deliveryDelaySeconds != 0) { |
| 131 | + sendMessageRequest.setDelaySeconds(deliveryDelaySeconds); |
| 132 | + } |
| 133 | + |
125 | 134 | //for FIFO queues, we have to specify both MessageGroupId, which we obtain from standard property JMSX_GROUP_ID |
126 | 135 | //and MessageDeduplicationId, which we obtain from a custom provider specific property JMS_SQS_DEDUPLICATION_ID |
127 | 136 | //notice that this code does not validate if the values are actually set by the JMS user |
@@ -493,7 +502,32 @@ public void setTimeToLive(long timeToLive) throws JMSException { |
493 | 502 | public long getTimeToLive() throws JMSException { |
494 | 503 | return timeToLive; |
495 | 504 | } |
| 505 | + |
| 506 | + /** |
| 507 | + * Sets the minimum length of time in milliseconds that must elapse after a |
| 508 | + * message is sent before the JMS provider may deliver the message to a consumer. |
| 509 | + * <p> |
| 510 | + * This must be a multiple of 1000, since SQS only supports delivery delays |
| 511 | + * in seconds. |
| 512 | + */ |
| 513 | + public void setDeliveryDelay(long deliveryDelay) { |
| 514 | + if (deliveryDelay < 0 || deliveryDelay > MAXIMUM_DELIVERY_DELAY_MILLISECONDS) { |
| 515 | + throw new IllegalArgumentException("Delivery delay must be non-negative and at most 15 minutes: " + deliveryDelay); |
| 516 | + } |
| 517 | + if (deliveryDelay % 1000 != 0) { |
| 518 | + throw new IllegalArgumentException("Delivery delay must be a multiple of 1000: " + deliveryDelay); |
| 519 | + } |
| 520 | + this.deliveryDelaySeconds = (int)(deliveryDelay / 1000); |
| 521 | + } |
496 | 522 |
|
| 523 | + /** |
| 524 | + * Gets the minimum length of time in milliseconds that must elapse after a |
| 525 | + * message is sent before the JMS provider may deliver the message to a consumer. |
| 526 | + */ |
| 527 | + public long getDeliveryDelay() { |
| 528 | + return deliveryDelaySeconds * 1000; |
| 529 | + } |
| 530 | + |
497 | 531 | void checkClosed() throws IllegalStateException { |
498 | 532 | if (closed.get()) { |
499 | 533 | throw new IllegalStateException("The producer is closed."); |
|
0 commit comments