From bf26e84d55aad2bf3b281a56d732908f31fea9c0 Mon Sep 17 00:00:00 2001 From: Thomas Hilzendegen Date: Thu, 13 Jun 2024 10:25:08 +0200 Subject: [PATCH] fix(protocols): consume server queues on start-up --- .../RelayServerBuilderExtensions.cs | 13 +++-- .../ServerTransport.cs | 51 ++++++++++++------- ...ure.Relay.Server.Protocols.RabbitMq.csproj | 1 + 3 files changed, 43 insertions(+), 22 deletions(-) diff --git a/src/Thinktecture.Relay.Server.Protocols.RabbitMq/RelayServerBuilderExtensions.cs b/src/Thinktecture.Relay.Server.Protocols.RabbitMq/RelayServerBuilderExtensions.cs index 5b4410c9a..183184c27 100644 --- a/src/Thinktecture.Relay.Server.Protocols.RabbitMq/RelayServerBuilderExtensions.cs +++ b/src/Thinktecture.Relay.Server.Protocols.RabbitMq/RelayServerBuilderExtensions.cs @@ -1,5 +1,6 @@ using System; using Microsoft.Extensions.DependencyInjection.Extensions; +using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Options; using RabbitMQ.Client; using Thinktecture.Relay.Acknowledgement; @@ -44,14 +45,18 @@ public static class RelayServerBuilderExtensions if (useServerRouting) { - builder.Services - .TryAddSingleton, ServerTransport>(); + builder.Services.AddSingleton>(); + builder.Services.AddHostedService(provider + => provider.GetRequiredService>()); + + builder.Services.AddSingleton>(provider + => provider.GetRequiredService>()); } if (useTenantRouting) { - builder.Services.TryAddSingleton, TenantTransport>(); - builder.Services.TryAddSingleton>(); + builder.Services.AddSingleton, TenantTransport>(); + builder.Services.AddSingleton>(); } builder.Services.TryAddSingleton(provider => diff --git a/src/Thinktecture.Relay.Server.Protocols.RabbitMq/ServerTransport.cs b/src/Thinktecture.Relay.Server.Protocols.RabbitMq/ServerTransport.cs index db35727ef..e74f2592d 100644 --- a/src/Thinktecture.Relay.Server.Protocols.RabbitMq/ServerTransport.cs +++ b/src/Thinktecture.Relay.Server.Protocols.RabbitMq/ServerTransport.cs @@ -1,6 +1,8 @@ using System; using System.Text.Json; +using System.Threading; using System.Threading.Tasks; +using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using RabbitMQ.Client; @@ -12,19 +14,22 @@ namespace Thinktecture.Relay.Server.Protocols.RabbitMq; /// -public partial class ServerTransport : IServerTransport, IDisposable +public partial class ServerTransport : IServerTransport, + IHostedService, IDisposable where TResponse : ITargetResponse where TAcknowledge : IAcknowledgeRequest { private readonly IModel _acknowledgeConsumeModel; - private readonly DisposableConsumer _acknowledgeConsumer; private readonly IAcknowledgeCoordinator _acknowledgeCoordinator; private readonly IModel _acknowledgeDispatchModel; private readonly ILogger _logger; private readonly IModel _responseConsumeModel; - private readonly DisposableConsumer _responseConsumer; private readonly IResponseCoordinator _responseCoordinator; private readonly IModel _responseDispatchModel; + private readonly Guid _originId; + + private DisposableConsumer? _responseConsumer; + private DisposableConsumer? _acknowledgeConsumer; /// public int? BinarySizeThreshold { get; } @@ -55,16 +60,10 @@ public ServerTransport(ILogger> logger, _responseDispatchModel = modelFactory.Create("response dispatcher"); _acknowledgeDispatchModel = modelFactory.Create("acknowledge dispatcher"); - _responseConsumeModel = modelFactory.Create("response handler"); - _responseConsumer = new DisposableConsumer(_logger, _responseConsumeModel, - $"{Constants.ResponseQueuePrefix} {relayServerContext.OriginId}"); - _responseConsumer.Consume(ResponseConsumerReceivedAsync); - _acknowledgeConsumeModel = modelFactory.Create("acknowledge handler"); - _acknowledgeConsumer = new DisposableConsumer(_logger, _acknowledgeConsumeModel, - $"{Constants.AcknowledgeQueuePrefix} {relayServerContext.OriginId}"); - _acknowledgeConsumer.Consume(AcknowledgeConsumerReceivedAsync); + + _originId = relayServerContext.OriginId; } /// @@ -73,8 +72,8 @@ public void Dispose() _responseDispatchModel.Dispose(); _acknowledgeDispatchModel.Dispose(); - _responseConsumer.Dispose(); - _acknowledgeConsumer.Dispose(); + _responseConsumer?.Dispose(); + _acknowledgeConsumer?.Dispose(); _responseConsumeModel.Dispose(); _acknowledgeConsumeModel.Dispose(); @@ -84,9 +83,7 @@ public void Dispose() public async Task DispatchResponseAsync(TResponse response) { await _responseDispatchModel.PublishJsonAsync($"{Constants.ResponseQueuePrefix} {response.RequestOriginId}", - response, - durable: false, - persistent: false); + response, durable: false, persistent: false); Log.DispatchedResponse(_logger, response.RequestId, response.RequestOriginId); } @@ -95,8 +92,8 @@ public async Task DispatchAcknowledgeAsync(TAcknowledge request) { Log.DispatchingAcknowledge(_logger, request); - await _acknowledgeDispatchModel.PublishJsonAsync($"{Constants.AcknowledgeQueuePrefix} {request.OriginId}", request, - durable: false, persistent: false); + await _acknowledgeDispatchModel.PublishJsonAsync($"{Constants.AcknowledgeQueuePrefix} {request.OriginId}", + request, durable: false, persistent: false); Log.DispatchedAcknowledge(_logger, request.RequestId, request.OriginId); } @@ -116,4 +113,22 @@ private async Task AcknowledgeConsumerReceivedAsync(BasicDeliverEventArgs @event Log.AcknowledgeConsumed(_logger, request.RequestId, @event.RoutingKey, @event.ConsumerTag); await _acknowledgeCoordinator.ProcessAcknowledgeAsync(request); } + + /// + public Task StartAsync(CancellationToken cancellationToken) + { + _responseConsumer = new DisposableConsumer(_logger, _responseConsumeModel, + $"{Constants.ResponseQueuePrefix} {_originId}"); + _responseConsumer.Consume(ResponseConsumerReceivedAsync); + + _acknowledgeConsumer = new DisposableConsumer(_logger, _acknowledgeConsumeModel, + $"{Constants.AcknowledgeQueuePrefix} {_originId}"); + _acknowledgeConsumer.Consume(AcknowledgeConsumerReceivedAsync); + + return Task.CompletedTask; + } + + /// + public Task StopAsync(CancellationToken cancellationToken) + => Task.CompletedTask; } diff --git a/src/Thinktecture.Relay.Server.Protocols.RabbitMq/Thinktecture.Relay.Server.Protocols.RabbitMq.csproj b/src/Thinktecture.Relay.Server.Protocols.RabbitMq/Thinktecture.Relay.Server.Protocols.RabbitMq.csproj index d60502dc0..08e27e2cd 100644 --- a/src/Thinktecture.Relay.Server.Protocols.RabbitMq/Thinktecture.Relay.Server.Protocols.RabbitMq.csproj +++ b/src/Thinktecture.Relay.Server.Protocols.RabbitMq/Thinktecture.Relay.Server.Protocols.RabbitMq.csproj @@ -7,6 +7,7 @@ +