diff --git a/.editorconfig b/.editorconfig new file mode 100644 index 0000000..1d6d4c3 --- /dev/null +++ b/.editorconfig @@ -0,0 +1,363 @@ +root = true + +# All files +[*] +indent_style = space + +# Xml files +[*.xml] +indent_size = 2 + +# C# files +[*.cs] + +#### Core EditorConfig Options #### + +# Indentation and spacing +indent_size = 4 +tab_width = 4 + +# New line preferences +insert_final_newline = false + +#### .NET Coding Conventions #### +[*.{cs,vb}] + +# Organize usings +dotnet_separate_import_directive_groups = false +dotnet_sort_system_directives_first = true +file_header_template = unset + +# this. and Me. preferences +dotnet_style_qualification_for_event = false:silent +dotnet_style_qualification_for_field = false:silent +dotnet_style_qualification_for_method = false:silent +dotnet_style_qualification_for_property = false:silent + +# Language keywords vs BCL types preferences +dotnet_style_predefined_type_for_locals_parameters_members = true:silent +dotnet_style_predefined_type_for_member_access = true:silent + +# Parentheses preferences +dotnet_style_parentheses_in_arithmetic_binary_operators = always_for_clarity:silent +dotnet_style_parentheses_in_other_binary_operators = always_for_clarity:silent +dotnet_style_parentheses_in_other_operators = never_if_unnecessary:silent +dotnet_style_parentheses_in_relational_binary_operators = always_for_clarity:silent + +# Modifier preferences +dotnet_style_require_accessibility_modifiers = for_non_interface_members:silent + +# Expression-level preferences +dotnet_style_coalesce_expression = true:suggestion +dotnet_style_collection_initializer = true:suggestion +dotnet_style_explicit_tuple_names = true:suggestion +dotnet_style_null_propagation = true:suggestion +dotnet_style_object_initializer = true:suggestion +dotnet_style_operator_placement_when_wrapping = beginning_of_line +dotnet_style_prefer_auto_properties = true:suggestion +dotnet_style_prefer_compound_assignment = true:suggestion +dotnet_style_prefer_conditional_expression_over_assignment = true:suggestion +dotnet_style_prefer_conditional_expression_over_return = true:suggestion +dotnet_style_prefer_inferred_anonymous_type_member_names = true:suggestion +dotnet_style_prefer_inferred_tuple_names = true:suggestion +dotnet_style_prefer_is_null_check_over_reference_equality_method = true:suggestion +dotnet_style_prefer_simplified_boolean_expressions = true:suggestion +dotnet_style_prefer_simplified_interpolation = true:suggestion + +# Field preferences +dotnet_style_readonly_field = true:warning + +# Parameter preferences +dotnet_code_quality_unused_parameters = all:suggestion + +# Suppression preferences +dotnet_remove_unnecessary_suppression_exclusions = none + +#### C# Coding Conventions #### +[*.cs] + +# var preferences +csharp_style_var_elsewhere = false:silent +csharp_style_var_for_built_in_types = false:silent +csharp_style_var_when_type_is_apparent = false:silent + +# Expression-bodied members +csharp_style_expression_bodied_accessors = true:silent +csharp_style_expression_bodied_constructors = false:silent +csharp_style_expression_bodied_indexers = true:silent +csharp_style_expression_bodied_lambdas = true:suggestion +csharp_style_expression_bodied_local_functions = false:silent +csharp_style_expression_bodied_methods = false:silent +csharp_style_expression_bodied_operators = false:silent +csharp_style_expression_bodied_properties = true:silent + +# Pattern matching preferences +csharp_style_pattern_matching_over_as_with_null_check = true:suggestion +csharp_style_pattern_matching_over_is_with_cast_check = true:suggestion +csharp_style_prefer_not_pattern = true:suggestion +csharp_style_prefer_pattern_matching = true:silent +csharp_style_prefer_switch_expression = true:suggestion + +# Null-checking preferences +csharp_style_conditional_delegate_call = true:suggestion + +# Modifier preferences +csharp_prefer_static_local_function = true:warning +csharp_preferred_modifier_order = public,private,protected,internal,static,extern,new,virtual,abstract,sealed,override,readonly,unsafe,volatile,async:silent + +# Code-block preferences +csharp_prefer_braces = true:silent +csharp_prefer_simple_using_statement = true:suggestion + +# Expression-level preferences +csharp_prefer_simple_default_expression = true:suggestion +csharp_style_deconstructed_variable_declaration = true:suggestion +csharp_style_inlined_variable_declaration = true:suggestion +csharp_style_pattern_local_over_anonymous_function = true:suggestion +csharp_style_prefer_index_operator = true:suggestion +csharp_style_prefer_range_operator = true:suggestion +csharp_style_throw_expression = true:suggestion +csharp_style_unused_value_assignment_preference = discard_variable:suggestion +csharp_style_unused_value_expression_statement_preference = discard_variable:silent + +# 'using' directive preferences +csharp_using_directive_placement = outside_namespace:silent + +#### C# Formatting Rules #### + +# New line preferences +csharp_new_line_before_catch = true +csharp_new_line_before_else = true +csharp_new_line_before_finally = true +csharp_new_line_before_members_in_anonymous_types = true +csharp_new_line_before_members_in_object_initializers = true +csharp_new_line_before_open_brace = all +csharp_new_line_between_query_expression_clauses = true + +# Indentation preferences +csharp_indent_block_contents = true +csharp_indent_braces = false +csharp_indent_case_contents = true +csharp_indent_case_contents_when_block = true +csharp_indent_labels = one_less_than_current +csharp_indent_switch_labels = true + +# Space preferences +csharp_space_after_cast = false +csharp_space_after_colon_in_inheritance_clause = true +csharp_space_after_comma = true +csharp_space_after_dot = false +csharp_space_after_keywords_in_control_flow_statements = true +csharp_space_after_semicolon_in_for_statement = true +csharp_space_around_binary_operators = before_and_after +csharp_space_around_declaration_statements = false +csharp_space_before_colon_in_inheritance_clause = true +csharp_space_before_comma = false +csharp_space_before_dot = false +csharp_space_before_open_square_brackets = false +csharp_space_before_semicolon_in_for_statement = false +csharp_space_between_empty_square_brackets = false +csharp_space_between_method_call_empty_parameter_list_parentheses = false +csharp_space_between_method_call_name_and_opening_parenthesis = false +csharp_space_between_method_call_parameter_list_parentheses = false +csharp_space_between_method_declaration_empty_parameter_list_parentheses = false +csharp_space_between_method_declaration_name_and_open_parenthesis = false +csharp_space_between_method_declaration_parameter_list_parentheses = false +csharp_space_between_parentheses = false +csharp_space_between_square_brackets = false + +# Wrapping preferences +csharp_preserve_single_line_blocks = true +csharp_preserve_single_line_statements = true + +#### Naming styles #### +[*.{cs,vb}] + +# Naming rules + +dotnet_naming_rule.types_and_namespaces_should_be_pascalcase.severity = suggestion +dotnet_naming_rule.types_and_namespaces_should_be_pascalcase.symbols = types_and_namespaces +dotnet_naming_rule.types_and_namespaces_should_be_pascalcase.style = pascalcase + +dotnet_naming_rule.interfaces_should_be_ipascalcase.severity = suggestion +dotnet_naming_rule.interfaces_should_be_ipascalcase.symbols = interfaces +dotnet_naming_rule.interfaces_should_be_ipascalcase.style = ipascalcase + +dotnet_naming_rule.type_parameters_should_be_tpascalcase.severity = suggestion +dotnet_naming_rule.type_parameters_should_be_tpascalcase.symbols = type_parameters +dotnet_naming_rule.type_parameters_should_be_tpascalcase.style = tpascalcase + +dotnet_naming_rule.methods_should_be_pascalcase.severity = suggestion +dotnet_naming_rule.methods_should_be_pascalcase.symbols = methods +dotnet_naming_rule.methods_should_be_pascalcase.style = pascalcase + +dotnet_naming_rule.properties_should_be_pascalcase.severity = suggestion +dotnet_naming_rule.properties_should_be_pascalcase.symbols = properties +dotnet_naming_rule.properties_should_be_pascalcase.style = pascalcase + +dotnet_naming_rule.events_should_be_pascalcase.severity = suggestion +dotnet_naming_rule.events_should_be_pascalcase.symbols = events +dotnet_naming_rule.events_should_be_pascalcase.style = pascalcase + +dotnet_naming_rule.local_variables_should_be_camelcase.severity = suggestion +dotnet_naming_rule.local_variables_should_be_camelcase.symbols = local_variables +dotnet_naming_rule.local_variables_should_be_camelcase.style = camelcase + +dotnet_naming_rule.local_constants_should_be_camelcase.severity = suggestion +dotnet_naming_rule.local_constants_should_be_camelcase.symbols = local_constants +dotnet_naming_rule.local_constants_should_be_camelcase.style = camelcase + +dotnet_naming_rule.parameters_should_be_camelcase.severity = suggestion +dotnet_naming_rule.parameters_should_be_camelcase.symbols = parameters +dotnet_naming_rule.parameters_should_be_camelcase.style = camelcase + +dotnet_naming_rule.public_fields_should_be_pascalcase.severity = suggestion +dotnet_naming_rule.public_fields_should_be_pascalcase.symbols = public_fields +dotnet_naming_rule.public_fields_should_be_pascalcase.style = pascalcase + +dotnet_naming_rule.private_fields_should_be__camelcase.severity = suggestion +dotnet_naming_rule.private_fields_should_be__camelcase.symbols = private_fields +dotnet_naming_rule.private_fields_should_be__camelcase.style = _camelcase + +dotnet_naming_rule.private_static_fields_should_be_s_camelcase.severity = suggestion +dotnet_naming_rule.private_static_fields_should_be_s_camelcase.symbols = private_static_fields +dotnet_naming_rule.private_static_fields_should_be_s_camelcase.style = s_camelcase + +dotnet_naming_rule.public_constant_fields_should_be_pascalcase.severity = suggestion +dotnet_naming_rule.public_constant_fields_should_be_pascalcase.symbols = public_constant_fields +dotnet_naming_rule.public_constant_fields_should_be_pascalcase.style = pascalcase + +dotnet_naming_rule.private_constant_fields_should_be_pascalcase.severity = suggestion +dotnet_naming_rule.private_constant_fields_should_be_pascalcase.symbols = private_constant_fields +dotnet_naming_rule.private_constant_fields_should_be_pascalcase.style = pascalcase + +dotnet_naming_rule.public_static_readonly_fields_should_be_pascalcase.severity = suggestion +dotnet_naming_rule.public_static_readonly_fields_should_be_pascalcase.symbols = public_static_readonly_fields +dotnet_naming_rule.public_static_readonly_fields_should_be_pascalcase.style = pascalcase + +dotnet_naming_rule.private_static_readonly_fields_should_be_pascalcase.severity = suggestion +dotnet_naming_rule.private_static_readonly_fields_should_be_pascalcase.symbols = private_static_readonly_fields +dotnet_naming_rule.private_static_readonly_fields_should_be_pascalcase.style = pascalcase + +dotnet_naming_rule.enums_should_be_pascalcase.severity = suggestion +dotnet_naming_rule.enums_should_be_pascalcase.symbols = enums +dotnet_naming_rule.enums_should_be_pascalcase.style = pascalcase + +dotnet_naming_rule.local_functions_should_be_pascalcase.severity = suggestion +dotnet_naming_rule.local_functions_should_be_pascalcase.symbols = local_functions +dotnet_naming_rule.local_functions_should_be_pascalcase.style = pascalcase + +dotnet_naming_rule.non_field_members_should_be_pascalcase.severity = suggestion +dotnet_naming_rule.non_field_members_should_be_pascalcase.symbols = non_field_members +dotnet_naming_rule.non_field_members_should_be_pascalcase.style = pascalcase + +# Symbol specifications + +dotnet_naming_symbols.interfaces.applicable_kinds = interface +dotnet_naming_symbols.interfaces.applicable_accessibilities = public, internal, private, protected, protected_internal, private_protected +dotnet_naming_symbols.interfaces.required_modifiers = + +dotnet_naming_symbols.enums.applicable_kinds = enum +dotnet_naming_symbols.enums.applicable_accessibilities = public, internal, private, protected, protected_internal, private_protected +dotnet_naming_symbols.enums.required_modifiers = + +dotnet_naming_symbols.events.applicable_kinds = event +dotnet_naming_symbols.events.applicable_accessibilities = public, internal, private, protected, protected_internal, private_protected +dotnet_naming_symbols.events.required_modifiers = + +dotnet_naming_symbols.methods.applicable_kinds = method +dotnet_naming_symbols.methods.applicable_accessibilities = public, internal, private, protected, protected_internal, private_protected +dotnet_naming_symbols.methods.required_modifiers = + +dotnet_naming_symbols.properties.applicable_kinds = property +dotnet_naming_symbols.properties.applicable_accessibilities = public, internal, private, protected, protected_internal, private_protected +dotnet_naming_symbols.properties.required_modifiers = + +dotnet_naming_symbols.public_fields.applicable_kinds = field +dotnet_naming_symbols.public_fields.applicable_accessibilities = public, internal +dotnet_naming_symbols.public_fields.required_modifiers = + +dotnet_naming_symbols.private_fields.applicable_kinds = field +dotnet_naming_symbols.private_fields.applicable_accessibilities = private, protected, protected_internal, private_protected +dotnet_naming_symbols.private_fields.required_modifiers = + +dotnet_naming_symbols.private_static_fields.applicable_kinds = field +dotnet_naming_symbols.private_static_fields.applicable_accessibilities = private, protected, protected_internal, private_protected +dotnet_naming_symbols.private_static_fields.required_modifiers = static + +dotnet_naming_symbols.types_and_namespaces.applicable_kinds = namespace, class, struct, interface, enum +dotnet_naming_symbols.types_and_namespaces.applicable_accessibilities = public, internal, private, protected, protected_internal, private_protected +dotnet_naming_symbols.types_and_namespaces.required_modifiers = + +dotnet_naming_symbols.non_field_members.applicable_kinds = property, event, method +dotnet_naming_symbols.non_field_members.applicable_accessibilities = public, internal, private, protected, protected_internal, private_protected +dotnet_naming_symbols.non_field_members.required_modifiers = + +dotnet_naming_symbols.type_parameters.applicable_kinds = namespace +dotnet_naming_symbols.type_parameters.applicable_accessibilities = * +dotnet_naming_symbols.type_parameters.required_modifiers = + +dotnet_naming_symbols.private_constant_fields.applicable_kinds = field +dotnet_naming_symbols.private_constant_fields.applicable_accessibilities = private, protected, protected_internal, private_protected +dotnet_naming_symbols.private_constant_fields.required_modifiers = const + +dotnet_naming_symbols.local_variables.applicable_kinds = local +dotnet_naming_symbols.local_variables.applicable_accessibilities = local +dotnet_naming_symbols.local_variables.required_modifiers = + +dotnet_naming_symbols.local_constants.applicable_kinds = local +dotnet_naming_symbols.local_constants.applicable_accessibilities = local +dotnet_naming_symbols.local_constants.required_modifiers = const + +dotnet_naming_symbols.parameters.applicable_kinds = parameter +dotnet_naming_symbols.parameters.applicable_accessibilities = * +dotnet_naming_symbols.parameters.required_modifiers = + +dotnet_naming_symbols.public_constant_fields.applicable_kinds = field +dotnet_naming_symbols.public_constant_fields.applicable_accessibilities = public, internal +dotnet_naming_symbols.public_constant_fields.required_modifiers = const + +dotnet_naming_symbols.public_static_readonly_fields.applicable_kinds = field +dotnet_naming_symbols.public_static_readonly_fields.applicable_accessibilities = public, internal +dotnet_naming_symbols.public_static_readonly_fields.required_modifiers = readonly, static + +dotnet_naming_symbols.private_static_readonly_fields.applicable_kinds = field +dotnet_naming_symbols.private_static_readonly_fields.applicable_accessibilities = private, protected, protected_internal, private_protected +dotnet_naming_symbols.private_static_readonly_fields.required_modifiers = readonly, static + +dotnet_naming_symbols.local_functions.applicable_kinds = local_function +dotnet_naming_symbols.local_functions.applicable_accessibilities = * +dotnet_naming_symbols.local_functions.required_modifiers = + +# Naming styles + +dotnet_naming_style.pascalcase.required_prefix = +dotnet_naming_style.pascalcase.required_suffix = +dotnet_naming_style.pascalcase.word_separator = +dotnet_naming_style.pascalcase.capitalization = pascal_case + +dotnet_naming_style.ipascalcase.required_prefix = I +dotnet_naming_style.ipascalcase.required_suffix = +dotnet_naming_style.ipascalcase.word_separator = +dotnet_naming_style.ipascalcase.capitalization = pascal_case + +dotnet_naming_style.tpascalcase.required_prefix = T +dotnet_naming_style.tpascalcase.required_suffix = +dotnet_naming_style.tpascalcase.word_separator = +dotnet_naming_style.tpascalcase.capitalization = pascal_case + +dotnet_naming_style._camelcase.required_prefix = _ +dotnet_naming_style._camelcase.required_suffix = +dotnet_naming_style._camelcase.word_separator = +dotnet_naming_style._camelcase.capitalization = camel_case + +dotnet_naming_style.camelcase.required_prefix = +dotnet_naming_style.camelcase.required_suffix = +dotnet_naming_style.camelcase.word_separator = +dotnet_naming_style.camelcase.capitalization = camel_case + +dotnet_naming_style.s_camelcase.required_prefix = s_ +dotnet_naming_style.s_camelcase.required_suffix = +dotnet_naming_style.s_camelcase.word_separator = +dotnet_naming_style.s_camelcase.capitalization = camel_case + diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 648880b..4a283f5 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -9,13 +9,13 @@ on: - '*' tags: - 'v[0-9]+.[0-9]+.[0-9]+.*' - pull_request: + pull_request: workflow_dispatch: env: VERSION: 0.0.0 ConnectionStrings__Test: ${{ secrets.TEST_CONNECTION_STRING }} - + jobs: build: runs-on: ubuntu-latest @@ -23,7 +23,7 @@ jobs: - uses: actions/checkout@v4 with: ref: ${{ github.ref }} # This checks out the commit that triggered the workflow run - + - name: Setup .NET uses: actions/setup-dotnet@v4 with: @@ -39,18 +39,33 @@ jobs: - name: Restore dependencies run: dotnet restore + - name: Verify Format + run: dotnet format --verify-no-changes --verbosity diagnostic + - name: Build run: dotnet build --no-restore --configuration Release /p:Version=${{ env.VERSION }} /p:CopyrightYear=$(date +%Y) - name: Test (latest pgmq version) run: Npgmq.Test/scripts/run-tests.sh + - name: Test (pgmq 1.4.5) + run: Npgmq.Test/scripts/run-tests.sh 1.4.5 + + - name: Test (pgmq 1.3.3) + run: Npgmq.Test/scripts/run-tests.sh 1.3.3 + + - name: Test (pgmq 1.2.1) + run: Npgmq.Test/scripts/run-tests.sh 1.2.1 + - name: Test (pgmq 1.1.1) run: Npgmq.Test/scripts/run-tests.sh 1.1.1 - name: Test (pgmq 1.0.0) run: Npgmq.Test/scripts/run-tests.sh 1.0.0 + - name: Test (pgmq 0.33.1) + run: Npgmq.Test/scripts/run-tests.sh 0.33.1 + - name: Test (pgmq 0.31.0) run: Npgmq.Test/scripts/run-tests.sh 0.31.0 @@ -73,7 +88,7 @@ jobs: with: name: build-artifact path: out - + - name: Setup .NET uses: actions/setup-dotnet@v4 with: diff --git a/Npgmq.Example/Npgmq.Example.csproj b/Npgmq.Example/Npgmq.Example.csproj index e9e7233..457e176 100644 --- a/Npgmq.Example/Npgmq.Example.csproj +++ b/Npgmq.Example/Npgmq.Example.csproj @@ -13,8 +13,8 @@ - - + + diff --git a/Npgmq.Example/Program.cs b/Npgmq.Example/Program.cs index d0166fe..49c31d4 100644 --- a/Npgmq.Example/Program.cs +++ b/Npgmq.Example/Program.cs @@ -27,7 +27,7 @@ var msg = await npgmq.ReadAsync("example_queue"); if (msg != null) { - Console.WriteLine($"Read message with id {msg.MsgId}: Foo = {msg.Message?.Foo}, Bar = {msg.Message?.Bar}"); + Console.WriteLine($"Read message with id {msg.MsgId} (EnqueuedAt = {msg.EnqueuedAt}, Vt = {msg.Vt}): Foo = {msg.Message?.Foo}, Bar = {msg.Message?.Bar}"); await npgmq.ArchiveAsync("example_queue", msg.MsgId); } } @@ -59,13 +59,13 @@ var msg = await npgmq.ReadAsync("example_queue"); if (msg != null) { - Console.WriteLine($"Read message with id {msg.MsgId}: Foo = {msg.Message?.Foo}, Bar = {msg.Message?.Bar}"); + Console.WriteLine($"Read message with id {msg.MsgId} (EnqueuedAt = {msg.EnqueuedAt}, Vt = {msg.Vt}): Foo = {msg.Message?.Foo}, Bar = {msg.Message?.Bar}"); await npgmq.ArchiveAsync("example_queue", msg.MsgId); } msg = await npgmq.ReadAsync("example_queue"); if (msg != null) { - Console.WriteLine($"Read message with id {msg.MsgId}: Foo = {msg.Message?.Foo}, Bar = {msg.Message?.Bar}"); + Console.WriteLine($"Read message with id {msg.MsgId} (EnqueuedAt = {msg.EnqueuedAt}, Vt = {msg.Vt}): Foo = {msg.Message?.Foo}, Bar = {msg.Message?.Bar}"); await npgmq.ArchiveAsync("example_queue", msg.MsgId); } } diff --git a/Npgmq.Test/Npgmq.Test.csproj b/Npgmq.Test/Npgmq.Test.csproj index 2b68481..4d6e24f 100644 --- a/Npgmq.Test/Npgmq.Test.csproj +++ b/Npgmq.Test/Npgmq.Test.csproj @@ -12,16 +12,17 @@ - - - - - - - + + + + + + + runtime; build; native; contentfiles; analyzers; buildtransitive all + diff --git a/Npgmq.Test/NpgmqClientTest.cs b/Npgmq.Test/NpgmqClientTest.cs index 953c64a..3d0e799 100644 --- a/Npgmq.Test/NpgmqClientTest.cs +++ b/Npgmq.Test/NpgmqClientTest.cs @@ -9,16 +9,16 @@ namespace Npgmq.Test; public sealed class NpgmqClientTest : IDisposable { private static readonly string TestQueueName = $"test_{Guid.NewGuid():N}"; - + private readonly string _connectionString; private readonly NpgsqlConnection _connection; private readonly NpgmqClient _sut; private class TestMessage { - public int? Foo { get; set; } - public string? Bar { get; set; } - public DateTimeOffset? Baz { get; set; } + public int? Foo { get; init; } + public string? Bar { get; init; } + public DateTimeOffset? Baz { get; init; } } public NpgmqClientTest() @@ -32,13 +32,13 @@ public NpgmqClientTest() _connection = new NpgsqlConnection(_connectionString); _sut = new NpgmqClient(_connection); } - + public void Dispose() { _connection.Close(); _connection.Dispose(); } - + private async Task ResetTestQueueAsync() { if (await _sut.QueueExistsAsync(TestQueueName)) @@ -48,12 +48,34 @@ private async Task ResetTestQueueAsync() await _sut.CreateQueueAsync(TestQueueName); } + private async Task IsMinPgmqVersion(string minVersion) + { + var version = await _connection.ExecuteScalarAsync("select extversion from pg_extension where extname = 'pgmq';"); + return version is not null && new Version(version) >= new Version(minVersion); + } + + [Fact] + public async Task NpgmqClient_should_work_when_created_using_a_connection_string() + { + // Arrange + await ResetTestQueueAsync(); + var sut = new NpgmqClient(_connectionString); // Don't use _sut here, as we want to test a new instance + + // Act + await sut.SendAsync(TestQueueName, new TestMessage { Foo = 123 }); + var msg = await sut.ReadAsync(TestQueueName); + + // Assert + Assert.NotNull(msg); + msg.Message.ShouldDeepEqual(new TestMessage { Foo = 123 }); + } + [Fact] public async Task ArchiveAsync_should_archive_a_single_message() { // Arrange await ResetTestQueueAsync(); - + // Act var msgId = await _sut.SendAsync(TestQueueName, new TestMessage { @@ -70,13 +92,13 @@ public async Task ArchiveAsync_should_archive_a_single_message() Assert.Equal(1, await _connection.ExecuteScalarAsync($"SELECT count(*) FROM pgmq.a_{TestQueueName};")); Assert.Equal(msgId, await _connection.ExecuteScalarAsync($"SELECT msg_id FROM pgmq.a_{TestQueueName} LIMIT 1;")); } - + [Fact] public async Task ArchiveAsync_should_return_false_if_message_not_found() { // Arrange await ResetTestQueueAsync(); - + // Act var result = await _sut.ArchiveAsync(TestQueueName, 1); @@ -89,7 +111,7 @@ public async Task ArchiveBatchAsync_should_archive_multiple_messages() { // Arrange await ResetTestQueueAsync(); - + // Act var msgIds = new List { @@ -106,7 +128,7 @@ public async Task ArchiveBatchAsync_should_archive_multiple_messages() Assert.Equal(3, await _connection.ExecuteScalarAsync($"SELECT count(*) FROM pgmq.a_{TestQueueName};")); Assert.Equal(msgIds.OrderBy(x => x), (await _connection.QueryAsync($"SELECT msg_id FROM pgmq.a_{TestQueueName} ORDER BY msg_id;")).ToList()); } - + [Fact] public async Task CreateQueueAsync_should_create_a_queue() { @@ -115,14 +137,14 @@ public async Task CreateQueueAsync_should_create_a_queue() { await _sut.DropQueueAsync(TestQueueName); } - + // Act await _sut.CreateQueueAsync(TestQueueName); - + // Assert Assert.Equal(1, await _connection.ExecuteScalarAsync("SELECT count(*) FROM pgmq.meta WHERE queue_name = @queueName and is_partitioned = false and is_unlogged = false;", new { queueName = TestQueueName })); } - + [Fact] public async Task CreateUnloggedQueueAsync_should_create_an_unlogged_queue() { @@ -131,10 +153,10 @@ public async Task CreateUnloggedQueueAsync_should_create_an_unlogged_queue() { await _sut.DropQueueAsync(TestQueueName); } - + // Act await _sut.CreateUnloggedQueueAsync(TestQueueName); - + // Assert Assert.Equal(1, await _connection.ExecuteScalarAsync("SELECT count(*) FROM pgmq.meta WHERE queue_name = @queueName and is_partitioned = false and is_unlogged = true;", new { queueName = TestQueueName })); } @@ -144,7 +166,7 @@ public async Task DeleteAsync_should_delete_message() { // Arrange await ResetTestQueueAsync(); - + // Act var msgId = await _sut.SendAsync(TestQueueName, new TestMessage { @@ -160,20 +182,20 @@ public async Task DeleteAsync_should_delete_message() Assert.Equal(0, await _connection.ExecuteScalarAsync($"SELECT count(*) FROM pgmq.q_{TestQueueName};")); Assert.Equal(0, await _connection.ExecuteScalarAsync($"SELECT count(*) FROM pgmq.a_{TestQueueName};")); } - + [Fact] public async Task DeleteAsync_should_return_false_if_message_not_found() { // Arrange await ResetTestQueueAsync(); - + // Act var result = await _sut.DeleteAsync(TestQueueName, 1); // Assert Assert.False(result); } - + [Fact] public async Task DeleteBatchAsync_should_delete_multiple_messages() { @@ -186,7 +208,7 @@ public async Task DeleteBatchAsync_should_delete_multiple_messages() await _sut.SendAsync(TestQueueName, new TestMessage { Foo = 3 }) }; Assert.Equal(3, await _connection.ExecuteScalarAsync($"SELECT count(*) FROM pgmq.q_{TestQueueName};")); - + // Act var results = await _sut.DeleteBatchAsync(TestQueueName, msgIds); @@ -202,7 +224,7 @@ public async Task DropQueueAsync_should_drop_queue() // Arrange await ResetTestQueueAsync(); Assert.Equal(1, await _connection.ExecuteScalarAsync("SELECT count(*) FROM pgmq.meta WHERE queue_name = @queueName;", new { queueName = TestQueueName })); - + // Act await _sut.DropQueueAsync(TestQueueName); @@ -212,22 +234,66 @@ public async Task DropQueueAsync_should_drop_queue() [Fact] public async Task InitAsync_should_initialize_pgmq_extension() + { + try + { + // Arrange + await _connection.ExecuteAsync("DROP EXTENSION IF EXISTS pgmq CASCADE;"); + Assert.Equal(0, await _connection.ExecuteScalarAsync("SELECT count(*) FROM pg_extension WHERE extname = 'pgmq';")); + + // Act + await _sut.InitAsync(); + + // Assert + Assert.Equal(1, await _connection.ExecuteScalarAsync("SELECT count(*) FROM pg_extension WHERE extname = 'pgmq';")); + + // Act (Calling it again should not throw an exception) + await _sut.InitAsync(); + + // Assert + Assert.Equal(1, await _connection.ExecuteScalarAsync("SELECT count(*) FROM pg_extension WHERE extname = 'pgmq';")); + } + finally + { + // Cleanup + await _sut.InitAsync(); + } + } + + [Fact] + public async Task GetPgmqVersionAsync_should_return_pgmq_version() { // Arrange - await _connection.ExecuteAsync("DROP EXTENSION IF EXISTS pgmq CASCADE;"); - Assert.Equal(0, await _connection.ExecuteScalarAsync("SELECT count(*) FROM pg_extension WHERE extname = 'pgmq';")); + var expectedVersionString = await _connection.ExecuteScalarAsync("SELECT extversion FROM pg_extension WHERE extname = 'pgmq';"); + var expectedVersion = new Version(expectedVersionString!); // Act - await _sut.InitAsync(); - - // Assert - Assert.Equal(1, await _connection.ExecuteScalarAsync("SELECT count(*) FROM pg_extension WHERE extname = 'pgmq';")); - - // Act (Calling it again should not throw an exception) - await _sut.InitAsync(); - + var version = await _sut.GetPgmqVersionAsync(); + // Assert - Assert.Equal(1, await _connection.ExecuteScalarAsync("SELECT count(*) FROM pg_extension WHERE extname = 'pgmq';")); + Assert.Equal(expectedVersion, version); + } + + [Fact] + public async Task GetPgmqVersionAsync_should_return_null_if_pgmq_is_not_installed() + { + try + { + // Arrange + await _connection.ExecuteAsync("DROP EXTENSION IF EXISTS pgmq CASCADE;"); + Assert.Equal(0, await _connection.ExecuteScalarAsync("SELECT count(*) FROM pg_extension WHERE extname = 'pgmq';")); + + // Act + var version = await _sut.GetPgmqVersionAsync(); + + // Assert + Assert.Null(version); + } + finally + { + // Cleanup + await _sut.InitAsync(); + } } [Fact] @@ -235,23 +301,24 @@ public async Task ListQueuesAsync_should_return_list_of_queues() { // Arrange await ResetTestQueueAsync(); - + // Act var queues = await _sut.ListQueuesAsync(); // Assert var queue = Assert.Single(queues); Assert.Equal(TestQueueName, queue.QueueName); + Assert.True(queue.CreatedAt < DateTimeOffset.UtcNow); Assert.False(queue.IsPartitioned); Assert.False(queue.IsUnlogged); } - + [Fact] public async Task PollAsync_should_wait_for_message_and_return_it() { // Arrange await ResetTestQueueAsync(); - + // Act var pollTask = _sut.PollAsync(TestQueueName); await Task.Delay(1000); @@ -263,12 +330,13 @@ public async Task PollAsync_should_wait_for_message_and_return_it() Bar = "Test", Baz = DateTimeOffset.Parse("2023-09-01T01:23:45-04:00") }); - + var msg = await pollTask; - + // Assert Assert.NotNull(msg); Assert.True(msg.EnqueuedAt < DateTimeOffset.UtcNow); + Assert.Equal(TimeSpan.Zero, msg.EnqueuedAt.Offset); Assert.Equal(1, msg.ReadCt); msg.Message.ShouldDeepEqual(new TestMessage { @@ -283,7 +351,7 @@ public async Task PollAsync_should_return_null_if_timeout_occurs_before_a_messag { // Arrange await ResetTestQueueAsync(); - + // Act var pollTask = _sut.PollAsync(TestQueueName, pollTimeoutSeconds: 1); await Task.Delay(1100); @@ -294,7 +362,7 @@ public async Task PollAsync_should_return_null_if_timeout_occurs_before_a_messag Baz = DateTimeOffset.Parse("2023-09-01T01:23:45-04:00") }); var msg = await pollTask; - + // Assert Assert.Null(msg); Assert.Equal(1, await _connection.ExecuteScalarAsync($"SELECT count(*) FROM pgmq.q_{TestQueueName};")); @@ -305,7 +373,7 @@ public async Task PollBatchAsync_should_poll_for_multiple_messages() { // Arrange await ResetTestQueueAsync(); - + // Act var pollTask = _sut.PollBatchAsync(TestQueueName, limit: 3); @@ -317,12 +385,12 @@ public async Task PollBatchAsync_should_poll_for_multiple_messages() await producer.SendAsync(TestQueueName, new TestMessage { Foo = 3 }); await producer.SendAsync(TestQueueName, new TestMessage { Foo = 4 }); await producer.SendAsync(TestQueueName, new TestMessage { Foo = 5 }); - + // Get the messages received by the poll var messages = await pollTask; - + // Assert - Assert.True(messages.Any()); + Assert.True(messages.Count > 0); Assert.True(messages.Count <= 3); // TODO: Improve this test, keeping in mind that each call to PollBatchAsync is not guaranteed to read the limit } @@ -352,19 +420,19 @@ public async Task PollBatchAsync_should_poll_for_multiple_messages_in_multiple_b var batch2 = await _sut.PollBatchAsync(TestQueueName, limit: 3, pollTimeoutSeconds: 1); // Assert - Assert.True(batch1.Any()); + Assert.True(batch1.Count > 0); Assert.True(batch1.Count <= 3); - Assert.True(batch2.Any()); + Assert.True(batch2.Count > 0); Assert.True(batch2.Count <= 3); Assert.Equal(batch1.Count + batch2.Count, batch1.Select(x => x.MsgId).Union(batch2.Select(x => x.MsgId)).Count()); } - + [Fact] public async Task PopAsync_should_read_and_delete_message() { // Arrange await ResetTestQueueAsync(); - + // Act var msgId = await _sut.SendAsync(TestQueueName, new TestMessage { @@ -379,6 +447,7 @@ public async Task PopAsync_should_read_and_delete_message() Assert.NotNull(msg); Assert.Equal(msgId, msg.MsgId); Assert.True(msg.EnqueuedAt < DateTimeOffset.UtcNow); + Assert.Equal(TimeSpan.Zero, msg.EnqueuedAt.Offset); Assert.Equal(0, msg.ReadCt); msg.Message.ShouldDeepEqual(new TestMessage { @@ -395,7 +464,7 @@ public async Task PopAsync_should_return_null_if_no_message_is_available() { // Arrange await ResetTestQueueAsync(); - + // Act var msg = await _sut.PopAsync(TestQueueName); @@ -413,7 +482,7 @@ public async Task PurgeQueueAsync_should_delete_all_messages_from_a_queue() await _sut.SendAsync(TestQueueName, new TestMessage { Foo = 2 }); await _sut.SendAsync(TestQueueName, new TestMessage { Foo = 3 }); Assert.Equal(3, await _connection.ExecuteScalarAsync($"SELECT count(*) FROM pgmq.q_{TestQueueName};")); - + // Act var purgeCount = await _sut.PurgeQueueAsync(TestQueueName); @@ -427,34 +496,34 @@ public async Task QueueExistsAsync_should_return_true_if_queue_exists() { // Arrange await ResetTestQueueAsync(); - + // Act var result = await _sut.QueueExistsAsync(TestQueueName); // Assert Assert.True(result); } - + [Fact] public async Task QueueExistsAsync_should_return_false_if_queue_does_not_exist() { // Arrange await ResetTestQueueAsync(); await _sut.DropQueueAsync(TestQueueName); - + // Act var result = await _sut.QueueExistsAsync(TestQueueName); // Assert Assert.False(result); } - + [Fact] public async Task ReadAsync_should_read_message() { // Arrange await ResetTestQueueAsync(); - + // Act var msgId = await _sut.SendAsync(TestQueueName, new TestMessage { @@ -469,7 +538,9 @@ public async Task ReadAsync_should_read_message() Assert.NotNull(msg); Assert.Equal(msgId, msg.MsgId); Assert.True(msg.EnqueuedAt < DateTimeOffset.UtcNow); + Assert.Equal(TimeSpan.Zero, msg.EnqueuedAt.Offset); Assert.True(msg.Vt > DateTimeOffset.UtcNow); + Assert.Equal(TimeSpan.Zero, msg.Vt.Offset); Assert.Equal(1, msg.ReadCt); msg.Message.ShouldDeepEqual(new TestMessage { @@ -484,7 +555,7 @@ public async Task ReadAsync_should_read_string_message() { // Arrange await ResetTestQueueAsync(); - + // Act var msgId = await _sut.SendAsync(TestQueueName, new { @@ -499,7 +570,9 @@ public async Task ReadAsync_should_read_string_message() Assert.NotNull(msg); Assert.Equal(msgId, msg.MsgId); Assert.True(msg.EnqueuedAt < DateTimeOffset.UtcNow); + Assert.Equal(TimeSpan.Zero, msg.EnqueuedAt.Offset); Assert.True(msg.Vt > DateTimeOffset.UtcNow); + Assert.Equal(TimeSpan.Zero, msg.Vt.Offset); Assert.Equal(1, msg.ReadCt); msg.Message.ShouldDeepEqual("{\"Bar\": \"Test\", \"Baz\": \"2023-09-01T01:23:45-04:00\", \"Foo\": 123}"); } @@ -509,7 +582,7 @@ public async Task ReadAsync_should_return_null_if_no_message_is_available() { // Arrange await ResetTestQueueAsync(); - + // Act var msg = await _sut.ReadAsync(TestQueueName); @@ -523,7 +596,7 @@ public async Task ReadBatchAsync_should_return_list_of_messages() { // Arrange await ResetTestQueueAsync(); - + // Act var msgIds = new List { @@ -570,7 +643,7 @@ public async Task SendAsync_should_add_message() { // Arrange await ResetTestQueueAsync(); - + // Act var msgId = await _sut.SendAsync(TestQueueName, new TestMessage { @@ -653,11 +726,11 @@ public async Task SendAsync_should_add_string_message() { // Arrange await ResetTestQueueAsync(); - + // Act var message = "{\"Foo\": 123, \"Bar\": \"Test\", \"Baz\": \"2023-09-01T01:23:45-04:00\"}"; - var msgId = await _sut.SendAsync(TestQueueName, message); - + var msgId = await _sut.SendAsync(TestQueueName, message); + // Assert Assert.Equal(1, await _connection.ExecuteScalarAsync($"SELECT count(*) FROM pgmq.q_{TestQueueName};")); Assert.Equal(msgId, await _connection.ExecuteScalarAsync($"SELECT msg_id FROM pgmq.q_{TestQueueName} LIMIT 1;")); @@ -666,13 +739,13 @@ public async Task SendAsync_should_add_string_message() } [Fact] - public async Task SendDelayAsync_should_add_message_with_future_vt() + public async Task SendAsync_with_delay_should_add_message_with_future_vt() { // Arrange await ResetTestQueueAsync(); - + // Act - var msgId = await _sut.SendDelayAsync(TestQueueName, new TestMessage + var msgId = await _sut.SendAsync(TestQueueName, new TestMessage { Foo = 123, Bar = "Test", @@ -690,7 +763,7 @@ public async Task SendBatchAsync_should_add_multiple_messages() { // Arrange await ResetTestQueueAsync(); - + // Act var msgIds = await _sut.SendBatchAsync(TestQueueName, new List { @@ -704,6 +777,26 @@ public async Task SendBatchAsync_should_add_multiple_messages() Assert.Equal(msgIds.OrderBy(x => x), (await _connection.QueryAsync($"SELECT msg_id FROM pgmq.q_{TestQueueName} ORDER BY msg_id;")).ToList()); } + [Fact] + public async Task SendBatchAsync_with_delay_should_add_multiple_messages_with_future_vt() + { + // Arrange + await ResetTestQueueAsync(); + + // Act + var msgIds = await _sut.SendBatchAsync(TestQueueName, new List + { + new() { Foo = 1 }, + new() { Foo = 2 }, + new() { Foo = 3 } + }, 100); + + // Assert + Assert.Equal(3, await _connection.ExecuteScalarAsync($"SELECT count(*) FROM pgmq.q_{TestQueueName};")); + Assert.Equal(3, await _connection.ExecuteScalarAsync($"SELECT count(*) FROM pgmq.q_{TestQueueName} WHERE vt > CURRENT_TIMESTAMP;")); + Assert.Equal(msgIds.OrderBy(x => x), (await _connection.QueryAsync($"SELECT msg_id FROM pgmq.q_{TestQueueName} ORDER BY msg_id;")).ToList()); + } + [Fact] public async Task SetVtAsync_should_change_vt_for_a_message() { @@ -714,13 +807,108 @@ public async Task SetVtAsync_should_change_vt_for_a_message() Assert.NotNull(message1); Assert.Equal(msgId, message1.MsgId); Assert.Null(await _sut.ReadAsync(TestQueueName)); - + // Act await _sut.SetVtAsync(TestQueueName, msgId, -60); var message2 = await _sut.ReadAsync(TestQueueName); - + // Assert Assert.NotNull(message2); Assert.Equal(msgId, message2.MsgId); } -} + + [SkippableFact] + public async Task GetMetricsAsync_should_return_metrics_for_a_single_queue() + { + Skip.IfNot(await IsMinPgmqVersion("0.33.1"), "PGMQ versions before 0.33.1 have a bug in the total messages calculation."); + + // Arrange + await ResetTestQueueAsync(); + + var metrics1 = await _sut.GetMetricsAsync(TestQueueName); + await _sut.SendAsync(TestQueueName, new TestMessage { Foo = 1 }); + await _sut.SendAsync(TestQueueName, new TestMessage { Foo = 2 }); + var msgId3 = await _sut.SendAsync(TestQueueName, new TestMessage { Foo = 3 }); + var msgId4 = await _sut.SendAsync(TestQueueName, new TestMessage { Foo = 4 }); + await _sut.SendAsync(TestQueueName, new TestMessage { Foo = 5 }); + await _sut.DeleteAsync(TestQueueName, msgId3); + await _sut.ArchiveAsync(TestQueueName, msgId4); + + // Act + var metrics2 = await _sut.GetMetricsAsync(TestQueueName); + await _sut.ReadAsync(TestQueueName); + var metrics3 = await _sut.GetMetricsAsync(TestQueueName); + await _sut.PurgeQueueAsync(TestQueueName); + var metrics4 = await _sut.GetMetricsAsync(TestQueueName); + + // Assert + Assert.Equal(TestQueueName, metrics1.QueueName); + Assert.Equal(0, metrics1.QueueLength); + Assert.Null(metrics1.NewestMessageAge); + Assert.Null(metrics1.OldestMessageAge); + Assert.Equal(0, metrics1.TotalMessages); + Assert.True(metrics1.ScrapeTime < DateTimeOffset.UtcNow); + Assert.Equal(TimeSpan.Zero, metrics1.ScrapeTime.Offset); + + Assert.Equal(TestQueueName, metrics2.QueueName); + Assert.Equal(3, metrics2.QueueLength); + Assert.True(metrics2.NewestMessageAge >= 0); + Assert.True(metrics2.OldestMessageAge >= 0); + Assert.Equal(5, metrics2.TotalMessages); + Assert.True(metrics2.ScrapeTime < DateTimeOffset.UtcNow); + Assert.Equal(TimeSpan.Zero, metrics1.ScrapeTime.Offset); + + Assert.Equal(TestQueueName, metrics3.QueueName); + Assert.Equal(3, metrics3.QueueLength); + Assert.True(metrics3.NewestMessageAge >= 0); + Assert.True(metrics3.OldestMessageAge >= 0); + Assert.Equal(5, metrics3.TotalMessages); + Assert.True(metrics3.ScrapeTime < DateTimeOffset.UtcNow); + Assert.Equal(TimeSpan.Zero, metrics1.ScrapeTime.Offset); + + Assert.Equal(TestQueueName, metrics4.QueueName); + Assert.Equal(0, metrics4.QueueLength); + Assert.Null(metrics1.NewestMessageAge); + Assert.Null(metrics1.OldestMessageAge); + Assert.Equal(5, metrics4.TotalMessages); + Assert.True(metrics4.ScrapeTime < DateTimeOffset.UtcNow); + Assert.Equal(TimeSpan.Zero, metrics1.ScrapeTime.Offset); + } + + [Fact] + public async Task GetMetricsAsync_should_return_metrics_for_all_queues() + { + // Create some queues just for testing this function. + var testMetricsQueueName1 = TestQueueName + "_m1"; + var testMetricsQueueName2 = TestQueueName + "_m2"; + var testMetricsQueueName3 = TestQueueName + "_m3"; + try + { + // Arrange + await _sut.CreateQueueAsync(testMetricsQueueName1); + await _sut.CreateQueueAsync(testMetricsQueueName2); + await _sut.CreateQueueAsync(testMetricsQueueName3); + + await _sut.SendAsync(testMetricsQueueName1, new TestMessage { Foo = 1 }); + await _sut.SendAsync(testMetricsQueueName1, new TestMessage { Foo = 2 }); + await _sut.SendAsync(testMetricsQueueName1, new TestMessage { Foo = 3 }); + await _sut.SendAsync(testMetricsQueueName2, new TestMessage { Foo = 4 }); + await _sut.SendAsync(testMetricsQueueName2, new TestMessage { Foo = 5 }); + + // Act + var allMetrics = await _sut.GetMetricsAsync(); + + // Assert + Assert.Equal(3, allMetrics.Single(x => x.QueueName == testMetricsQueueName1).QueueLength); + Assert.Equal(2, allMetrics.Single(x => x.QueueName == testMetricsQueueName2).QueueLength); + Assert.Equal(0, allMetrics.Single(x => x.QueueName == testMetricsQueueName3).QueueLength); + } + finally + { + // Cleanup + try { await _sut.DropQueueAsync(testMetricsQueueName1); } catch { /* ignored */ } + try { await _sut.DropQueueAsync(testMetricsQueueName2); } catch { /* ignored */ } + try { await _sut.DropQueueAsync(testMetricsQueueName3); } catch { /* ignored */ } + } + } +} \ No newline at end of file diff --git a/Npgmq.sln b/Npgmq.sln index 5f0d93a..6c9b945 100644 --- a/Npgmq.sln +++ b/Npgmq.sln @@ -9,6 +9,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution LICENSE = LICENSE README.md = README.md global.json = global.json + .editorconfig = .editorconfig + .gitignore = .gitignore EndProjectSection EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "workflows", "workflows", "{8C37002D-05C6-4B1F-B4FC-C2F45C5E5328}" diff --git a/Npgmq/INpgmqClient.cs b/Npgmq/INpgmqClient.cs index 9229949..4421bb8 100644 --- a/Npgmq/INpgmqClient.cs +++ b/Npgmq/INpgmqClient.cs @@ -32,7 +32,7 @@ public interface INpgmqClient /// /// The queue name. Task CreateUnloggedQueueAsync(string queueName); - + /// /// Delete a message. /// @@ -59,13 +59,22 @@ public interface INpgmqClient /// Create pgmq extension, if it does not exist. /// Task InitAsync(); - + + /// + /// Gets the version of the pgmq extension installed in the database. + /// + /// + /// This method will return null if the pgmq extension is not installed. + /// + /// A object representing the version of the pgmq extension. + Task GetPgmqVersionAsync(); + /// /// List queues. /// /// The list of queues. Task> ListQueuesAsync(); - + /// /// Poll a queue for a message. /// @@ -76,7 +85,7 @@ public interface INpgmqClient /// The message type. /// The message read, or null if no message was read. Task?> PollAsync(string queue, int vt = NpgmqClient.DefaultVt, int pollTimeoutSeconds = NpgmqClient.DefaultPollTimeoutSeconds, int pollIntervalMilliseconds = NpgmqClient.DefaultPollIntervalMilliseconds) where T : class; - + /// /// Poll a queue for multiple messages. /// @@ -96,7 +105,7 @@ public interface INpgmqClient /// The message type. /// The message read, or null if no message was read. Task?> PopAsync(string queueName) where T : class; - + /// /// Purge a queue. /// @@ -110,7 +119,7 @@ public interface INpgmqClient /// The queue name. /// True if the queue exists, false otherwise. Task QueueExistsAsync(string queueName); - + /// /// Read a message from a queue. /// @@ -129,7 +138,7 @@ public interface INpgmqClient /// The message type. /// The messages read. Task>> ReadBatchAsync(string queue, int vt = NpgmqClient.DefaultVt, int limit = NpgmqClient.DefaultReadBatchLimit) where T : class; - + /// /// Send a message to a queue. /// @@ -138,15 +147,26 @@ public interface INpgmqClient /// The message type. /// The ID of the sent message. Task SendAsync(string queueName, T message) where T : class; - + + /// + /// Send a message to a queue, visible after a specified number of seconds. + /// + /// The queue name. + /// The message to send. + /// Number of seconds until the message becomes visible. + /// The message type. + /// The ID of the sent message. + Task SendAsync(string queueName, T message, int delay) where T : class; + /// /// Send a message to a queue with a delayed vt. /// /// The queue name. /// The message to send. - /// The delay, in seconds. + /// Number of seconds until the message becomes visible. /// The message type. /// The ID of the sent message. + [Obsolete("Use SendAsync instead.")] Task SendDelayAsync(string queueName, T message, int delay) where T : class; /// @@ -158,6 +178,16 @@ public interface INpgmqClient /// The IDs of the sent messages. Task> SendBatchAsync(string queueName, IEnumerable messages) where T : class; + /// + /// Send multiple messages to a queue, visible after a specified number of seconds. + /// + /// The queue name. + /// The messages to send. + /// Number of seconds until the message becomes visible. + /// The message type. + /// The IDs of the sent messages. + Task> SendBatchAsync(string queueName, IEnumerable messages, int delay) where T : class; + /// /// Adjust the Vt of an existing message. /// @@ -165,4 +195,17 @@ public interface INpgmqClient /// The message ID. /// The number of seconds to be added to the current Vt. Task SetVtAsync(string queueName, long msgId, int vtOffset); + + /// + /// Get metrics for all queues. + /// + /// A list of + Task> GetMetricsAsync(); + + /// + /// Get metrics for a specific queue. + /// + /// The queue name. + /// An + Task GetMetricsAsync(string queueName); } \ No newline at end of file diff --git a/Npgmq/Npgmq.csproj b/Npgmq/Npgmq.csproj index 188c60a..70458f2 100644 --- a/Npgmq/Npgmq.csproj +++ b/Npgmq/Npgmq.csproj @@ -14,11 +14,11 @@ Npgmq provides a .NET client for Postgres Message Queue (PGMQ) https://github.com/brianpursley/Npgmq PGMQ - netstandard2.1;net6.0;net7.0;net8.0 + net6.0;net7.0;net8.0 - + diff --git a/Npgmq/NpgmqClient.cs b/Npgmq/NpgmqClient.cs index 673d276..85a1d91 100644 --- a/Npgmq/NpgmqClient.cs +++ b/Npgmq/NpgmqClient.cs @@ -1,4 +1,5 @@ -using System.Data.Common; +using System.Data; +using System.Data.Common; using System.Text.Json; using Npgsql; using NpgsqlTypes; @@ -16,7 +17,7 @@ public class NpgmqClient : INpgmqClient private readonly NpgmqCommandFactory _commandFactory; /// - /// Create a new . + /// Create a new using a connection string. /// /// The connection string. public NpgmqClient(string connectionString) @@ -25,7 +26,7 @@ public NpgmqClient(string connectionString) } /// - /// Create a new . + /// Create a new using an existing . /// /// The connection to use. public NpgmqClient(NpgsqlConnection connection) @@ -43,7 +44,7 @@ public async Task ArchiveAsync(string queueName, long msgId) cmd.Parameters.AddWithValue("@queue_name", queueName); cmd.Parameters.AddWithValue("@msg_id", msgId); var result = await cmd.ExecuteScalarAsync().ConfigureAwait(false); - return Convert.ToBoolean(result); + return result is not null && Convert.ToBoolean(result); } } catch (Exception ex) @@ -60,7 +61,7 @@ public async Task> ArchiveBatchAsync(string queueName, IEnumerable DeleteAsync(string queueName, long msgId) cmd.Parameters.AddWithValue("@queue_name", queueName); cmd.Parameters.AddWithValue("@msg_id", msgId); var result = await cmd.ExecuteScalarAsync().ConfigureAwait(false); - return Convert.ToBoolean(result!); + return result is not null && Convert.ToBoolean(result); } } catch (Exception ex) @@ -192,6 +193,28 @@ public async Task InitAsync() } } + public async Task GetPgmqVersionAsync() + { + try + { + var cmd = await _commandFactory.CreateAsync("SELECT extversion FROM pg_extension WHERE extname = 'pgmq';").ConfigureAwait(false); + await using (cmd.ConfigureAwait(false)) + { + var result = await cmd.ExecuteScalarAsync().ConfigureAwait(false); + return result switch + { + string versionString => new Version(versionString), + null => null, + _ => throw new NpgmqException($"Unexpected result type: {result.GetType().Name}") + }; + } + } + catch (Exception ex) + { + throw new NpgmqException("Failed to get PGMQ version.", ex); + } + } + public async Task> ListQueuesAsync() { try @@ -355,27 +378,18 @@ public async Task>> ReadBatchAsync(string queueName, int } } - public async Task SendAsync(string queueName, T message) where T : class - { - try - { - return await SendDelayAsync(queueName, message, 0).ConfigureAwait(false); - } - catch (Exception ex) - { - throw new NpgmqException($"Failed to send message to queue {queueName}.", ex); - } - } + public Task SendAsync(string queueName, T message) where T : class => + SendAsync(queueName, message, 0); - public async Task SendDelayAsync(string queueName, T message, int delay) where T : class + public async Task SendAsync(string queueName, T message, int delay) where T : class { try { - var cmd = await _commandFactory.CreateAsync("SELECT * FROM pgmq.send(@queue_name, @message, @delay);").ConfigureAwait(false); + var cmd = await _commandFactory.CreateAsync("SELECT * FROM pgmq.send(@queue_name, @msg, @delay);").ConfigureAwait(false); await using (cmd.ConfigureAwait(false)) { cmd.Parameters.AddWithValue("@queue_name", queueName); - cmd.Parameters.AddWithValue("@message", NpgsqlDbType.Jsonb, SerializeMessage(message)); + cmd.Parameters.AddWithValue("@msg", NpgsqlDbType.Jsonb, SerializeMessage(message)); cmd.Parameters.AddWithValue("@delay", delay); var result = await cmd.ExecuteScalarAsync().ConfigureAwait(false); return Convert.ToInt64(result!); @@ -387,15 +401,23 @@ public async Task SendDelayAsync(string queueName, T message, int delay } } - public async Task> SendBatchAsync(string queueName, IEnumerable messages) where T : class + [Obsolete("Use SendAsync instead.")] + public Task SendDelayAsync(string queueName, T message, int delay) where T : class => + SendAsync(queueName, message, delay); + + public Task> SendBatchAsync(string queueName, IEnumerable messages) where T : class => + SendBatchAsync(queueName, messages, 0); + + public async Task> SendBatchAsync(string queueName, IEnumerable messages, int delay) where T : class { try { - var cmd = await _commandFactory.CreateAsync("SELECT * FROM pgmq.send_batch(@queue_name, @messages);").ConfigureAwait(false); + var cmd = await _commandFactory.CreateAsync("SELECT * FROM pgmq.send_batch(@queue_name, @msgs, @delay);").ConfigureAwait(false); await using (cmd.ConfigureAwait(false)) { cmd.Parameters.AddWithValue("@queue_name", queueName); - cmd.Parameters.AddWithValue("@messages", NpgsqlDbType.Array | NpgsqlDbType.Jsonb, messages.Select(SerializeMessage).ToArray()); + cmd.Parameters.AddWithValue("@msgs", NpgsqlDbType.Array | NpgsqlDbType.Jsonb, messages.Select(SerializeMessage).ToArray()); + cmd.Parameters.AddWithValue("@delay", delay); var reader = await cmd.ExecuteReaderAsync().ConfigureAwait(false); await using (reader.ConfigureAwait(false)) { @@ -433,6 +455,71 @@ public async Task SetVtAsync(string queueName, long msgId, int vtOffset) } } + public async Task> GetMetricsAsync() + { + try + { + var cmd = await _commandFactory.CreateAsync("SELECT queue_name, queue_length, newest_msg_age_sec, oldest_msg_age_sec, total_messages, scrape_time FROM pgmq.metrics_all();").ConfigureAwait(false); + await using (cmd.ConfigureAwait(false)) + { + var reader = await cmd.ExecuteReaderAsync().ConfigureAwait(false); + await using (reader.ConfigureAwait(false)) + { + return await ReadMetricsAsync(reader).ConfigureAwait(false); + } + } + } + catch (Exception ex) + { + throw new NpgmqException("Failed to get metrics.", ex); + } + } + + public async Task GetMetricsAsync(string queueName) + { + try + { + var cmd = await _commandFactory.CreateAsync("SELECT queue_name, queue_length, newest_msg_age_sec, oldest_msg_age_sec, total_messages, scrape_time FROM pgmq.metrics(@queue_name);").ConfigureAwait(false); + await using (cmd.ConfigureAwait(false)) + { + cmd.Parameters.AddWithValue("@queue_name", queueName); + var reader = await cmd.ExecuteReaderAsync(CommandBehavior.SingleRow).ConfigureAwait(false); + await using (reader.ConfigureAwait(false)) + { + var results = await ReadMetricsAsync(reader).ConfigureAwait(false); + return results.Count switch + { + 1 => results.Single(), + 0 => throw new NpgmqException("No data returned."), + _ => throw new NpgmqException("Multiple results returned.") + }; + } + } + } + catch (Exception ex) + { + throw new NpgmqException($"Failed to get metrics for queue {queueName}.", ex); + } + } + + private static async Task> ReadMetricsAsync(DbDataReader reader) + { + var results = new List(); + while (await reader.ReadAsync().ConfigureAwait(false)) + { + results.Add(new NpgmqMetricsResult + { + QueueName = reader.GetString(0), + QueueLength = reader.GetInt64(1), + NewestMessageAge = await reader.IsDBNullAsync(2) ? null : reader.GetInt32(2), + OldestMessageAge = await reader.IsDBNullAsync(3) ? null : reader.GetInt32(3), + TotalMessages = reader.GetInt64(4), + ScrapeTime = reader.GetDateTime(5) + }); + } + return results; + } + private static async Task>> ReadMessagesAsync(DbDataReader reader) where T : class { var result = new List>(); @@ -444,15 +531,15 @@ private static async Task>> ReadMessagesAsync(DbDataRead ReadCt = reader.GetInt32(1), EnqueuedAt = reader.GetDateTime(2), Vt = reader.GetDateTime(3), - Message = DeserializeMessage(reader.GetString(4)) + Message = await reader.IsDBNullAsync(4) ? null : DeserializeMessage(reader.GetString(4)) }); } return result; } - private static string SerializeMessage(T message) where T : class => - typeof(T) == typeof(string) ? message as string ?? "" : JsonSerializer.Serialize(message); + private static string SerializeMessage(T message) where T : class => + typeof(T) == typeof(string) ? message as string ?? "" : JsonSerializer.Serialize(message); private static T? DeserializeMessage(string message) where T : class => typeof(T) == typeof(string) ? (T?)(object?)message : JsonSerializer.Deserialize(message); -} +} \ No newline at end of file diff --git a/Npgmq/NpgmqCommand.cs b/Npgmq/NpgmqCommand.cs index 08d286c..5542a38 100644 --- a/Npgmq/NpgmqCommand.cs +++ b/Npgmq/NpgmqCommand.cs @@ -14,8 +14,8 @@ public override async ValueTask DisposeAsync() { await Connection.CloseAsync().ConfigureAwait(false); } - + await Connection.DisposeAsync().ConfigureAwait(false); } } -} +} \ No newline at end of file diff --git a/Npgmq/NpgmqCommandFactory.cs b/Npgmq/NpgmqCommandFactory.cs index 49d385d..3ea5b17 100644 --- a/Npgmq/NpgmqCommandFactory.cs +++ b/Npgmq/NpgmqCommandFactory.cs @@ -25,7 +25,7 @@ public async Task CreateAsync(string commandText) { await connection.OpenAsync().ConfigureAwait(false); } - + return new NpgmqCommand(commandText, connection, _connection == null); } -} +} \ No newline at end of file diff --git a/Npgmq/NpgmqException.cs b/Npgmq/NpgmqException.cs index 14e1086..52a1113 100644 --- a/Npgmq/NpgmqException.cs +++ b/Npgmq/NpgmqException.cs @@ -21,4 +21,4 @@ public NpgmqException(string message) : base(message) public NpgmqException(string message, Exception innerException) : base(message, innerException) { } -} +} \ No newline at end of file diff --git a/Npgmq/NpgmqMessage.cs b/Npgmq/NpgmqMessage.cs index c81ade1..3d2efec 100644 --- a/Npgmq/NpgmqMessage.cs +++ b/Npgmq/NpgmqMessage.cs @@ -10,12 +10,12 @@ public class NpgmqMessage /// Unique identifier for the message. /// public long MsgId { get; set; } - + /// /// The number of times the message has been read. Increments on read. /// public int ReadCt { get; set; } - + /// /// Timestamp at which the message was sent to the queue. /// @@ -25,7 +25,7 @@ public class NpgmqMessage /// Timestamp at which the message will be available for reading. /// public DateTimeOffset Vt { get; set; } - + /// /// The message value. /// diff --git a/Npgmq/NpgmqMetricsResult.cs b/Npgmq/NpgmqMetricsResult.cs new file mode 100644 index 0000000..4546216 --- /dev/null +++ b/Npgmq/NpgmqMetricsResult.cs @@ -0,0 +1,37 @@ +namespace Npgmq; + +/// +/// Metrics data for a queue. +/// +public class NpgmqMetricsResult +{ + /// + /// Name of the queue. + /// + public string QueueName { get; set; } = null!; + + /// + /// Number of messages in the queue. + /// + public long QueueLength { get; set; } + + /// + /// Age, in seconds, of the newest message in the queue, or null if the queue is empty. + /// + public int? NewestMessageAge { get; set; } + + /// + /// Age, in seconds, of the oldest message in the queue, or null if the queue is empty. + /// + public int? OldestMessageAge { get; set; } + + /// + /// Total number of messages that have been in the queue. + /// + public long TotalMessages { get; set; } + + /// + /// When the metrics were scraped. + /// + public DateTimeOffset ScrapeTime { get; set; } +} \ No newline at end of file diff --git a/Npgmq/NpgmqQueue.cs b/Npgmq/NpgmqQueue.cs index 1e00ffe..68b7a09 100644 --- a/Npgmq/NpgmqQueue.cs +++ b/Npgmq/NpgmqQueue.cs @@ -9,17 +9,17 @@ public class NpgmqQueue /// The name of the queue. /// public string QueueName { get; set; } = null!; - + /// /// Timestamp at which the queue was created. /// public DateTimeOffset CreatedAt { get; set; } - + /// /// Whether the queue is partitioned. /// public bool IsPartitioned { get; set; } - + /// /// Whether the queue is unlogged. /// diff --git a/README.md b/README.md index f530e55..5a31742 100644 --- a/README.md +++ b/README.md @@ -2,14 +2,18 @@ A .NET client for [Postgres Message Queue](https://github.com/tembo-io/pgmq) (PGMQ). +[![Build](https://github.com/brianpursley/Npgmq/actions/workflows/build.yml/badge.svg)](https://github.com/brianpursley/Npgmq/actions/workflows/build.yml) +[![Nuget](https://img.shields.io/nuget/v/Npgmq)](https://www.nuget.org/packages/Npgmq/) +![License](https://img.shields.io/github/license/brianpursley/Npgmq) + ## Compatibility * pgmq >= 0.31.0 ## Installation -To install the package via Nuget, run the following command: +To install the package via [Nuget](https://www.nuget.org/packages/Npgmq/), run the following command: -```bash +```shell dotnet add package Npgmq ``` @@ -74,6 +78,12 @@ var npgmq = new NpgmqClient(myConnection); ## Database Connection -Npgmq uses Npgsql internally to connect to the database. +Npgmq uses [Npgsql](https://www.npgsql.org/) internally to connect to the database. + +### Using a Connection String + +If you pass an [Npgsql connection string](https://www.npgsql.org/doc/connection-string-parameters.html) to the `NpgmqClient` constructor, it will use this connection string to create an [`NpgsqlConnection`](https://www.npgsql.org/doc/api/Npgsql.NpgsqlConnection.html) object internally, and the connection lifetime will be managed by NpgmqClient. + +### Using a Connection Object -The connection string passed to the `NpgmqClient` constructor should be an [Npgsql connection string](https://www.npgsql.org/doc/connection-string-parameters.html). +If you pass an [`NpgsqlConnection`](https://www.npgsql.org/doc/api/Npgsql.NpgsqlConnection.html) object to the `NpgmqClient` constructor, it will use this connection instead of creating its own.