Skip to content

Commit

Permalink
- Add additional pgmq versions to build workflow
Browse files Browse the repository at this point in the history
- Add GetPgmqVersionAsync() to return the version of the PGMQ extension installed in the database
- Deprecated SendDelayAsync(), replaced by SendAsync() overload that takes delay as an arg
- Added SendBatchAsync() overload that takes a delay
- Added GetMetricsAsync() to retrieve metrics for a single queue or all queues.
- Updated Npgsql dependency from 8.0.3 to 8.0.6
- Updated various test dependencies
- Updated README with badges and nuget link
  • Loading branch information
brianpursley committed Dec 19, 2024
1 parent d86dfc7 commit bd63764
Show file tree
Hide file tree
Showing 10 changed files with 428 additions and 57 deletions.
12 changes: 12 additions & 0 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,24 @@ jobs:
- name: Test (latest pgmq version)
run: Npgmq.Test/scripts/run-tests.sh

- name: Test (pgmq 1.4.5)
run: Npgmq.Test/scripts/run-tests.sh 1.4.5

- name: Test (pgmq 1.3.3)
run: Npgmq.Test/scripts/run-tests.sh 1.3.3

- name: Test (pgmq 1.2.1)
run: Npgmq.Test/scripts/run-tests.sh 1.2.1

- name: Test (pgmq 1.1.1)
run: Npgmq.Test/scripts/run-tests.sh 1.1.1

- name: Test (pgmq 1.0.0)
run: Npgmq.Test/scripts/run-tests.sh 1.0.0

- name: Test (pgmq 0.33.1)
run: Npgmq.Test/scripts/run-tests.sh 0.33.1

- name: Test (pgmq 0.31.0)
run: Npgmq.Test/scripts/run-tests.sh 0.31.0

Expand Down
4 changes: 2 additions & 2 deletions Npgmq.Example/Npgmq.Example.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Configuration.EnvironmentVariables" Version="8.0.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.UserSecrets" Version="8.0.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.EnvironmentVariables" Version="9.0.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.UserSecrets" Version="9.0.0" />
</ItemGroup>

</Project>
6 changes: 3 additions & 3 deletions Npgmq.Example/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
var msg = await npgmq.ReadAsync<MyMessageType>("example_queue");
if (msg != null)
{
Console.WriteLine($"Read message with id {msg.MsgId}: Foo = {msg.Message?.Foo}, Bar = {msg.Message?.Bar}");
Console.WriteLine($"Read message with id {msg.MsgId} (EnqueuedAt = {msg.EnqueuedAt}, Vt = {msg.Vt}): Foo = {msg.Message?.Foo}, Bar = {msg.Message?.Bar}");
await npgmq.ArchiveAsync("example_queue", msg.MsgId);
}
}
Expand Down Expand Up @@ -59,13 +59,13 @@
var msg = await npgmq.ReadAsync<MyMessageType>("example_queue");
if (msg != null)
{
Console.WriteLine($"Read message with id {msg.MsgId}: Foo = {msg.Message?.Foo}, Bar = {msg.Message?.Bar}");
Console.WriteLine($"Read message with id {msg.MsgId} (EnqueuedAt = {msg.EnqueuedAt}, Vt = {msg.Vt}): Foo = {msg.Message?.Foo}, Bar = {msg.Message?.Bar}");
await npgmq.ArchiveAsync("example_queue", msg.MsgId);
}
msg = await npgmq.ReadAsync<MyMessageType>("example_queue");
if (msg != null)
{
Console.WriteLine($"Read message with id {msg.MsgId}: Foo = {msg.Message?.Foo}, Bar = {msg.Message?.Bar}");
Console.WriteLine($"Read message with id {msg.MsgId} (EnqueuedAt = {msg.EnqueuedAt}, Vt = {msg.Vt}): Foo = {msg.Message?.Foo}, Bar = {msg.Message?.Bar}");
await npgmq.ArchiveAsync("example_queue", msg.MsgId);
}
}
Expand Down
15 changes: 8 additions & 7 deletions Npgmq.Test/Npgmq.Test.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,17 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Dapper" Version="2.1.21" />
<PackageReference Include="DeepEqual" Version="4.2.1" />
<PackageReference Include="Microsoft.Extensions.Configuration.EnvironmentVariables" Version="8.0.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.UserSecrets" Version="8.0.0" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.8.0" />
<PackageReference Include="xunit" Version="2.5.3" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.5.3">
<PackageReference Include="Dapper" Version="2.1.35" />
<PackageReference Include="DeepEqual" Version="5.1.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.EnvironmentVariables" Version="9.0.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.UserSecrets" Version="9.0.0" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.12.0" />
<PackageReference Include="xunit" Version="2.9.2" />
<PackageReference Include="xunit.runner.visualstudio" Version="3.0.0">
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
<PrivateAssets>all</PrivateAssets>
</PackageReference>
<PackageReference Include="Xunit.SkippableFact" Version="1.5.23" />
</ItemGroup>

<ItemGroup>
Expand Down
219 changes: 203 additions & 16 deletions Npgmq.Test/NpgmqClientTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ public sealed class NpgmqClientTest : IDisposable

private class TestMessage
{
public int? Foo { get; set; }
public string? Bar { get; set; }
public DateTimeOffset? Baz { get; set; }
public int? Foo { get; init; }
public string? Bar { get; init; }
public DateTimeOffset? Baz { get; init; }
}

public NpgmqClientTest()
Expand Down Expand Up @@ -48,6 +48,28 @@ private async Task ResetTestQueueAsync()
await _sut.CreateQueueAsync(TestQueueName);
}

private async Task<bool> IsMinPgmqVersion(string minVersion)
{
var version = await _connection.ExecuteScalarAsync<string>("select extversion from pg_extension where extname = 'pgmq';");
return version is not null && new Version(version) >= new Version(minVersion);
}

[Fact]
public async Task NpgmqClient_should_work_when_created_using_a_connection_string()
{
// Arrange
await ResetTestQueueAsync();
var sut = new NpgmqClient(_connectionString); // Don't use _sut here, as we want to test a new instance

// Act
await sut.SendAsync(TestQueueName, new TestMessage { Foo = 123 });
var msg = await sut.ReadAsync<TestMessage>(TestQueueName);

// Assert
Assert.NotNull(msg);
msg.Message.ShouldDeepEqual(new TestMessage { Foo = 123 });
}

[Fact]
public async Task ArchiveAsync_should_archive_a_single_message()
{
Expand Down Expand Up @@ -213,21 +235,65 @@ public async Task DropQueueAsync_should_drop_queue()
[Fact]
public async Task InitAsync_should_initialize_pgmq_extension()
{
// Arrange
await _connection.ExecuteAsync("DROP EXTENSION IF EXISTS pgmq CASCADE;");
Assert.Equal(0, await _connection.ExecuteScalarAsync<int>("SELECT count(*) FROM pg_extension WHERE extname = 'pgmq';"));
try
{
// Arrange
await _connection.ExecuteAsync("DROP EXTENSION IF EXISTS pgmq CASCADE;");
Assert.Equal(0, await _connection.ExecuteScalarAsync<int>("SELECT count(*) FROM pg_extension WHERE extname = 'pgmq';"));

// Act
await _sut.InitAsync();

// Assert
Assert.Equal(1, await _connection.ExecuteScalarAsync<int>("SELECT count(*) FROM pg_extension WHERE extname = 'pgmq';"));
// Act
await _sut.InitAsync();

// Assert
Assert.Equal(1, await _connection.ExecuteScalarAsync<int>("SELECT count(*) FROM pg_extension WHERE extname = 'pgmq';"));

// Act (Calling it again should not throw an exception)
await _sut.InitAsync();

// Assert
Assert.Equal(1, await _connection.ExecuteScalarAsync<int>("SELECT count(*) FROM pg_extension WHERE extname = 'pgmq';"));
}
finally
{
// Cleanup
await _sut.InitAsync();
}
}

[Fact]
public async Task GetPgmqVersionAsync_should_return_pgmq_version()
{
// Arrange
var expectedVersionString = await _connection.ExecuteScalarAsync<string>("SELECT extversion FROM pg_extension WHERE extname = 'pgmq';");
var expectedVersion = new Version(expectedVersionString!);

// Act (Calling it again should not throw an exception)
await _sut.InitAsync();
// Act
var version = await _sut.GetPgmqVersionAsync();

// Assert
Assert.Equal(1, await _connection.ExecuteScalarAsync<int>("SELECT count(*) FROM pg_extension WHERE extname = 'pgmq';"));
Assert.Equal(expectedVersion, version);
}

[Fact]
public async Task GetPgmqVersionAsync_should_return_null_if_pgmq_is_not_installed()
{
try
{
// Arrange
await _connection.ExecuteAsync("DROP EXTENSION IF EXISTS pgmq CASCADE;");
Assert.Equal(0, await _connection.ExecuteScalarAsync<int>("SELECT count(*) FROM pg_extension WHERE extname = 'pgmq';"));

// Act
var version = await _sut.GetPgmqVersionAsync();

// Assert
Assert.Null(version);
}
finally
{
// Cleanup
await _sut.InitAsync();
}
}

[Fact]
Expand Down Expand Up @@ -269,6 +335,7 @@ public async Task PollAsync_should_wait_for_message_and_return_it()
// Assert
Assert.NotNull(msg);
Assert.True(msg.EnqueuedAt < DateTimeOffset.UtcNow);
Assert.Equal(TimeSpan.Zero, msg.EnqueuedAt.Offset);
Assert.Equal(1, msg.ReadCt);
msg.Message.ShouldDeepEqual(new TestMessage
{
Expand Down Expand Up @@ -379,6 +446,7 @@ public async Task PopAsync_should_read_and_delete_message()
Assert.NotNull(msg);
Assert.Equal(msgId, msg.MsgId);
Assert.True(msg.EnqueuedAt < DateTimeOffset.UtcNow);
Assert.Equal(TimeSpan.Zero, msg.EnqueuedAt.Offset);
Assert.Equal(0, msg.ReadCt);
msg.Message.ShouldDeepEqual(new TestMessage
{
Expand Down Expand Up @@ -469,7 +537,9 @@ public async Task ReadAsync_should_read_message()
Assert.NotNull(msg);
Assert.Equal(msgId, msg.MsgId);
Assert.True(msg.EnqueuedAt < DateTimeOffset.UtcNow);
Assert.Equal(TimeSpan.Zero, msg.EnqueuedAt.Offset);
Assert.True(msg.Vt > DateTimeOffset.UtcNow);
Assert.Equal(TimeSpan.Zero, msg.Vt.Offset);
Assert.Equal(1, msg.ReadCt);
msg.Message.ShouldDeepEqual(new TestMessage
{
Expand Down Expand Up @@ -499,7 +569,9 @@ public async Task ReadAsync_should_read_string_message()
Assert.NotNull(msg);
Assert.Equal(msgId, msg.MsgId);
Assert.True(msg.EnqueuedAt < DateTimeOffset.UtcNow);
Assert.Equal(TimeSpan.Zero, msg.EnqueuedAt.Offset);
Assert.True(msg.Vt > DateTimeOffset.UtcNow);
Assert.Equal(TimeSpan.Zero, msg.Vt.Offset);
Assert.Equal(1, msg.ReadCt);
msg.Message.ShouldDeepEqual("{\"Bar\": \"Test\", \"Baz\": \"2023-09-01T01:23:45-04:00\", \"Foo\": 123}");
}
Expand Down Expand Up @@ -666,13 +738,13 @@ public async Task SendAsync_should_add_string_message()
}

[Fact]
public async Task SendDelayAsync_should_add_message_with_future_vt()
public async Task SendAsync_with_delay_should_add_message_with_future_vt()
{
// Arrange
await ResetTestQueueAsync();

// Act
var msgId = await _sut.SendDelayAsync(TestQueueName, new TestMessage
var msgId = await _sut.SendAsync(TestQueueName, new TestMessage
{
Foo = 123,
Bar = "Test",
Expand Down Expand Up @@ -704,6 +776,26 @@ public async Task SendBatchAsync_should_add_multiple_messages()
Assert.Equal(msgIds.OrderBy(x => x), (await _connection.QueryAsync<long>($"SELECT msg_id FROM pgmq.q_{TestQueueName} ORDER BY msg_id;")).ToList());
}

[Fact]
public async Task SendBatchAsync_with_delay_should_add_multiple_messages_with_future_vt()
{
// Arrange
await ResetTestQueueAsync();

// Act
var msgIds = await _sut.SendBatchAsync(TestQueueName, new List<TestMessage>
{
new() { Foo = 1 },
new() { Foo = 2 },
new() { Foo = 3 }
}, 100);

// Assert
Assert.Equal(3, await _connection.ExecuteScalarAsync<long>($"SELECT count(*) FROM pgmq.q_{TestQueueName};"));
Assert.Equal(3, await _connection.ExecuteScalarAsync<long>($"SELECT count(*) FROM pgmq.q_{TestQueueName} WHERE vt > CURRENT_TIMESTAMP;"));
Assert.Equal(msgIds.OrderBy(x => x), (await _connection.QueryAsync<long>($"SELECT msg_id FROM pgmq.q_{TestQueueName} ORDER BY msg_id;")).ToList());
}

[Fact]
public async Task SetVtAsync_should_change_vt_for_a_message()
{
Expand All @@ -723,4 +815,99 @@ public async Task SetVtAsync_should_change_vt_for_a_message()
Assert.NotNull(message2);
Assert.Equal(msgId, message2.MsgId);
}

[SkippableFact]
public async Task GetMetricsAsync_should_return_metrics_for_a_single_queue()
{
Skip.IfNot(await IsMinPgmqVersion("0.33.1"), "PGMQ versions before 0.33.1 have a bug in the total messages calculation.");

// Arrange
await ResetTestQueueAsync();

var metrics1 = await _sut.GetMetricsAsync(TestQueueName);
await _sut.SendAsync(TestQueueName, new TestMessage { Foo = 1 });
await _sut.SendAsync(TestQueueName, new TestMessage { Foo = 2 });
var msgId3 = await _sut.SendAsync(TestQueueName, new TestMessage { Foo = 3 });
var msgId4 = await _sut.SendAsync(TestQueueName, new TestMessage { Foo = 4 });
await _sut.SendAsync(TestQueueName, new TestMessage { Foo = 5 });
await _sut.DeleteAsync(TestQueueName, msgId3);
await _sut.ArchiveAsync(TestQueueName, msgId4);

// Act
var metrics2 = await _sut.GetMetricsAsync(TestQueueName);
await _sut.ReadAsync<string>(TestQueueName);
var metrics3 = await _sut.GetMetricsAsync(TestQueueName);
await _sut.PurgeQueueAsync(TestQueueName);
var metrics4 = await _sut.GetMetricsAsync(TestQueueName);

// Assert
Assert.Equal(TestQueueName, metrics1.QueueName);
Assert.Equal(0, metrics1.QueueLength);
Assert.Null(metrics1.NewestMessageAge);
Assert.Null(metrics1.OldestMessageAge);
Assert.Equal(0, metrics1.TotalMessages);
Assert.True(metrics1.ScrapeTime < DateTimeOffset.UtcNow);
Assert.Equal(TimeSpan.Zero, metrics1.ScrapeTime.Offset);

Assert.Equal(TestQueueName, metrics2.QueueName);
Assert.Equal(3, metrics2.QueueLength);
Assert.True(metrics2.NewestMessageAge >= 0);
Assert.True(metrics2.OldestMessageAge >= 0);
Assert.Equal(5, metrics2.TotalMessages);
Assert.True(metrics2.ScrapeTime < DateTimeOffset.UtcNow);
Assert.Equal(TimeSpan.Zero, metrics1.ScrapeTime.Offset);

Assert.Equal(TestQueueName, metrics3.QueueName);
Assert.Equal(3, metrics3.QueueLength);
Assert.True(metrics3.NewestMessageAge >= 0);
Assert.True(metrics3.OldestMessageAge >= 0);
Assert.Equal(5, metrics3.TotalMessages);
Assert.True(metrics3.ScrapeTime < DateTimeOffset.UtcNow);
Assert.Equal(TimeSpan.Zero, metrics1.ScrapeTime.Offset);

Assert.Equal(TestQueueName, metrics4.QueueName);
Assert.Equal(0, metrics4.QueueLength);
Assert.Null(metrics1.NewestMessageAge);
Assert.Null(metrics1.OldestMessageAge);
Assert.Equal(5, metrics4.TotalMessages);
Assert.True(metrics4.ScrapeTime < DateTimeOffset.UtcNow);
Assert.Equal(TimeSpan.Zero, metrics1.ScrapeTime.Offset);
}

[Fact]
public async Task GetMetricsAsync_should_return_metrics_for_all_queues()
{
// Create some queues just for testing this function.
var testMetricsQueueName1 = TestQueueName + "_m1";
var testMetricsQueueName2 = TestQueueName + "_m2";
var testMetricsQueueName3 = TestQueueName + "_m3";
try
{
// Arrange
await _sut.CreateQueueAsync(testMetricsQueueName1);
await _sut.CreateQueueAsync(testMetricsQueueName2);
await _sut.CreateQueueAsync(testMetricsQueueName3);

await _sut.SendAsync(testMetricsQueueName1, new TestMessage { Foo = 1 });
await _sut.SendAsync(testMetricsQueueName1, new TestMessage { Foo = 2 });
await _sut.SendAsync(testMetricsQueueName1, new TestMessage { Foo = 3 });
await _sut.SendAsync(testMetricsQueueName2, new TestMessage { Foo = 4 });
await _sut.SendAsync(testMetricsQueueName2, new TestMessage { Foo = 5 });

// Act
var allMetrics = await _sut.GetMetricsAsync();

// Assert
Assert.Equal(3, allMetrics.Single(x => x.QueueName == testMetricsQueueName1).QueueLength);
Assert.Equal(2, allMetrics.Single(x => x.QueueName == testMetricsQueueName2).QueueLength);
Assert.Equal(0, allMetrics.Single(x => x.QueueName == testMetricsQueueName3).QueueLength);
}
finally
{
// Cleanup
try { await _sut.DropQueueAsync(testMetricsQueueName1); } catch { /* ignored */ }
try { await _sut.DropQueueAsync(testMetricsQueueName2); } catch { /* ignored */ }
try { await _sut.DropQueueAsync(testMetricsQueueName3); } catch { /* ignored */ }
}
}
}
Loading

0 comments on commit bd63764

Please sign in to comment.