Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

v1: Convert RabbitMQ #871

Open
wants to merge 8 commits into
base: develop-v1
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions EventFlow.sln
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,19 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "EventFlow.MsSql", "Source\E
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "EventFlow.MsSql.Tests", "Source\EventFlow.MsSql.Tests\EventFlow.MsSql.Tests.csproj", "{CE19355C-6355-405F-A640-908AE4F83C2C}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "RabbitMQ", "RabbitMQ", "{ADA0FEA0-106C-47B6-AFFF-7A055FD068AC}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "EventFlow.RabbitMQ", "Source\EventFlow.RabbitMQ\EventFlow.RabbitMQ.csproj", "{E6AB068A-4B32-416D-AEF7-C347942E441F}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "EventFlow.RabbitMQ.Tests", "Source\EventFlow.RabbitMQ.Tests\EventFlow.RabbitMQ.Tests.csproj", "{695DD7E9-5AEB-4CC6-A477-6F9AA5AD2255}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = ".general", ".general", "{A920B8BC-D459-45F8-9E78-FF209946DB99}"
ProjectSection(SolutionItems) = preProject
MIGRATION_GUIDE.md = MIGRATION_GUIDE.md
README.md = README.md
RELEASE_NOTES.md = RELEASE_NOTES.md
EndProjectSection
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "PostgreSql", "PostgreSql", "{37ABC463-BAF5-4B5E-9A55-2BF12B0C144A}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "EventFlow.PostgreSql", "Source\EventFlow.PostgreSql\EventFlow.PostgreSql.csproj", "{B05E67FF-294D-4871-A5EF-8BA762951476}"
Expand Down Expand Up @@ -83,6 +96,14 @@ Global
{CE19355C-6355-405F-A640-908AE4F83C2C}.Debug|Any CPU.Build.0 = Debug|Any CPU
{CE19355C-6355-405F-A640-908AE4F83C2C}.Release|Any CPU.ActiveCfg = Release|Any CPU
{CE19355C-6355-405F-A640-908AE4F83C2C}.Release|Any CPU.Build.0 = Release|Any CPU
{E6AB068A-4B32-416D-AEF7-C347942E441F}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{E6AB068A-4B32-416D-AEF7-C347942E441F}.Debug|Any CPU.Build.0 = Debug|Any CPU
{E6AB068A-4B32-416D-AEF7-C347942E441F}.Release|Any CPU.ActiveCfg = Release|Any CPU
{E6AB068A-4B32-416D-AEF7-C347942E441F}.Release|Any CPU.Build.0 = Release|Any CPU
{695DD7E9-5AEB-4CC6-A477-6F9AA5AD2255}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{695DD7E9-5AEB-4CC6-A477-6F9AA5AD2255}.Debug|Any CPU.Build.0 = Debug|Any CPU
{695DD7E9-5AEB-4CC6-A477-6F9AA5AD2255}.Release|Any CPU.ActiveCfg = Release|Any CPU
{695DD7E9-5AEB-4CC6-A477-6F9AA5AD2255}.Release|Any CPU.Build.0 = Release|Any CPU
{B05E67FF-294D-4871-A5EF-8BA762951476}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{B05E67FF-294D-4871-A5EF-8BA762951476}.Debug|Any CPU.Build.0 = Debug|Any CPU
{B05E67FF-294D-4871-A5EF-8BA762951476}.Release|Any CPU.ActiveCfg = Release|Any CPU
Expand Down Expand Up @@ -112,6 +133,9 @@ Global
{983EFD08-2256-4361-A869-23EB872328CB} = {88359036-4F35-487C-BF2C-4F31C7BC92D8}
{96A39EA3-6772-409E-AF0B-C5A35A1A13CC} = {88359036-4F35-487C-BF2C-4F31C7BC92D8}
{CE19355C-6355-405F-A640-908AE4F83C2C} = {88359036-4F35-487C-BF2C-4F31C7BC92D8}
{ADA0FEA0-106C-47B6-AFFF-7A055FD068AC} = {5EE323DE-E69B-451A-8AC3-22DD6A004FBA}
{E6AB068A-4B32-416D-AEF7-C347942E441F} = {ADA0FEA0-106C-47B6-AFFF-7A055FD068AC}
{695DD7E9-5AEB-4CC6-A477-6F9AA5AD2255} = {ADA0FEA0-106C-47B6-AFFF-7A055FD068AC}
{37ABC463-BAF5-4B5E-9A55-2BF12B0C144A} = {5EE323DE-E69B-451A-8AC3-22DD6A004FBA}
{B05E67FF-294D-4871-A5EF-8BA762951476} = {37ABC463-BAF5-4B5E-9A55-2BF12B0C144A}
{87A29B00-48EB-40BC-B532-7EDC17604BB3} = {37ABC463-BAF5-4B5E-9A55-2BF12B0C144A}
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ The following list key characteristics of each version as well as its related br
- 🟢 `EventFlow.MsSql`
- 💀 `EventFlow.Owin`
- 🟢 `EventFlow.PostgreSql`
- 🔴 `EventFlow.RabbitMQ`
- 🟢 `EventFlow.RabbitMQ`
- 🟢 `EventFlow.Sql`
- 🔴 `EventFlow.SQLite`
- 🟢 `EventFlow.TestHelpers`
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
<Project Sdk="Microsoft.NET.Sdk">
<Import Project="../Common.props" />
<PropertyGroup>
<TargetFramework>netcoreapp3.1</TargetFramework>
<TargetFrameworks>netcoreapp2.1;netcoreapp3.1</TargetFrameworks>
<TreatWarningsAsErrors>True</TreatWarningsAsErrors>
<GenerateAssemblyInfo>False</GenerateAssemblyInfo>
<IsPackable>False</IsPackable>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.6.1" />
<PackageReference Include="NUnit" Version="3.12.0" />
<PackageReference Include="NUnit" Version="3.13.2" />
<PackageReference Include="NUnit3TestAdapter" Version="3.17.0" />
</ItemGroup>
<ItemGroup>
Expand Down
30 changes: 17 additions & 13 deletions Source/EventFlow.RabbitMQ.Tests/Integration/RabbitMqTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,8 @@
using System.Threading;
using System.Threading.Tasks;
using EventFlow.Aggregates;
using EventFlow.Configuration;
using EventFlow.EventStores;
using EventFlow.Extensions;
using EventFlow.Logs;
using EventFlow.RabbitMQ.Extensions;
using EventFlow.RabbitMQ.Integrations;
using EventFlow.TestHelpers;
Expand All @@ -40,6 +38,8 @@
using EventFlow.TestHelpers.Aggregates.Queries;
using EventFlow.TestHelpers.Aggregates.ValueObjects;
using FluentAssertions;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using NUnit.Framework;

namespace EventFlow.RabbitMQ.Tests.Integration
Expand All @@ -56,7 +56,7 @@ public void SetUp()
var url = Environment.GetEnvironmentVariable("RABBITMQ_URL");
if (string.IsNullOrEmpty(url))
{
Assert.Inconclusive("The environment variable named 'RABBITMQ_URL' isn't set. Set it to e.g. 'amqp://localhost'");
url = "amqp://localhost";
}

_uri = new Uri(url);
Expand All @@ -74,10 +74,11 @@ public async Task Scenario()
{
var exchange = new Exchange($"eventflow-{Guid.NewGuid():N}");
using (var consumer = new RabbitMqConsumer(_uri, exchange, new[] { "#" }))
using (var resolver = BuildResolver(exchange))
using (var rootServiceProvider = BuildServiceProvider(exchange))
using (var serviceScope = rootServiceProvider.CreateScope())
{
var commandBus = resolver.Resolve<ICommandBus>();
var eventJsonSerializer = resolver.Resolve<IEventJsonSerializer>();
var commandBus = serviceScope.ServiceProvider.GetRequiredService<ICommandBus>();
var eventJsonSerializer = serviceScope.ServiceProvider.GetRequiredService<IEventJsonSerializer>();

var pingId = PingId.New;
await commandBus.PublishAsync(new ThingyPingCommand(ThingyId.New, pingId), _timeout.Token).ConfigureAwait(false);
Expand Down Expand Up @@ -105,9 +106,10 @@ public async Task PublisherPerformance()
const int totalMessageCount = taskCount * messagesPrThread;

using (var consumer = new RabbitMqConsumer(_uri, exchange, new[] { "#" }))
using (var resolver = BuildResolver(exchange, o => o.RegisterServices(sr => sr.Register<ILog, NullLog>())))
using (var rootServiceProvider = BuildServiceProvider(exchange))
using (var serviceScope = rootServiceProvider.CreateScope())
{
var rabbitMqPublisher = resolver.Resolve<IRabbitMqPublisher>();
var rabbitMqPublisher = serviceScope.ServiceProvider.GetRequiredService<IRabbitMqPublisher>();
var tasks = Enumerable.Range(0, taskCount)
.Select(i => Task.Run(() => SendMessagesAsync(rabbitMqPublisher, messagesPrThread, exchange, routingKey, exceptions, _timeout.Token)));

Expand Down Expand Up @@ -147,16 +149,18 @@ private static async Task SendMessagesAsync(
exceptions.Add(e);
}
}

private IRootResolver BuildResolver(Exchange exchange, Func<IEventFlowOptions, IEventFlowOptions> configure = null)

private ServiceProvider BuildServiceProvider(
Exchange exchange,
Func<IEventFlowOptions, IEventFlowOptions> configure = null)
{
configure = configure ?? (e => e);

return configure(EventFlowOptions.New
return configure(EventFlowOptions.New()
.PublishToRabbitMq(RabbitMqConfiguration.With(_uri, false, exchange: exchange.Value))
.AddDefaults(EventFlowTestHelpers.Assembly))
.RegisterServices(sr => sr.Register<IScopedContext, ScopedContext>(Lifetime.Scoped))
.CreateResolver(false);
.RegisterServices(sr => sr.TryAddScoped<IScopedContext, ScopedContext>())
.ServiceCollection.BuildServiceProvider(true);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@
using System.Threading;
using System.Threading.Tasks;
using EventFlow.Core;
using EventFlow.Logs;
using EventFlow.RabbitMQ.Integrations;
using EventFlow.TestHelpers;
using AutoFixture;
using Microsoft.Extensions.Logging;
using Moq;
using NUnit.Framework;
using RabbitMQ.Client;
Expand All @@ -41,7 +41,7 @@ public class RabbitMqPublisherTests : TestsFor<RabbitMqPublisher>
{
private Mock<IRabbitMqConnectionFactory> _rabbitMqConnectionFactoryMock;
private Mock<IRabbitMqConfiguration> _rabbitMqConfigurationMock;
private Mock<ILog> _logMock;
private Mock<ILogger<TransientFaultHandler<IRabbitMqRetryStrategy>>> _logMock;
private Mock<IModel> _modelMock;
private Mock<IRabbitConnection> _rabbitConnectionMock;

Expand All @@ -50,7 +50,7 @@ public void SetUp()
{
_rabbitMqConnectionFactoryMock = InjectMock<IRabbitMqConnectionFactory>();
_rabbitMqConfigurationMock = InjectMock<IRabbitMqConfiguration>();
_logMock = InjectMock<ILog>();
_logMock = InjectMock<ILogger<TransientFaultHandler<IRabbitMqRetryStrategy>>>();

Fixture.Inject<ITransientFaultHandler<IRabbitMqRetryStrategy>>(new TransientFaultHandler<IRabbitMqRetryStrategy>(
_logMock.Object,
Expand Down
4 changes: 2 additions & 2 deletions Source/EventFlow.RabbitMQ/EventFlow.RabbitMQ.csproj
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">
<Import Project="../Common.props" />
<PropertyGroup>
<TargetFrameworks>netstandard2.0</TargetFrameworks>
<TargetFrameworks>netstandard2.0;netcoreapp2.1;netcoreapp3.1</TargetFrameworks>
<TreatWarningsAsErrors>True</TreatWarningsAsErrors>
<GenerateAssemblyInfo>True</GenerateAssemblyInfo>
<GeneratePackageOnBuild>False</GeneratePackageOnBuild>
Expand All @@ -26,6 +26,6 @@

<ItemGroup>
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.0.0" PrivateAssets="All" />
<PackageReference Include="RabbitMQ.Client" Version="5.0.1" />
<PackageReference Include="RabbitMQ.Client" Version="5.2.0" />
</ItemGroup>
</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@
// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

using EventFlow.Configuration;
using EventFlow.Extensions;
using EventFlow.RabbitMQ.Integrations;
using EventFlow.Subscribers;
using Microsoft.Extensions.DependencyInjection.Extensions;

namespace EventFlow.RabbitMQ.Extensions
{
Expand All @@ -35,14 +36,12 @@ public static IEventFlowOptions PublishToRabbitMq(
{
return eventFlowOptions.RegisterServices(sr =>
{
sr.Register<IRabbitMqConnectionFactory, RabbitMqConnectionFactory>(Lifetime.Singleton);
sr.Register<IRabbitMqMessageFactory, RabbitMqMessageFactory>(Lifetime.Singleton);
sr.Register<IRabbitMqPublisher, RabbitMqPublisher>(Lifetime.Singleton);
sr.Register<IRabbitMqRetryStrategy, RabbitMqRetryStrategy>(Lifetime.Singleton);

sr.Register(rc => configuration, Lifetime.Singleton);

sr.Register<ISubscribeSynchronousToAll, RabbitMqDomainEventPublisher>();
sr.TryAddSingleton<IRabbitMqConnectionFactory, RabbitMqConnectionFactory>();
sr.TryAddSingleton<IRabbitMqMessageFactory, RabbitMqMessageFactory>();
sr.TryAddSingleton<IRabbitMqPublisher, RabbitMqPublisher>();
sr.TryAddSingleton<IRabbitMqRetryStrategy, RabbitMqRetryStrategy>();
sr.TryAddSingleton(rc => configuration);
sr.TryAddTransient<ISubscribeSynchronousToAll, RabbitMqDomainEventPublisher>();
});
}
}
Expand Down
20 changes: 11 additions & 9 deletions Source/EventFlow.RabbitMQ/Integrations/RabbitConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,22 +28,25 @@
using System.Threading.Tasks;
using EventFlow.Core;
using EventFlow.Extensions;
using EventFlow.Logs;
using Microsoft.Extensions.Logging;
using RabbitMQ.Client;

namespace EventFlow.RabbitMQ.Integrations
{
public class RabbitConnection : IRabbitConnection
{
private readonly ILog _log;
private readonly ILogger _logger;
private readonly IConnection _connection;
private readonly AsyncLock _asyncLock;
private readonly ConcurrentBag<IModel> _models;

public RabbitConnection(ILog log, int maxModels, IConnection connection)
public RabbitConnection(
ILogger logger,
int maxModels,
IConnection connection)
{
_logger = logger;
_connection = connection;
_log = log;
_asyncLock = new AsyncLock(maxModels);
_models = new ConcurrentBag<IModel>(Enumerable.Range(0, maxModels).Select(_ => connection.CreateModel()));
}
Expand All @@ -52,8 +55,7 @@ public async Task<int> WithModelAsync(Func<IModel, Task> action, CancellationTok
{
using (await _asyncLock.WaitAsync(cancellationToken).ConfigureAwait(false))
{
IModel model;
if (!_models.TryTake(out model))
if (!_models.TryTake(out var model))
{
throw new InvalidOperationException(
"This should NEVER happen! If it does, please report a bug.");
Expand All @@ -74,12 +76,12 @@ public async Task<int> WithModelAsync(Func<IModel, Task> action, CancellationTok

public void Dispose()
{
_logger.LogTrace("Disposing RabbitMQ connection");
foreach (var model in _models)
{
model.DisposeSafe(_log, "Failed to dispose model");
model.DisposeSafe(_logger, "Failed to dispose model");
}
_connection.DisposeSafe(_log, "Failed to dispose connection");
_log.Verbose("Disposing RabbitMQ connection");
_connection.DisposeSafe(_logger, "Failed to dispose connection");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,23 +27,23 @@
using System.Threading;
using System.Threading.Tasks;
using EventFlow.Core;
using EventFlow.Logs;
using Microsoft.Extensions.Logging;
using RabbitMQ.Client;

namespace EventFlow.RabbitMQ.Integrations
{
public class RabbitMqConnectionFactory : IRabbitMqConnectionFactory
{
private readonly ILog _log;
private readonly ILogger<RabbitMqConnectionFactory> _logger;
private readonly IRabbitMqConfiguration _configuration;
private readonly AsyncLock _asyncLock = new AsyncLock();
private readonly Dictionary<Uri, ConnectionFactory> _connectionFactories = new Dictionary<Uri, ConnectionFactory>();

public RabbitMqConnectionFactory(
ILog log,
ILogger<RabbitMqConnectionFactory> logger,
IRabbitMqConfiguration configuration)
{
_log = log;
_logger = logger;
_configuration = configuration;
}

Expand All @@ -52,19 +52,20 @@ public async Task<IRabbitConnection> CreateConnectionAsync(Uri uri, Cancellation
var connectionFactory = await CreateConnectionFactoryAsync(uri, cancellationToken).ConfigureAwait(false);
var connection = connectionFactory.CreateConnection();

return new RabbitConnection(_log, _configuration.ModelsPrConnection, connection);
return new RabbitConnection(_logger, _configuration.ModelsPrConnection, connection);
}

private async Task<ConnectionFactory> CreateConnectionFactoryAsync(Uri uri, CancellationToken cancellationToken)
{
using (await _asyncLock.WaitAsync(cancellationToken).ConfigureAwait(false))
{
ConnectionFactory connectionFactory;
if (_connectionFactories.TryGetValue(uri, out connectionFactory))
if (_connectionFactories.TryGetValue(uri, out var connectionFactory))
{
return connectionFactory;
}
_log.Verbose("Creating RabbitMQ connection factory to {0}", uri.Host);
_logger.LogTrace(
"Creating RabbitMQ connection factory to {RabbitMqHost}",
uri.Host);

connectionFactory = new ConnectionFactory
{
Expand Down
29 changes: 24 additions & 5 deletions Source/EventFlow.RabbitMQ/Integrations/RabbitMqMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,30 @@ public RabbitMqMessage(
RoutingKey routingKey,
MessageId messageId)
{
if (string.IsNullOrEmpty(message)) throw new ArgumentNullException(nameof(message));
if (headers == null) throw new ArgumentNullException(nameof(headers));
if (exchange == null) throw new ArgumentNullException(nameof(exchange));
if (routingKey == null) throw new ArgumentNullException(nameof(routingKey));
if (messageId == null) throw new ArgumentNullException(nameof(messageId));
if (string.IsNullOrEmpty(message))
{
throw new ArgumentNullException(nameof(message));
}

if (headers == null)
{
throw new ArgumentNullException(nameof(headers));
}

if (exchange == null)
{
throw new ArgumentNullException(nameof(exchange));
}

if (routingKey == null)
{
throw new ArgumentNullException(nameof(routingKey));
}

if (messageId == null)
{
throw new ArgumentNullException(nameof(messageId));
}

Message = message;
Headers = headers;
Expand Down
Loading