diff --git a/event-queue/amqp/src/main/java/com/netflix/conductor/contribs/queue/amqp/AMQPObservableQueue.java b/event-queue/amqp/src/main/java/com/netflix/conductor/contribs/queue/amqp/AMQPObservableQueue.java index ff26fc7c9..c1ecadd73 100644 --- a/event-queue/amqp/src/main/java/com/netflix/conductor/contribs/queue/amqp/AMQPObservableQueue.java +++ b/event-queue/amqp/src/main/java/com/netflix/conductor/contribs/queue/amqp/AMQPObservableQueue.java @@ -187,16 +187,16 @@ public Address[] getAddresses() { } public List ack(List messages) { - final List processedDeliveryTags = new ArrayList<>(); + final List failedMessages = new ArrayList<>(); for (final Message message : messages) { try { ackMsg(message); - processedDeliveryTags.add(message.getReceipt()); } catch (final Exception e) { LOGGER.error("Cannot ACK message with delivery tag {}", message.getReceipt(), e); + failedMessages.add(message.getReceipt()); } } - return processedDeliveryTags; + return failedMessages; } public void ackMsg(Message message) throws Exception { diff --git a/event-queue/amqp/src/test/java/com/netflix/conductor/contribs/queue/amqp/AMQPObservableQueueTest.java b/event-queue/amqp/src/test/java/com/netflix/conductor/contribs/queue/amqp/AMQPObservableQueueTest.java index 15893be81..0d936ccf6 100644 --- a/event-queue/amqp/src/test/java/com/netflix/conductor/contribs/queue/amqp/AMQPObservableQueueTest.java +++ b/event-queue/amqp/src/test/java/com/netflix/conductor/contribs/queue/amqp/AMQPObservableQueueTest.java @@ -334,12 +334,12 @@ void runObserve( } @Test - public void testGetMessagesFromExistingExchangeWithDefaultConfiguration() throws IOException, TimeoutException { + public void testGetMessagesFromExistingExchangeWithDefaultConfiguration() + throws IOException, TimeoutException { // Mock channel and connection Channel channel = mockBaseChannel(); Connection connection = mockGoodConnection(channel); - testGetMessagesFromExchangeAndDefaultConfiguration( - channel, connection, true, true); + testGetMessagesFromExchangeAndDefaultConfiguration(channel, connection, true, true); } @Test @@ -386,8 +386,9 @@ public void testAck() throws IOException, TimeoutException { msg.setPayload("Payload"); msg.setReceipt("1"); messages.add(msg); - List deliveredTags = observableQueue.ack(messages); - assertNotNull(deliveredTags); + List failedMessages = observableQueue.ack(messages); + assertNotNull(failedMessages); + assertTrue(failedMessages.isEmpty()); } private void testGetMessagesFromExchangeAndDefaultConfiguration(