Skip to content

Commit

Permalink
Implement redelivery policy.
Browse files Browse the repository at this point in the history
Reject poison messages.
  • Loading branch information
stevemuk committed Jul 27, 2022
1 parent 05e8350 commit b246659
Show file tree
Hide file tree
Showing 4 changed files with 158 additions and 6 deletions.
40 changes: 34 additions & 6 deletions src/NMS.AMQP/NmsMessageConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -360,8 +360,7 @@ private async Task DeliverNextPendingAsync()
Tracer.Debug($"{Info.Id} filtered message with excessive redelivery count: {envelope.RedeliveryCount.ToString()}");
}

// TODO: Apply redelivery policy
await DoAckExpiredAsync(envelope).Await();
await DoAckRejectedAsync(envelope).Await();
}
else
{
Expand All @@ -372,7 +371,9 @@ private async Task DeliverNextPendingAsync()
await DoAckDeliveredAsync(envelope).Await();
else
await AckFromReceiveAsync(envelope).Await();


await ApplyRedeliveryPolicy(envelope).Await();

try
{
Listener.Invoke(envelope.Message.Copy());
Expand Down Expand Up @@ -428,6 +429,17 @@ private bool IsRedeliveryExceeded(InboundMessageDispatch envelope)

return false;
}

private int RedeliveryDelay(InboundMessageDispatch envelope)
{
Tracer.DebugFormat("Checking if envelope is redelivered");
IRedeliveryPolicy redeliveryPolicy = Session.Connection.RedeliveryPolicy;
if (redeliveryPolicy == null || envelope.RedeliveryCount <= 0) return 0;

var redeliveryDelay = redeliveryPolicy.RedeliveryDelay(envelope.RedeliveryCount);
Tracer.DebugFormat("Envelope has been redelivered, apply redelivery policy wait {0} milliseconds", redeliveryDelay);
return redeliveryDelay;
}

private Task DoAckReleasedAsync(InboundMessageDispatch envelope)
{
Expand Down Expand Up @@ -516,8 +528,7 @@ private async Task<T> ReceiveInternalBaseAsync<T>(int timeout, Func<InboundMessa
Tracer.Debug($"{Info.Id} filtered message with excessive redelivery count: {envelope.RedeliveryCount.ToString()}");
}

// TODO: Apply redelivery policy
await DoAckExpiredAsync(envelope).Await();
await DoAckRejectedAsync(envelope).Await();
}
else
{
Expand All @@ -526,6 +537,8 @@ private async Task<T> ReceiveInternalBaseAsync<T>(int timeout, Func<InboundMessa
Tracer.Debug($"{Info.Id} received message {envelope.Message.NMSMessageId}.");
}

await ApplyRedeliveryPolicy(envelope).Await();

return await func.Invoke(envelope);
}
}
Expand All @@ -539,7 +552,17 @@ private async Task<T> ReceiveInternalBaseAsync<T>(int timeout, Func<InboundMessa
throw ExceptionSupport.Wrap(ex, "Receive failed");
}
}


private async Task ApplyRedeliveryPolicy(InboundMessageDispatch envelope)
{
int redeliveryDelay = RedeliveryDelay(envelope);

if (redeliveryDelay > 0)
{
await Task.Delay(TimeSpan.FromMilliseconds(redeliveryDelay)).Await();
}
}


private static long GetDeadline(int timeout)
{
Expand Down Expand Up @@ -577,6 +600,11 @@ private Task DoAckExpiredAsync(InboundMessageDispatch envelope)
return Session.AcknowledgeAsync(AckType.MODIFIED_FAILED_UNDELIVERABLE, envelope);
}

private Task DoAckRejectedAsync(InboundMessageDispatch envelope)
{
return Session.AcknowledgeAsync(AckType.REJECTED, envelope);
}

private void SetAcknowledgeCallback(InboundMessageDispatch envelope)
{
if (acknowledgementMode == AcknowledgementMode.ClientAcknowledge)
Expand Down
4 changes: 4 additions & 0 deletions src/NMS.AMQP/Provider/Amqp/AmqpConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,10 @@ public void Acknowledge(InboundMessageDispatch envelope, AckType ackType)
receiverLink.Modify(message, true, true);
RemoveMessage(envelope);
break;
case AckType.REJECTED:
receiverLink.Reject(message);
RemoveMessage(envelope);
break;
default:
Tracer.ErrorFormat("Unsupported Ack Type for message: {0}", envelope);
throw new ArgumentException($"Unsupported Ack Type for message: {envelope}");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

using System;
using System.Threading;
using Amqp.Framing;
using Apache.NMS;
using Apache.NMS.Policies;
using NMS.AMQP.Test.TestAmqp;
using NUnit.Framework;

namespace NMS.AMQP.Test.Integration
{
[TestFixture]
public class MessageRedeliveryPolicyIntegrationTest : IntegrationTestFixture
{
[Test, Timeout(20_000)]
public void TestIncomingDeliveryCountExceededMessageGetsRejected()
{
using (TestAmqpPeer testPeer = new TestAmqpPeer())
{
IConnection connection = EstablishConnection(testPeer);
int initialRedeliveryDelay = 1000;
int clockResolution = 15;
connection.RedeliveryPolicy = new RedeliveryPolicy() { MaximumRedeliveries = 1, InitialRedeliveryDelay = initialRedeliveryDelay};
connection.Start();

testPeer.ExpectBegin();

ISession session = connection.CreateSession(AcknowledgementMode.IndividualAcknowledge);
IQueue queue = session.GetQueue("myQueue");

testPeer.ExpectReceiverAttach();
testPeer.ExpectLinkFlowRespondWithTransfer(message: new Amqp.Message() { BodySection = new AmqpValue() { Value = "hello" } });
testPeer.ExpectDispositionThatIsRejectedAndSettled();

IMessageConsumer consumer = session.CreateConsumer(queue);

IMessage m = consumer.Receive(TimeSpan.FromMilliseconds(3000));
Assert.NotNull(m, "Message should have been received");
Assert.IsInstanceOf<ITextMessage>(m);
session.Recover();

DateTime startTimer = DateTime.UtcNow;
m = consumer.Receive(TimeSpan.FromMilliseconds(3000));
Assert.That(DateTime.UtcNow.Subtract(startTimer).TotalMilliseconds, Is.GreaterThanOrEqualTo(initialRedeliveryDelay - clockResolution));

Assert.NotNull(m, "Message should have been received");
Assert.IsInstanceOf<ITextMessage>(m);
session.Recover();

// Verify the message is no longer there. Will drain to be sure there are no messages.
Assert.IsNull(consumer.Receive(TimeSpan.FromMilliseconds(10)), "Message should not have been received");

testPeer.ExpectClose();
connection.Close();

testPeer.WaitForAllMatchersToComplete(3000);
}
}

[Test, Timeout(20_000)]
public void TestIncomingDeliveryCountExceededMessageGetsRejectedAsync()
{
using (TestAmqpPeer testPeer = new TestAmqpPeer())
{
IConnection connection = EstablishConnection(testPeer);
int initialRedeliveryDelay = 1000;
connection.RedeliveryPolicy = new RedeliveryPolicy() { MaximumRedeliveries = 1, InitialRedeliveryDelay = initialRedeliveryDelay};
connection.Start();

testPeer.ExpectBegin();

ISession session = connection.CreateSession(AcknowledgementMode.IndividualAcknowledge);
IQueue queue = session.GetQueue("myQueue");


testPeer.ExpectReceiverAttach();
testPeer.ExpectLinkFlowRespondWithTransfer(message: new Amqp.Message() { BodySection = new AmqpValue() { Value = "hello" } });
testPeer.ExpectDispositionThatIsRejectedAndSettled();

IMessageConsumer consumer = session.CreateConsumer(queue);

CountdownEvent success = new CountdownEvent(2);

consumer.Listener += m =>
{
session.Recover();
success.Signal();
};

Assert.IsTrue(success.Wait(TimeSpan.FromSeconds(5)), "Didn't get expected messages");

testPeer.ExpectClose();
connection.Close();

testPeer.WaitForAllMatchersToComplete(3000);
}
}
}
}
5 changes: 5 additions & 0 deletions test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -726,6 +726,11 @@ public void ExpectDispositionThatIsReleasedAndSettled()

ExpectDisposition(settled: true, stateMatcher: stateMatcher);
}

public void ExpectDispositionThatIsRejectedAndSettled()
{
ExpectDisposition(settled: true, stateMatcher: Assert.IsInstanceOf<Rejected>);
}

public void ExpectDisposition(bool settled, Action<DeliveryState> stateMatcher, uint? firstDeliveryId = null, uint? lastDeliveryId = null)
{
Expand Down

0 comments on commit b246659

Please sign in to comment.