diff --git a/Npgmq.Test/NpgmqClientTest.cs b/Npgmq.Test/NpgmqClientTest.cs index 953c64a..7b1d450 100644 --- a/Npgmq.Test/NpgmqClientTest.cs +++ b/Npgmq.Test/NpgmqClientTest.cs @@ -1,3 +1,4 @@ +using System.Diagnostics.CodeAnalysis; using System.Text.Json; using Dapper; using DeepEqual.Syntax; @@ -14,11 +15,12 @@ public sealed class NpgmqClientTest : IDisposable private readonly NpgsqlConnection _connection; private readonly NpgmqClient _sut; + [SuppressMessage("ReSharper", "UnusedAutoPropertyAccessor.Local")] private class TestMessage { - public int? Foo { get; set; } - public string? Bar { get; set; } - public DateTimeOffset? Baz { get; set; } + public int? Foo { get; init; } + public string? Bar { get; init; } + public DateTimeOffset? Baz { get; init; } } public NpgmqClientTest() @@ -666,13 +668,13 @@ public async Task SendAsync_should_add_string_message() } [Fact] - public async Task SendDelayAsync_should_add_message_with_future_vt() + public async Task SendAsync_with_int_delay_should_add_message_with_future_vt() { // Arrange await ResetTestQueueAsync(); // Act - var msgId = await _sut.SendDelayAsync(TestQueueName, new TestMessage + var msgId = await _sut.SendAsync(TestQueueName, new TestMessage { Foo = 123, Bar = "Test", @@ -685,6 +687,27 @@ public async Task SendDelayAsync_should_add_message_with_future_vt() Assert.Equal(msgId, await _connection.ExecuteScalarAsync($"SELECT msg_id FROM pgmq.q_{TestQueueName} LIMIT 1;")); } + [Fact] + public async Task SendAsync_with_timestamp_delay_should_add_message_with_a_specified_vt() + { + // Arrange + await ResetTestQueueAsync(); + var delay = DateTimeOffset.UtcNow.AddHours(1); + + // Act + var msgId = await _sut.SendAsync(TestQueueName, new TestMessage + { + Foo = 123, + Bar = "Test", + Baz = DateTimeOffset.Parse("2023-09-01T01:23:45-04:00") + }, delay); + + // Assert + Assert.Equal(1, await _connection.ExecuteScalarAsync($"SELECT count(*) FROM pgmq.q_{TestQueueName};")); + Assert.Equal(1, await _connection.ExecuteScalarAsync($"SELECT count(*) FROM pgmq.q_{TestQueueName} WHERE vt = @expected_vt;", new { expected_vt = delay })); + Assert.Equal(msgId, await _connection.ExecuteScalarAsync($"SELECT msg_id FROM pgmq.q_{TestQueueName} LIMIT 1;")); + } + [Fact] public async Task SendBatchAsync_should_add_multiple_messages() { @@ -704,6 +727,47 @@ public async Task SendBatchAsync_should_add_multiple_messages() Assert.Equal(msgIds.OrderBy(x => x), (await _connection.QueryAsync($"SELECT msg_id FROM pgmq.q_{TestQueueName} ORDER BY msg_id;")).ToList()); } + [Fact] + public async Task SendBatchAsync_with_int_delay_should_add_multiple_messages_with_future_vt() + { + // Arrange + await ResetTestQueueAsync(); + + // Act + var msgIds = await _sut.SendBatchAsync(TestQueueName, new List + { + new() { Foo = 1 }, + new() { Foo = 2 }, + new() { Foo = 3 } + }, 100); + + // Assert + Assert.Equal(3, await _connection.ExecuteScalarAsync($"SELECT count(*) FROM pgmq.q_{TestQueueName};")); + Assert.Equal(3, await _connection.ExecuteScalarAsync($"SELECT count(*) FROM pgmq.q_{TestQueueName} WHERE vt > CURRENT_TIMESTAMP;")); + Assert.Equal(msgIds.OrderBy(x => x), (await _connection.QueryAsync($"SELECT msg_id FROM pgmq.q_{TestQueueName} ORDER BY msg_id;")).ToList()); + } + + [Fact] + public async Task SendBatchAsync_with_timestamp_delay_should_add_multiple_messages_with_a_specified_vt() + { + // Arrange + await ResetTestQueueAsync(); + var delay = DateTimeOffset.UtcNow.AddHours(1); + + // Act + var msgIds = await _sut.SendBatchAsync(TestQueueName, new List + { + new() { Foo = 1 }, + new() { Foo = 2 }, + new() { Foo = 3 } + }, delay); + + // Assert + Assert.Equal(3, await _connection.ExecuteScalarAsync($"SELECT count(*) FROM pgmq.q_{TestQueueName};")); + Assert.Equal(3, await _connection.ExecuteScalarAsync($"SELECT count(*) FROM pgmq.q_{TestQueueName} WHERE vt = @expected_vt;", new { expected_vt = delay })); + Assert.Equal(msgIds.OrderBy(x => x), (await _connection.QueryAsync($"SELECT msg_id FROM pgmq.q_{TestQueueName} ORDER BY msg_id;")).ToList()); + } + [Fact] public async Task SetVtAsync_should_change_vt_for_a_message() { diff --git a/Npgmq/INpgmqClient.cs b/Npgmq/INpgmqClient.cs index 9229949..a00d5b0 100644 --- a/Npgmq/INpgmqClient.cs +++ b/Npgmq/INpgmqClient.cs @@ -139,14 +139,35 @@ public interface INpgmqClient /// The ID of the sent message. Task SendAsync(string queueName, T message) where T : class; + /// + /// Send a message to a queue, visible after a specified number of seconds. + /// + /// The queue name. + /// The message to send. + /// Number of seconds until the message becomes visible. + /// The message type. + /// The ID of the sent message. + Task SendAsync(string queueName, T message, int delay) where T : class; + + /// + /// Send a message to a queue, visible after a specified date/time. + /// + /// The queue name. + /// The message to send. + /// Date/Time at which the message becomes visible. + /// The message type. + /// The ID of the sent message. + Task SendAsync(string queueName, T message, DateTimeOffset delay) where T : class; + /// /// Send a message to a queue with a delayed vt. /// /// The queue name. /// The message to send. - /// The delay, in seconds. + /// Number of seconds until the message becomes visible. /// The message type. /// The ID of the sent message. + [Obsolete("Use SendAsync instead.")] Task SendDelayAsync(string queueName, T message, int delay) where T : class; /// @@ -158,6 +179,26 @@ public interface INpgmqClient /// The IDs of the sent messages. Task> SendBatchAsync(string queueName, IEnumerable messages) where T : class; + /// + /// Send multiple messages to a queue, visible after a specified number of seconds. + /// + /// The queue name. + /// The messages to send. + /// Number of seconds until the message becomes visible. + /// The message type. + /// The IDs of the sent messages. + Task> SendBatchAsync(string queueName, IEnumerable messages, int delay) where T : class; + + /// + /// Send multiple messages to a queue, visible after a specified date/time. + /// + /// The queue name. + /// The messages to send. + /// Date/Time at which the message becomes visible. + /// The message type. + /// The IDs of the sent messages. + Task> SendBatchAsync(string queueName, IEnumerable messages, DateTimeOffset delay) where T : class; + /// /// Adjust the Vt of an existing message. /// diff --git a/Npgmq/NpgmqClient.cs b/Npgmq/NpgmqClient.cs index 673d276..ea048bf 100644 --- a/Npgmq/NpgmqClient.cs +++ b/Npgmq/NpgmqClient.cs @@ -355,28 +355,37 @@ public async Task>> ReadBatchAsync(string queueName, int } } - public async Task SendAsync(string queueName, T message) where T : class - { - try - { - return await SendDelayAsync(queueName, message, 0).ConfigureAwait(false); - } - catch (Exception ex) - { - throw new NpgmqException($"Failed to send message to queue {queueName}.", ex); - } - } + public Task SendAsync(string queueName, T message) where T : class => + SendAsync(queueName, message, null, null); + + public Task SendAsync(string queueName, T message, int delay) where T : class => + SendAsync(queueName, message, delay, null); - public async Task SendDelayAsync(string queueName, T message, int delay) where T : class + public Task SendAsync(string queueName, T message, DateTimeOffset delay) where T : class => + SendAsync(queueName, message, null, delay); + + private async Task SendAsync(string queueName, T message, int? delaySeconds, DateTimeOffset? delayTimestamp) where T : class { try { - var cmd = await _commandFactory.CreateAsync("SELECT * FROM pgmq.send(@queue_name, @message, @delay);").ConfigureAwait(false); + if (delaySeconds.HasValue && delayTimestamp.HasValue) + { + throw new ArgumentException("Only one of delaySeconds and delayTimestamp can be set."); + } + + var cmd = await _commandFactory.CreateAsync("SELECT * FROM pgmq.send(@queue_name, @msg, @delay);").ConfigureAwait(false); await using (cmd.ConfigureAwait(false)) { cmd.Parameters.AddWithValue("@queue_name", queueName); - cmd.Parameters.AddWithValue("@message", NpgsqlDbType.Jsonb, SerializeMessage(message)); - cmd.Parameters.AddWithValue("@delay", delay); + cmd.Parameters.AddWithValue("@msg", NpgsqlDbType.Jsonb, SerializeMessage(message)); + if (delayTimestamp.HasValue) + { + cmd.Parameters.AddWithValue("@delay", delayTimestamp.Value); + } + else + { + cmd.Parameters.AddWithValue("@delay", delaySeconds ?? 0); + } var result = await cmd.ExecuteScalarAsync().ConfigureAwait(false); return Convert.ToInt64(result!); } @@ -386,16 +395,42 @@ public async Task SendDelayAsync(string queueName, T message, int delay throw new NpgmqException($"Failed to send message to queue {queueName}.", ex); } } + + [Obsolete("Use SendAsync instead.")] + public Task SendDelayAsync(string queueName, T message, int delay) where T : class => + SendAsync(queueName, message, delay, null); - public async Task> SendBatchAsync(string queueName, IEnumerable messages) where T : class + public Task> SendBatchAsync(string queueName, IEnumerable messages) where T : class => + SendBatchAsync(queueName, messages, null, null); + + public Task> SendBatchAsync(string queueName, IEnumerable messages, int delay) where T : class => + SendBatchAsync(queueName, messages, delay, null); + + public Task> SendBatchAsync(string queueName, IEnumerable messages, DateTimeOffset delay) where T : class => + SendBatchAsync(queueName, messages, null, delay); + + private async Task> SendBatchAsync(string queueName, IEnumerable messages, int? delaySeconds, DateTimeOffset? delayTimestamp) where T : class { try { - var cmd = await _commandFactory.CreateAsync("SELECT * FROM pgmq.send_batch(@queue_name, @messages);").ConfigureAwait(false); + if (delaySeconds.HasValue && delayTimestamp.HasValue) + { + throw new ArgumentException("Only one of delaySeconds and delayTimestamp can be set."); + } + + var cmd = await _commandFactory.CreateAsync("SELECT * FROM pgmq.send_batch(@queue_name, @msgs, @delay);").ConfigureAwait(false); await using (cmd.ConfigureAwait(false)) { cmd.Parameters.AddWithValue("@queue_name", queueName); - cmd.Parameters.AddWithValue("@messages", NpgsqlDbType.Array | NpgsqlDbType.Jsonb, messages.Select(SerializeMessage).ToArray()); + cmd.Parameters.AddWithValue("@msgs", NpgsqlDbType.Array | NpgsqlDbType.Jsonb, messages.Select(SerializeMessage).ToArray()); + if (delayTimestamp.HasValue) + { + cmd.Parameters.AddWithValue("@delay", delayTimestamp.Value); + } + else + { + cmd.Parameters.AddWithValue("@delay", delaySeconds ?? 0); + } var reader = await cmd.ExecuteReaderAsync().ConfigureAwait(false); await using (reader.ConfigureAwait(false)) {