From b2466597a8cf0f897d2e73a65cfe245c140e68f3 Mon Sep 17 00:00:00 2001 From: stevemuk Date: Wed, 27 Jul 2022 13:17:52 +0100 Subject: [PATCH 1/2] Implement redelivery policy. Reject poison messages. --- src/NMS.AMQP/NmsMessageConsumer.cs | 40 +++++- src/NMS.AMQP/Provider/Amqp/AmqpConsumer.cs | 4 + .../MessageRedeliveryPolicyIntegrationTest.cs | 115 ++++++++++++++++++ .../TestAmqp/TestAmqpPeer.cs | 5 + 4 files changed, 158 insertions(+), 6 deletions(-) create mode 100644 test/Apache-NMS-AMQP-Test/Integration/MessageRedeliveryPolicyIntegrationTest.cs diff --git a/src/NMS.AMQP/NmsMessageConsumer.cs b/src/NMS.AMQP/NmsMessageConsumer.cs index c214ee0f..44c8b332 100644 --- a/src/NMS.AMQP/NmsMessageConsumer.cs +++ b/src/NMS.AMQP/NmsMessageConsumer.cs @@ -360,8 +360,7 @@ private async Task DeliverNextPendingAsync() Tracer.Debug($"{Info.Id} filtered message with excessive redelivery count: {envelope.RedeliveryCount.ToString()}"); } - // TODO: Apply redelivery policy - await DoAckExpiredAsync(envelope).Await(); + await DoAckRejectedAsync(envelope).Await(); } else { @@ -372,7 +371,9 @@ private async Task DeliverNextPendingAsync() await DoAckDeliveredAsync(envelope).Await(); else await AckFromReceiveAsync(envelope).Await(); - + + await ApplyRedeliveryPolicy(envelope).Await(); + try { Listener.Invoke(envelope.Message.Copy()); @@ -428,6 +429,17 @@ private bool IsRedeliveryExceeded(InboundMessageDispatch envelope) return false; } + + private int RedeliveryDelay(InboundMessageDispatch envelope) + { + Tracer.DebugFormat("Checking if envelope is redelivered"); + IRedeliveryPolicy redeliveryPolicy = Session.Connection.RedeliveryPolicy; + if (redeliveryPolicy == null || envelope.RedeliveryCount <= 0) return 0; + + var redeliveryDelay = redeliveryPolicy.RedeliveryDelay(envelope.RedeliveryCount); + Tracer.DebugFormat("Envelope has been redelivered, apply redelivery policy wait {0} milliseconds", redeliveryDelay); + return redeliveryDelay; + } private Task DoAckReleasedAsync(InboundMessageDispatch envelope) { @@ -516,8 +528,7 @@ private async Task ReceiveInternalBaseAsync(int timeout, Func ReceiveInternalBaseAsync(int timeout, Func ReceiveInternalBaseAsync(int timeout, Func 0) + { + await Task.Delay(TimeSpan.FromMilliseconds(redeliveryDelay)).Await(); + } + } + private static long GetDeadline(int timeout) { @@ -577,6 +600,11 @@ private Task DoAckExpiredAsync(InboundMessageDispatch envelope) return Session.AcknowledgeAsync(AckType.MODIFIED_FAILED_UNDELIVERABLE, envelope); } + private Task DoAckRejectedAsync(InboundMessageDispatch envelope) + { + return Session.AcknowledgeAsync(AckType.REJECTED, envelope); + } + private void SetAcknowledgeCallback(InboundMessageDispatch envelope) { if (acknowledgementMode == AcknowledgementMode.ClientAcknowledge) diff --git a/src/NMS.AMQP/Provider/Amqp/AmqpConsumer.cs b/src/NMS.AMQP/Provider/Amqp/AmqpConsumer.cs index fc3a5cfc..bb245ccb 100644 --- a/src/NMS.AMQP/Provider/Amqp/AmqpConsumer.cs +++ b/src/NMS.AMQP/Provider/Amqp/AmqpConsumer.cs @@ -331,6 +331,10 @@ public void Acknowledge(InboundMessageDispatch envelope, AckType ackType) receiverLink.Modify(message, true, true); RemoveMessage(envelope); break; + case AckType.REJECTED: + receiverLink.Reject(message); + RemoveMessage(envelope); + break; default: Tracer.ErrorFormat("Unsupported Ack Type for message: {0}", envelope); throw new ArgumentException($"Unsupported Ack Type for message: {envelope}"); diff --git a/test/Apache-NMS-AMQP-Test/Integration/MessageRedeliveryPolicyIntegrationTest.cs b/test/Apache-NMS-AMQP-Test/Integration/MessageRedeliveryPolicyIntegrationTest.cs new file mode 100644 index 00000000..6cbd4d39 --- /dev/null +++ b/test/Apache-NMS-AMQP-Test/Integration/MessageRedeliveryPolicyIntegrationTest.cs @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using System; +using System.Threading; +using Amqp.Framing; +using Apache.NMS; +using Apache.NMS.Policies; +using NMS.AMQP.Test.TestAmqp; +using NUnit.Framework; + +namespace NMS.AMQP.Test.Integration +{ + [TestFixture] + public class MessageRedeliveryPolicyIntegrationTest : IntegrationTestFixture + { + [Test, Timeout(20_000)] + public void TestIncomingDeliveryCountExceededMessageGetsRejected() + { + using (TestAmqpPeer testPeer = new TestAmqpPeer()) + { + IConnection connection = EstablishConnection(testPeer); + int initialRedeliveryDelay = 1000; + int clockResolution = 15; + connection.RedeliveryPolicy = new RedeliveryPolicy() { MaximumRedeliveries = 1, InitialRedeliveryDelay = initialRedeliveryDelay}; + connection.Start(); + + testPeer.ExpectBegin(); + + ISession session = connection.CreateSession(AcknowledgementMode.IndividualAcknowledge); + IQueue queue = session.GetQueue("myQueue"); + + testPeer.ExpectReceiverAttach(); + testPeer.ExpectLinkFlowRespondWithTransfer(message: new Amqp.Message() { BodySection = new AmqpValue() { Value = "hello" } }); + testPeer.ExpectDispositionThatIsRejectedAndSettled(); + + IMessageConsumer consumer = session.CreateConsumer(queue); + + IMessage m = consumer.Receive(TimeSpan.FromMilliseconds(3000)); + Assert.NotNull(m, "Message should have been received"); + Assert.IsInstanceOf(m); + session.Recover(); + + DateTime startTimer = DateTime.UtcNow; + m = consumer.Receive(TimeSpan.FromMilliseconds(3000)); + Assert.That(DateTime.UtcNow.Subtract(startTimer).TotalMilliseconds, Is.GreaterThanOrEqualTo(initialRedeliveryDelay - clockResolution)); + + Assert.NotNull(m, "Message should have been received"); + Assert.IsInstanceOf(m); + session.Recover(); + + // Verify the message is no longer there. Will drain to be sure there are no messages. + Assert.IsNull(consumer.Receive(TimeSpan.FromMilliseconds(10)), "Message should not have been received"); + + testPeer.ExpectClose(); + connection.Close(); + + testPeer.WaitForAllMatchersToComplete(3000); + } + } + + [Test, Timeout(20_000)] + public void TestIncomingDeliveryCountExceededMessageGetsRejectedAsync() + { + using (TestAmqpPeer testPeer = new TestAmqpPeer()) + { + IConnection connection = EstablishConnection(testPeer); + int initialRedeliveryDelay = 1000; + connection.RedeliveryPolicy = new RedeliveryPolicy() { MaximumRedeliveries = 1, InitialRedeliveryDelay = initialRedeliveryDelay}; + connection.Start(); + + testPeer.ExpectBegin(); + + ISession session = connection.CreateSession(AcknowledgementMode.IndividualAcknowledge); + IQueue queue = session.GetQueue("myQueue"); + + + testPeer.ExpectReceiverAttach(); + testPeer.ExpectLinkFlowRespondWithTransfer(message: new Amqp.Message() { BodySection = new AmqpValue() { Value = "hello" } }); + testPeer.ExpectDispositionThatIsRejectedAndSettled(); + + IMessageConsumer consumer = session.CreateConsumer(queue); + + CountdownEvent success = new CountdownEvent(2); + + consumer.Listener += m => + { + session.Recover(); + success.Signal(); + }; + + Assert.IsTrue(success.Wait(TimeSpan.FromSeconds(5)), "Didn't get expected messages"); + + testPeer.ExpectClose(); + connection.Close(); + + testPeer.WaitForAllMatchersToComplete(3000); + } + } + } +} \ No newline at end of file diff --git a/test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeer.cs b/test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeer.cs index e61cc882..95ddce6e 100644 --- a/test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeer.cs +++ b/test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeer.cs @@ -726,6 +726,11 @@ public void ExpectDispositionThatIsReleasedAndSettled() ExpectDisposition(settled: true, stateMatcher: stateMatcher); } + + public void ExpectDispositionThatIsRejectedAndSettled() + { + ExpectDisposition(settled: true, stateMatcher: Assert.IsInstanceOf); + } public void ExpectDisposition(bool settled, Action stateMatcher, uint? firstDeliveryId = null, uint? lastDeliveryId = null) { From 4d2466dc25c98905b779afa2d1c79ca3094c75f8 Mon Sep 17 00:00:00 2001 From: stevemuk Date: Tue, 16 Aug 2022 08:16:05 +0100 Subject: [PATCH 2/2] Updates following code review. --- src/NMS.AMQP/NmsMessageConsumer.cs | 10 ++++++---- .../MessageRedeliveryPolicyIntegrationTest.cs | 2 +- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/src/NMS.AMQP/NmsMessageConsumer.cs b/src/NMS.AMQP/NmsMessageConsumer.cs index 44c8b332..126740b3 100644 --- a/src/NMS.AMQP/NmsMessageConsumer.cs +++ b/src/NMS.AMQP/NmsMessageConsumer.cs @@ -430,14 +430,15 @@ private bool IsRedeliveryExceeded(InboundMessageDispatch envelope) return false; } - private int RedeliveryDelay(InboundMessageDispatch envelope) + private int GetRedeliveryDelay(InboundMessageDispatch envelope) { Tracer.DebugFormat("Checking if envelope is redelivered"); IRedeliveryPolicy redeliveryPolicy = Session.Connection.RedeliveryPolicy; - if (redeliveryPolicy == null || envelope.RedeliveryCount <= 0) return 0; + + if (redeliveryPolicy == null || envelope.RedeliveryCount <= 0) + return 0; var redeliveryDelay = redeliveryPolicy.RedeliveryDelay(envelope.RedeliveryCount); - Tracer.DebugFormat("Envelope has been redelivered, apply redelivery policy wait {0} milliseconds", redeliveryDelay); return redeliveryDelay; } @@ -555,10 +556,11 @@ private async Task ReceiveInternalBaseAsync(int timeout, Func 0) { + Tracer.DebugFormat("Envelope has been redelivered, apply redelivery policy wait {0} milliseconds", redeliveryDelay); await Task.Delay(TimeSpan.FromMilliseconds(redeliveryDelay)).Await(); } } diff --git a/test/Apache-NMS-AMQP-Test/Integration/MessageRedeliveryPolicyIntegrationTest.cs b/test/Apache-NMS-AMQP-Test/Integration/MessageRedeliveryPolicyIntegrationTest.cs index 6cbd4d39..a492a5a4 100644 --- a/test/Apache-NMS-AMQP-Test/Integration/MessageRedeliveryPolicyIntegrationTest.cs +++ b/test/Apache-NMS-AMQP-Test/Integration/MessageRedeliveryPolicyIntegrationTest.cs @@ -103,7 +103,7 @@ public void TestIncomingDeliveryCountExceededMessageGetsRejectedAsync() success.Signal(); }; - Assert.IsTrue(success.Wait(TimeSpan.FromSeconds(5)), "Didn't get expected messages"); + Assert.IsTrue(success.Wait(TimeSpan.FromSeconds(3)), "Didn't get expected messages"); testPeer.ExpectClose(); connection.Close();