From 236c9a35766ccf9e1df3aa48362e4d34d47d2a61 Mon Sep 17 00:00:00 2001 From: Rasmus Mikkelsen Date: Thu, 30 Jun 2016 20:15:55 +0200 Subject: [PATCH 01/29] First step in making read model updates handle optimistic concurrency exceptions --- .../EventFlow/Core/ITransientFaultHandler.cs | 10 +++++-- .../EventFlow/Core/TransientFaultHandler.cs | 12 ++++++++ .../ReadStores/InMemory/InMemoryReadStore.cs | 2 +- .../MultipleAggregateReadStoreManager.cs | 10 +++++-- .../EventFlow/ReadStores/ReadStoreManager.cs | 30 ++++++++++++------- .../SingleAggregateReadStoreManager.cs | 10 +++++-- 6 files changed, 54 insertions(+), 20 deletions(-) diff --git a/Source/EventFlow/Core/ITransientFaultHandler.cs b/Source/EventFlow/Core/ITransientFaultHandler.cs index 4f38443b5..bc3baf96b 100644 --- a/Source/EventFlow/Core/ITransientFaultHandler.cs +++ b/Source/EventFlow/Core/ITransientFaultHandler.cs @@ -20,7 +20,8 @@ // COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER // IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN // CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. -// +// + using System; using System.Threading; using System.Threading.Tasks; @@ -32,9 +33,14 @@ public interface ITransientFaultHandler { void ConfigureRetryStrategy(Action configureStrategy); + Task TryAsync( + Func action, + Label label, + CancellationToken cancellationToken); + Task TryAsync( Func> action, Label label, CancellationToken cancellationToken); } -} +} \ No newline at end of file diff --git a/Source/EventFlow/Core/TransientFaultHandler.cs b/Source/EventFlow/Core/TransientFaultHandler.cs index 7bf440771..d8ebf82e4 100644 --- a/Source/EventFlow/Core/TransientFaultHandler.cs +++ b/Source/EventFlow/Core/TransientFaultHandler.cs @@ -54,6 +54,18 @@ public void ConfigureRetryStrategy(Action configureStrategy) configureStrategy(_retryStrategy); } + public Task TryAsync(Func action, Label label, CancellationToken cancellationToken) + { + return TryAsync( + async c => + { + await action(c).ConfigureAwait(false); + return 0; + }, + label, + cancellationToken); + } + public async Task TryAsync(Func> action, Label label, CancellationToken cancellationToken) { if (_retryStrategy == null) diff --git a/Source/EventFlow/ReadStores/InMemory/InMemoryReadStore.cs b/Source/EventFlow/ReadStores/InMemory/InMemoryReadStore.cs index fe6584329..6e20a162f 100644 --- a/Source/EventFlow/ReadStores/InMemory/InMemoryReadStore.cs +++ b/Source/EventFlow/ReadStores/InMemory/InMemoryReadStore.cs @@ -70,7 +70,7 @@ public async Task> FindAsync( } } - public async override Task DeleteAllAsync( + public override async Task DeleteAllAsync( CancellationToken cancellationToken) { using (await _asyncLock.WaitAsync(cancellationToken).ConfigureAwait(false)) diff --git a/Source/EventFlow/ReadStores/MultipleAggregateReadStoreManager.cs b/Source/EventFlow/ReadStores/MultipleAggregateReadStoreManager.cs index 495149f35..47a4753cd 100644 --- a/Source/EventFlow/ReadStores/MultipleAggregateReadStoreManager.cs +++ b/Source/EventFlow/ReadStores/MultipleAggregateReadStoreManager.cs @@ -20,13 +20,16 @@ // COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER // IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN // CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. -// +// + using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks; using EventFlow.Aggregates; using EventFlow.Configuration; +using EventFlow.Core; +using EventFlow.Core.RetryStrategies; using EventFlow.Logs; namespace EventFlow.ReadStores @@ -45,8 +48,9 @@ public MultipleAggregateReadStoreManager( TReadStore readModelStore, IReadModelDomainEventApplier readModelDomainEventApplier, TReadModelLocator readModelLocator, - IReadModelFactory readModelFactory) - : base(log, resolver, readModelStore, readModelDomainEventApplier, readModelFactory) + IReadModelFactory readModelFactory, + ITransientFaultHandler transientFaultHandler) + : base(log, resolver, readModelStore, readModelDomainEventApplier, readModelFactory, transientFaultHandler) { _readModelLocator = readModelLocator; } diff --git a/Source/EventFlow/ReadStores/ReadStoreManager.cs b/Source/EventFlow/ReadStores/ReadStoreManager.cs index 1dc243c6a..dc4e82e1f 100644 --- a/Source/EventFlow/ReadStores/ReadStoreManager.cs +++ b/Source/EventFlow/ReadStores/ReadStoreManager.cs @@ -20,7 +20,8 @@ // COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER // IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN // CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. -// +// + using System; using System.Collections.Generic; using System.Linq; @@ -28,6 +29,8 @@ using System.Threading.Tasks; using EventFlow.Aggregates; using EventFlow.Configuration; +using EventFlow.Core; +using EventFlow.Core.RetryStrategies; using EventFlow.Extensions; using EventFlow.Logs; @@ -48,6 +51,7 @@ public abstract class ReadStoreManager : IReadStore protected TReadModelStore ReadModelStore { get; } protected IReadModelDomainEventApplier ReadModelDomainEventApplier { get; } protected IReadModelFactory ReadModelFactory { get; } + protected ITransientFaultHandler TransientFaultHandler { get; } protected ISet GetAggregateTypes() { return AggregateTypes; } protected ISet GetDomainEventTypes() { return AggregateEventTypes; } @@ -73,16 +77,18 @@ protected ReadStoreManager( IResolver resolver, TReadModelStore readModelStore, IReadModelDomainEventApplier readModelDomainEventApplier, - IReadModelFactory readModelFactory) + IReadModelFactory readModelFactory, + ITransientFaultHandler transientFaultHandler) { Log = log; Resolver = resolver; ReadModelStore = readModelStore; ReadModelDomainEventApplier = readModelDomainEventApplier; ReadModelFactory = readModelFactory; + TransientFaultHandler = transientFaultHandler; } - public async Task UpdateReadStoresAsync( + public Task UpdateReadStoresAsync( IReadOnlyCollection domainEvents, CancellationToken cancellationToken) { @@ -96,7 +102,7 @@ public async Task UpdateReadStoresAsync( ReadModelType.PrettyPrint(), string.Join(", ", domainEvents.Select(e => e.EventType.PrettyPrint())) )); - return; + return Task.FromResult(0); } Log.Verbose(() => string.Format( @@ -115,15 +121,17 @@ public async Task UpdateReadStoresAsync( typeof(TReadModel).PrettyPrint(), typeof(TReadModelStore).PrettyPrint(), string.Join(", ", relevantDomainEvents.Select(e => e.ToString())))); - return; + return Task.FromResult(0); } - await ReadModelStore.UpdateAsync( - readModelUpdates, - readModelContext, - UpdateAsync, - cancellationToken) - .ConfigureAwait(false); + return TransientFaultHandler.TryAsync( + c => ReadModelStore.UpdateAsync( + readModelUpdates, + readModelContext, + UpdateAsync, + c), + Label.Named("ss"), + cancellationToken); } protected abstract IReadOnlyCollection BuildReadModelUpdates( diff --git a/Source/EventFlow/ReadStores/SingleAggregateReadStoreManager.cs b/Source/EventFlow/ReadStores/SingleAggregateReadStoreManager.cs index 9ed9d6baa..4a40ec8ef 100644 --- a/Source/EventFlow/ReadStores/SingleAggregateReadStoreManager.cs +++ b/Source/EventFlow/ReadStores/SingleAggregateReadStoreManager.cs @@ -20,7 +20,8 @@ // COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER // IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN // CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. -// +// + using System; using System.Collections.Generic; using System.Linq; @@ -28,6 +29,8 @@ using System.Threading.Tasks; using EventFlow.Aggregates; using EventFlow.Configuration; +using EventFlow.Core; +using EventFlow.Core.RetryStrategies; using EventFlow.Logs; namespace EventFlow.ReadStores @@ -41,8 +44,9 @@ public SingleAggregateReadStoreManager( IResolver resolver, TReadModelStore readModelStore, IReadModelDomainEventApplier readModelDomainEventApplier, - IReadModelFactory readModelFactory) - : base(log, resolver, readModelStore, readModelDomainEventApplier, readModelFactory) + IReadModelFactory readModelFactory, + ITransientFaultHandler transientFaultHandler) + : base(log, resolver, readModelStore, readModelDomainEventApplier, readModelFactory, transientFaultHandler) { } From 91551bf6d1b151d900d9bcdf23be25499cc3d545 Mon Sep 17 00:00:00 2001 From: Rasmus Mikkelsen Date: Wed, 24 Aug 2016 16:14:03 +0200 Subject: [PATCH 02/29] Fix a few tests --- .../UnitTests/ReadStores/ReadStoreManagerTestSuite.cs | 9 +++++++++ Source/EventFlow/ReadStores/ReadStoreManager.cs | 2 +- 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/Source/EventFlow.Tests/UnitTests/ReadStores/ReadStoreManagerTestSuite.cs b/Source/EventFlow.Tests/UnitTests/ReadStores/ReadStoreManagerTestSuite.cs index 0ed503ca6..9bb536260 100644 --- a/Source/EventFlow.Tests/UnitTests/ReadStores/ReadStoreManagerTestSuite.cs +++ b/Source/EventFlow.Tests/UnitTests/ReadStores/ReadStoreManagerTestSuite.cs @@ -27,6 +27,10 @@ using System.Threading; using System.Threading.Tasks; using EventFlow.Aggregates; +using EventFlow.Configuration; +using EventFlow.Core; +using EventFlow.Core.RetryStrategies; +using EventFlow.Logs; using EventFlow.ReadStores; using EventFlow.TestHelpers; using EventFlow.TestHelpers.Aggregates.Events; @@ -43,6 +47,11 @@ public abstract class ReadStoreManagerTestSuite : TestsFor [SetUp] public void SetUpReadStoreManagerTestSuite() { + Inject>( + new TransientFaultHandler( + Mock(), + new OptimisticConcurrencyRetryStrategy(new EventFlowConfiguration()))); + ReadModelStoreMock = InjectMock>(); } diff --git a/Source/EventFlow/ReadStores/ReadStoreManager.cs b/Source/EventFlow/ReadStores/ReadStoreManager.cs index dc4e82e1f..2e9222f20 100644 --- a/Source/EventFlow/ReadStores/ReadStoreManager.cs +++ b/Source/EventFlow/ReadStores/ReadStoreManager.cs @@ -130,7 +130,7 @@ public Task UpdateReadStoresAsync( readModelContext, UpdateAsync, c), - Label.Named("ss"), + Label.Named("updatereadmodel"), cancellationToken); } From f4410cfd7861e58baf55dbbe06161ee2844c372f Mon Sep 17 00:00:00 2001 From: Rasmus Mikkelsen Date: Mon, 19 Feb 2018 19:59:06 +0100 Subject: [PATCH 03/29] First spike at implementing optimistic concurrency control in MSSQL read models --- .../ReadStores/MssqlReadModelStore.cs | 114 +++++++++++------- .../ReadModels/ReadModelSqlGenerator.cs | 32 ++++- .../EventFlow.TestHelpers/MsSql/MsSqlHelpz.cs | 8 +- .../Suites/TestSuiteForReadModelStore.cs | 89 ++++++++++++++ .../ReadStores/InMemory/InMemoryReadStore.cs | 6 +- .../MultipleAggregateReadStoreManager.cs | 7 +- .../EventFlow/ReadStores/ReadStoreManager.cs | 26 ++-- .../SingleAggregateReadStoreManager.cs | 7 +- 8 files changed, 210 insertions(+), 79 deletions(-) diff --git a/Source/EventFlow.MsSql/ReadStores/MssqlReadModelStore.cs b/Source/EventFlow.MsSql/ReadStores/MssqlReadModelStore.cs index a662a2ad5..c236f0436 100644 --- a/Source/EventFlow.MsSql/ReadStores/MssqlReadModelStore.cs +++ b/Source/EventFlow.MsSql/ReadStores/MssqlReadModelStore.cs @@ -27,8 +27,11 @@ using System.Reflection; using System.Threading; using System.Threading.Tasks; +using Dapper; using EventFlow.Aggregates; using EventFlow.Core; +using EventFlow.Core.RetryStrategies; +using EventFlow.Exceptions; using EventFlow.Extensions; using EventFlow.Logs; using EventFlow.MsSql.ReadStores.Attributes; @@ -47,8 +50,10 @@ public class MssqlReadModelStore : private readonly IMsSqlConnection _connection; private readonly IReadModelSqlGenerator _readModelSqlGenerator; private readonly IReadModelFactory _readModelFactory; + private readonly ITransientFaultHandler _transientFaultHandler; private static readonly Func GetVersion; private static readonly Action SetVersion; + private static readonly string ReadModelNameLoverCase = typeof(TReadModel).Name.ToLowerInvariant(); static MssqlReadModelStore() { @@ -78,12 +83,14 @@ public MssqlReadModelStore( ILog log, IMsSqlConnection connection, IReadModelSqlGenerator readModelSqlGenerator, - IReadModelFactory readModelFactory) + IReadModelFactory readModelFactory, + ITransientFaultHandler transientFaultHandler) : base(log) { _connection = connection; _readModelSqlGenerator = readModelSqlGenerator; _readModelFactory = readModelFactory; + _transientFaultHandler = transientFaultHandler; } public override async Task UpdateAsync( @@ -94,62 +101,88 @@ public override async Task UpdateAsync( { foreach (var readModelUpdate in readModelUpdates) { - IMssqlReadModel mssqlReadModel; + await _transientFaultHandler.TryAsync( + c => UpdateReadModelAsync(readModelContext, updateReadModel, c, readModelUpdate), + Label.Named($"mssql-read-model-update-{ReadModelNameLoverCase}"), + cancellationToken) + .ConfigureAwait(false); + } + } - var readModelNameLowerCased = typeof(TReadModel).Name.ToLowerInvariant(); - var readModelEnvelope = await GetAsync(readModelUpdate.ReadModelId, cancellationToken).ConfigureAwait(false); - var readModel = readModelEnvelope.ReadModel; - var isNew = readModel == null; + private async Task UpdateReadModelAsync( + IReadModelContext readModelContext, + Func, ReadModelEnvelope, CancellationToken, Task>> updateReadModel, + CancellationToken cancellationToken, + ReadModelUpdate readModelUpdate) + { + IMssqlReadModel mssqlReadModel; - if (readModel == null) + var readModelEnvelope = await GetAsync(readModelUpdate.ReadModelId, cancellationToken).ConfigureAwait(false); + var readModel = readModelEnvelope.ReadModel; + var isNew = readModel == null; + + if (readModel == null) + { + readModel = await _readModelFactory.CreateAsync(readModelUpdate.ReadModelId, cancellationToken) + .ConfigureAwait(false); + mssqlReadModel = readModel as IMssqlReadModel; + if (mssqlReadModel != null) { - readModel = await _readModelFactory.CreateAsync(readModelUpdate.ReadModelId, cancellationToken).ConfigureAwait(false); - mssqlReadModel = readModel as IMssqlReadModel; - if (mssqlReadModel != null) - { - mssqlReadModel.AggregateId = readModelUpdate.ReadModelId; - mssqlReadModel.CreateTime = readModelUpdate.DomainEvents.First().Timestamp; - } - readModelEnvelope = ReadModelEnvelope.With(readModelUpdate.ReadModelId, readModel); + mssqlReadModel.AggregateId = readModelUpdate.ReadModelId; + mssqlReadModel.CreateTime = readModelUpdate.DomainEvents.First().Timestamp; } + readModelEnvelope = ReadModelEnvelope.With(readModelUpdate.ReadModelId, readModel); + } - readModelEnvelope = await updateReadModel( + var originalVersion = readModelEnvelope.Version; + readModelEnvelope = await updateReadModel( readModelContext, readModelUpdate.DomainEvents, readModelEnvelope, cancellationToken) - .ConfigureAwait(false); + .ConfigureAwait(false); - mssqlReadModel = readModel as IMssqlReadModel; - if (mssqlReadModel != null) - { - mssqlReadModel.UpdatedTime = DateTimeOffset.Now; - mssqlReadModel.LastAggregateSequenceNumber = (int)readModelEnvelope.Version.GetValueOrDefault(); - } - else - { - SetVersion(readModel, (int?) readModelEnvelope.Version); - } + mssqlReadModel = readModel as IMssqlReadModel; + if (mssqlReadModel != null) + { + mssqlReadModel.UpdatedTime = DateTimeOffset.Now; + mssqlReadModel.LastAggregateSequenceNumber = (int) readModelEnvelope.Version.GetValueOrDefault(); + } + else + { + SetVersion(readModel, (int?) readModelEnvelope.Version); + } - var sql = isNew - ? _readModelSqlGenerator.CreateInsertSql() - : _readModelSqlGenerator.CreateUpdateSql(); + var sql = isNew + ? _readModelSqlGenerator.CreateInsertSql() + : _readModelSqlGenerator.CreateUpdateSql(); - await _connection.ExecuteAsync( - Label.Named("mssql-store-read-model", readModelNameLowerCased), - cancellationToken, - sql, - readModel).ConfigureAwait(false); + var dynamicParameters = new DynamicParameters(readModel); + if (originalVersion.HasValue) + { + dynamicParameters.Add("_PREVIOUS_VERSION", (int)originalVersion.Value); + } + + var rowsAffected = await _connection.ExecuteAsync( + Label.Named("mssql-store-read-model", ReadModelNameLoverCase), + cancellationToken, + sql, + dynamicParameters).ConfigureAwait(false); + Log.Verbose(() => $"Updated MSSQL read model {typeof(TReadModel).PrettyPrint()} with ID '{readModelUpdate.ReadModelId}' to version '{readModelEnvelope.Version}'"); + + if (rowsAffected != 1) + { + throw new OptimisticConcurrencyException( + $"Read model '{readModelEnvelope.ReadModelId}' ({typeof(TReadModel).PrettyPrint()}) is already updated"); } } public override async Task> GetAsync(string id, CancellationToken cancellationToken) { var readModelType = typeof(TReadModel); - var readModelNameLowerCased = readModelType.Name.ToLowerInvariant(); var selectSql = _readModelSqlGenerator.CreateSelectSql(); var readModels = await _connection.QueryAsync( - Label.Named(string.Format("mssql-fetch-read-model-{0}", readModelNameLowerCased)), + Label.Named("mssql-fetch-read-model", ReadModelNameLoverCase), cancellationToken, selectSql, new { EventFlowReadModelId = id }) @@ -165,7 +198,7 @@ public override async Task> GetAsync(string id, Ca var readModelVersion = GetVersion(readModel); - Log.Verbose(() => $"Foud MSSQL read model '{readModelType.PrettyPrint()}' with ID '{readModelVersion}'"); + Log.Verbose(() => $"Found MSSQL read model '{readModelType.PrettyPrint()}' with ID '{id}' and version '{readModelVersion}'"); return readModelVersion.HasValue ? ReadModelEnvelope.With(id, readModel, readModelVersion.Value) @@ -177,10 +210,9 @@ public override async Task DeleteAsync( CancellationToken cancellationToken) { var sql = _readModelSqlGenerator.CreateDeleteSql(); - var readModelName = typeof(TReadModel).Name; var rowsAffected = await _connection.ExecuteAsync( - Label.Named("mssql-delete-read-model", readModelName), + Label.Named("mssql-delete-read-model", ReadModelNameLoverCase), cancellationToken, sql, new { EventFlowReadModelId = id }) @@ -188,7 +220,7 @@ public override async Task DeleteAsync( if (rowsAffected != 0) { - Log.Verbose($"Deleted read model '{id}' of type '{readModelName}'"); + Log.Verbose($"Deleted read model '{id}' of type '{ReadModelNameLoverCase}'"); } } diff --git a/Source/EventFlow.Sql/ReadModels/ReadModelSqlGenerator.cs b/Source/EventFlow.Sql/ReadModels/ReadModelSqlGenerator.cs index 4abc79053..2eed663e3 100644 --- a/Source/EventFlow.Sql/ReadModels/ReadModelSqlGenerator.cs +++ b/Source/EventFlow.Sql/ReadModels/ReadModelSqlGenerator.cs @@ -38,6 +38,7 @@ public class ReadModelSqlGenerator : IReadModelSqlGenerator private static readonly ConcurrentDictionary TableNames = new ConcurrentDictionary(); private static readonly ConcurrentDictionary> PropertyInfos = new ConcurrentDictionary>(); private static readonly ConcurrentDictionary IdentityColumns = new ConcurrentDictionary(); + private static readonly ConcurrentDictionary VersionColumns = new ConcurrentDictionary(); private readonly Dictionary _insertSqls = new Dictionary(); private readonly Dictionary _purgeSqls = new Dictionary(); private readonly Dictionary _deleteSqls = new Dictionary(); @@ -102,18 +103,24 @@ public string CreateUpdateSql() where TReadModel : IReadModel { var readModelType = typeof(TReadModel); - string sql; - if (_updateSqls.TryGetValue(readModelType, out sql)) + if (_updateSqls.TryGetValue(readModelType, out var sql)) { return sql; } var identityColumn = GetIdentityColumn(); + var versionColumn = GetVersionColumn(); + var versionCheck = string.IsNullOrEmpty(versionColumn) + ? string.Empty + : $"AND {versionColumn} = @_PREVIOUS_VERSION"; + sql = string.Format( - "UPDATE {0} SET {1} WHERE {2} = @{2}", + "UPDATE {0} SET {1} WHERE {2} = @{2} {3}", GetTableName(), string.Join(", ", GetUpdateColumns().Select(c => string.Format("{0} = @{0}", c))), - identityColumn); + identityColumn, + versionCheck); + _updateSqls[readModelType] = sql; return sql; @@ -172,6 +179,23 @@ private string GetIdentityColumn() }); } + private string GetVersionColumn() + { + return VersionColumns.GetOrAdd( + typeof(TReadModel), + t => + { + var propertyInfo = GetPropertyInfos(t).SingleOrDefault(pi => pi.GetCustomAttributes().Any(a => a is SqlReadModelVersionColumnAttribute)); + if (propertyInfo != null) + { + return propertyInfo.Name; + } + return GetPropertyInfos(t).Any(n => n.Name == "LastAggregateSequenceNumber") + ? "LastAggregateSequenceNumber" + : string.Empty; + }); + } + protected IReadOnlyCollection GetPropertyInfos(Type readModelType) { return PropertyInfos.GetOrAdd( diff --git a/Source/EventFlow.TestHelpers/MsSql/MsSqlHelpz.cs b/Source/EventFlow.TestHelpers/MsSql/MsSqlHelpz.cs index af712853d..b1f035cbf 100644 --- a/Source/EventFlow.TestHelpers/MsSql/MsSqlHelpz.cs +++ b/Source/EventFlow.TestHelpers/MsSql/MsSqlHelpz.cs @@ -44,16 +44,16 @@ public static MsSqlConnectionString CreateConnectionString(string label) var databaseName = $"{label}_{DateTime.Now:yyyy-MM-dd-HH-mm}_{Guid.NewGuid():N}"; var connectionstringParts = new List - { - $"Database={databaseName}" - }; + { + $"Database={databaseName}" + }; var environmentServer = Environment.GetEnvironmentVariable("HELPZ_MSSQL_SERVER"); var environmentPassword = Environment.GetEnvironmentVariable("HELPZ_MSSQL_PASS"); var envrionmentUsername = Environment.GetEnvironmentVariable("HELPZ_MSSQL_USER"); connectionstringParts.Add(string.IsNullOrEmpty(environmentServer) - ? @"Server=localhost\SQLEXPRESS" + ? @"Server=." : $"Server={environmentServer}"); connectionstringParts.Add(string.IsNullOrEmpty(envrionmentUsername) ? @"Integrated Security=True" diff --git a/Source/EventFlow.TestHelpers/Suites/TestSuiteForReadModelStore.cs b/Source/EventFlow.TestHelpers/Suites/TestSuiteForReadModelStore.cs index e37a02119..8d939eac7 100644 --- a/Source/EventFlow.TestHelpers/Suites/TestSuiteForReadModelStore.cs +++ b/Source/EventFlow.TestHelpers/Suites/TestSuiteForReadModelStore.cs @@ -26,6 +26,9 @@ using System.Linq; using System.Threading; using System.Threading.Tasks; +using EventFlow.Aggregates; +using EventFlow.Logs; +using EventFlow.ReadStores; using EventFlow.TestHelpers.Aggregates; using EventFlow.TestHelpers.Aggregates.Commands; using EventFlow.TestHelpers.Aggregates.Entities; @@ -215,6 +218,44 @@ public async Task PopulateCreatesReadModels() readModel.PingsReceived.Should().Be(2); } + [Test, Timeout(10000)] + public async Task T() + { + // Arrange + var id = ThingyId.New; + var waitState = new WaitState(); + await PublishPingCommandsAsync(id, 1).ConfigureAwait(false); + await PublishPingCommandsAsync(id, 1).ConfigureAwait(false); + + // Arramge + _semaphoreSlims[id.Value] = waitState; + var delayedPublishTask = Task.Run(() => PublishPingCommandsAsync(id, 2)); + waitState.AutoResetEvent1.WaitOne(); + _semaphoreSlims.Remove(id.Value); + await PublishPingCommandsAsync(id, 2).ConfigureAwait(false); + waitState.AutoResetEvent2.Set(); + await delayedPublishTask.ConfigureAwait(false); + + Console.WriteLine(""); + } + + private class WaitState + { + public AutoResetEvent AutoResetEvent1 { get; } = new AutoResetEvent(false); + public AutoResetEvent AutoResetEvent2 { get; } = new AutoResetEvent(false); + } + + private readonly Dictionary _semaphoreSlims = new Dictionary(); + + protected override IEventFlowOptions Options(IEventFlowOptions eventFlowOptions) + { + _semaphoreSlims.Clear(); + + return base.Options(eventFlowOptions) + .RegisterServices(sr => sr.Decorate( + (r, dea) => new DelayingReadModelDomainEventApplier(dea, _semaphoreSlims, r.Resolver.Resolve()))); + } + private async Task> CreateAndPublishThingyMessagesAsync(ThingyId thingyId, int count) { var thingyMessages = Fixture.CreateMany(count).ToList(); @@ -223,5 +264,53 @@ private async Task> CreateAndPublishThingyMes } protected abstract Type ReadModelType { get; } + + private class DelayingReadModelDomainEventApplier : IReadModelDomainEventApplier + { + private readonly IReadModelDomainEventApplier _readModelDomainEventApplier; + private readonly IReadOnlyDictionary _waitStates; + private readonly ILog _log; + + public DelayingReadModelDomainEventApplier( + IReadModelDomainEventApplier readModelDomainEventApplier, + IReadOnlyDictionary waitStates, + ILog log) + { + _readModelDomainEventApplier = readModelDomainEventApplier; + _waitStates = waitStates; + _log = log; + } + + public async Task UpdateReadModelAsync( + TReadModel readModel, + IReadOnlyCollection domainEvents, + IReadModelContext readModelContext, + CancellationToken cancellationToken) + where TReadModel : IReadModel + { + _waitStates.TryGetValue(domainEvents.First().GetIdentity().Value, out var waitState); + + if (waitState != null) + { + _log.Information("Waiting for access to read model"); + waitState.AutoResetEvent1.Set(); + waitState.AutoResetEvent2.WaitOne(); + } + + try + { + return await _readModelDomainEventApplier.UpdateReadModelAsync( + readModel, + domainEvents, + readModelContext, + cancellationToken) + .ConfigureAwait(false); + } + finally + { + + } + } + } } } \ No newline at end of file diff --git a/Source/EventFlow/ReadStores/InMemory/InMemoryReadStore.cs b/Source/EventFlow/ReadStores/InMemory/InMemoryReadStore.cs index 90da046a8..976b22663 100644 --- a/Source/EventFlow/ReadStores/InMemory/InMemoryReadStore.cs +++ b/Source/EventFlow/ReadStores/InMemory/InMemoryReadStore.cs @@ -50,8 +50,7 @@ public override async Task> GetAsync( { using (await _asyncLock.WaitAsync(cancellationToken).ConfigureAwait(false)) { - ReadModelEnvelope readModelEnvelope; - return _readModels.TryGetValue(id, out readModelEnvelope) + return _readModels.TryGetValue(id, out var readModelEnvelope) ? readModelEnvelope : ReadModelEnvelope.Empty(id); } @@ -99,8 +98,7 @@ public override async Task UpdateAsync( { foreach (var readModelUpdate in readModelUpdates) { - ReadModelEnvelope readModelEnvelope; - if (!_readModels.TryGetValue(readModelUpdate.ReadModelId, out readModelEnvelope)) + if (!_readModels.TryGetValue(readModelUpdate.ReadModelId, out var readModelEnvelope)) { readModelEnvelope = ReadModelEnvelope.Empty(readModelUpdate.ReadModelId); } diff --git a/Source/EventFlow/ReadStores/MultipleAggregateReadStoreManager.cs b/Source/EventFlow/ReadStores/MultipleAggregateReadStoreManager.cs index bf8e6890a..7d9463e37 100644 --- a/Source/EventFlow/ReadStores/MultipleAggregateReadStoreManager.cs +++ b/Source/EventFlow/ReadStores/MultipleAggregateReadStoreManager.cs @@ -28,8 +28,6 @@ using System.Threading.Tasks; using EventFlow.Aggregates; using EventFlow.Configuration; -using EventFlow.Core; -using EventFlow.Core.RetryStrategies; using EventFlow.Logs; namespace EventFlow.ReadStores @@ -48,9 +46,8 @@ public MultipleAggregateReadStoreManager( TReadStore readModelStore, IReadModelDomainEventApplier readModelDomainEventApplier, TReadModelLocator readModelLocator, - IReadModelFactory readModelFactory, - ITransientFaultHandler transientFaultHandler) - : base(log, resolver, readModelStore, readModelDomainEventApplier, readModelFactory, transientFaultHandler) + IReadModelFactory readModelFactory) + : base(log, resolver, readModelStore, readModelDomainEventApplier, readModelFactory) { _readModelLocator = readModelLocator; } diff --git a/Source/EventFlow/ReadStores/ReadStoreManager.cs b/Source/EventFlow/ReadStores/ReadStoreManager.cs index 3ff816ab9..cda71eecd 100644 --- a/Source/EventFlow/ReadStores/ReadStoreManager.cs +++ b/Source/EventFlow/ReadStores/ReadStoreManager.cs @@ -31,7 +31,6 @@ using EventFlow.Aggregates; using EventFlow.Configuration; using EventFlow.Core; -using EventFlow.Core.RetryStrategies; using EventFlow.Extensions; using EventFlow.Logs; @@ -52,7 +51,6 @@ public abstract class ReadStoreManager : IReadStore protected TReadModelStore ReadModelStore { get; } protected IReadModelDomainEventApplier ReadModelDomainEventApplier { get; } protected IReadModelFactory ReadModelFactory { get; } - protected ITransientFaultHandler TransientFaultHandler { get; } protected ISet GetAggregateTypes() => AggregateTypes; protected ISet GetDomainEventTypes() => AggregateEventTypes; @@ -81,18 +79,16 @@ protected ReadStoreManager( IResolver resolver, TReadModelStore readModelStore, IReadModelDomainEventApplier readModelDomainEventApplier, - IReadModelFactory readModelFactory, - ITransientFaultHandler transientFaultHandler) + IReadModelFactory readModelFactory) { Log = log; Resolver = resolver; ReadModelStore = readModelStore; ReadModelDomainEventApplier = readModelDomainEventApplier; ReadModelFactory = readModelFactory; - TransientFaultHandler = transientFaultHandler; } - public Task UpdateReadStoresAsync( + public async Task UpdateReadStoresAsync( IReadOnlyCollection domainEvents, CancellationToken cancellationToken) { @@ -106,7 +102,7 @@ public Task UpdateReadStoresAsync( StaticReadModelType.PrettyPrint(), string.Join(", ", domainEvents.Select(e => e.EventType.PrettyPrint())) )); - return Task.FromResult(0); + return; } Log.Verbose(() => string.Format( @@ -125,17 +121,15 @@ public Task UpdateReadStoresAsync( typeof(TReadModel).PrettyPrint(), typeof(TReadModelStore).PrettyPrint(), string.Join(", ", relevantDomainEvents.Select(e => e.ToString())))); - return Task.FromResult(0); + return; } - return TransientFaultHandler.TryAsync( - c => ReadModelStore.UpdateAsync( - readModelUpdates, - readModelContext, - UpdateAsync, - c), - Label.Named("updatereadmodel"), - cancellationToken); + await ReadModelStore.UpdateAsync( + readModelUpdates, + readModelContext, + UpdateAsync, + cancellationToken) + .ConfigureAwait(false); } protected abstract IReadOnlyCollection BuildReadModelUpdates( diff --git a/Source/EventFlow/ReadStores/SingleAggregateReadStoreManager.cs b/Source/EventFlow/ReadStores/SingleAggregateReadStoreManager.cs index 66d913d7a..0c73d9d14 100644 --- a/Source/EventFlow/ReadStores/SingleAggregateReadStoreManager.cs +++ b/Source/EventFlow/ReadStores/SingleAggregateReadStoreManager.cs @@ -28,8 +28,6 @@ using System.Threading.Tasks; using EventFlow.Aggregates; using EventFlow.Configuration; -using EventFlow.Core; -using EventFlow.Core.RetryStrategies; using EventFlow.Logs; namespace EventFlow.ReadStores @@ -43,9 +41,8 @@ public SingleAggregateReadStoreManager( IResolver resolver, TReadModelStore readModelStore, IReadModelDomainEventApplier readModelDomainEventApplier, - IReadModelFactory readModelFactory, - ITransientFaultHandler transientFaultHandler) - : base(log, resolver, readModelStore, readModelDomainEventApplier, readModelFactory, transientFaultHandler) + IReadModelFactory readModelFactory) + : base(log, resolver, readModelStore, readModelDomainEventApplier, readModelFactory) { } From 83ab730002b819092bf7a25e12ed205c2fc98a13 Mon Sep 17 00:00:00 2001 From: Rasmus Mikkelsen Date: Mon, 19 Feb 2018 20:08:21 +0100 Subject: [PATCH 04/29] Cleanup tests --- .../Suites/TestSuiteForReadModelStore.cs | 62 ++++++++++--------- 1 file changed, 32 insertions(+), 30 deletions(-) diff --git a/Source/EventFlow.TestHelpers/Suites/TestSuiteForReadModelStore.cs b/Source/EventFlow.TestHelpers/Suites/TestSuiteForReadModelStore.cs index 8d939eac7..a3c63436b 100644 --- a/Source/EventFlow.TestHelpers/Suites/TestSuiteForReadModelStore.cs +++ b/Source/EventFlow.TestHelpers/Suites/TestSuiteForReadModelStore.cs @@ -219,41 +219,50 @@ public async Task PopulateCreatesReadModels() } [Test, Timeout(10000)] - public async Task T() + public async Task OptimisticConcurrencyCheck() { + // Simulates a state in which two read models have been loaded to memory + // and each is updated independently. The read store should detect the + // concurrent update, reload the read model and apply the updates once + // again. + // A decorated DelayingReadModelDomainEventApplier is used to introduce + // a controlled delay and a set of AutoResetEvent is used to ensure + // that the read store is in the desired state before continuing + // Arrange var id = ThingyId.New; var waitState = new WaitState(); await PublishPingCommandsAsync(id, 1).ConfigureAwait(false); - await PublishPingCommandsAsync(id, 1).ConfigureAwait(false); - // Arramge - _semaphoreSlims[id.Value] = waitState; - var delayedPublishTask = Task.Run(() => PublishPingCommandsAsync(id, 2)); - waitState.AutoResetEvent1.WaitOne(); - _semaphoreSlims.Remove(id.Value); - await PublishPingCommandsAsync(id, 2).ConfigureAwait(false); - waitState.AutoResetEvent2.Set(); + // Arrange + _waitStates[id.Value] = waitState; + var delayedPublishTask = Task.Run(() => PublishPingCommandsAsync(id, 1)); + waitState.ReadStoreReady.WaitOne(); + _waitStates.Remove(id.Value); + await PublishPingCommandsAsync(id, 1).ConfigureAwait(false); + waitState.ReadStoreContinue.Set(); await delayedPublishTask.ConfigureAwait(false); - Console.WriteLine(""); + // Assert + var readModel = await QueryProcessor.ProcessAsync(new ThingyGetQuery(id)).ConfigureAwait(false); + readModel.PingsReceived.Should().Be(3); } private class WaitState { - public AutoResetEvent AutoResetEvent1 { get; } = new AutoResetEvent(false); - public AutoResetEvent AutoResetEvent2 { get; } = new AutoResetEvent(false); + public AutoResetEvent ReadStoreReady { get; } = new AutoResetEvent(false); + public AutoResetEvent ReadStoreContinue { get; } = new AutoResetEvent(false); } - private readonly Dictionary _semaphoreSlims = new Dictionary(); + private readonly Dictionary _waitStates = new Dictionary(); protected override IEventFlowOptions Options(IEventFlowOptions eventFlowOptions) { - _semaphoreSlims.Clear(); + _waitStates.Clear(); return base.Options(eventFlowOptions) .RegisterServices(sr => sr.Decorate( - (r, dea) => new DelayingReadModelDomainEventApplier(dea, _semaphoreSlims, r.Resolver.Resolve()))); + (r, dea) => new DelayingReadModelDomainEventApplier(dea, _waitStates, r.Resolver.Resolve()))); } private async Task> CreateAndPublishThingyMessagesAsync(ThingyId thingyId, int count) @@ -293,23 +302,16 @@ public async Task UpdateReadModelAsync( if (waitState != null) { _log.Information("Waiting for access to read model"); - waitState.AutoResetEvent1.Set(); - waitState.AutoResetEvent2.WaitOne(); - } - - try - { - return await _readModelDomainEventApplier.UpdateReadModelAsync( - readModel, - domainEvents, - readModelContext, - cancellationToken) - .ConfigureAwait(false); + waitState.ReadStoreReady.Set(); + waitState.ReadStoreContinue.WaitOne(); } - finally - { - } + return await _readModelDomainEventApplier.UpdateReadModelAsync( + readModel, + domainEvents, + readModelContext, + cancellationToken) + .ConfigureAwait(false); } } } From f04d0895a136a94313330369ed1167c1b0c253de Mon Sep 17 00:00:00 2001 From: Rasmus Mikkelsen Date: Thu, 1 Mar 2018 19:53:45 +0100 Subject: [PATCH 05/29] Concurrency checks doens't make any sense for in-memory read models --- .../Suites/TestSuiteForReadModelStore.cs | 2 +- .../ReadStores/InMemoryReadModelStoreTests.cs | 8 ++++++++ 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/Source/EventFlow.TestHelpers/Suites/TestSuiteForReadModelStore.cs b/Source/EventFlow.TestHelpers/Suites/TestSuiteForReadModelStore.cs index a3c63436b..219c7bbb4 100644 --- a/Source/EventFlow.TestHelpers/Suites/TestSuiteForReadModelStore.cs +++ b/Source/EventFlow.TestHelpers/Suites/TestSuiteForReadModelStore.cs @@ -219,7 +219,7 @@ public async Task PopulateCreatesReadModels() } [Test, Timeout(10000)] - public async Task OptimisticConcurrencyCheck() + public virtual async Task OptimisticConcurrencyCheck() { // Simulates a state in which two read models have been loaded to memory // and each is updated independently. The read store should detect the diff --git a/Source/EventFlow.Tests/IntegrationTests/ReadStores/InMemoryReadModelStoreTests.cs b/Source/EventFlow.Tests/IntegrationTests/ReadStores/InMemoryReadModelStoreTests.cs index f389cd87c..171a0b7dc 100644 --- a/Source/EventFlow.Tests/IntegrationTests/ReadStores/InMemoryReadModelStoreTests.cs +++ b/Source/EventFlow.Tests/IntegrationTests/ReadStores/InMemoryReadModelStoreTests.cs @@ -22,6 +22,7 @@ // CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. using System; +using System.Threading.Tasks; using EventFlow.Configuration; using EventFlow.Extensions; using EventFlow.TestHelpers; @@ -52,5 +53,12 @@ protected override IRootResolver CreateRootResolver(IEventFlowOptions eventFlowO return resolver; } + + [Test] + public override Task OptimisticConcurrencyCheck() + { + // The in-memory uses a global lock on all read models making concurrency impossible + return Task.FromResult(0); + } } } \ No newline at end of file From 1c740adc087705bdc47d042fa63ac97336953e2b Mon Sep 17 00:00:00 2001 From: Rasmus Mikkelsen Date: Thu, 1 Mar 2018 20:01:19 +0100 Subject: [PATCH 06/29] SQLite optimistic concurrency support for read models --- .../ReadStores/MssqlReadModelStore.cs | 4 +- .../ReadStores/SQLiteReadModelStore.cs | 7 +- .../ReadModels/SqlReadModelStore.cs | 86 +++++++++++++------ 3 files changed, 67 insertions(+), 30 deletions(-) diff --git a/Source/EventFlow.MsSql/ReadStores/MssqlReadModelStore.cs b/Source/EventFlow.MsSql/ReadStores/MssqlReadModelStore.cs index c236f0436..45e55e200 100644 --- a/Source/EventFlow.MsSql/ReadStores/MssqlReadModelStore.cs +++ b/Source/EventFlow.MsSql/ReadStores/MssqlReadModelStore.cs @@ -168,13 +168,13 @@ private async Task UpdateReadModelAsync( cancellationToken, sql, dynamicParameters).ConfigureAwait(false); - Log.Verbose(() => $"Updated MSSQL read model {typeof(TReadModel).PrettyPrint()} with ID '{readModelUpdate.ReadModelId}' to version '{readModelEnvelope.Version}'"); - if (rowsAffected != 1) { throw new OptimisticConcurrencyException( $"Read model '{readModelEnvelope.ReadModelId}' ({typeof(TReadModel).PrettyPrint()}) is already updated"); } + + Log.Verbose(() => $"Updated MSSQL read model {typeof(TReadModel).PrettyPrint()} with ID '{readModelUpdate.ReadModelId}' to version '{readModelEnvelope.Version}'"); } public override async Task> GetAsync(string id, CancellationToken cancellationToken) diff --git a/Source/EventFlow.SQLite/ReadStores/SQLiteReadModelStore.cs b/Source/EventFlow.SQLite/ReadStores/SQLiteReadModelStore.cs index 4230dc3a2..251909d6a 100644 --- a/Source/EventFlow.SQLite/ReadStores/SQLiteReadModelStore.cs +++ b/Source/EventFlow.SQLite/ReadStores/SQLiteReadModelStore.cs @@ -21,6 +21,8 @@ // IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN // CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +using EventFlow.Core; +using EventFlow.Core.RetryStrategies; using EventFlow.Logs; using EventFlow.ReadStores; using EventFlow.Sql.ReadModels; @@ -35,8 +37,9 @@ public SQLiteReadModelStore( ILog log, ISQLiteConnection connection, IReadModelSqlGenerator readModelSqlGenerator, - IReadModelFactory readModelFactory) - : base(log, connection, readModelSqlGenerator, readModelFactory) + IReadModelFactory readModelFactory, + ITransientFaultHandler transientFaultHandler) + : base(log, connection, readModelSqlGenerator, readModelFactory, transientFaultHandler) { } } diff --git a/Source/EventFlow.Sql/ReadModels/SqlReadModelStore.cs b/Source/EventFlow.Sql/ReadModels/SqlReadModelStore.cs index 2db301e53..473ba9ebf 100644 --- a/Source/EventFlow.Sql/ReadModels/SqlReadModelStore.cs +++ b/Source/EventFlow.Sql/ReadModels/SqlReadModelStore.cs @@ -27,8 +27,11 @@ using System.Reflection; using System.Threading; using System.Threading.Tasks; +using Dapper; using EventFlow.Aggregates; using EventFlow.Core; +using EventFlow.Core.RetryStrategies; +using EventFlow.Exceptions; using EventFlow.Extensions; using EventFlow.Logs; using EventFlow.ReadStores; @@ -46,6 +49,7 @@ public abstract class SqlReadModelStore : private readonly TSqlConnection _connection; private readonly IReadModelSqlGenerator _readModelSqlGenerator; private readonly IReadModelFactory _readModelFactory; + private readonly ITransientFaultHandler _transientFaultHandler; private static readonly Func GetVersion; private static readonly Action SetVersion; private static readonly Func GetIdentity; @@ -87,12 +91,14 @@ protected SqlReadModelStore( ILog log, TSqlConnection connection, IReadModelSqlGenerator readModelSqlGenerator, - IReadModelFactory readModelFactory) + IReadModelFactory readModelFactory, + ITransientFaultHandler transientFaultHandler) : base(log) { _connection = connection; _readModelSqlGenerator = readModelSqlGenerator; _readModelFactory = readModelFactory; + _transientFaultHandler = transientFaultHandler; } public override async Task UpdateAsync( @@ -103,37 +109,65 @@ public override async Task UpdateAsync( { foreach (var readModelUpdate in readModelUpdates) { - var readModelNameLowerCased = typeof(TReadModel).Name.ToLowerInvariant(); - var readModelEnvelope = await GetAsync(readModelUpdate.ReadModelId, cancellationToken).ConfigureAwait(false); - var readModel = readModelEnvelope.ReadModel; - var isNew = readModel == null; - - if (readModel == null) - { - readModel = await _readModelFactory.CreateAsync(readModelUpdate.ReadModelId, cancellationToken).ConfigureAwait(false); - readModelEnvelope = ReadModelEnvelope.With(readModelUpdate.ReadModelId, readModel); - } - - readModelEnvelope = await updateReadModel( - readModelContext, - readModelUpdate.DomainEvents, - readModelEnvelope, + await _transientFaultHandler.TryAsync( + c => UpdateReadModelAsync(readModelContext, updateReadModel, c, readModelUpdate), + Label.Named($"sqlite-read-model-update"), cancellationToken) .ConfigureAwait(false); + } + } + + private async Task UpdateReadModelAsync( + IReadModelContext readModelContext, + Func, ReadModelEnvelope, CancellationToken, Task>> updateReadModel, + CancellationToken cancellationToken, + ReadModelUpdate readModelUpdate) + { + var readModelNameLowerCased = typeof(TReadModel).Name.ToLowerInvariant(); + var readModelEnvelope = await GetAsync(readModelUpdate.ReadModelId, cancellationToken).ConfigureAwait(false); + var readModel = readModelEnvelope.ReadModel; + var isNew = readModel == null; + + if (readModel == null) + { + readModel = await _readModelFactory.CreateAsync(readModelUpdate.ReadModelId, cancellationToken).ConfigureAwait(false); + readModelEnvelope = ReadModelEnvelope.With(readModelUpdate.ReadModelId, readModel); + } - SetVersion(readModel, (int?) readModelEnvelope.Version); - SetIdentity(readModel, readModelEnvelope.ReadModelId); + var originalVersion = readModelEnvelope.Version; + readModelEnvelope = await updateReadModel( + readModelContext, + readModelUpdate.DomainEvents, + readModelEnvelope, + cancellationToken) + .ConfigureAwait(false); + + SetVersion(readModel, (int?) readModelEnvelope.Version); + SetIdentity(readModel, readModelEnvelope.ReadModelId); - var sql = isNew - ? _readModelSqlGenerator.CreateInsertSql() - : _readModelSqlGenerator.CreateUpdateSql(); + var sql = isNew + ? _readModelSqlGenerator.CreateInsertSql() + : _readModelSqlGenerator.CreateUpdateSql(); - await _connection.ExecuteAsync( - Label.Named("sql-store-read-model", readModelNameLowerCased), - cancellationToken, - sql, - readModel).ConfigureAwait(false); + var dynamicParameters = new DynamicParameters(readModel); + if (originalVersion.HasValue) + { + dynamicParameters.Add("_PREVIOUS_VERSION", (int)originalVersion.Value); + } + + var rowsAffected = await _connection.ExecuteAsync( + Label.Named("sql-store-read-model", readModelNameLowerCased), + cancellationToken, + sql, + dynamicParameters) + .ConfigureAwait(false); + if (rowsAffected != 1) + { + throw new OptimisticConcurrencyException( + $"Read model '{readModelEnvelope.ReadModelId}' ({typeof(TReadModel).PrettyPrint()}) is already updated"); } + + Log.Verbose(() => $"Updated MSSQL read model {typeof(TReadModel).PrettyPrint()} with ID '{readModelUpdate.ReadModelId}' to version '{readModelEnvelope.Version}'"); } public override async Task> GetAsync(string id, CancellationToken cancellationToken) From 05724c33e3243764d3c77470cef28c34cdb5d529 Mon Sep 17 00:00:00 2001 From: Rasmus Mikkelsen Date: Thu, 1 Mar 2018 20:05:22 +0100 Subject: [PATCH 07/29] Validate that SQL for versioned read models are corect --- .../ReadModels/ReadModelSqlGeneratorTests.cs | 23 +++++++++++++++++-- 1 file changed, 21 insertions(+), 2 deletions(-) diff --git a/Source/EventFlow.Sql.Tests/UnitTests/ReadModels/ReadModelSqlGeneratorTests.cs b/Source/EventFlow.Sql.Tests/UnitTests/ReadModels/ReadModelSqlGeneratorTests.cs index e21ad98fb..1c48ccc38 100644 --- a/Source/EventFlow.Sql.Tests/UnitTests/ReadModels/ReadModelSqlGeneratorTests.cs +++ b/Source/EventFlow.Sql.Tests/UnitTests/ReadModels/ReadModelSqlGeneratorTests.cs @@ -46,15 +46,25 @@ public void CreateInsertSql_ProducesCorrectSql() } [Test] - public void CreateUpdateSql_ProducesCorrectSql() + public void CreateUpdateSql_WithoutVersion_ProducesCorrectSql() { // Act - var sql = Sut.CreateUpdateSql(); + var sql = Sut.CreateUpdateSql().Trim(); // Assert sql.Should().Be("UPDATE [ReadModel-TestAttributes] SET UpdatedTime = @UpdatedTime WHERE Id = @Id"); } + [Test] + public void CreateUpdateSql_WithVersion_ProducesCorrectSql() + { + // Act + var sql = Sut.CreateUpdateSql().Trim(); + + // Assert + sql.Should().Be("UPDATE [ReadModel-TestVersionedAttributes] SET FancyVersion = @FancyVersion WHERE CoolId = @CoolId AND FancyVersion = @_PREVIOUS_VERSION"); + } + [Test] public void CreateSelectSql_ProducesCorrectSql() { @@ -86,6 +96,15 @@ public class TestAttributesReadModel : IReadModel public string Secret { get; set; } } + public class TestVersionedAttributesReadModel : IReadModel + { + [SqlReadModelIdentityColumn] + public string CoolId { get; set; } + + [SqlReadModelVersionColumn] + public string FancyVersion { get; set; } + } + [Table("Fancy")] public class TestTableAttributeReadModel : IReadModel { From 6e360792ec4ca71d3518b58b572150676c341432 Mon Sep 17 00:00:00 2001 From: Rasmus Mikkelsen Date: Wed, 23 May 2018 21:39:32 +0200 Subject: [PATCH 08/29] Version is now 0.60 --- RELEASE_NOTES.md | 6 +++++- appveyor.yml | 2 +- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 23ef97e2b..35d4fd332 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -1,4 +1,8 @@ -### New in 0.59 (not released yet) +### New in 0.60 (not released yet) + +* _Nothing yet_ + +### New in 0.59.3396 (released 2018-05-23) * Fix: Commands are now correctly published when no events are emitted from a saga after handling a domain event diff --git a/appveyor.yml b/appveyor.yml index 897cc73e4..011d7d1d4 100644 --- a/appveyor.yml +++ b/appveyor.yml @@ -1,7 +1,7 @@ init: - git config --global core.autocrlf input -version: 0.59.{build} +version: 0.60.{build} skip_tags: true From 4420e2eb3a026b9badc900006d807f5d41416c56 Mon Sep 17 00:00:00 2001 From: Rasmus Mikkelsen Date: Mon, 28 May 2018 20:24:17 +0200 Subject: [PATCH 09/29] Fix #238 and better formatting for articles --- README.md | 37 +++++++++++++++++++++---------------- 1 file changed, 21 insertions(+), 16 deletions(-) diff --git a/README.md b/README.md index 59aa86fa1..821d8a45a 100644 --- a/README.md +++ b/README.md @@ -294,26 +294,31 @@ section lists some of them. If you have a link with a relevant article, please share it by creating an issue with the link. * **Domain-Driven Design** - - [Domain-Driven Design Reference](https://domainlanguage.com/ddd/reference/) by Eric Evans + * [Domain-Driven Design Reference](https://domainlanguage.com/ddd/reference/) + by Eric Evans + * [DDD Decoded - Bounded Contexts Explained](http://blog.sapiensworks.com/post/2016/08/12/DDD-Bounded-Contexts-Explained) + * [Going "Events-First" for Microservices with Event Storming and DDD](http://www.russmiles.com/essais/going-events-first-for-microservices-with-event-storming-and-ddd) * **General CQRS+ES** - - [CQRS Journey by Microsoft](https://msdn.microsoft.com/en-us/library/jj554200.aspx) - published by Microsoft - - [An In-Depth Look At CQRS](http://blog.sapiensworks.com/post/2015/09/01/In-Depth-CQRS/) - by Mike Mogosanu - - [CQRS, Task Based UIs, Event Sourcing agh!](http://codebetter.com/gregyoung/2010/02/16/cqrs-task-based-uis-event-sourcing-agh/) - by Greg Young - - [Busting some CQRS myths](https://lostechies.com/jimmybogard/2012/08/22/busting-some-cqrs-myths/) - by Jimmy Bogard - - [CQRS applied](https://lostechies.com/gabrielschenker/2015/04/12/cqrs-applied/) - by Gabriel Schenker + * [CQRS Journey by Microsoft](https://msdn.microsoft.com/en-us/library/jj554200.aspx) + published by Microsoft + * [An In-Depth Look At CQRS](http://blog.sapiensworks.com/post/2015/09/01/In-Depth-CQRS/) + by Mike Mogosanu + * [CQRS, Task Based UIs, Event Sourcing agh!](http://codebetter.com/gregyoung/2010/02/16/cqrs-task-based-uis-event-sourcing-agh/) + by Greg Young + * [Busting some CQRS myths](https://lostechies.com/jimmybogard/2012/08/22/busting-some-cqrs-myths/) + by Jimmy Bogard + * [CQRS applied](https://lostechies.com/gabrielschenker/2015/04/12/cqrs-applied/) + by Gabriel Schenker + * [DDD Decoded - Entities and Value Objects Explained](http://blog.sapiensworks.com/post/2016/07/29/DDD-Entities-Value-Objects-Explained) * **Eventual consistency** - - [How To Ensure Idempotency In An Eventual Consistent DDD/CQRS Application](http://blog.sapiensworks.com/post/2015/08/26/How-To-Ensure-Idempotency) + * [How To Ensure Idempotency In An Eventual Consistent DDD/CQRS Application](http://blog.sapiensworks.com/post/2015/08/26/How-To-Ensure-Idempotency) by Mike Mogosanu + * [DDD Decoded - Don't Fear Eventual Consistency](http://blog.sapiensworks.com/post/2016/07/23/DDD-Eventual-Consistency) * **Why _not_ to implement "unit of work" in DDD** - - [Unit Of Work is the new Singleton](http://blog.sapiensworks.com/post/2014/06/04/Unit-Of-Work-is-the-new-Singleton.aspx) - by Mike Mogosanu - - [The Unit of Work and Transactions In Domain Driven Design](http://blog.sapiensworks.com/post/2015/09/02/DDD-and-UoW/) - by Mike Mogosanu + * [Unit Of Work is the new Singleton](http://blog.sapiensworks.com/post/2014/06/04/Unit-Of-Work-is-the-new-Singleton.aspx) + by Mike Mogosanu + * [The Unit of Work and Transactions In Domain Driven Design](http://blog.sapiensworks.com/post/2015/09/02/DDD-and-UoW/) + by Mike Mogosanu ### Integration tests From 945cea103e264c4252fd4bf300292aa5a073f931 Mon Sep 17 00:00:00 2001 From: Alexander Zeitler Date: Wed, 6 Jun 2018 10:17:59 +0200 Subject: [PATCH 10/29] Change ExampleAggrenate to ExampleAggregate in Complete Example --- README.md | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/README.md b/README.md index 821d8a45a..4647ff13d 100644 --- a/README.md +++ b/README.md @@ -179,12 +179,12 @@ public async Task Example() ```csharp // The aggregate root -public class ExampleAggrenate : AggregateRoot, +public class ExampleAggregate : AggregateRoot, IEmit { private int? _magicNumber; - public ExampleAggrenate(ExampleId id) : base(id) { } + public ExampleAggregate(ExampleId id) : base(id) { } // Method invoked by our command public void SetMagicNumer(int magicNumber) @@ -215,7 +215,7 @@ public class ExampleId : Identity ```csharp // A basic event containing some information -public class ExampleEvent : AggregateEvent +public class ExampleEvent : AggregateEvent { public ExampleEvent(int magicNumber) { @@ -228,7 +228,7 @@ public class ExampleEvent : AggregateEvent ```csharp // Command for update magic number -public class ExampleCommand : Command +public class ExampleCommand : Command { public ExampleCommand( ExampleId aggregateId, @@ -245,10 +245,10 @@ public class ExampleCommand : Command ```csharp // Command handler for our command public class ExampleCommandHandler - : CommandHandler + : CommandHandler { public override Task ExecuteAsync( - ExampleAggrenate aggregate, + ExampleAggregate aggregate, ExampleCommand command, CancellationToken cancellationToken) { @@ -261,13 +261,13 @@ public class ExampleCommandHandler ```csharp // Read model for our aggregate public class ExampleReadModel : IReadModel, - IAmReadModelFor + IAmReadModelFor { public int MagicNumber { get; private set; } public void Apply( IReadModelContext context, - IDomainEvent domainEvent) + IDomainEvent domainEvent) { MagicNumber = domainEvent.AggregateEvent.MagicNumber; } From 84b0d827dffb2b0ed0d251815461d97cb28b6a46 Mon Sep 17 00:00:00 2001 From: Rasmus Mikkelsen Date: Wed, 6 Jun 2018 19:49:37 +0200 Subject: [PATCH 11/29] Validate arguments --- Source/EventFlow/Aggregates/AggregateRoot.cs | 25 +++++++++++++------- 1 file changed, 17 insertions(+), 8 deletions(-) diff --git a/Source/EventFlow/Aggregates/AggregateRoot.cs b/Source/EventFlow/Aggregates/AggregateRoot.cs index 0d3995f85..1696e396e 100644 --- a/Source/EventFlow/Aggregates/AggregateRoot.cs +++ b/Source/EventFlow/Aggregates/AggregateRoot.cs @@ -78,10 +78,7 @@ public bool HasSourceId(ISourceId sourceId) protected virtual void Emit(TEvent aggregateEvent, IMetadata metadata = null) where TEvent : IAggregateEvent { - if (aggregateEvent == null) - { - throw new ArgumentNullException(nameof(aggregateEvent)); - } + if (aggregateEvent == null) throw new ArgumentNullException(nameof(aggregateEvent)); var aggregateSequenceNumber = Version + 1; var eventId = EventId.NewDeterministic( @@ -113,6 +110,8 @@ public virtual async Task LoadAsync( ISnapshotStore snapshotStore, CancellationToken cancellationToken) { + if (eventStore == null) throw new ArgumentNullException(nameof(eventStore)); + var domainEvents = await eventStore.LoadEventsAsync(Id, cancellationToken).ConfigureAwait(false); ApplyEvents(domainEvents); @@ -124,6 +123,8 @@ public virtual async Task> CommitAsync( ISourceId sourceId, CancellationToken cancellationToken) { + if (eventStore == null) throw new ArgumentNullException(nameof(eventStore)); + var domainEvents = await eventStore.StoreAsync( Id, _uncommittedEvents, @@ -136,6 +137,8 @@ public virtual async Task> CommitAsync( public void ApplyEvents(IReadOnlyCollection domainEvents) { + if (domainEvents == null) throw new ArgumentNullException(nameof(domainEvents)); + if (!domainEvents.Any()) { return; @@ -156,6 +159,8 @@ public IIdentity GetIdentity() public void ApplyEvents(IEnumerable aggregateEvents) { + if (aggregateEvents == null) throw new ArgumentNullException(nameof(aggregateEvents)); + if (Version > 0) { throw new InvalidOperationException($"Aggregate '{GetType().PrettyPrint()}' with ID '{Id}' already has events"); @@ -163,8 +168,7 @@ public void ApplyEvents(IEnumerable aggregateEvents) foreach (var aggregateEvent in aggregateEvents) { - var e = aggregateEvent as IAggregateEvent; - if (e == null) + if (!(aggregateEvent is IAggregateEvent e)) { throw new ArgumentException($"Aggregate event of type '{aggregateEvent.GetType()}' does not belong with aggregate '{this}',"); } @@ -175,6 +179,8 @@ public void ApplyEvents(IEnumerable aggregateEvents) protected virtual void ApplyEvent(IAggregateEvent aggregateEvent) { + if (aggregateEvent == null) throw new ArgumentNullException(nameof(aggregateEvent)); + var eventType = aggregateEvent.GetType(); if (_eventHandlers.ContainsKey(eventType)) { @@ -186,8 +192,7 @@ protected virtual void ApplyEvent(IAggregateEvent aggrega } else { - Action applyMethod; - if (!ApplyMethods.TryGetValue(eventType, out applyMethod)) + if (!ApplyMethods.TryGetValue(eventType, out var applyMethod)) { throw new NotImplementedException( $"Aggregate '{Name}' does have an 'Apply' method that takes aggregate event '{eventType.PrettyPrint()}' as argument"); @@ -203,6 +208,8 @@ protected virtual void ApplyEvent(IAggregateEvent aggrega protected void Register(Action handler) where TAggregateEvent : IAggregateEvent { + if (handler == null) throw new ArgumentNullException(nameof(handler)); + var eventType = typeof(TAggregateEvent); if (_eventHandlers.ContainsKey(eventType)) { @@ -215,6 +222,8 @@ protected void Register(Action handler) protected void Register(IEventApplier eventApplier) { + if (eventApplier == null) throw new ArgumentNullException(nameof(eventApplier)); + _eventAppliers.Add(eventApplier); } From 599363f11ede1aa8c6bf7594e2223f3fce834f4e Mon Sep 17 00:00:00 2001 From: Rasmus Mikkelsen Date: Wed, 6 Jun 2018 20:12:38 +0200 Subject: [PATCH 12/29] Remove the 'for test' as of #469 --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 4647ff13d..1f596449b 100644 --- a/README.md +++ b/README.md @@ -76,7 +76,7 @@ to the documentation. * In-memory - only for test * Files - only for test * Microsoft SQL Server - * EventStore - only for test (for now) [home page](https://geteventstore.com/) + * EventStore - [home page](https://geteventstore.com/) * [**Subscribers:**](http://docs.geteventflow.net/Subscribers.html) Listeners that act on specific domain events. Useful if an specific action needs to be triggered after a domain event has been committed. From 5368e7f05947be45337abd021c042631395b66bf Mon Sep 17 00:00:00 2001 From: Rasmus Mikkelsen Date: Wed, 6 Jun 2018 20:21:03 +0200 Subject: [PATCH 13/29] Cleanup and use correct event ID --- .../EventStoreEventPersistence.cs | 2 +- Source/EventFlow/Aggregates/IEventId.cs | 4 +- Source/EventFlow/Aggregates/IMetadata.cs | 1 + Source/EventFlow/Aggregates/Metadata.cs | 46 ++++++++----------- Source/EventFlow/Core/MetadataContainer.cs | 6 +-- 5 files changed, 27 insertions(+), 32 deletions(-) diff --git a/Source/EventFlow.EventStores.EventStore/EventStoreEventPersistence.cs b/Source/EventFlow.EventStores.EventStore/EventStoreEventPersistence.cs index fec4e4214..cd3191d94 100644 --- a/Source/EventFlow.EventStores.EventStore/EventStoreEventPersistence.cs +++ b/Source/EventFlow.EventStores.EventStore/EventStoreEventPersistence.cs @@ -122,7 +122,7 @@ public async Task> CommitEventsAsync( var eventDatas = serializedEvents .Select(e => { - var guid = Guid.Parse(e.Metadata["guid"]); + var guid = e.Metadata.EventId.GetGuid(); var eventType = string.Format("{0}.{1}.{2}", e.Metadata[MetadataKeys.AggregateName], e.Metadata.EventName, e.Metadata.EventVersion); var data = Encoding.UTF8.GetBytes(e.SerializedData); var meta = Encoding.UTF8.GetBytes(e.SerializedMetadata); diff --git a/Source/EventFlow/Aggregates/IEventId.cs b/Source/EventFlow/Aggregates/IEventId.cs index f1657feb3..8141f3fa0 100644 --- a/Source/EventFlow/Aggregates/IEventId.cs +++ b/Source/EventFlow/Aggregates/IEventId.cs @@ -21,11 +21,13 @@ // IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN // CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +using System; using EventFlow.Core; namespace EventFlow.Aggregates { public interface IEventId : ISourceId { + Guid GetGuid(); } -} \ No newline at end of file +} diff --git a/Source/EventFlow/Aggregates/IMetadata.cs b/Source/EventFlow/Aggregates/IMetadata.cs index afc3c4b16..94f89742f 100644 --- a/Source/EventFlow/Aggregates/IMetadata.cs +++ b/Source/EventFlow/Aggregates/IMetadata.cs @@ -37,6 +37,7 @@ public interface IMetadata : IMetadataContainer long TimestampEpoch { get; } int AggregateSequenceNumber { get; } string AggregateId { get; } + string AggregateName { get; set; } IMetadata CloneWith(params KeyValuePair[] keyValuePairs); IMetadata CloneWith(IEnumerable> keyValuePairs); diff --git a/Source/EventFlow/Aggregates/Metadata.cs b/Source/EventFlow/Aggregates/Metadata.cs index b41b1dc97..d20cc7567 100644 --- a/Source/EventFlow/Aggregates/Metadata.cs +++ b/Source/EventFlow/Aggregates/Metadata.cs @@ -49,71 +49,65 @@ public static IMetadata With(IDictionary keyValuePairs) return new Metadata(keyValuePairs); } + [JsonIgnore] public ISourceId SourceId { - get { return GetMetadataValue(MetadataKeys.SourceId, v => new SourceId(v)); } - set { Add(MetadataKeys.SourceId, value.Value); } + get => GetMetadataValue(MetadataKeys.SourceId, v => new SourceId(v)); + set => Add(MetadataKeys.SourceId, value.Value); } [JsonIgnore] public string EventName { - get { return GetMetadataValue(MetadataKeys.EventName); } - set { Add(MetadataKeys.EventName, value); } + get => GetMetadataValue(MetadataKeys.EventName); + set => Add(MetadataKeys.EventName, value); } [JsonIgnore] public int EventVersion { - get { return GetMetadataValue(MetadataKeys.EventVersion, int.Parse); } - set { Add(MetadataKeys.EventVersion, value.ToString()); } + get => GetMetadataValue(MetadataKeys.EventVersion, int.Parse); + set => Add(MetadataKeys.EventVersion, value.ToString()); } [JsonIgnore] public DateTimeOffset Timestamp { - get { return GetMetadataValue(MetadataKeys.Timestamp, DateTimeOffset.Parse); } - set { Add(MetadataKeys.Timestamp, value.ToString("O")); } + get => GetMetadataValue(MetadataKeys.Timestamp, DateTimeOffset.Parse); + set => Add(MetadataKeys.Timestamp, value.ToString("O")); } [JsonIgnore] - public long TimestampEpoch - { - get - { - string timestampEpoch; - return TryGetValue(MetadataKeys.TimestampEpoch, out timestampEpoch) - ? long.Parse(timestampEpoch) - : Timestamp.ToUnixTime(); - } - } + public long TimestampEpoch => TryGetValue(MetadataKeys.TimestampEpoch, out var timestampEpoch) + ? long.Parse(timestampEpoch) + : Timestamp.ToUnixTime(); [JsonIgnore] public int AggregateSequenceNumber { - get { return GetMetadataValue(MetadataKeys.AggregateSequenceNumber, int.Parse); } - set { Add(MetadataKeys.AggregateSequenceNumber, value.ToString()); } + get => GetMetadataValue(MetadataKeys.AggregateSequenceNumber, int.Parse); + set => Add(MetadataKeys.AggregateSequenceNumber, value.ToString()); } [JsonIgnore] public string AggregateId { - get { return GetMetadataValue(MetadataKeys.AggregateId); } - set { Add(MetadataKeys.AggregateId, value); } + get => GetMetadataValue(MetadataKeys.AggregateId); + set => Add(MetadataKeys.AggregateId, value); } [JsonIgnore] public IEventId EventId { - get { return GetMetadataValue(MetadataKeys.EventId, Aggregates.EventId.With); } - set { Add(MetadataKeys.EventId, value.Value); } + get => GetMetadataValue(MetadataKeys.EventId, Aggregates.EventId.With); + set => Add(MetadataKeys.EventId, value.Value); } [JsonIgnore] public string AggregateName { - get { return GetMetadataValue(MetadataKeys.AggregateName); } - set { Add(MetadataKeys.AggregateName, value); } + get => GetMetadataValue(MetadataKeys.AggregateName); + set => Add(MetadataKeys.AggregateName, value); } public Metadata() diff --git a/Source/EventFlow/Core/MetadataContainer.cs b/Source/EventFlow/Core/MetadataContainer.cs index a2c664416..34e8c6657 100644 --- a/Source/EventFlow/Core/MetadataContainer.cs +++ b/Source/EventFlow/Core/MetadataContainer.cs @@ -74,9 +74,7 @@ public string GetMetadataValue(string key) public T GetMetadataValue(string key, Func converter) { - string value; - - if (!TryGetValue(key, out value)) + if (!TryGetValue(key, out var value)) { throw new MetadataKeyNotFoundException(key); } @@ -91,4 +89,4 @@ public T GetMetadataValue(string key, Func converter) } } } -} \ No newline at end of file +} From 655fd91cc81c3614f1f06d42cefa7dcc3315124f Mon Sep 17 00:00:00 2001 From: Rasmus Mikkelsen Date: Wed, 6 Jun 2018 20:24:04 +0200 Subject: [PATCH 14/29] Remove transaction --- .../EventStoreEventPersistence.cs | 23 ++++++++++--------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/Source/EventFlow.EventStores.EventStore/EventStoreEventPersistence.cs b/Source/EventFlow.EventStores.EventStore/EventStoreEventPersistence.cs index cd3191d94..e77f33866 100644 --- a/Source/EventFlow.EventStores.EventStore/EventStoreEventPersistence.cs +++ b/Source/EventFlow.EventStores.EventStore/EventStoreEventPersistence.cs @@ -132,17 +132,18 @@ public async Task> CommitEventsAsync( try { - using (var transaction = await _connection.StartTransactionAsync(id.Value, expectedVersion).ConfigureAwait(false)) - { - await transaction.WriteAsync(eventDatas).ConfigureAwait(false); - var writeResult = await transaction.CommitAsync().ConfigureAwait(false); - _log.Verbose( - "Wrote entity {0} with version {1} ({2},{3})", - id, - writeResult.NextExpectedVersion - 1, - writeResult.LogPosition.CommitPosition, - writeResult.LogPosition.PreparePosition); - } + var writeResult = await _connection.AppendToStreamAsync( + id.Value, + expectedVersion, + eventDatas) + .ConfigureAwait(false); + + _log.Verbose( + "Wrote entity {0} with version {1} ({2},{3})", + id, + writeResult.NextExpectedVersion - 1, + writeResult.LogPosition.CommitPosition, + writeResult.LogPosition.PreparePosition); } catch (WrongExpectedVersionException e) { From 55fcfc3f7527dd4d5ea5536e39e98e56d5b9af32 Mon Sep 17 00:00:00 2001 From: Rasmus Mikkelsen Date: Wed, 6 Jun 2018 21:25:35 +0200 Subject: [PATCH 15/29] Don't use EventId --- .../EventStoreEventPersistence.cs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/Source/EventFlow.EventStores.EventStore/EventStoreEventPersistence.cs b/Source/EventFlow.EventStores.EventStore/EventStoreEventPersistence.cs index e77f33866..489cafcde 100644 --- a/Source/EventFlow.EventStores.EventStore/EventStoreEventPersistence.cs +++ b/Source/EventFlow.EventStores.EventStore/EventStoreEventPersistence.cs @@ -122,7 +122,10 @@ public async Task> CommitEventsAsync( var eventDatas = serializedEvents .Select(e => { - var guid = e.Metadata.EventId.GetGuid(); + // While it might be tempting to use e.Metadata.EventId here, we can't + // as EventStore won't detect optimistic concurrency exceptions then + var guid = Guid.NewGuid(); + var eventType = string.Format("{0}.{1}.{2}", e.Metadata[MetadataKeys.AggregateName], e.Metadata.EventName, e.Metadata.EventVersion); var data = Encoding.UTF8.GetBytes(e.SerializedData); var meta = Encoding.UTF8.GetBytes(e.SerializedMetadata); From 9bf83f48af11bfafe1b27e132337f4b57f2ae524 Mon Sep 17 00:00:00 2001 From: Rasmus Mikkelsen Date: Wed, 6 Jun 2018 21:30:04 +0200 Subject: [PATCH 16/29] Updated README --- RELEASE_NOTES.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 35d4fd332..8cd5b8414 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -1,6 +1,6 @@ ### New in 0.60 (not released yet) -* _Nothing yet_ +* Minor: Removed unnecessary transaction in EventStore persistance ### New in 0.59.3396 (released 2018-05-23) From 814c8b340cf69ac2c4136c2c7d13a5c460d7f9b0 Mon Sep 17 00:00:00 2001 From: Rasmus Mikkelsen Date: Wed, 6 Jun 2018 23:20:46 +0200 Subject: [PATCH 17/29] Remove set; from IMetadata --- Source/EventFlow/Aggregates/IMetadata.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Source/EventFlow/Aggregates/IMetadata.cs b/Source/EventFlow/Aggregates/IMetadata.cs index 94f89742f..3e7e385a5 100644 --- a/Source/EventFlow/Aggregates/IMetadata.cs +++ b/Source/EventFlow/Aggregates/IMetadata.cs @@ -37,9 +37,9 @@ public interface IMetadata : IMetadataContainer long TimestampEpoch { get; } int AggregateSequenceNumber { get; } string AggregateId { get; } - string AggregateName { get; set; } + string AggregateName { get; } IMetadata CloneWith(params KeyValuePair[] keyValuePairs); IMetadata CloneWith(IEnumerable> keyValuePairs); } -} \ No newline at end of file +} From f39cb7f113d00b0c9f3c8f10901e52a970033220 Mon Sep 17 00:00:00 2001 From: Rasmus Mikkelsen Date: Thu, 7 Jun 2018 19:52:25 +0200 Subject: [PATCH 18/29] Added .NET standard support to EventStore --- RELEASE_NOTES.md | 1 + .../EventFlow.EventStores.EventStore.csproj | 7 +++++-- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 8cd5b8414..82a9e20f8 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -1,5 +1,6 @@ ### New in 0.60 (not released yet) +* New: Added .NET standard support for EventStore * Minor: Removed unnecessary transaction in EventStore persistance ### New in 0.59.3396 (released 2018-05-23) diff --git a/Source/EventFlow.EventStores.EventStore/EventFlow.EventStores.EventStore.csproj b/Source/EventFlow.EventStores.EventStore/EventFlow.EventStores.EventStore.csproj index e63684bf7..b04ba887e 100644 --- a/Source/EventFlow.EventStores.EventStore/EventFlow.EventStores.EventStore.csproj +++ b/Source/EventFlow.EventStores.EventStore/EventFlow.EventStores.EventStore.csproj @@ -1,7 +1,7 @@  - net462 + net462;netstandard2.0 True True False @@ -23,9 +23,12 @@ - + + + + From 4e7e52671d83786dacfc14747b689252afc3e35e Mon Sep 17 00:00:00 2001 From: Rasmus Mikkelsen Date: Thu, 7 Jun 2018 23:15:11 +0200 Subject: [PATCH 19/29] Implement optimistic concurrency check for Elasticsearch --- .../ReadStores/ElasticsearchReadModelStore.cs | 87 ++++++++++++------- 1 file changed, 57 insertions(+), 30 deletions(-) diff --git a/Source/EventFlow.Elasticsearch/ReadStores/ElasticsearchReadModelStore.cs b/Source/EventFlow.Elasticsearch/ReadStores/ElasticsearchReadModelStore.cs index 6a368db72..9fdfeec27 100644 --- a/Source/EventFlow.Elasticsearch/ReadStores/ElasticsearchReadModelStore.cs +++ b/Source/EventFlow.Elasticsearch/ReadStores/ElasticsearchReadModelStore.cs @@ -23,12 +23,15 @@ using System; using System.Collections.Generic; -using System.Linq; using System.Net; using System.Threading; using System.Threading.Tasks; using Elasticsearch.Net; using EventFlow.Aggregates; +using EventFlow.Core; +using EventFlow.Core.RetryStrategies; +using EventFlow.Elasticsearch.ValueObjects; +using EventFlow.Exceptions; using EventFlow.Extensions; using EventFlow.Logs; using EventFlow.ReadStores; @@ -43,15 +46,18 @@ public class ElasticsearchReadModelStore : private readonly ILog _log; private readonly IElasticClient _elasticClient; private readonly IReadModelDescriptionProvider _readModelDescriptionProvider; + private readonly ITransientFaultHandler _transientFaultHandler; public ElasticsearchReadModelStore( ILog log, IElasticClient elasticClient, - IReadModelDescriptionProvider readModelDescriptionProvider) + IReadModelDescriptionProvider readModelDescriptionProvider, + ITransientFaultHandler transientFaultHandler) { _log = log; _elasticClient = elasticClient; _readModelDescriptionProvider = readModelDescriptionProvider; + _transientFaultHandler = transientFaultHandler; } public async Task> GetAsync( @@ -119,44 +125,65 @@ public async Task UpdateAsync( { var readModelDescription = _readModelDescriptionProvider.GetReadModelDescription(); - _log.Verbose(() => - { - var readModelIds = readModelUpdates - .Select(u => u.ReadModelId) - .Distinct() - .OrderBy(i => i) - .ToList(); - return $"Updating read models of type '{typeof(TReadModel).PrettyPrint()}' with IDs '{string.Join(", ", readModelIds)}' in index '{readModelDescription.IndexName}'"; - }); - foreach (var readModelUpdate in readModelUpdates) { - var response = await _elasticClient.GetAsync( - readModelUpdate.ReadModelId, - d => d - .RequestConfiguration(c => c - .AllowedStatusCodes((int)HttpStatusCode.NotFound)) - .Index(readModelDescription.IndexName.Value), - cancellationToken) + await _transientFaultHandler.TryAsync( + c => UpdateReadModelAsync(readModelDescription, readModelUpdate, readModelContext, updateReadModel, c), + Label.Named("elasticsearch-read-model-update"), + cancellationToken) .ConfigureAwait(false); + } + } + + private async Task UpdateReadModelAsync( + ReadModelDescription readModelDescription, + ReadModelUpdate readModelUpdate, + IReadModelContext readModelContext, + Func, ReadModelEnvelope, CancellationToken, Task>> updateReadModel, + CancellationToken cancellationToken) + { + var response = await _elasticClient.GetAsync( + readModelUpdate.ReadModelId, + d => d + .RequestConfiguration(c => c + .AllowedStatusCodes((int)HttpStatusCode.NotFound)) + .Index(readModelDescription.IndexName.Value), + cancellationToken) + .ConfigureAwait(false); - var readModelEnvelope = response.Found - ? ReadModelEnvelope.With(readModelUpdate.ReadModelId, response.Source, response.Version) - : ReadModelEnvelope.Empty(readModelUpdate.ReadModelId); + var readModelEnvelope = response.Found + ? ReadModelEnvelope.With(readModelUpdate.ReadModelId, response.Source, response.Version) + : ReadModelEnvelope.Empty(readModelUpdate.ReadModelId); - readModelEnvelope = await updateReadModel(readModelContext, readModelUpdate.DomainEvents, readModelEnvelope, cancellationToken).ConfigureAwait(false); + readModelEnvelope = await updateReadModel( + readModelContext, + readModelUpdate.DomainEvents, + readModelEnvelope, + cancellationToken) + .ConfigureAwait(false); + try + { await _elasticClient.IndexAsync( readModelEnvelope.ReadModel, - d => d - .RequestConfiguration(c => c) - .Id(readModelUpdate.ReadModelId) - .Index(readModelDescription.IndexName.Value) - .Version(readModelEnvelope.Version.GetValueOrDefault()) - .VersionType(VersionType.ExternalGte), - cancellationToken) + d => + { + d = d + .RequestConfiguration(c => c) + .Id(readModelUpdate.ReadModelId) + .Index(readModelDescription.IndexName.Value); + d = response.Found + ? d.Version(response.Version) + : d.OpType(OpType.Create); + return d; + }, + cancellationToken) .ConfigureAwait(false); } + catch (ElasticsearchClientException e) when (e.Response?.HttpStatusCode == (int)HttpStatusCode.Conflict) + { + throw new OptimisticConcurrencyException(e.Message); + } } } } \ No newline at end of file From 4f6de84fce30d6033c46bbddcd5f5012fa9c84ec Mon Sep 17 00:00:00 2001 From: Rasmus Mikkelsen Date: Sat, 9 Jun 2018 11:51:13 +0200 Subject: [PATCH 20/29] Minor cleanups --- .../ReadStores/ElasticsearchReadModelStore.cs | 7 +++++-- Source/EventFlow.MsSql/ReadStores/MssqlReadModelStore.cs | 2 +- Source/EventFlow.Sql/ReadModels/SqlReadModelStore.cs | 4 ++-- 3 files changed, 8 insertions(+), 5 deletions(-) diff --git a/Source/EventFlow.Elasticsearch/ReadStores/ElasticsearchReadModelStore.cs b/Source/EventFlow.Elasticsearch/ReadStores/ElasticsearchReadModelStore.cs index 9fdfeec27..f2c3abeb5 100644 --- a/Source/EventFlow.Elasticsearch/ReadStores/ElasticsearchReadModelStore.cs +++ b/Source/EventFlow.Elasticsearch/ReadStores/ElasticsearchReadModelStore.cs @@ -180,9 +180,12 @@ await _elasticClient.IndexAsync( cancellationToken) .ConfigureAwait(false); } - catch (ElasticsearchClientException e) when (e.Response?.HttpStatusCode == (int)HttpStatusCode.Conflict) + catch (ElasticsearchClientException e) + when (e.Response?.HttpStatusCode == (int)HttpStatusCode.Conflict) { - throw new OptimisticConcurrencyException(e.Message); + throw new OptimisticConcurrencyException( + $"Read model '{readModelUpdate.ReadModelId}' updated by another", + e); } } } diff --git a/Source/EventFlow.MsSql/ReadStores/MssqlReadModelStore.cs b/Source/EventFlow.MsSql/ReadStores/MssqlReadModelStore.cs index 45e55e200..33d8cebaa 100644 --- a/Source/EventFlow.MsSql/ReadStores/MssqlReadModelStore.cs +++ b/Source/EventFlow.MsSql/ReadStores/MssqlReadModelStore.cs @@ -171,7 +171,7 @@ private async Task UpdateReadModelAsync( if (rowsAffected != 1) { throw new OptimisticConcurrencyException( - $"Read model '{readModelEnvelope.ReadModelId}' ({typeof(TReadModel).PrettyPrint()}) is already updated"); + $"Read model '{readModelEnvelope.ReadModelId}' updated by another"); } Log.Verbose(() => $"Updated MSSQL read model {typeof(TReadModel).PrettyPrint()} with ID '{readModelUpdate.ReadModelId}' to version '{readModelEnvelope.Version}'"); diff --git a/Source/EventFlow.Sql/ReadModels/SqlReadModelStore.cs b/Source/EventFlow.Sql/ReadModels/SqlReadModelStore.cs index f7c0e1b3c..596608154 100644 --- a/Source/EventFlow.Sql/ReadModels/SqlReadModelStore.cs +++ b/Source/EventFlow.Sql/ReadModels/SqlReadModelStore.cs @@ -164,10 +164,10 @@ private async Task UpdateReadModelAsync( if (rowsAffected != 1) { throw new OptimisticConcurrencyException( - $"Read model '{readModelEnvelope.ReadModelId}' ({typeof(TReadModel).PrettyPrint()}) is already updated"); + $"Read model '{readModelEnvelope.ReadModelId}' updated by another"); } - Log.Verbose(() => $"Updated MSSQL read model {typeof(TReadModel).PrettyPrint()} with ID '{readModelUpdate.ReadModelId}' to version '{readModelEnvelope.Version}'"); + Log.Verbose(() => $"Updated SQL read model {typeof(TReadModel).PrettyPrint()} with ID '{readModelUpdate.ReadModelId}' to version '{readModelEnvelope.Version}'"); } public override async Task> GetAsync(string id, CancellationToken cancellationToken) From 3b29ab463bf47ca9d291ab9a605455101c1ce29b Mon Sep 17 00:00:00 2001 From: Rasmus Mikkelsen Date: Sat, 9 Jun 2018 11:53:12 +0200 Subject: [PATCH 21/29] Updated release notes --- RELEASE_NOTES.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 8cd5b8414..ebef5cad7 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -1,5 +1,7 @@ ### New in 0.60 (not released yet) +* New: Implemented optimistic concurrency checks for MSSQL, SQLite and + Elasticsearch read models * Minor: Removed unnecessary transaction in EventStore persistance ### New in 0.59.3396 (released 2018-05-23) From 2bc31ad2763f6ed34a3cb426b13a62eaffc73489 Mon Sep 17 00:00:00 2001 From: Rasmus Mikkelsen Date: Sat, 9 Jun 2018 12:27:46 +0200 Subject: [PATCH 22/29] Cleanup --- .../MultipleAggregateReadStoreManager.cs | 15 ++++++++++++--- .../ReadStores/SingleAggregateReadStoreManager.cs | 5 ++++- 2 files changed, 16 insertions(+), 4 deletions(-) diff --git a/Source/EventFlow/ReadStores/MultipleAggregateReadStoreManager.cs b/Source/EventFlow/ReadStores/MultipleAggregateReadStoreManager.cs index 77d6e489e..557dfb177 100644 --- a/Source/EventFlow/ReadStores/MultipleAggregateReadStoreManager.cs +++ b/Source/EventFlow/ReadStores/MultipleAggregateReadStoreManager.cs @@ -71,8 +71,17 @@ protected override async Task> UpdateAsync( CancellationToken cancellationToken) { var readModel = readModelEnvelope.ReadModel ?? await ReadModelFactory.CreateAsync(readModelEnvelope.ReadModelId, cancellationToken).ConfigureAwait(false); - await ReadModelDomainEventApplier.UpdateReadModelAsync(readModel, domainEvents, readModelContext, cancellationToken).ConfigureAwait(false); - return ReadModelEnvelope.With(readModelEnvelope.ReadModelId, readModel); + await ReadModelDomainEventApplier.UpdateReadModelAsync( + readModel, + domainEvents, + readModelContext, + cancellationToken) + .ConfigureAwait(false); + return ReadModelEnvelope.With( + readModelEnvelope.ReadModelId, + readModel, + readModelEnvelope.Version.GetValueOrDefault() + 1 // the best we can do + ); } } -} \ No newline at end of file +} diff --git a/Source/EventFlow/ReadStores/SingleAggregateReadStoreManager.cs b/Source/EventFlow/ReadStores/SingleAggregateReadStoreManager.cs index d5e284df6..c93dee212 100644 --- a/Source/EventFlow/ReadStores/SingleAggregateReadStoreManager.cs +++ b/Source/EventFlow/ReadStores/SingleAggregateReadStoreManager.cs @@ -21,6 +21,7 @@ // IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN // CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +using System; using System.Collections.Generic; using System.Linq; using System.Threading; @@ -64,7 +65,9 @@ protected override async Task> UpdateAsync( await ReadModelDomainEventApplier.UpdateReadModelAsync(readModel, domainEvents, readModelContext, cancellationToken).ConfigureAwait(false); - var readModelVersion = domainEvents.Max(e => e.AggregateSequenceNumber); + var readModelVersion = Math.Max( + domainEvents.Max(e => e.AggregateSequenceNumber), + readModelEnvelope.Version.GetValueOrDefault()); return ReadModelEnvelope.With(readModelEnvelope.ReadModelId, readModel, readModelVersion); } From 8900b9455f4d004d44d1be3bd21ec0898f8804c1 Mon Sep 17 00:00:00 2001 From: Rasmus Mikkelsen Date: Sat, 9 Jun 2018 12:43:47 +0200 Subject: [PATCH 23/29] Use version from envelope --- .../ReadStores/ElasticsearchReadModelStore.cs | 2 +- .../Suites/TestSuiteForReadModelStore.cs | 21 +++++++++++++++++++ 2 files changed, 22 insertions(+), 1 deletion(-) diff --git a/Source/EventFlow.Elasticsearch/ReadStores/ElasticsearchReadModelStore.cs b/Source/EventFlow.Elasticsearch/ReadStores/ElasticsearchReadModelStore.cs index f2c3abeb5..ee90ad7b4 100644 --- a/Source/EventFlow.Elasticsearch/ReadStores/ElasticsearchReadModelStore.cs +++ b/Source/EventFlow.Elasticsearch/ReadStores/ElasticsearchReadModelStore.cs @@ -173,7 +173,7 @@ await _elasticClient.IndexAsync( .Id(readModelUpdate.ReadModelId) .Index(readModelDescription.IndexName.Value); d = response.Found - ? d.Version(response.Version) + ? d.VersionType(VersionType.ExternalGte).Version(readModelEnvelope.Version.GetValueOrDefault()) : d.OpType(OpType.Create); return d; }, diff --git a/Source/EventFlow.TestHelpers/Suites/TestSuiteForReadModelStore.cs b/Source/EventFlow.TestHelpers/Suites/TestSuiteForReadModelStore.cs index 219c7bbb4..c0a47058e 100644 --- a/Source/EventFlow.TestHelpers/Suites/TestSuiteForReadModelStore.cs +++ b/Source/EventFlow.TestHelpers/Suites/TestSuiteForReadModelStore.cs @@ -218,6 +218,27 @@ public async Task PopulateCreatesReadModels() readModel.PingsReceived.Should().Be(2); } + [Test] + public async Task MultipleUpdatesAreHandledCorrect() + { + // Arrange + var id = ThingyId.New; + var pingIds = new List + { + await PublishPingCommandAsync(id).ConfigureAwait(false) + }; + + for (var i = 0; i < 5; i++) + { + // Act + pingIds.Add(await PublishPingCommandAsync(id).ConfigureAwait(false)); + + // Assert + var readModel = await QueryProcessor.ProcessAsync(new ThingyGetQuery(id)).ConfigureAwait(false); + readModel.PingsReceived.Should().Be(pingIds.Count); + } + } + [Test, Timeout(10000)] public virtual async Task OptimisticConcurrencyCheck() { From e8b8df284d1060a0c6262f5377aca763d603b0ac Mon Sep 17 00:00:00 2001 From: idubnori Date: Tue, 12 Jun 2018 22:12:43 +0900 Subject: [PATCH 24/29] Fixed bug that returns http response status 200 even if exception occurs. Added tests. --- .../IntegrationTests/Site/SiteTests.cs | 11 +++++++++++ .../Middlewares/CommandPublishMiddleware.cs | 2 +- .../IntegrationTests/Site/SiteTests.cs | 11 +++++++++++ .../Middlewares/CommandPublishMiddleware.cs | 2 +- 4 files changed, 24 insertions(+), 2 deletions(-) diff --git a/Source/EventFlow.AspNetCore.Tests/IntegrationTests/Site/SiteTests.cs b/Source/EventFlow.AspNetCore.Tests/IntegrationTests/Site/SiteTests.cs index 50d13e98d..3ca422ef1 100644 --- a/Source/EventFlow.AspNetCore.Tests/IntegrationTests/Site/SiteTests.cs +++ b/Source/EventFlow.AspNetCore.Tests/IntegrationTests/Site/SiteTests.cs @@ -21,12 +21,14 @@ // IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN // CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +using System; using System.Net.Http; using System.Text; using System.Threading.Tasks; using EventFlow.Logs; using EventFlow.TestHelpers; using EventFlow.TestHelpers.Aggregates.Commands; +using FluentAssertions; using Microsoft.AspNetCore.Hosting; using Microsoft.AspNetCore.TestHost; using Newtonsoft.Json; @@ -74,6 +76,15 @@ public async Task PublishCommand() await PostAsync("commands/ThingyPing/1", pingCommand).ConfigureAwait(false); } + [Test] + public void PublishCommand_WithNull_ThrowsException() + { + // Arrange + Act + Action action = () => Task.WaitAll(PostAsync("commands/ThingyPing/1", null)); + + action.ShouldThrow("because of command is null."); + } + private async Task GetAsync(string url) { // Act diff --git a/Source/EventFlow.AspNetCore/Middlewares/CommandPublishMiddleware.cs b/Source/EventFlow.AspNetCore/Middlewares/CommandPublishMiddleware.cs index bc9a1c9f4..37c424128 100644 --- a/Source/EventFlow.AspNetCore/Middlewares/CommandPublishMiddleware.cs +++ b/Source/EventFlow.AspNetCore/Middlewares/CommandPublishMiddleware.cs @@ -124,8 +124,8 @@ await WriteAsync( private async Task WriteAsync(object obj, HttpStatusCode statusCode, HttpContext context) { var json = _jsonSerializer.Serialize(obj); - await context.Response.WriteAsync(json).ConfigureAwait(false); context.Response.StatusCode = (int) statusCode; + await context.Response.WriteAsync(json).ConfigureAwait(false); } private Task WriteErrorAsync(string errorMessage, HttpStatusCode statusCode, HttpContext context) diff --git a/Source/EventFlow.Owin.Tests/IntegrationTests/Site/SiteTests.cs b/Source/EventFlow.Owin.Tests/IntegrationTests/Site/SiteTests.cs index 9b9b738c8..d1d6e3a40 100644 --- a/Source/EventFlow.Owin.Tests/IntegrationTests/Site/SiteTests.cs +++ b/Source/EventFlow.Owin.Tests/IntegrationTests/Site/SiteTests.cs @@ -22,9 +22,11 @@ // CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. using System; +using System.Net.Http; using System.Threading.Tasks; using EventFlow.TestHelpers; using EventFlow.TestHelpers.Aggregates.Commands; +using FluentAssertions; using Microsoft.Owin.Hosting; using NUnit.Framework; @@ -68,6 +70,15 @@ public async Task PublishCommand() await PostAsync("commands/ThingyPing/1", pingCommand).ConfigureAwait(false); } + [Test] + public void PublishCommand_WithNull_ThrowsException() + { + // Arrange + Act + Action action = () => Task.WaitAll(PostAsync("commands/ThingyPing/1", null)); + + action.ShouldThrow("because of command is null."); + } + private Task GetAsync(string url) { var uri = new Uri(_uri, url); diff --git a/Source/EventFlow.Owin/Middlewares/CommandPublishMiddleware.cs b/Source/EventFlow.Owin/Middlewares/CommandPublishMiddleware.cs index b2a983774..7ee885364 100644 --- a/Source/EventFlow.Owin/Middlewares/CommandPublishMiddleware.cs +++ b/Source/EventFlow.Owin/Middlewares/CommandPublishMiddleware.cs @@ -123,8 +123,8 @@ await WriteAsync( private async Task WriteAsync(object obj, HttpStatusCode statusCode, IOwinContext owinContext) { var json = _jsonSerializer.Serialize(obj); - await owinContext.Response.WriteAsync(json).ConfigureAwait(false); owinContext.Response.StatusCode = (int) statusCode; + await owinContext.Response.WriteAsync(json).ConfigureAwait(false); } private Task WriteErrorAsync(string errorMessage, HttpStatusCode statusCode, IOwinContext owinContext) From 327f7c45fa6c09d76ecf5b09919bb90b77d47a2a Mon Sep 17 00:00:00 2001 From: "Ebersoll, Frank" Date: Thu, 14 Jun 2018 19:19:57 +0200 Subject: [PATCH 25/29] IReadModelContext: Mark for deletion --- .../ElasticsearchThingyReadModel.cs | 8 +++++- .../ReadStores/ElasticsearchReadModelStore.cs | 15 ++++++++--- .../ReadModels/MsSqlThingyReadModel.cs | 8 +++++- .../ReadStores/MssqlReadModelStore.cs | 19 +++++++++----- .../ReadModels/SQliteThingyReadModel.cs | 8 +++++- .../ReadModels/SqlReadModelStore.cs | 16 +++++++++--- .../Commands/ThingyDeleteCommand.cs | 26 +++++++++++++++++++ .../Aggregates/Events/ThingyDeletedEvent.cs | 10 +++++++ .../Aggregates/ThingyAggregate.cs | 6 +++++ .../Suites/TestSuiteForReadModelStore.cs | 23 ++++++++++++++++ .../ReadModels/InMemoryThingyReadModel.cs | 8 +++++- .../MultipleAggregateReadStoreManagerTests.cs | 2 +- .../ReadStores/ReadStoreManagerTestSuite.cs | 2 +- .../EventFlow/ReadStores/IReadModelContext.cs | 2 ++ .../EventFlow/ReadStores/IReadModelStore.cs | 8 +++--- .../ReadStores/InMemory/InMemoryReadStore.cs | 19 ++++++++++---- .../EventFlow/ReadStores/ReadModelContext.cs | 14 +++++++--- Source/EventFlow/ReadStores/ReadModelStore.cs | 8 +++--- .../EventFlow/ReadStores/ReadStoreManager.cs | 5 ++-- 19 files changed, 168 insertions(+), 39 deletions(-) create mode 100644 Source/EventFlow.TestHelpers/Aggregates/Commands/ThingyDeleteCommand.cs create mode 100644 Source/EventFlow.TestHelpers/Aggregates/Events/ThingyDeletedEvent.cs diff --git a/Source/EventFlow.Elasticsearch.Tests/IntegrationTests/ReadModels/ElasticsearchThingyReadModel.cs b/Source/EventFlow.Elasticsearch.Tests/IntegrationTests/ReadModels/ElasticsearchThingyReadModel.cs index a763b999b..d7747fc27 100644 --- a/Source/EventFlow.Elasticsearch.Tests/IntegrationTests/ReadModels/ElasticsearchThingyReadModel.cs +++ b/Source/EventFlow.Elasticsearch.Tests/IntegrationTests/ReadModels/ElasticsearchThingyReadModel.cs @@ -32,7 +32,8 @@ namespace EventFlow.Elasticsearch.Tests.IntegrationTests.ReadModels [ElasticsearchType(IdProperty = "Id", Name = "thingy")] public class ElasticsearchThingyReadModel : IReadModel, IAmReadModelFor, - IAmReadModelFor + IAmReadModelFor, + IAmReadModelFor { [Keyword( Index = true)] @@ -61,6 +62,11 @@ public void Apply(IReadModelContext context, IDomainEvent domainEvent) + { + context.MarkForDeletion(); + } + public Thingy ToThingy() { return new Thingy( diff --git a/Source/EventFlow.Elasticsearch/ReadStores/ElasticsearchReadModelStore.cs b/Source/EventFlow.Elasticsearch/ReadStores/ElasticsearchReadModelStore.cs index ee90ad7b4..a1e610cc7 100644 --- a/Source/EventFlow.Elasticsearch/ReadStores/ElasticsearchReadModelStore.cs +++ b/Source/EventFlow.Elasticsearch/ReadStores/ElasticsearchReadModelStore.cs @@ -117,13 +117,14 @@ await _elasticClient.DeleteIndexAsync( .ConfigureAwait(false); } - public async Task UpdateAsync( - IReadOnlyCollection readModelUpdates, - IReadModelContext readModelContext, - Func, ReadModelEnvelope, CancellationToken, Task>> updateReadModel, + public async Task UpdateAsync(IReadOnlyCollection readModelUpdates, + Func readModelContextFactory, + Func, ReadModelEnvelope, CancellationToken, + Task>> updateReadModel, CancellationToken cancellationToken) { var readModelDescription = _readModelDescriptionProvider.GetReadModelDescription(); + var readModelContext = readModelContextFactory(); foreach (var readModelUpdate in readModelUpdates) { @@ -162,6 +163,12 @@ private async Task UpdateReadModelAsync( cancellationToken) .ConfigureAwait(false); + if (readModelContext.IsMarkedForDeletion) + { + await DeleteAsync(readModelUpdate.ReadModelId, cancellationToken); + return; + } + try { await _elasticClient.IndexAsync( diff --git a/Source/EventFlow.MsSql.Tests/IntegrationTests/ReadStores/ReadModels/MsSqlThingyReadModel.cs b/Source/EventFlow.MsSql.Tests/IntegrationTests/ReadStores/ReadModels/MsSqlThingyReadModel.cs index 7ff66c8db..b66a8d61a 100644 --- a/Source/EventFlow.MsSql.Tests/IntegrationTests/ReadStores/ReadModels/MsSqlThingyReadModel.cs +++ b/Source/EventFlow.MsSql.Tests/IntegrationTests/ReadStores/ReadModels/MsSqlThingyReadModel.cs @@ -35,7 +35,8 @@ namespace EventFlow.MsSql.Tests.IntegrationTests.ReadStores.ReadModels [Table("ReadModel-ThingyAggregate")] public class MsSqlThingyReadModel : MssqlReadModel, IAmReadModelFor, - IAmReadModelFor + IAmReadModelFor, + IAmReadModelFor { public bool DomainErrorAfterFirstReceived { get; set; } public int PingsReceived { get; set; } @@ -50,6 +51,11 @@ public void Apply(IReadModelContext context, IDomainEvent domainEvent) + { + context.MarkForDeletion(); + } + public Thingy ToThingy() { return new Thingy( diff --git a/Source/EventFlow.MsSql/ReadStores/MssqlReadModelStore.cs b/Source/EventFlow.MsSql/ReadStores/MssqlReadModelStore.cs index 33d8cebaa..38ccacc7c 100644 --- a/Source/EventFlow.MsSql/ReadStores/MssqlReadModelStore.cs +++ b/Source/EventFlow.MsSql/ReadStores/MssqlReadModelStore.cs @@ -93,16 +93,16 @@ public MssqlReadModelStore( _transientFaultHandler = transientFaultHandler; } - public override async Task UpdateAsync( - IReadOnlyCollection readModelUpdates, - IReadModelContext readModelContext, - Func, ReadModelEnvelope, CancellationToken, Task>> updateReadModel, + public override async Task UpdateAsync(IReadOnlyCollection readModelUpdates, + Func readModelContextFactory, + Func, ReadModelEnvelope, CancellationToken, + Task>> updateReadModel, CancellationToken cancellationToken) { foreach (var readModelUpdate in readModelUpdates) { await _transientFaultHandler.TryAsync( - c => UpdateReadModelAsync(readModelContext, updateReadModel, c, readModelUpdate), + c => UpdateReadModelAsync(readModelContextFactory, updateReadModel, c, readModelUpdate), Label.Named($"mssql-read-model-update-{ReadModelNameLoverCase}"), cancellationToken) .ConfigureAwait(false); @@ -110,7 +110,7 @@ await _transientFaultHandler.TryAsync( } private async Task UpdateReadModelAsync( - IReadModelContext readModelContext, + Func readModelContextFactory, Func, ReadModelEnvelope, CancellationToken, Task>> updateReadModel, CancellationToken cancellationToken, ReadModelUpdate readModelUpdate) @@ -119,6 +119,7 @@ private async Task UpdateReadModelAsync( var readModelEnvelope = await GetAsync(readModelUpdate.ReadModelId, cancellationToken).ConfigureAwait(false); var readModel = readModelEnvelope.ReadModel; + var readModelContext = readModelContextFactory(); var isNew = readModel == null; if (readModel == null) @@ -142,6 +143,12 @@ private async Task UpdateReadModelAsync( cancellationToken) .ConfigureAwait(false); + if (readModelContext.IsMarkedForDeletion) + { + await DeleteAsync(readModelUpdate.ReadModelId, cancellationToken); + return; + } + mssqlReadModel = readModel as IMssqlReadModel; if (mssqlReadModel != null) { diff --git a/Source/EventFlow.SQLite.Tests/IntegrationTests/ReadStores/ReadModels/SQliteThingyReadModel.cs b/Source/EventFlow.SQLite.Tests/IntegrationTests/ReadStores/ReadModels/SQliteThingyReadModel.cs index 5ed25c930..fd82cb632 100644 --- a/Source/EventFlow.SQLite.Tests/IntegrationTests/ReadStores/ReadModels/SQliteThingyReadModel.cs +++ b/Source/EventFlow.SQLite.Tests/IntegrationTests/ReadStores/ReadModels/SQliteThingyReadModel.cs @@ -33,7 +33,8 @@ namespace EventFlow.SQLite.Tests.IntegrationTests.ReadStores.ReadModels [Table("ReadModel-ThingyAggregate")] public class SQLiteThingyReadModel : IReadModel, IAmReadModelFor, - IAmReadModelFor + IAmReadModelFor, + IAmReadModelFor { [SqlReadModelIdentityColumn] public string AggregateId { get; set; } @@ -55,6 +56,11 @@ public void Apply(IReadModelContext context, IDomainEvent domainEvent) + { + context.MarkForDeletion(); + } + public Thingy ToThingy() { return new Thingy( diff --git a/Source/EventFlow.Sql/ReadModels/SqlReadModelStore.cs b/Source/EventFlow.Sql/ReadModels/SqlReadModelStore.cs index 596608154..add561206 100644 --- a/Source/EventFlow.Sql/ReadModels/SqlReadModelStore.cs +++ b/Source/EventFlow.Sql/ReadModels/SqlReadModelStore.cs @@ -101,14 +101,16 @@ protected SqlReadModelStore( _transientFaultHandler = transientFaultHandler; } - public override async Task UpdateAsync( - IReadOnlyCollection readModelUpdates, - IReadModelContext readModelContext, - Func, ReadModelEnvelope, CancellationToken, Task>> updateReadModel, + public override async Task UpdateAsync(IReadOnlyCollection readModelUpdates, + Func readModelContextFactory, + Func, ReadModelEnvelope, CancellationToken, + Task>> updateReadModel, CancellationToken cancellationToken) { foreach (var readModelUpdate in readModelUpdates) { + var readModelContext = readModelContextFactory(); + await _transientFaultHandler.TryAsync( c => UpdateReadModelAsync(readModelContext, updateReadModel, c, readModelUpdate), Label.Named($"sqlite-read-model-update"), @@ -142,6 +144,12 @@ private async Task UpdateReadModelAsync( cancellationToken) .ConfigureAwait(false); + if (readModelContext.IsMarkedForDeletion) + { + await DeleteAsync(readModelUpdate.ReadModelId, cancellationToken); + return; + } + SetVersion(readModel, (int?) readModelEnvelope.Version); SetIdentity(readModel, readModelEnvelope.ReadModelId); diff --git a/Source/EventFlow.TestHelpers/Aggregates/Commands/ThingyDeleteCommand.cs b/Source/EventFlow.TestHelpers/Aggregates/Commands/ThingyDeleteCommand.cs new file mode 100644 index 000000000..2a6b69b71 --- /dev/null +++ b/Source/EventFlow.TestHelpers/Aggregates/Commands/ThingyDeleteCommand.cs @@ -0,0 +1,26 @@ +using System.Threading; +using System.Threading.Tasks; +using EventFlow.Commands; +using EventFlow.TestHelpers.Aggregates.ValueObjects; + +namespace EventFlow.TestHelpers.Aggregates.Commands +{ + [CommandVersion("ThingyDelete", 1)] + public class ThingyDeleteCommand : Command + { + public PingId PingId { get; } + + public ThingyDeleteCommand(ThingyId aggregateId) : base(aggregateId) + { + } + } + + public class ThingyDeleteCommandHandler : CommandHandler + { + public override Task ExecuteAsync(ThingyAggregate aggregate, ThingyDeleteCommand command, CancellationToken cancellationToken) + { + aggregate.Delete(); + return Task.FromResult(0); + } + } +} diff --git a/Source/EventFlow.TestHelpers/Aggregates/Events/ThingyDeletedEvent.cs b/Source/EventFlow.TestHelpers/Aggregates/Events/ThingyDeletedEvent.cs new file mode 100644 index 000000000..1ec37055f --- /dev/null +++ b/Source/EventFlow.TestHelpers/Aggregates/Events/ThingyDeletedEvent.cs @@ -0,0 +1,10 @@ +using EventFlow.Aggregates; +using EventFlow.EventStores; + +namespace EventFlow.TestHelpers.Aggregates.Events +{ + [EventVersion("ThingyDeleted", 1)] + public class ThingyDeletedEvent : AggregateEvent + { + } +} \ No newline at end of file diff --git a/Source/EventFlow.TestHelpers/Aggregates/ThingyAggregate.cs b/Source/EventFlow.TestHelpers/Aggregates/ThingyAggregate.cs index 08e1d18fc..a0cb473fd 100644 --- a/Source/EventFlow.TestHelpers/Aggregates/ThingyAggregate.cs +++ b/Source/EventFlow.TestHelpers/Aggregates/ThingyAggregate.cs @@ -58,6 +58,7 @@ public ThingyAggregate(ThingyId id) Register(e => _messages.Add(e.ThingyMessage)); Register(e => {/* do nothing */}); Register(e => {/* do nothing */}); + Register(e => {/* do nothing */}); } public void DomainErrorAfterFirst() @@ -121,5 +122,10 @@ protected override Task LoadSnapshotAsync(ThingySnapshot snapshot, ISnapshotMeta SnapshotVersions = snapshot.PreviousVersions; return Task.FromResult(0); } + + public void Delete() + { + Emit(new ThingyDeletedEvent()); + } } } \ No newline at end of file diff --git a/Source/EventFlow.TestHelpers/Suites/TestSuiteForReadModelStore.cs b/Source/EventFlow.TestHelpers/Suites/TestSuiteForReadModelStore.cs index c0a47058e..16b6f26da 100644 --- a/Source/EventFlow.TestHelpers/Suites/TestSuiteForReadModelStore.cs +++ b/Source/EventFlow.TestHelpers/Suites/TestSuiteForReadModelStore.cs @@ -269,6 +269,29 @@ public virtual async Task OptimisticConcurrencyCheck() readModel.PingsReceived.Should().Be(3); } + [Test] + public async Task MarkingForDeletionRemovesSpecificReadModel() + { + // Arrange + var id1 = ThingyId.New; + var id2 = ThingyId.New; + await PublishPingCommandAsync(id1).ConfigureAwait(false); + await PublishPingCommandAsync(id2).ConfigureAwait(false); + var readModel1 = await QueryProcessor.ProcessAsync(new ThingyGetQuery(id1)).ConfigureAwait(false); + var readModel2 = await QueryProcessor.ProcessAsync(new ThingyGetQuery(id2)).ConfigureAwait(false); + readModel1.Should().NotBeNull(); + readModel2.Should().NotBeNull(); + + // Act + await CommandBus.PublishAsync(new ThingyDeleteCommand(id1), CancellationToken.None); + + // Assert + readModel1 = await QueryProcessor.ProcessAsync(new ThingyGetQuery(id1)).ConfigureAwait(false); + readModel2 = await QueryProcessor.ProcessAsync(new ThingyGetQuery(id2)).ConfigureAwait(false); + readModel1.Should().BeNull(); + readModel2.Should().NotBeNull(); + } + private class WaitState { public AutoResetEvent ReadStoreReady { get; } = new AutoResetEvent(false); diff --git a/Source/EventFlow.Tests/IntegrationTests/ReadStores/ReadModels/InMemoryThingyReadModel.cs b/Source/EventFlow.Tests/IntegrationTests/ReadStores/ReadModels/InMemoryThingyReadModel.cs index dfae5046d..0c2cd0cf7 100644 --- a/Source/EventFlow.Tests/IntegrationTests/ReadStores/ReadModels/InMemoryThingyReadModel.cs +++ b/Source/EventFlow.Tests/IntegrationTests/ReadStores/ReadModels/InMemoryThingyReadModel.cs @@ -30,7 +30,8 @@ namespace EventFlow.Tests.IntegrationTests.ReadStores.ReadModels { public class InMemoryThingyReadModel : IReadModel, IAmReadModelFor, - IAmReadModelFor + IAmReadModelFor, + IAmReadModelFor { public ThingyId ThingyId { get; private set; } public bool DomainErrorAfterFirstReceived { get; private set; } @@ -48,6 +49,11 @@ public void Apply(IReadModelContext context, IDomainEvent domainEvent) + { + context.MarkForDeletion(); + } + public Thingy ToThingy() { return new Thingy( diff --git a/Source/EventFlow.Tests/UnitTests/ReadStores/MultipleAggregateReadStoreManagerTests.cs b/Source/EventFlow.Tests/UnitTests/ReadStores/MultipleAggregateReadStoreManagerTests.cs index a729acd45..521a55bc9 100644 --- a/Source/EventFlow.Tests/UnitTests/ReadStores/MultipleAggregateReadStoreManagerTests.cs +++ b/Source/EventFlow.Tests/UnitTests/ReadStores/MultipleAggregateReadStoreManagerTests.cs @@ -99,7 +99,7 @@ public async Task IfNoReadModelIdsAreReturned_ThenDontInvokeTheReadModelStore() ReadModelStoreMock.Verify( s => s.UpdateAsync( It.IsAny>(), - It.IsAny(), + It.IsAny>(), It.IsAny, ReadModelEnvelope, CancellationToken, Task>>>(), It.IsAny()), Times.Never); diff --git a/Source/EventFlow.Tests/UnitTests/ReadStores/ReadStoreManagerTestSuite.cs b/Source/EventFlow.Tests/UnitTests/ReadStores/ReadStoreManagerTestSuite.cs index f561bc268..00a2e7b70 100644 --- a/Source/EventFlow.Tests/UnitTests/ReadStores/ReadStoreManagerTestSuite.cs +++ b/Source/EventFlow.Tests/UnitTests/ReadStores/ReadStoreManagerTestSuite.cs @@ -71,7 +71,7 @@ public async Task ReadStoreIsUpdatedWithRelevantEvents() ReadModelStoreMock.Verify( s => s.UpdateAsync( It.Is>(l => l.Count == 1), - It.IsAny(), + It.IsAny>(), It.IsAny, ReadModelEnvelope, CancellationToken, Task>>>(), It.IsAny()), Times.Once); diff --git a/Source/EventFlow/ReadStores/IReadModelContext.cs b/Source/EventFlow/ReadStores/IReadModelContext.cs index f5a77ad6d..a47213ccb 100644 --- a/Source/EventFlow/ReadStores/IReadModelContext.cs +++ b/Source/EventFlow/ReadStores/IReadModelContext.cs @@ -28,5 +28,7 @@ namespace EventFlow.ReadStores public interface IReadModelContext { IResolver Resolver { get; } + void MarkForDeletion(); + bool IsMarkedForDeletion { get; } } } \ No newline at end of file diff --git a/Source/EventFlow/ReadStores/IReadModelStore.cs b/Source/EventFlow/ReadStores/IReadModelStore.cs index 9fb45a3f5..bd0da72c4 100644 --- a/Source/EventFlow/ReadStores/IReadModelStore.cs +++ b/Source/EventFlow/ReadStores/IReadModelStore.cs @@ -46,10 +46,10 @@ Task> GetAsync( string id, CancellationToken cancellationToken); - Task UpdateAsync( - IReadOnlyCollection readModelUpdates, - IReadModelContext readModelContext, - Func, ReadModelEnvelope, CancellationToken, Task>> updateReadModel, + Task UpdateAsync(IReadOnlyCollection readModelUpdates, + Func readModelContextFactory, + Func, ReadModelEnvelope, CancellationToken, + Task>> updateReadModel, CancellationToken cancellationToken); } } \ No newline at end of file diff --git a/Source/EventFlow/ReadStores/InMemory/InMemoryReadStore.cs b/Source/EventFlow/ReadStores/InMemory/InMemoryReadStore.cs index 976b22663..4995c7b74 100644 --- a/Source/EventFlow/ReadStores/InMemory/InMemoryReadStore.cs +++ b/Source/EventFlow/ReadStores/InMemory/InMemoryReadStore.cs @@ -88,10 +88,10 @@ public override async Task DeleteAllAsync( } } - public override async Task UpdateAsync( - IReadOnlyCollection readModelUpdates, - IReadModelContext readModelContext, - Func, ReadModelEnvelope, CancellationToken, Task>> updateReadModel, + public override async Task UpdateAsync(IReadOnlyCollection readModelUpdates, + Func readModelContextFactory, + Func, ReadModelEnvelope, CancellationToken, + Task>> updateReadModel, CancellationToken cancellationToken) { using (await _asyncLock.WaitAsync(cancellationToken).ConfigureAwait(false)) @@ -103,6 +103,8 @@ public override async Task UpdateAsync( readModelEnvelope = ReadModelEnvelope.Empty(readModelUpdate.ReadModelId); } + var readModelContext = readModelContextFactory(); + readModelEnvelope = await updateReadModel( readModelContext, readModelUpdate.DomainEvents, @@ -110,7 +112,14 @@ public override async Task UpdateAsync( cancellationToken) .ConfigureAwait(false); - _readModels[readModelUpdate.ReadModelId] = readModelEnvelope; + if (readModelContext.IsMarkedForDeletion) + { + _readModels.Remove(readModelUpdate.ReadModelId); + } + else + { + _readModels[readModelUpdate.ReadModelId] = readModelEnvelope; + } } } } diff --git a/Source/EventFlow/ReadStores/ReadModelContext.cs b/Source/EventFlow/ReadStores/ReadModelContext.cs index b8fa330ab..580217f21 100644 --- a/Source/EventFlow/ReadStores/ReadModelContext.cs +++ b/Source/EventFlow/ReadStores/ReadModelContext.cs @@ -27,12 +27,18 @@ namespace EventFlow.ReadStores { public class ReadModelContext : IReadModelContext { + public ReadModelContext(IResolver resolver) + { + Resolver = resolver; + } + + public bool IsMarkedForDeletion { get; private set; } + public IResolver Resolver { get; } - public ReadModelContext( - IResolver resolver) + public void MarkForDeletion() { - Resolver = resolver; + IsMarkedForDeletion = true; } } -} \ No newline at end of file +} diff --git a/Source/EventFlow/ReadStores/ReadModelStore.cs b/Source/EventFlow/ReadStores/ReadModelStore.cs index eff0e05a5..3abd2baec 100644 --- a/Source/EventFlow/ReadStores/ReadModelStore.cs +++ b/Source/EventFlow/ReadStores/ReadModelStore.cs @@ -52,10 +52,10 @@ public abstract Task DeleteAsync( public abstract Task DeleteAllAsync( CancellationToken cancellationToken); - public abstract Task UpdateAsync( - IReadOnlyCollection readModelUpdates, - IReadModelContext readModelContext, - Func, ReadModelEnvelope, CancellationToken, Task>> updateReadModel, + public abstract Task UpdateAsync(IReadOnlyCollection readModelUpdates, + Func readModelContextFactory, + Func, ReadModelEnvelope, CancellationToken, + Task>> updateReadModel, CancellationToken cancellationToken); } } \ No newline at end of file diff --git a/Source/EventFlow/ReadStores/ReadStoreManager.cs b/Source/EventFlow/ReadStores/ReadStoreManager.cs index cda71eecd..a1b571fd4 100644 --- a/Source/EventFlow/ReadStores/ReadStoreManager.cs +++ b/Source/EventFlow/ReadStores/ReadStoreManager.cs @@ -111,7 +111,8 @@ public async Task UpdateReadStoresAsync( typeof(TReadModelStore).PrettyPrint(), string.Join(", ", relevantDomainEvents.Select(e => e.ToString())))); - var readModelContext = new ReadModelContext(Resolver); + IReadModelContext ReadModelContextFactory() => new ReadModelContext(Resolver); + var readModelUpdates = BuildReadModelUpdates(relevantDomainEvents); if (!readModelUpdates.Any()) @@ -126,7 +127,7 @@ public async Task UpdateReadStoresAsync( await ReadModelStore.UpdateAsync( readModelUpdates, - readModelContext, + ReadModelContextFactory, UpdateAsync, cancellationToken) .ConfigureAwait(false); From 312616d1d0ea426bc6634f97792ba90c10b79867 Mon Sep 17 00:00:00 2001 From: Frank Ebersoll Date: Thu, 14 Jun 2018 20:02:43 +0200 Subject: [PATCH 26/29] Release notes: Delete read models --- RELEASE_NOTES.md | 1 + 1 file changed, 1 insertion(+) diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index d6cb50c8f..333630021 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -3,6 +3,7 @@ * New: Implemented optimistic concurrency checks for MSSQL, SQLite and Elasticsearch read models * New: Added .NET standard support for EventStore +* New: Delete read models by invoking `context.MarkForDeletion()` in an Apply method * Minor: Removed unnecessary transaction in EventStore persistance ### New in 0.59.3396 (released 2018-05-23) From 40c6031f413bd09b3793efbaceb0133ca9cd17be Mon Sep 17 00:00:00 2001 From: Rasmus Mikkelsen Date: Thu, 14 Jun 2018 20:12:19 +0200 Subject: [PATCH 27/29] Fixed SQL schema ignored --- RELEASE_NOTES.md | 1 + .../UnitTests/ReadModels/ReadModelSqlGeneratorTests.cs | 10 +++++----- .../EventFlow.Sql/ReadModels/ReadModelSqlGenerator.cs | 10 +++++++--- 3 files changed, 13 insertions(+), 8 deletions(-) diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 8cd5b8414..96dabec82 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -1,6 +1,7 @@ ### New in 0.60 (not released yet) * Minor: Removed unnecessary transaction in EventStore persistance +* Fixed: Read model SQL schema is no longer ignored for `Table` attribute ### New in 0.59.3396 (released 2018-05-23) diff --git a/Source/EventFlow.Sql.Tests/UnitTests/ReadModels/ReadModelSqlGeneratorTests.cs b/Source/EventFlow.Sql.Tests/UnitTests/ReadModels/ReadModelSqlGeneratorTests.cs index e21ad98fb..46a554fde 100644 --- a/Source/EventFlow.Sql.Tests/UnitTests/ReadModels/ReadModelSqlGeneratorTests.cs +++ b/Source/EventFlow.Sql.Tests/UnitTests/ReadModels/ReadModelSqlGeneratorTests.cs @@ -42,7 +42,7 @@ public void CreateInsertSql_ProducesCorrectSql() var sql = Sut.CreateInsertSql(); // Assert - sql.Should().Be("INSERT INTO [ReadModel-TestAttributes] (Id, UpdatedTime) VALUES (@Id, @UpdatedTime)"); + sql.Should().Be("INSERT INTO [dbo].[ReadModel-TestAttributes] (Id, UpdatedTime) VALUES (@Id, @UpdatedTime)"); } [Test] @@ -52,7 +52,7 @@ public void CreateUpdateSql_ProducesCorrectSql() var sql = Sut.CreateUpdateSql(); // Assert - sql.Should().Be("UPDATE [ReadModel-TestAttributes] SET UpdatedTime = @UpdatedTime WHERE Id = @Id"); + sql.Should().Be("UPDATE [dbo].[ReadModel-TestAttributes] SET UpdatedTime = @UpdatedTime WHERE Id = @Id"); } [Test] @@ -62,7 +62,7 @@ public void CreateSelectSql_ProducesCorrectSql() var sql = Sut.CreateSelectSql(); // Assert - sql.Should().Be("SELECT * FROM [ReadModel-TestAttributes] WHERE Id = @EventFlowReadModelId"); + sql.Should().Be("SELECT * FROM [dbo].[ReadModel-TestAttributes] WHERE Id = @EventFlowReadModelId"); } [Test] @@ -72,7 +72,7 @@ public void GetTableName_UsesTableAttribute() var tableName = Sut.GetTableName(); // Assert - tableName.Should().Be("[Fancy]"); + tableName.Should().Be("[doh].[Fancy]"); } public class TestAttributesReadModel : IReadModel @@ -86,7 +86,7 @@ public class TestAttributesReadModel : IReadModel public string Secret { get; set; } } - [Table("Fancy")] + [Table("Fancy", Schema = "doh")] public class TestTableAttributeReadModel : IReadModel { } diff --git a/Source/EventFlow.Sql/ReadModels/ReadModelSqlGenerator.cs b/Source/EventFlow.Sql/ReadModels/ReadModelSqlGenerator.cs index 4abc79053..5b69d727a 100644 --- a/Source/EventFlow.Sql/ReadModels/ReadModelSqlGenerator.cs +++ b/Source/EventFlow.Sql/ReadModels/ReadModelSqlGenerator.cs @@ -155,9 +155,13 @@ protected virtual string GetTableName(Type readModelType) t => { var tableAttribute = t.GetTypeInfo().GetCustomAttribute(false); - return tableAttribute != null - ? $"[{tableAttribute.Name}]" - : $"[ReadModel-{t.Name.Replace("ReadModel", string.Empty)}]"; + var schema = string.IsNullOrEmpty(tableAttribute?.Schema) + ? "dbo" + : tableAttribute.Schema; + var table = string.IsNullOrEmpty(tableAttribute?.Name) + ? $"ReadModel-{t.Name.Replace("ReadModel", string.Empty)}" + : tableAttribute.Name; + return $"[{schema}].[{table}]"; }); } From 01437c4ea07b6809d393765c3db8e0dbc057c7b9 Mon Sep 17 00:00:00 2001 From: Rasmus Mikkelsen Date: Fri, 15 Jun 2018 12:22:07 +0200 Subject: [PATCH 28/29] Retry if something fails --- .../EventFlow.RabbitMQ.Tests/Integration/RabbitMqTests.cs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/Source/EventFlow.RabbitMQ.Tests/Integration/RabbitMqTests.cs b/Source/EventFlow.RabbitMQ.Tests/Integration/RabbitMqTests.cs index 4f9d38f2a..5ee6c5045 100644 --- a/Source/EventFlow.RabbitMQ.Tests/Integration/RabbitMqTests.cs +++ b/Source/EventFlow.RabbitMQ.Tests/Integration/RabbitMqTests.cs @@ -60,7 +60,7 @@ public void SetUp() _uri = new Uri(url); } - [Test, Timeout(10000)] + [Test, Timeout(10000), Retry(3)] public async Task Scenario() { var exchange = new Exchange($"eventflow-{Guid.NewGuid():N}"); @@ -85,7 +85,7 @@ public async Task Scenario() } } - [Test, Timeout(60000)] + [Test, Timeout(60000), Retry(3)] public async Task PublisherPerformance() { var exchange = new Exchange($"eventflow-{Guid.NewGuid():N}"); @@ -148,4 +148,4 @@ private IRootResolver BuildResolver(Exchange exchange, Func Date: Sun, 17 Jun 2018 19:37:00 +0200 Subject: [PATCH 29/29] Use default if nothing is provided --- .../UnitTests/ReadModels/ReadModelSqlGeneratorTests.cs | 9 ++++----- Source/EventFlow.Sql/ReadModels/ReadModelSqlGenerator.cs | 7 +++---- 2 files changed, 7 insertions(+), 9 deletions(-) diff --git a/Source/EventFlow.Sql.Tests/UnitTests/ReadModels/ReadModelSqlGeneratorTests.cs b/Source/EventFlow.Sql.Tests/UnitTests/ReadModels/ReadModelSqlGeneratorTests.cs index 04c48a922..0ad46f6c9 100644 --- a/Source/EventFlow.Sql.Tests/UnitTests/ReadModels/ReadModelSqlGeneratorTests.cs +++ b/Source/EventFlow.Sql.Tests/UnitTests/ReadModels/ReadModelSqlGeneratorTests.cs @@ -42,7 +42,7 @@ public void CreateInsertSql_ProducesCorrectSql() var sql = Sut.CreateInsertSql(); // Assert - sql.Should().Be("INSERT INTO [dbo].[ReadModel-TestAttributes] (Id, UpdatedTime) VALUES (@Id, @UpdatedTime)"); + sql.Should().Be("INSERT INTO [ReadModel-TestAttributes] (Id, UpdatedTime) VALUES (@Id, @UpdatedTime)"); } [Test] @@ -52,7 +52,7 @@ public void CreateUpdateSql_WithoutVersion_ProducesCorrectSql() var sql = Sut.CreateUpdateSql().Trim(); // Assert - sql.Should().Be("UPDATE [dbo].[ReadModel-TestAttributes] SET UpdatedTime = @UpdatedTime WHERE Id = @Id"); + sql.Should().Be("UPDATE [ReadModel-TestAttributes] SET UpdatedTime = @UpdatedTime WHERE Id = @Id"); } [Test] @@ -62,7 +62,7 @@ public void CreateUpdateSql_WithVersion_ProducesCorrectSql() var sql = Sut.CreateUpdateSql().Trim(); // Assert - sql.Should().Be("UPDATE [dbo].[ReadModel-TestVersionedAttributes] SET FancyVersion = @FancyVersion WHERE CoolId = @CoolId AND FancyVersion = @_PREVIOUS_VERSION"); + sql.Should().Be("UPDATE [ReadModel-TestVersionedAttributes] SET FancyVersion = @FancyVersion WHERE CoolId = @CoolId AND FancyVersion = @_PREVIOUS_VERSION"); } [Test] @@ -72,7 +72,7 @@ public void CreateSelectSql_ProducesCorrectSql() var sql = Sut.CreateSelectSql(); // Assert - sql.Should().Be("SELECT * FROM [dbo].[ReadModel-TestAttributes] WHERE Id = @EventFlowReadModelId"); + sql.Should().Be("SELECT * FROM [ReadModel-TestAttributes] WHERE Id = @EventFlowReadModelId"); } [Test] @@ -96,7 +96,6 @@ public class TestAttributesReadModel : IReadModel public string Secret { get; set; } } - [Table("Fancy")] public class TestVersionedAttributesReadModel : IReadModel { [SqlReadModelIdentityColumn] diff --git a/Source/EventFlow.Sql/ReadModels/ReadModelSqlGenerator.cs b/Source/EventFlow.Sql/ReadModels/ReadModelSqlGenerator.cs index a03792d4d..a2172c286 100644 --- a/Source/EventFlow.Sql/ReadModels/ReadModelSqlGenerator.cs +++ b/Source/EventFlow.Sql/ReadModels/ReadModelSqlGenerator.cs @@ -162,13 +162,12 @@ protected virtual string GetTableName(Type readModelType) t => { var tableAttribute = t.GetTypeInfo().GetCustomAttribute(false); - var schema = string.IsNullOrEmpty(tableAttribute?.Schema) - ? "dbo" - : tableAttribute.Schema; var table = string.IsNullOrEmpty(tableAttribute?.Name) ? $"ReadModel-{t.Name.Replace("ReadModel", string.Empty)}" : tableAttribute.Name; - return $"[{schema}].[{table}]"; + return string.IsNullOrEmpty(tableAttribute?.Schema) + ? $"[{table}]" + : $"[{tableAttribute?.Schema}].[{table}]"; }); }