Skip to content

Commit

Permalink
Implement fire and forget anonymous send (#121)
Browse files Browse the repository at this point in the history
  • Loading branch information
Havret authored Jun 4, 2024
1 parent 958b6dc commit cea7277
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 2 deletions.
11 changes: 11 additions & 0 deletions src/ArtemisNetCoreClient/AnonymousProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,17 @@ public ValueTask DisposeAsync()
return session.RemoveProducerAsync(ProducerId);
}

public void SendMessage(string address, RoutingType? routingType, Message message)
{
message.Address = address;
if (routingType != null)
{
message.RoutingType = routingType;
}

session.SendMessage(message: message, producerId: ProducerId);
}

public async ValueTask SendMessageAsync(string address, RoutingType? routingType, Message message, CancellationToken cancellationToken = default)
{
message.Address = address;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,35 @@ namespace ActiveMQ.Artemis.Core.Client;
public static class AnonymousProducerExtensions
{
/// <summary>
/// Sends a message to the specified address.
/// Sends a message synchronously to the broker. This method is primarily used for non-durable
/// message delivery since it does not wait for a confirmation from the broker and operates
/// in a fire-and-forget manner.
/// </summary>
/// <remarks>
/// This method should typically be used when message delivery speed is prioritized over reliability.
/// The message will be sent with 'at most once' delivery guarantee, as there's no acknowledgment
/// from the broker that the message has been received or persisted.
/// </remarks>
/// <param name="anonymousProducer">The <see cref="IAnonymousProducer"/> instance that this method extends.</param>
/// <param name="address">The address to which the message should be sent.</param>
/// <param name="message">The message to send.</param>
public static void SendMessage(this IAnonymousProducer anonymousProducer, string address, Message message)
{
anonymousProducer.SendMessage(address, routingType: null, message);
}

/// <summary>
/// Sends a message asynchronously to the broker. This method supports both durable and non-durable message
/// delivery modes, as specified by the message's durable property. It awaits for a confirmation
/// from the broker, ensuring that the message is either stored (for durable messages) or acknowledged
/// (for non-durable messages) before completing.
/// </summary>
/// <remarks>
/// This method should be used when reliability is required, and it supports awaiting the acknowledgment
/// from the broker. The delivery semantics are 'at least once' for durable messages, where the broker
/// confirms the persistence of the message. For non-durable messages, the completion of the task
/// indicates that the broker has received the message.
/// </remarks>
/// <param name="anonymousProducer">The <see cref="IAnonymousProducer"/> instance that this method extends.</param>
/// <param name="address">The address to which the message should be sent.</param>
/// <param name="message">The message to send.</param>
Expand Down
28 changes: 27 additions & 1 deletion src/ArtemisNetCoreClient/IAnonymousProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,34 @@ namespace ActiveMQ.Artemis.Core.Client;
public interface IAnonymousProducer : IAsyncDisposable
{
/// <summary>
/// Sends a message to the specified address.
/// Sends a message synchronously to the broker. This method is primarily used for non-durable
/// message delivery since it does not wait for a confirmation from the broker and operates
/// in a fire-and-forget manner.
/// </summary>
/// <remarks>
/// This method should typically be used when message delivery speed is prioritized over reliability.
/// The message will be sent with 'at most once' delivery guarantee, as there's no acknowledgment
/// from the broker that the message has been received or persisted.
/// </remarks>
/// <param name="address">The address to which the message should be sent.</param>
/// <param name="routingType">
/// The routing type to use when sending the message. Ensures that this message is only routed to queues with matching routing type.
/// </param>
/// <param name="message">The message to send.</param>
void SendMessage(string address, RoutingType? routingType, Message message);

/// <summary>
/// Sends a message asynchronously to the broker. This method supports both durable and non-durable message
/// delivery modes, as specified by the message's durable property. It awaits for a confirmation
/// from the broker, ensuring that the message is either stored (for durable messages) or acknowledged
/// (for non-durable messages) before completing.
/// </summary>
/// <remarks>
/// This method should be used when reliability is required, and it supports awaiting the acknowledgment
/// from the broker. The delivery semantics are 'at least once' for durable messages, where the broker
/// confirms the persistence of the message. For non-durable messages, the completion of the task
/// indicates that the broker has received the message.
/// </remarks>
/// <param name="address">The address to which the message should be sent.</param>
/// <param name="routingType">
/// The routing type to use when sending the message. Ensures that this message is only routed to queues with matching routing type.
Expand Down
48 changes: 48 additions & 0 deletions test/ArtemisNetCoreClient.Tests/AnonymousProducerSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -227,4 +227,52 @@ await consumer.ReceiveMessageAsync(testFixture.CancellationToken)
});
});
}

[Fact]
public async Task Should_send_message_to_specific_address_in_a_fire_and_forget_manner()
{
await using var testFixture = await TestFixture.CreateAsync(testOutputHelper);
var scenario = TestScenarioFactory.Default(new XUnitOutputAdapter(testOutputHelper));

await using var connection = await testFixture.CreateConnectionAsync();
await using var session = await connection.CreateSessionAsync();

var (addressName, queueName) = await scenario.Step("Create address and queue", async () =>
{
var addressName = await testFixture.CreateAddressAsync(RoutingType.Anycast);
var queueName = await testFixture.CreateQueueAsync(addressName, RoutingType.Anycast);
return (addressName, queueName);
});

await scenario.Step("Send a message in a fire-and-forget manner", async () =>
{
await using var producer = await session.CreateAnonymousProducerAsync(testFixture.CancellationToken);

// ReSharper disable once MethodHasAsyncOverload
producer.SendMessage(address: addressName, routingType: RoutingType.Anycast, message: new Message
{
Body = "fire_and_forget_msg"u8.ToArray(),
});
});

await scenario.Step("Confirm message count (one message should be available on the queue)", async () =>
{
await RetryUtil.RetryUntil(
func: () => session.GetQueueInfoAsync(queueName, testFixture.CancellationToken),
until: info => info?.MessageCount == 1,
cancellationToken: testFixture.CancellationToken
);
});

await scenario.Step("Verify message payload", async () =>
{
await using var consumer = await session.CreateConsumerAsync(new ConsumerConfiguration
{
QueueName = queueName
}, testFixture.CancellationToken);
var message = await consumer.ReceiveMessageAsync(testFixture.CancellationToken);
Assert.NotNull(message);
Assert.Equal("fire_and_forget_msg"u8.ToArray(), message.Body.ToArray());
});
}
}

0 comments on commit cea7277

Please sign in to comment.