Skip to content

Commit

Permalink
Add synchronous acknowledgment methods to IConsumer interface (#111)
Browse files Browse the repository at this point in the history
This commit introduces two new methods to the IConsumer interface:
1. `Acknowledge`, which provides a fire-and-forget mechanism for acknowledging all messages received by the consumer. This method does not wait for broker confirmation.
2. `IndividualAcknowledge`, similar to `Acknowledge`, but used for acknowledging individual messages without waiting for broker confirmation.

Both methods are intended for scenarios where waiting for acknowledgment confirmation is not required, enhancing flexibility in message processing.
  • Loading branch information
Havret authored May 26, 2024
1 parent c9e5c53 commit cbdc7f6
Show file tree
Hide file tree
Showing 4 changed files with 186 additions and 0 deletions.
10 changes: 10 additions & 0 deletions src/ArtemisNetCoreClient/Consumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,16 @@ public ValueTask IndividualAcknowledgeAsync(in MessageDelivery messageDelivery,
return _session.IndividualAcknowledgeAsync(messageDelivery, cancellationToken);
}

public void Acknowledge(in MessageDelivery messageDelivery)
{
_session.Acknowledge(messageDelivery);
}

public void IndividualAcknowledge(in MessageDelivery messageDelivery)
{
_session.IndividualAcknowledge(messageDelivery);
}

internal void OnMessage(ReceivedMessage message)
{
// TODO: What if try write is false?
Expand Down
14 changes: 14 additions & 0 deletions src/ArtemisNetCoreClient/IConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,18 @@ public interface IConsumer : IAsyncDisposable
/// <param name="messageDelivery">Delivery information of a message to acknowledge.</param>
/// <param name="cancellationToken">Cancellation token.</param>
ValueTask IndividualAcknowledgeAsync(in MessageDelivery messageDelivery, CancellationToken cancellationToken);

/// <summary>
/// Acknowledges all messages received by the consumer. It doesn't wait for the confirmation from the broker.
/// It's a fire-and-forget operation. If you need to wait for the confirmation, use <see cref="AcknowledgeAsync"/> instead.
/// </summary>
/// <param name="messageDelivery">Delivery information of a last message to acknowledge.</param>
void Acknowledge(in MessageDelivery messageDelivery);

/// <summary>
/// Acknowledges the message without waiting for the confirmation from the broker.
/// It's a fire-and-forget operation. If you need to wait for the confirmation, use <see cref="IndividualAcknowledgeAsync"/> instead.
/// </summary>
/// <param name="messageDelivery">Delivery information of a message to acknowledge.</param>
void IndividualAcknowledge(in MessageDelivery messageDelivery);
}
22 changes: 22 additions & 0 deletions src/ArtemisNetCoreClient/Session.cs
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,28 @@ public async ValueTask AcknowledgeAsync(MessageDelivery messageDelivery, Cancell
_lock.Release();
}
}

public void Acknowledge(in MessageDelivery messageDelivery)
{
var request = new SessionAcknowledgeMessage
{
ConsumerId = messageDelivery.ConsumerId,
MessageId = messageDelivery.MessageId,
RequiresResponse = false,
};
connection.Send(request, ChannelId);
}

public void IndividualAcknowledge(in MessageDelivery messageDelivery)
{
var request = new SessionIndividualAcknowledgeMessage
{
ConsumerId = messageDelivery.ConsumerId,
MessageId = messageDelivery.MessageId,
RequiresResponse = false,
};
connection.Send(request, ChannelId);
}

public async ValueTask DisposeAsync()
{
Expand Down
140 changes: 140 additions & 0 deletions test/ArtemisNetCoreClient.Tests/ConsumerSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,75 @@ await scenario.Step("Verify that there are no outstanding messages on the queue"
Assert.Equal(0, queueInfo.MessageCount);
});
}

[Fact]
public async Task Should_individually_acknowledge_message_without_waiting_for_confirmation_from_the_broker()
{
await using var testFixture = await TestFixture.CreateAsync(testOutputHelper);
var scenario = TestScenarioFactory.Default(new XUnitOutputAdapter(testOutputHelper));

await using var connection = await testFixture.CreateConnectionAsync();

var (addressName, queueName) = await scenario.Step("Create address and queue", async () =>
{
var addressName = await testFixture.CreateAddressAsync(RoutingType.Anycast);
var queueName = await testFixture.CreateQueueAsync(addressName, RoutingType.Anycast);
return (addressName, queueName);
});

await using var session = await scenario.Step("Create a session with AutoCommitAcks enabled (default)",
async () => { return await connection.CreateSessionAsync(testFixture.CancellationToken); });

await scenario.Step("Send two messages", async () =>
{
await testFixture.SendMessageAsync(addressName, "msg_1"u8.ToArray());
await testFixture.SendMessageAsync(addressName, "msg_2"u8.ToArray());
});

await using var consumer = await scenario.Step("Create message consumer", async () =>
{
return await session.CreateConsumerAsync(new ConsumerConfiguration
{
QueueName = queueName,
}, testFixture.CancellationToken);
});

var messages = await scenario.Step("Receive the messages", async () =>
{
return new[]
{
await consumer.ReceiveMessageAsync(testFixture.CancellationToken),
await consumer.ReceiveMessageAsync(testFixture.CancellationToken)
};
});

await scenario.Step("Confirm message count before acknowledgment", async () =>
{
var queueInfo = await session.GetQueueInfoAsync(queueName, testFixture.CancellationToken);
Assert.NotNull(queueInfo);
Assert.Equal(2, queueInfo.MessageCount);
});

await scenario.Step("Acknowledge the first message",
() => { consumer.IndividualAcknowledge(messages[0].MessageDelivery); });

await scenario.Step("Verify that one outstanding message remains on the queue", async () =>
{
var queueInfo = await session.GetQueueInfoAsync(queueName, testFixture.CancellationToken);
Assert.NotNull(queueInfo);
Assert.Equal(1, queueInfo.MessageCount);
});

await scenario.Step("Acknowledge the second message",
async () => { await consumer.IndividualAcknowledgeAsync(messages[1].MessageDelivery, testFixture.CancellationToken); });

await scenario.Step("Verify that there are no outstanding messages on the queue", async () =>
{
var queueInfo = await session.GetQueueInfoAsync(queueName, testFixture.CancellationToken);
Assert.NotNull(queueInfo);
Assert.Equal(0, queueInfo.MessageCount);
});
}

[Fact]
public async Task Should_acknowledge_messages()
Expand Down Expand Up @@ -179,6 +248,77 @@ await scenario.Step("Verify that there are no outstanding messages on the queue"
Assert.Equal(0, queueInfo.MessageCount);
});
}

[Fact]
public async Task Should_acknowledge_messages_without_waiting_for_confirmation_from_the_broker()
{
await using var testFixture = await TestFixture.CreateAsync(testOutputHelper);
var scenario = TestScenarioFactory.Default(new XUnitOutputAdapter(testOutputHelper));

await using var connection = await testFixture.CreateConnectionAsync();

var (addressName, queueName) = await scenario.Step("Create address and queue", async () =>
{
var addressName = await testFixture.CreateAddressAsync(RoutingType.Anycast);
var queueName = await testFixture.CreateQueueAsync(addressName, RoutingType.Anycast);
return (addressName, queueName);
});

await using var session = await scenario.Step("Create a session with AutoCommitAcks enabled (default)",
async () => { return await connection.CreateSessionAsync(testFixture.CancellationToken); });

await scenario.Step("Send three messages", async () =>
{
await testFixture.SendMessageAsync(addressName, "msg_1"u8.ToArray());
await testFixture.SendMessageAsync(addressName, "msg_2"u8.ToArray());
await testFixture.SendMessageAsync(addressName, "msg_3"u8.ToArray());
});

await using var consumer = await scenario.Step("Create message consumer", async () =>
{
return await session.CreateConsumerAsync(new ConsumerConfiguration
{
QueueName = queueName,
}, testFixture.CancellationToken);
});

var messages = await scenario.Step("Receive the messages", async () =>
{
return new[]
{
await consumer.ReceiveMessageAsync(testFixture.CancellationToken),
await consumer.ReceiveMessageAsync(testFixture.CancellationToken),
await consumer.ReceiveMessageAsync(testFixture.CancellationToken)
};
});

await scenario.Step("Confirm message count before acknowledgment", async () =>
{
var queueInfo = await session.GetQueueInfoAsync(queueName, testFixture.CancellationToken);
Assert.NotNull(queueInfo);
Assert.Equal(3, queueInfo.MessageCount);
});

await scenario.Step("Acknowledge messages up to the second message",
() => { consumer.Acknowledge(messages[1].MessageDelivery); });

await scenario.Step("Verify that one outstanding message remains on the queue", async () =>
{
var queueInfo = await session.GetQueueInfoAsync(queueName, testFixture.CancellationToken);
Assert.NotNull(queueInfo);
Assert.Equal(1, queueInfo.MessageCount);
});

await scenario.Step("Acknowledge the last remaining message",
() => { consumer.Acknowledge(messages[2].MessageDelivery); });

await scenario.Step("Verify that there are no outstanding messages on the queue", async () =>
{
var queueInfo = await session.GetQueueInfoAsync(queueName, testFixture.CancellationToken);
Assert.NotNull(queueInfo);
Assert.Equal(0, queueInfo.MessageCount);
});
}

[Fact]
public async Task Should_acknowledge_individual_messages_only_post_session_commit_when_AutoCommitAcks_is_disabled()
Expand Down

0 comments on commit cbdc7f6

Please sign in to comment.