diff --git a/src/main/java/com/amazon/sqs/javamessaging/SQSMessageConsumerPrefetch.java b/src/main/java/com/amazon/sqs/javamessaging/SQSMessageConsumerPrefetch.java
index d5b222b..94507dd 100644
--- a/src/main/java/com/amazon/sqs/javamessaging/SQSMessageConsumerPrefetch.java
+++ b/src/main/java/com/amazon/sqs/javamessaging/SQSMessageConsumerPrefetch.java
@@ -14,21 +14,6 @@
*/
package com.amazon.sqs.javamessaging;
-import java.net.URI;
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-import java.util.UUID;
-
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.MessageListener;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
import com.amazon.sqs.javamessaging.acknowledge.Acknowledger;
import com.amazon.sqs.javamessaging.acknowledge.NegativeAcknowledger;
import com.amazon.sqs.javamessaging.acknowledge.SQSMessageIdentifier;
@@ -41,18 +26,25 @@
import com.amazonaws.services.sqs.model.MessageAttributeValue;
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import com.amazonaws.services.sqs.model.ReceiveMessageResult;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageListener;
+import java.util.*;
/**
* Used internally to prefetch messages to internal buffer on a background
* thread for better receive turn around times.
- *
+ *
* Each message consumer creates one prefetch thread.
- *
+ *
* This runs until the message consumer is closed and in-progress SQS
* receiveMessage call returns.
- *
+ *
* Uses SQS receiveMessage with long-poll wait time of 20 seconds.
- *
+ *
* Add re-tries on top of AmazonSQSClient re-tries on SQS calls.
*/
public class SQSMessageConsumerPrefetch implements Runnable, PrefetchManager {
@@ -94,14 +86,14 @@ public class SQSMessageConsumerPrefetch implements Runnable, PrefetchManager {
* Counter on how many messages are prefetched into internal messageQueue.
*/
protected int messagesPrefetched = 0;
-
+
/**
* Counter on how many messages have been explicitly requested.
* TODO: Consider renaming this class and several other variables now that
* this logic factors in message requests as well as prefetching.
*/
protected int messagesRequested = 0;
-
+
/**
* States of the prefetch thread
*/
@@ -109,7 +101,9 @@ public class SQSMessageConsumerPrefetch implements Runnable, PrefetchManager {
protected volatile boolean running = false;
- /** Controls the number of retry attempts to the SQS */
+ /**
+ * Controls the number of retry attempts to the SQS
+ */
protected int retriesAttempted = 0;
private final Object stateLock = new Object();
@@ -120,8 +114,8 @@ public class SQSMessageConsumerPrefetch implements Runnable, PrefetchManager {
* backoff after SDK completes re-tries with a max delay of 2 seconds and
* 25ms delayInterval.
*/
- protected ExponentialBackoffStrategy backoffStrategy = new ExponentialBackoffStrategy(25,25,2000);
-
+ protected ExponentialBackoffStrategy backoffStrategy = new ExponentialBackoffStrategy(25, 25, 2000);
+
SQSMessageConsumerPrefetch(SQSSessionCallbackScheduler sqsSessionRunnable, Acknowledger acknowledger,
NegativeAcknowledger negativeAcknowledger, SQSQueueDestination sqsDestination,
AmazonSQSMessagingClientWrapper amazonSQSClient, int numberOfMessagesToPrefetch) {
@@ -139,22 +133,22 @@ public class SQSMessageConsumerPrefetch implements Runnable, PrefetchManager {
MessageListener getMessageListener() {
return messageListener;
}
-
+
void setMessageConsumer(SQSMessageConsumer messageConsumer) {
this.messageConsumer = messageConsumer;
}
-
+
@Override
public SQSMessageConsumer getMessageConsumer() {
return messageConsumer;
}
-
+
/**
* Sets the message listener.
- *
+ *
* If message listener is set, the existing messages on the internal buffer
* will be pushed to session callback scheduler.
- *
+ *
* If message lister is set to null, then the messages on the internal
* buffer of session callback scheduler will be negative acknowledged, which
* will be handled by the session callback scheduler thread itself.
@@ -168,7 +162,7 @@ protected void setMessageListener(MessageListener messageListener) {
if (!running || isClosed()) {
return;
}
-
+
List allPrefetchedMessages = new ArrayList(messageQueue);
sqsSessionRunnable.scheduleCallBacks(messageListener, allPrefetchedMessages);
messageQueue.clear();
@@ -179,7 +173,7 @@ protected void setMessageListener(MessageListener messageListener) {
messageListenerReady();
}
}
-
+
/**
* Determine the number of messages we should attempt to fetch from SQS.
* Returns the difference between the number of messages needed (either for
@@ -189,14 +183,14 @@ private int numberOfMessagesToFetch() {
int numberOfMessagesNeeded = Math.max(numberOfMessagesToPrefetch, messagesRequested);
return Math.max(numberOfMessagesNeeded - messagesPrefetched, 0);
}
-
+
/**
* Runs until the message consumer is closed and in-progress SQS
* receiveMessage call returns.
- *
+ *
* This blocks if configured number of prefetched messages are already
* received or connection has not started yet.
- *
+ *
* After consumer is closed, all the messages inside internal buffer will
* be negatively acknowledged.
*/
@@ -210,7 +204,7 @@ public void run() {
if (isClosed()) {
break;
}
-
+
synchronized (stateLock) {
waitForStart();
waitForPrefetch();
@@ -238,7 +232,7 @@ public void run() {
}
}
}
-
+
/**
* Call receiveMessage with long-poll wait time of 20 seconds
* with available prefetch batch size and potential re-tries.
@@ -248,10 +242,10 @@ protected List getMessages(int prefetchBatchSize) throws InterruptedExc
assert prefetchBatchSize > 0;
ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(queueUrl)
- .withMaxNumberOfMessages(prefetchBatchSize)
- .withAttributeNames(ALL)
- .withMessageAttributeNames(ALL)
- .withWaitTimeSeconds(WAIT_TIME_SECONDS);
+ .withMaxNumberOfMessages(prefetchBatchSize)
+ .withAttributeNames(ALL)
+ .withMessageAttributeNames(ALL)
+ .withWaitTimeSeconds(WAIT_TIME_SECONDS);
//if the receive request is for FIFO queue, provide a unique receive request attempt it, so that
//failed calls retried by SDK will claim the same messages
if (sqsDestination.isFifo()) {
@@ -273,7 +267,7 @@ protected List getMessages(int prefetchBatchSize) throws InterruptedExc
}
return messages;
}
-
+
/**
* Converts the received message to JMS message, and pushes to messages to
* either callback scheduler for asynchronous message delivery or to
@@ -291,14 +285,14 @@ protected void processReceivedMessages(List messages) {
nackMessages.add(message.getReceiptHandle());
}
}
-
+
synchronized (stateLock) {
if (messageListener != null) {
sqsSessionRunnable.scheduleCallBacks(messageListener, messageManagers);
} else {
messageQueue.addAll(messageManagers);
}
-
+
messagesPrefetched += messageManagers.size();
notifyStateChange();
}
@@ -327,6 +321,7 @@ protected void waitForPrefetch() throws InterruptedException {
/**
* Convert the return SQS message into JMS message
+ *
* @param message SQS message to convert
* @return Converted JMS message
* @throws JMSException
@@ -344,7 +339,7 @@ protected javax.jms.Message convertToJMSMessage(Message message) throws JMSExcep
jmsMessage = new SQSBytesMessage(acknowledger, queueUrl, message);
} catch (JMSException e) {
LOG.warn("MessageReceiptHandle - " + message.getReceiptHandle() +
- "cannot be serialized to BytesMessage", e);
+ "cannot be serialized to BytesMessage", e);
throw e;
}
} else if (SQSMessage.OBJECT_MESSAGE_TYPE.equals(messageType)) {
@@ -355,9 +350,9 @@ protected javax.jms.Message convertToJMSMessage(Message message) throws JMSExcep
throw new JMSException("Not a supported JMS message type");
}
}
-
+
jmsMessage.setJMSDestination(sqsDestination);
-
+
MessageAttributeValue replyToQueueNameAttribute = message.getMessageAttributes().get(
SQSMessage.JMS_SQS_REPLY_TO_QUEUE_NAME);
MessageAttributeValue replyToQueueUrlAttribute = message.getMessageAttributes().get(
@@ -368,7 +363,7 @@ protected javax.jms.Message convertToJMSMessage(Message message) throws JMSExcep
Destination replyToQueue = new SQSQueueDestination(replyToQueueName, replyToQueueUrl);
jmsMessage.setJMSReplyTo(replyToQueue);
}
-
+
return jmsMessage;
}
@@ -397,7 +392,7 @@ protected void waitForStart() throws InterruptedException {
}
}
}
-
+
@Override
public void messageDispatched() {
synchronized (stateLock) {
@@ -419,7 +414,7 @@ public void messageListenerReady() {
}
}
}
-
+
void requestMessage() {
synchronized (stateLock) {
messagesRequested++;
@@ -433,7 +428,7 @@ private void unrequestMessage() {
notifyStateChange();
}
}
-
+
public static class MessageManager {
private final PrefetchManager prefetchManager;
@@ -457,7 +452,7 @@ public javax.jms.Message getMessage() {
javax.jms.Message receive() throws JMSException {
return receive(0);
}
-
+
javax.jms.Message receive(long timeout) throws JMSException {
if (cannotDeliver()) {
return null;
@@ -466,7 +461,7 @@ javax.jms.Message receive(long timeout) throws JMSException {
if (timeout < 0) {
timeout = 0;
}
-
+
MessageManager messageManager = null;
synchronized (stateLock) {
// If message exists in queue poll.
@@ -474,9 +469,9 @@ javax.jms.Message receive(long timeout) throws JMSException {
messageManager = messageQueue.pollFirst();
} else {
requestMessage();
- try {
+ try {
long startTime = System.currentTimeMillis();
-
+
long waitTime = 0;
while (messageQueue.isEmpty() && !isClosed() &&
(timeout == 0 || (waitTime = getWaitTime(timeout, startTime)) > 0)) {
@@ -491,11 +486,11 @@ javax.jms.Message receive(long timeout) throws JMSException {
return null;
}
messageManager = messageQueue.pollFirst();
- } finally {
- if (messageManager == null) {
- unrequestMessage();
- }
- }
+ } finally {
+ if (messageManager == null) {
+ unrequestMessage();
+ }
+ }
}
}
return messageHandler(messageManager);
@@ -515,7 +510,7 @@ javax.jms.Message receiveNoWait() throws JMSException {
if (cannotDeliver()) {
return null;
}
-
+
MessageManager messageManager;
synchronized (stateLock) {
messageManager = messageQueue.pollFirst();
@@ -553,32 +548,35 @@ void close() {
messageListener = null;
}
}
-
+
/**
* Helper that notifies PrefetchThread that message is dispatched and AutoAcknowledge
*/
private javax.jms.Message messageHandler(MessageManager messageManager) throws JMSException {
if (messageManager == null) {
return null;
- }
+ }
javax.jms.Message message = messageManager.getMessage();
-
+
// Notify PrefetchThread that message is dispatched
this.messageDispatched();
acknowledger.notifyMessageReceived((SQSMessage) message);
return message;
}
-
+
private boolean cannotDeliver() throws JMSException {
- if (isClosed() || !running) {
+ if (!running) {
return true;
}
+ if (isClosed()) {
+ throw new JMSException("Cannot receive messages when the consumer is closed");
+ }
if (messageListener != null) {
throw new JMSException("Cannot receive messages synchronously after a message listener is set");
}
return false;
}
-
+
/**
* Sleeps for the configured time.
*/
@@ -589,7 +587,7 @@ protected void sleep(long sleepTimeMillis) throws InterruptedException {
throw e;
}
}
-
+
protected boolean isClosed() {
return closed;
}
@@ -601,7 +599,7 @@ List purgePrefetchedMessagesWithGroups(Set affecte
Iterator managerIterator = messageQueue.iterator();
while (managerIterator.hasNext()) {
MessageManager messageManager = managerIterator.next();
- SQSMessage prefetchedMessage = (SQSMessage)messageManager.getMessage();
+ SQSMessage prefetchedMessage = (SQSMessage) messageManager.getMessage();
SQSMessageIdentifier messageIdentifier = SQSMessageIdentifier.fromSQSMessage(prefetchedMessage);
//is the prefetch entry for one of the affected group ids?
@@ -617,7 +615,7 @@ List purgePrefetchedMessagesWithGroups(Set affecte
notifyStateChange();
}
-
+
return purgedMessages;
}
}
diff --git a/src/main/java/com/amazon/sqs/javamessaging/SQSSessionCallbackScheduler.java b/src/main/java/com/amazon/sqs/javamessaging/SQSSessionCallbackScheduler.java
index 9a804ad..5737c04 100644
--- a/src/main/java/com/amazon/sqs/javamessaging/SQSSessionCallbackScheduler.java
+++ b/src/main/java/com/amazon/sqs/javamessaging/SQSSessionCallbackScheduler.java
@@ -14,21 +14,6 @@
*/
package com.amazon.sqs.javamessaging;
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import javax.jms.JMSException;
-import javax.jms.MessageListener;
-import javax.jms.Session;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
import com.amazon.sqs.javamessaging.SQSMessageConsumerPrefetch.MessageManager;
import com.amazon.sqs.javamessaging.SQSSession.CallbackEntry;
import com.amazon.sqs.javamessaging.acknowledge.AcknowledgeMode;
@@ -36,6 +21,13 @@
import com.amazon.sqs.javamessaging.acknowledge.NegativeAcknowledger;
import com.amazon.sqs.javamessaging.acknowledge.SQSMessageIdentifier;
import com.amazon.sqs.javamessaging.message.SQSMessage;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import javax.jms.JMSException;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+import java.util.*;
/**
* Used internally to guarantee serial execution of message processing on
@@ -92,14 +84,14 @@ public void run() {
try {
while (true) {
try {
- if (closed) {
+ if (!session.isRunning() || closed) {
break;
}
synchronized (callbackQueue) {
callbackEntry = callbackQueue.pollFirst();
if (callbackEntry == null) {
try {
- callbackQueue.wait();
+ callbackQueue.wait(2000L);
} catch (InterruptedException e) {
/**
* Will be retried on the next loop, and