diff --git a/.github/workflows/code_quality.yml b/.github/workflows/code_quality.yml index b47b2e29..98e1f2ca 100644 --- a/.github/workflows/code_quality.yml +++ b/.github/workflows/code_quality.yml @@ -14,6 +14,8 @@ jobs: with: fetch-depth: 0 - name: 'Qodana Scan' - uses: JetBrains/qodana-action@v2022.3.3 + uses: JetBrains/qodana-action@v2023.2.1 + with: + pr-mode: false env: QODANA_TOKEN: ${{ secrets.QODANA_TOKEN }} \ No newline at end of file diff --git a/Directory.Packages.props b/Directory.Packages.props index 81fd823c..3de0cac5 100644 --- a/Directory.Packages.props +++ b/Directory.Packages.props @@ -42,8 +42,8 @@ - - + + @@ -55,7 +55,7 @@ - + @@ -73,7 +73,7 @@ - + diff --git a/src/Benchmarks/Benchmarks/GapDetectionBenchmarks.cs b/src/Benchmarks/Benchmarks/GapDetectionBenchmarks.cs index ca01cb5b..0e931a98 100644 --- a/src/Benchmarks/Benchmarks/GapDetectionBenchmarks.cs +++ b/src/Benchmarks/Benchmarks/GapDetectionBenchmarks.cs @@ -19,7 +19,7 @@ public class GapDetectionBenchmarks { public void Setup() { _store = new NoOpCheckpointStore(); - _store.CheckpointStored += (sender, checkpoint) => Console.WriteLine(checkpoint); + _store.CheckpointStored += (_, checkpoint) => Console.WriteLine(checkpoint); var numbers = Enumerable.Range(1, 1000).ToList(); numbers.RemoveAll(x => x % 10 == 0); diff --git a/src/Benchmarks/Benchmarks/TypeMapBenchmark.cs b/src/Benchmarks/Benchmarks/TypeMapBenchmark.cs index 197e4223..23a2866c 100644 --- a/src/Benchmarks/Benchmarks/TypeMapBenchmark.cs +++ b/src/Benchmarks/Benchmarks/TypeMapBenchmark.cs @@ -12,6 +12,7 @@ public class TypeMapBenchmark { KeyValuePair[] _types = null!; [Params(5, 20, 100)] + // ReSharper disable once UnusedAutoPropertyAccessor.Global public int TypesCount { get; set; } [GlobalSetup] diff --git a/src/Core/src/Eventuous.Application/AggregateService/CommandHandlerBuilder.cs b/src/Core/src/Eventuous.Application/AggregateService/CommandHandlerBuilder.cs new file mode 100644 index 00000000..eeaee1d5 --- /dev/null +++ b/src/Core/src/Eventuous.Application/AggregateService/CommandHandlerBuilder.cs @@ -0,0 +1,113 @@ +// Copyright (C) Ubiquitous AS.All rights reserved +// Licensed under the Apache License, Version 2.0. + +namespace Eventuous; + +public abstract class CommandHandlerBuilder + where TAggregate : Aggregate where TId : Id where TState : State, new() { + internal abstract RegisteredHandler Build(); +} + +/// +/// Builds a command handler for a specific command type. You would not need to instantiate this class directly, +/// use function. +/// +/// Default aggregate store instance for the command service +/// Command type +/// Aggregate type +/// State of the aggregate type +/// Identity of the aggregate type +public class CommandHandlerBuilder(IAggregateStore? store) : CommandHandlerBuilder + where TCommand : class + where TAggregate : Aggregate, new() + where TState : State, new() + where TId : Id { + GetIdFromUntypedCommand? _getId; + HandleUntypedCommand? _action; + ResolveStore? _resolveStore; + ExpectedState _expectedState = ExpectedState.Any; + + /// + /// Set the expected aggregate state for the command handler. + /// If the aggregate won't be in the expected state, the command handler will return an error. + /// The default is . + /// + /// Expected aggregate state + /// + public CommandHandlerBuilder InState(ExpectedState expectedState) { + _expectedState = expectedState; + + return this; + } + + /// + /// Defines how the aggregate id is extracted from the command. + /// + /// A function to get the aggregate id from the command. + /// + public CommandHandlerBuilder GetId(GetIdFromCommand getId) { + _getId = getId.AsGetId(); + + return this; + } + + /// + /// Defines how the aggregate id is extracted from the command, asynchronously. + /// + /// A function to get the aggregate id from the command. + /// + public CommandHandlerBuilder GetIdAsync(GetIdFromCommandAsync getId) { + _getId = getId.AsGetId(); + + return this; + } + + /// + /// Defines how the aggregate is acted upon by the command. + /// + /// A function that executes an operation on an aggregate + /// + public CommandHandlerBuilder Act(ActOnAggregate action) { + _action = action.AsAct(); + + return this; + } + + /// + /// Defines how the aggregate is acted upon by the command, asynchronously. + /// + /// A function that executes an asynchronous operation on an aggregate + /// + public CommandHandlerBuilder ActAsync(ActOnAggregateAsync action) { + _action = action.AsAct(); + + return this; + } + + /// + /// Defines how the aggregate store is resolved from the command. It is optional. If not defined, the default + /// aggregate store of the command service will be used. + /// + /// + /// + public CommandHandlerBuilder ResolveStore(ResolveStore? resolveStore) { + _resolveStore = resolveStore; + + return this; + } + + internal override RegisteredHandler Build() { + return new RegisteredHandler( + _expectedState, + Ensure.NotNull(_getId, $"Function to get the aggregate id from {typeof(TCommand).Name} is not defined"), + Ensure.NotNull(_action, $"Function to act on the aggregate for command {typeof(TCommand).Name} is not defined"), + (_resolveStore ?? DefaultResolve()).AsResolveStore() + ); + } + + ResolveStore DefaultResolve() { + ArgumentNullException.ThrowIfNull(store, nameof(store)); + + return _ => store; + } +} diff --git a/src/Core/src/Eventuous.Application/AggregateService/CommandHandlersMap.cs b/src/Core/src/Eventuous.Application/AggregateService/CommandHandlersMap.cs new file mode 100644 index 00000000..ff34c2e1 --- /dev/null +++ b/src/Core/src/Eventuous.Application/AggregateService/CommandHandlersMap.cs @@ -0,0 +1,55 @@ +// Copyright (C) Ubiquitous AS. All rights reserved +// Licensed under the Apache License, Version 2.0. + +using System.Reflection; + +namespace Eventuous; + +using static Diagnostics.ApplicationEventSource; + +public delegate Task ActOnAggregateAsync(TAggregate aggregate, TCommand command, CancellationToken cancellationToken) + where TAggregate : Aggregate; + +public delegate void ActOnAggregate(TAggregate aggregate, TCommand command) where TAggregate : Aggregate; + +delegate ValueTask HandleUntypedCommand(T aggregate, object command, CancellationToken cancellationToken) where T : Aggregate; + +public delegate Task GetIdFromCommandAsync(TCommand command, CancellationToken cancellationToken) where TId : Id where TCommand : class; + +public delegate TId GetIdFromCommand(TCommand command) where TId : Id where TCommand : class; + +delegate ValueTask GetIdFromUntypedCommand(object command, CancellationToken cancellationToken) where TId : Id; + +public delegate IAggregateStore ResolveStore(TCommand command) where TCommand : class; + +delegate IAggregateStore ResolveStoreFromCommand(object command); + +record RegisteredHandler( + ExpectedState ExpectedState, + GetIdFromUntypedCommand GetId, + HandleUntypedCommand Handler, + ResolveStoreFromCommand ResolveStore + ) where T : Aggregate where TId : Id; + +class HandlersMap where TAggregate : Aggregate where TId : Id { + readonly TypeMap> _typeMap = new(); + + static readonly MethodInfo AddHandlerInternalMethod = + typeof(HandlersMap).GetMethod(nameof(AddHandlerInternal), BindingFlags.NonPublic | BindingFlags.Instance)!; + + internal void AddHandlerInternal(RegisteredHandler handler) { + try { + _typeMap.Add(handler); + Log.CommandHandlerRegistered(); + } catch (Exceptions.DuplicateTypeException) { + Log.CommandHandlerAlreadyRegistered(); + + throw new Exceptions.CommandHandlerAlreadyRegistered(); + } + } + + internal void AddHandlerUntyped(Type command, RegisteredHandler handler) + => AddHandlerInternalMethod.MakeGenericMethod(command).Invoke(this, new object?[] { handler }); + + public bool TryGet([NotNullWhen(true)] out RegisteredHandler? handler) => _typeMap.TryGetValue(out handler); +} diff --git a/src/Core/src/Eventuous.Application/AggregateService/CommandHandlingDelegateExtensions.cs b/src/Core/src/Eventuous.Application/AggregateService/CommandHandlingDelegateExtensions.cs new file mode 100644 index 00000000..26a5830b --- /dev/null +++ b/src/Core/src/Eventuous.Application/AggregateService/CommandHandlingDelegateExtensions.cs @@ -0,0 +1,29 @@ +// Copyright (C) Ubiquitous AS.All rights reserved +// Licensed under the Apache License, Version 2.0. + +namespace Eventuous; + +static class CommandHandlingDelegateExtensions { + public static GetIdFromUntypedCommand AsGetId(this GetIdFromCommandAsync getId) where TId : Id where TCommand : class + => async (cmd, ct) => await getId((TCommand)cmd, ct); + + public static GetIdFromUntypedCommand AsGetId(this GetIdFromCommand getId) where TId : Id where TCommand : class + => (cmd, _) => ValueTask.FromResult(getId((TCommand)cmd)); + + public static HandleUntypedCommand AsAct(this ActOnAggregateAsync act) where TAggregate : Aggregate + => async (aggregate, cmd, ct) => { + await act(aggregate, (TCommand)cmd, ct).NoContext(); + + return aggregate; + }; + + public static HandleUntypedCommand AsAct(this ActOnAggregate act) where TAggregate : Aggregate + => (aggregate, cmd, _) => { + act(aggregate, (TCommand)cmd); + + return ValueTask.FromResult(aggregate); + }; + + public static ResolveStoreFromCommand AsResolveStore(this ResolveStore resolveStore) where TCommand : class + => cmd => resolveStore((TCommand)cmd); +} diff --git a/src/Core/src/Eventuous.Application/AggregateService/CommandService.Async.cs b/src/Core/src/Eventuous.Application/AggregateService/CommandService.Async.cs new file mode 100644 index 00000000..c089869d --- /dev/null +++ b/src/Core/src/Eventuous.Application/AggregateService/CommandService.Async.cs @@ -0,0 +1,88 @@ +// Copyright (C) Ubiquitous AS. All rights reserved +// Licensed under the Apache License, Version 2.0. + +namespace Eventuous; + +public abstract partial class CommandService { + /// + /// Register an asynchronous handler for a command, which is expected to create a new aggregate instance. + /// + /// A function to get the aggregate id from the command + /// Asynchronous action to be performed on the aggregate, given the aggregate instance and the command + /// Resolve aggregate store from the command + /// Command type + [Obsolete("Use On().InState(ExpectedState.New).GetId(...).ActAsync(...).ResolveStore(...) instead")] + protected void OnNewAsync( + GetIdFromCommand getId, + ActOnAggregateAsync action, + ResolveStore? resolveStore = null + ) where TCommand : class + => On().InState(ExpectedState.New).GetId(getId).ActAsync(action).ResolveStore(resolveStore); + + /// + /// Register an asynchronous handler for a command, which is expected to use an existing aggregate instance. + /// + /// A function to get the aggregate id from the command + /// Asynchronous action to be performed on the aggregate, given the aggregate instance and the command + /// Resolve aggregate store from the command + /// Command type + [Obsolete("Use On().InState(ExpectedState.Existing).GetId(...).ActAsync(...).ResolveStore(...) instead")] + [PublicAPI] + protected void OnExistingAsync( + GetIdFromCommand getId, + ActOnAggregateAsync action, + ResolveStore? resolveStore = null + ) where TCommand : class + => On().InState(ExpectedState.Existing).GetId(getId).ActAsync(action).ResolveStore(resolveStore); + + /// + /// Register an asynchronous handler for a command, which is expected to use an existing aggregate instance. + /// + /// Asynchronous function to get the aggregate id from the command + /// Asynchronous action to be performed on the aggregate, given the aggregate instance and the command + /// Resolve aggregate store from the command + /// Command type + [Obsolete("Use On().InState(ExpectedState.Existing).GetIdAsync(...).ActAsync(...).ResolveStore(...) instead")] + [PublicAPI] + protected void OnExistingAsync( + GetIdFromCommandAsync getId, + ActOnAggregateAsync action, + ResolveStore? resolveStore = null + ) where TCommand : class + // => _handlers.AddHandler(ExpectedState.Existing, getId, action, resolveStore ?? DefaultResolve()); + => On().InState(ExpectedState.Existing).GetIdAsync(getId).ActAsync(action).ResolveStore(resolveStore); + + /// + /// Register an asynchronous handler for a command, which is expected to use an a new or an existing aggregate instance. + /// + /// A function to get the aggregate id from the command + /// Asynchronous action to be performed on the aggregate, given the aggregate instance and the command + /// Resolve aggregate store from the command + /// Command type + [Obsolete("Use On().InState(ExpectedState.Any).GetId(...).ActAsync(...).ResolveStore(...) instead")] + [PublicAPI] + protected void OnAnyAsync( + GetIdFromCommand getId, + ActOnAggregateAsync action, + ResolveStore? resolveStore = null + ) where TCommand : class + // => _handlers.AddHandler(ExpectedState.Any, getId, action, resolveStore ?? DefaultResolve()); + => On().InState(ExpectedState.Any).GetId(getId).ActAsync(action).ResolveStore(resolveStore); + + /// + /// Register an asynchronous handler for a command, which is expected to use an a new or an existing aggregate instance. + /// + /// Asynchronous function to get the aggregate id from the command + /// Asynchronous action to be performed on the aggregate, given the aggregate instance and the command + /// Resolve aggregate store from the command + /// Command type + [Obsolete("Use On().InState(ExpectedState.Any).GetIdAsync(...).ActAsync(...).ResolveStore(...) instead")] + [PublicAPI] + protected void OnAnyAsync( + GetIdFromCommandAsync getId, + ActOnAggregateAsync action, + ResolveStore? resolveStore = null + ) where TCommand : class + // => _handlers.AddHandler(ExpectedState.Any, getId, action, resolveStore ?? DefaultResolve()); + => On().InState(ExpectedState.Any).GetIdAsync(getId).ActAsync(action).ResolveStore(resolveStore); +} diff --git a/src/Core/src/Eventuous.Application/AggregateService/CommandService.Sync.cs b/src/Core/src/Eventuous.Application/AggregateService/CommandService.Sync.cs new file mode 100644 index 00000000..f24404e6 --- /dev/null +++ b/src/Core/src/Eventuous.Application/AggregateService/CommandService.Sync.cs @@ -0,0 +1,51 @@ +// Copyright (C) Ubiquitous AS. All rights reserved +// Licensed under the Apache License, Version 2.0. + +namespace Eventuous; + +public abstract partial class CommandService { + /// + /// Register a handler for a command, which is expected to create a new aggregate instance. + /// + /// A function to get the aggregate id from the command + /// Action to be performed on the aggregate, given the aggregate instance and the command + /// Resolve aggregate store from the command + /// Command type + [Obsolete("Use On().InState(ExpectedState.New).GetId(...).Act(...).ResolveStore(...) instead")] + protected void OnNew( + GetIdFromCommand getId, + ActOnAggregate action, + ResolveStore? resolveStore = null + ) where TCommand : class + => On().InState(ExpectedState.New).GetId(getId).Act(action).ResolveStore(resolveStore); + + /// + /// Register a handler for a command, which is expected to use an existing aggregate instance. + /// + /// A function to get the aggregate id from the command + /// Action to be performed on the aggregate, given the aggregate instance and the command + /// Resolve aggregate store from the command + /// Command type + [Obsolete("Use On().InState(ExpectedState.Existing).GetId(...).Act(...).ResolveStore(...) instead")] + protected void OnExisting( + GetIdFromCommand getId, + ActOnAggregate action, + ResolveStore? resolveStore = null + ) where TCommand : class + => On().InState(ExpectedState.Existing).GetId(getId).Act(action).ResolveStore(resolveStore); + + /// + /// Register a handler for a command, which is expected to use an a new or an existing aggregate instance. + /// + /// A function to get the aggregate id from the command + /// Action to be performed on the aggregate, given the aggregate instance and the command + /// Resolve aggregate store from the command + /// Command type + [Obsolete("Use On().InState(ExpectedState.Any).GetId(...).Act(...).ResolveStore(...) instead")] + protected void OnAny( + GetIdFromCommand getId, + ActOnAggregate action, + ResolveStore? resolveStore = null + ) where TCommand : class + => On().InState(ExpectedState.Any).GetId(getId).Act(action).ResolveStore(resolveStore); +} diff --git a/src/Core/src/Eventuous.Application/AggregateService/CommandService.cs b/src/Core/src/Eventuous.Application/AggregateService/CommandService.cs new file mode 100644 index 00000000..6aaaa368 --- /dev/null +++ b/src/Core/src/Eventuous.Application/AggregateService/CommandService.cs @@ -0,0 +1,123 @@ +// Copyright (C) Ubiquitous AS. All rights reserved +// Licensed under the Apache License, Version 2.0. + +namespace Eventuous; + +using static Diagnostics.ApplicationEventSource; + +/// +/// Command service base class. A derived class should be scoped to handle commands for one aggregate type only. +/// +/// The aggregate type +/// The aggregate state type +/// The aggregate identity type +// [PublicAPI] +public abstract partial class CommandService( + IAggregateStore? store, + AggregateFactoryRegistry? factoryRegistry = null, + StreamNameMap? streamNameMap = null, + TypeMapper? typeMap = null + ) + : ICommandService, ICommandService + where TAggregate : Aggregate, new() + where TState : State, new() + where TId : Id { + [PublicAPI] + protected IAggregateStore? Store { get; } = store; + + readonly HandlersMap _handlers = new(); + readonly AggregateFactoryRegistry _factoryRegistry = factoryRegistry ?? AggregateFactoryRegistry.Instance; + readonly StreamNameMap _streamNameMap = streamNameMap ?? new StreamNameMap(); + readonly TypeMapper _typeMap = typeMap ?? TypeMap.Instance; + + bool _initialized; + + /// + /// Returns the command handler builder for the specified command type. + /// + /// Command type + /// + protected CommandHandlerBuilder On() where TCommand : class { + var builder = new CommandHandlerBuilder(Store); + _builders.Add(typeof(TCommand), builder); + + return builder; + } + + /// + /// The command handler. Call this function from your edge (API). + /// + /// Command to execute + /// Cancellation token + /// of the execution + /// + public async Task> Handle(TCommand command, CancellationToken cancellationToken) where TCommand : class { + if (!_initialized) BuildHandlers(); + + if (!_handlers.TryGet(out var registeredHandler)) { + Log.CommandHandlerNotFound(); + var exception = new Exceptions.CommandHandlerNotFound(); + + return new ErrorResult(exception); + } + + var aggregateId = await registeredHandler.GetId(command, cancellationToken).NoContext(); + var store = registeredHandler.ResolveStore(command); + + try { + var aggregate = registeredHandler.ExpectedState switch { + ExpectedState.Any => await store.LoadOrNew(_streamNameMap, aggregateId, cancellationToken).NoContext(), + ExpectedState.Existing => await store.Load(_streamNameMap, aggregateId, cancellationToken).NoContext(), + ExpectedState.New => Create(aggregateId), + ExpectedState.Unknown => default, + _ => throw new ArgumentOutOfRangeException(nameof(registeredHandler.ExpectedState), "Unknown expected state") + }; + + var result = await registeredHandler + .Handler(aggregate!, command, cancellationToken) + .NoContext(); + + // Zero in the global position would mean nothing, so the receiver need to check the Changes.Length + if (result.Changes.Count == 0) return new OkResult(result.State, Array.Empty(), 0); + + var storeResult = await store.Store(GetAggregateStreamName(), result, cancellationToken).NoContext(); + var changes = result.Changes.Select(x => new Change(x, _typeMap.GetTypeName(x))); + Log.CommandHandled(); + + return new OkResult(result.State, changes, storeResult.GlobalPosition); + } catch (Exception e) { + Log.ErrorHandlingCommand(e); + + return new ErrorResult($"Error handling command {typeof(TCommand).Name}", e); + } + + TAggregate Create(TId id) => _factoryRegistry.CreateInstance().WithId(id); + + StreamName GetAggregateStreamName() => _streamNameMap.GetStreamName(aggregateId); + } + + async Task ICommandService.Handle(TCommand command, CancellationToken cancellationToken) where TCommand : class { + var result = await Handle(command, cancellationToken).NoContext(); + + return result switch { + OkResult(var state, var enumerable, _) => new OkResult(state, enumerable), + ErrorResult error => new ErrorResult(error.Message, error.Exception), + _ => throw new ApplicationException("Unknown result type") + }; + } + + readonly Dictionary> _builders = new(); + readonly object _handlersLock = new(); + + void BuildHandlers() { + lock (_handlersLock) { + foreach (var commandType in _builders.Keys) { + var builder = _builders[commandType]; + var handler = builder.Build(); + _handlers.AddHandlerUntyped(commandType, handler); + } + + _initialized = true; + } + } +} diff --git a/src/Core/src/Eventuous.Application/CommandService.cs b/src/Core/src/Eventuous.Application/CommandService.cs deleted file mode 100644 index ac2801ab..00000000 --- a/src/Core/src/Eventuous.Application/CommandService.cs +++ /dev/null @@ -1,238 +0,0 @@ -// Copyright (C) Ubiquitous AS. All rights reserved -// Licensed under the Apache License, Version 2.0. - -namespace Eventuous; - -using static Diagnostics.ApplicationEventSource; - -/// -/// Command service base class. A derived class should be scoped to handle commands for one aggregate type only. -/// -/// The aggregate type -/// The aggregate state type -/// The aggregate identity type -// [PublicAPI] -public abstract class CommandService( - IAggregateStore store, - AggregateFactoryRegistry? factoryRegistry = null, - StreamNameMap? streamNameMap = null, - TypeMapper? typeMap = null - ) - : ICommandService, ICommandService - where TAggregate : Aggregate, new() - where TState : State, new() - where TId : Id { - [PublicAPI] - protected IAggregateStore Store { get; } = store; - - readonly HandlersMap _handlers = new(); - readonly IdMap _idMap = new(); - readonly AggregateFactoryRegistry _factoryRegistry = factoryRegistry ?? AggregateFactoryRegistry.Instance; - readonly StreamNameMap _streamNameMap = streamNameMap ?? new StreamNameMap(); - readonly TypeMapper _typeMap = typeMap ?? TypeMap.Instance; - - /// - /// Register a handler for a command, which is expected to create a new aggregate instance. - /// - /// A function to get the aggregate id from the command - /// Action to be performed on the aggregate, given the aggregate instance and the command - /// Command type - protected void OnNew(GetIdFromCommand getId, ActOnAggregate action) where TCommand : class { - _handlers.AddHandler(ExpectedState.New, action); - _idMap.AddCommand(getId); - } - - /// - /// Register an asynchronous handler for a command, which is expected to create a new aggregate instance. - /// - /// A function to get the aggregate id from the command - /// Asynchronous action to be performed on the aggregate, - /// given the aggregate instance and the command - /// Command type - protected void OnNewAsync(GetIdFromCommand getId, ActOnAggregateAsync action) where TCommand : class { - _handlers.AddHandler(ExpectedState.New, action); - _idMap.AddCommand(getId); - } - - /// - /// Register a handler for a command, which is expected to use an existing aggregate instance. - /// - /// A function to get the aggregate id from the command - /// Action to be performed on the aggregate, given the aggregate instance and the command - /// Command type - protected void OnExisting(GetIdFromCommand getId, ActOnAggregate action) where TCommand : class { - _handlers.AddHandler(ExpectedState.Existing, action); - _idMap.AddCommand(getId); - } - - /// - /// Register an asynchronous handler for a command, which is expected to use an existing aggregate instance. - /// - /// A function to get the aggregate id from the command - /// Asynchronous action to be performed on the aggregate, - /// given the aggregate instance and the command - /// Command type - [PublicAPI] - protected void OnExistingAsync(GetIdFromCommand getId, ActOnAggregateAsync action) where TCommand : class { - _handlers.AddHandler(ExpectedState.Existing, action); - _idMap.AddCommand(getId); - } - - /// - /// Register an asynchronous handler for a command, which is expected to use an existing aggregate instance. - /// - /// Asynchronous function to get the aggregate id from the command - /// Asynchronous action to be performed on the aggregate, - /// given the aggregate instance and the command - /// Command type - [PublicAPI] - protected void OnExistingAsync(GetIdFromCommandAsync getId, ActOnAggregateAsync action) - where TCommand : class { - _handlers.AddHandler(ExpectedState.Existing, action); - _idMap.AddCommand(getId); - } - - /// - /// Register a handler for a command, which is expected to use an a new or an existing aggregate instance. - /// - /// A function to get the aggregate id from the command - /// Action to be performed on the aggregate, - /// given the aggregate instance and the command - /// Command type - protected void OnAny(GetIdFromCommand getId, ActOnAggregate action) where TCommand : class { - _handlers.AddHandler(ExpectedState.Any, action); - _idMap.AddCommand(getId); - } - - /// - /// Register a handler for a command, which is expected to use an a new or an existing aggregate instance. - /// - /// Asynchronous function to get the aggregate id from the command - /// Action to be performed on the aggregate, - /// given the aggregate instance and the command - /// Command type - [PublicAPI] - protected void OnAny(GetIdFromCommandAsync getId, ActOnAggregate action) where TCommand : class { - _handlers.AddHandler(ExpectedState.Any, action); - _idMap.AddCommand(getId); - } - - /// - /// Register an asynchronous handler for a command, which is expected to use an a new or an existing aggregate instance. - /// - /// A function to get the aggregate id from the command - /// Asynchronous action to be performed on the aggregate, - /// given the aggregate instance and the command - /// Command type - [PublicAPI] - protected void OnAnyAsync(GetIdFromCommand getId, ActOnAggregateAsync action) where TCommand : class { - _handlers.AddHandler(ExpectedState.Any, action); - _idMap.AddCommand(getId); - } - - /// - /// Register an asynchronous handler for a command, which is expected to use an a new or an existing aggregate instance. - /// - /// Asynchronous function to get the aggregate id from the command - /// Asynchronous action to be performed on the aggregate, - /// given the aggregate instance and the command - /// Command type - [PublicAPI] - protected void OnAnyAsync(GetIdFromCommandAsync getId, ActOnAggregateAsync action) where TCommand : class { - _handlers.AddHandler(ExpectedState.Any, action); - _idMap.AddCommand(getId); - } - - /// - /// Register an asynchronous handler for a command, which can figure out the aggregate instance by itself, and then return one. - /// - /// Function, which returns some aggregate instance to store - /// Command type - [PublicAPI] - protected void OnAsync(ArbitraryActAsync action) where TCommand : class - => _handlers.AddHandler( - new RegisteredHandler( - ExpectedState.Unknown, - async (_, cmd, ct) => await action((TCommand)cmd, ct).NoContext() - ) - ); - - /// - /// The command handler. Call this function from your edge (API). - /// - /// Command to execute - /// Cancellation token - /// of the execution - /// - public async Task> Handle(TCommand command, CancellationToken cancellationToken) - where TCommand : class { - if (!_handlers.TryGet(out var registeredHandler)) { - Log.CommandHandlerNotFound(); - var exception = new Exceptions.CommandHandlerNotFound(); - - return new ErrorResult(exception); - } - - var hasGetIdFunction = _idMap.TryGet(out var getId); - - if (!hasGetIdFunction || getId == null) { - Log.CannotCalculateAggregateId(); - var exception = new Exceptions.CommandHandlerNotFound(); - - return new ErrorResult(exception); - } - - var aggregateId = await getId(command, cancellationToken).NoContext(); - - try { - var aggregate = registeredHandler.ExpectedState switch { - ExpectedState.Any => await Store.LoadOrNew(_streamNameMap, aggregateId, cancellationToken).NoContext(), - ExpectedState.Existing => await Store.Load(_streamNameMap, aggregateId, cancellationToken).NoContext(), - ExpectedState.New => Create(aggregateId), - ExpectedState.Unknown => default, - _ => throw new ArgumentOutOfRangeException(nameof(registeredHandler.ExpectedState), "Unknown expected state") - }; - - var result = await registeredHandler - .Handler(aggregate!, command, cancellationToken) - .NoContext(); - - // Zero in the global position would mean nothing, so the receiver need to check the Changes.Length - if (result.Changes.Count == 0) return new OkResult(result.State, Array.Empty(), 0); - - var storeResult = await Store.Store(GetAggregateStreamName(), result, cancellationToken).NoContext(); - - var changes = result.Changes.Select(x => new Change(x, _typeMap.GetTypeName(x))); - - Log.CommandHandled(); - - return new OkResult(result.State, changes, storeResult.GlobalPosition); - } catch (Exception e) { - Log.ErrorHandlingCommand(e); - - return new ErrorResult($"Error handling command {typeof(TCommand).Name}", e); - } - - TAggregate Create(TId id) - => _factoryRegistry.CreateInstance().WithId(id); - - StreamName GetAggregateStreamName() - => _streamNameMap.GetStreamName(aggregateId); - } - - async Task ICommandService.Handle(TCommand command, CancellationToken cancellationToken) - where TCommand : class { - var result = await Handle(command, cancellationToken).NoContext(); - - return result switch { - OkResult(var state, var enumerable, _) => new OkResult(state, enumerable), - ErrorResult error => new ErrorResult(error.Message, error.Exception), - _ => throw new ApplicationException("Unknown result type") - }; - } - - public delegate Task ArbitraryActAsync( - TCommand command, - CancellationToken cancellationToken - ); -} diff --git a/src/Core/src/Eventuous.Application/CommandToIdMap.cs b/src/Core/src/Eventuous.Application/CommandToIdMap.cs deleted file mode 100644 index 74c34f98..00000000 --- a/src/Core/src/Eventuous.Application/CommandToIdMap.cs +++ /dev/null @@ -1,25 +0,0 @@ -// Copyright (C) Ubiquitous AS. All rights reserved -// Licensed under the Apache License, Version 2.0. - -namespace Eventuous; - -public delegate Task GetIdFromCommandAsync(TCommand command, CancellationToken cancellationToken) - where TId : Id where TCommand : class; - -public delegate TId GetIdFromCommand(TCommand command) where TId : Id where TCommand : class; - -delegate ValueTask GetIdFromUntypedCommand(object command, CancellationToken cancellationToken) - where TId : Id; - -class IdMap where TId : Id { - readonly TypeMap> _typeMap = new(); - - public void AddCommand(GetIdFromCommand getId) where TCommand : class - => _typeMap.Add((cmd, _) => new ValueTask(getId((TCommand)cmd))); - - public void AddCommand(GetIdFromCommandAsync getId) where TCommand : class - => _typeMap.Add(async (cmd, ct) => await getId((TCommand)cmd, ct)); - - internal bool TryGet([NotNullWhen(true)] out GetIdFromUntypedCommand? getId) where TCommand : class - => _typeMap.TryGetValue(out getId); -} diff --git a/src/Core/src/Eventuous.Application/CommandToStreamMap.cs b/src/Core/src/Eventuous.Application/CommandToStreamMap.cs deleted file mode 100644 index 1641f0a9..00000000 --- a/src/Core/src/Eventuous.Application/CommandToStreamMap.cs +++ /dev/null @@ -1,18 +0,0 @@ -// Copyright (C) Ubiquitous AS. All rights reserved -// Licensed under the Apache License, Version 2.0. - -namespace Eventuous; - -public delegate StreamName GetStreamNameFromCommand(TCommand command) where TCommand : class; - -delegate ValueTask GetStreamNameFromUntypedCommand(object command, CancellationToken cancellationToken); - -public class CommandToStreamMap { - readonly TypeMap _typeMap = new(); - - public void AddCommand(GetStreamNameFromCommand getId) where TCommand : class - => _typeMap.Add((cmd, _) => new ValueTask(getId((TCommand)cmd))); - - internal bool TryGet([NotNullWhen(true)] out GetStreamNameFromUntypedCommand? getId) where TCommand : class - => _typeMap.TryGetValue(out getId); -} diff --git a/src/Core/src/Eventuous.Application/Diagnostics/ApplicationEventSource.cs b/src/Core/src/Eventuous.Application/Diagnostics/ApplicationEventSource.cs index cbd90e36..1619478b 100644 --- a/src/Core/src/Eventuous.Application/Diagnostics/ApplicationEventSource.cs +++ b/src/Core/src/Eventuous.Application/Diagnostics/ApplicationEventSource.cs @@ -16,14 +16,10 @@ class ApplicationEventSource : EventSource { const int CommandHandledId = 3; const int CommandHandlerAlreadyRegisteredId = 4; const int CommandHandlerRegisteredId = 5; - const int CannotGetAggregateIdFromCommandId = 11; [NonEvent] public void CommandHandlerNotFound() => CommandHandlerNotFound(typeof(TCommand).Name); - [NonEvent] - public void CannotCalculateAggregateId() => CannotCalculateAggregateId(typeof(TCommand).Name); - [NonEvent] public void ErrorHandlingCommand(Exception e) => ErrorHandlingCommand(typeof(TCommand).Name, e.ToString()); @@ -43,13 +39,6 @@ public void CommandHandlerRegistered() { [Event(CommandHandlerNotFoundId, Message = "Handler not found for command: '{0}'", Level = EventLevel.Error)] void CommandHandlerNotFound(string commandType) => WriteEvent(CommandHandlerNotFoundId, commandType); - [Event( - CannotGetAggregateIdFromCommandId, - Message = "Cannot get aggregate id from command: '{0}'", - Level = EventLevel.Error - )] - void CannotCalculateAggregateId(string commandType) => WriteEvent(CannotGetAggregateIdFromCommandId, commandType); - [Event(ErrorHandlingCommandId, Message = "Error handling command: '{0}' {1}", Level = EventLevel.Error)] void ErrorHandlingCommand(string commandType, string exception) => WriteEvent(ErrorHandlingCommandId, commandType, exception); diff --git a/src/Core/src/Eventuous.Application/Eventuous.Application.csproj b/src/Core/src/Eventuous.Application/Eventuous.Application.csproj index ae060c26..d95ea841 100644 --- a/src/Core/src/Eventuous.Application/Eventuous.Application.csproj +++ b/src/Core/src/Eventuous.Application/Eventuous.Application.csproj @@ -3,14 +3,14 @@ Eventuous - - + + Tools\Ensure.cs - + @@ -18,6 +18,6 @@ - + diff --git a/src/Core/src/Eventuous.Application/Eventuous.Application.csproj.DotSettings b/src/Core/src/Eventuous.Application/Eventuous.Application.csproj.DotSettings index 8c80dfa7..b1a2786e 100644 --- a/src/Core/src/Eventuous.Application/Eventuous.Application.csproj.DotSettings +++ b/src/Core/src/Eventuous.Application/Eventuous.Application.csproj.DotSettings @@ -1,2 +1,5 @@  + True + True + True True \ No newline at end of file diff --git a/src/Core/src/Eventuous.Application/Exceptions/ExceptionMessages.cs b/src/Core/src/Eventuous.Application/Exceptions/ExceptionMessages.cs index aa1586fb..a27cbf18 100644 --- a/src/Core/src/Eventuous.Application/Exceptions/ExceptionMessages.cs +++ b/src/Core/src/Eventuous.Application/Exceptions/ExceptionMessages.cs @@ -9,9 +9,6 @@ namespace Eventuous; static class ExceptionMessages { static readonly ResourceManager Resources = new("Eventuous.ExceptionMessages", Assembly.GetExecutingAssembly()); - internal static string AggregateIdEmpty(Type idType) - => string.Format(Resources.GetString("AggregateIdEmpty")!, idType.Name); - internal static string MissingCommandHandler(Type type) => string.Format(Resources.GetString("MissingCommandHandler")!, type.Name); diff --git a/src/Core/src/Eventuous.Application/Exceptions/Exceptions.cs b/src/Core/src/Eventuous.Application/Exceptions/Exceptions.cs index 451844bb..74f69055 100644 --- a/src/Core/src/Eventuous.Application/Exceptions/Exceptions.cs +++ b/src/Core/src/Eventuous.Application/Exceptions/Exceptions.cs @@ -6,28 +6,15 @@ namespace Eventuous; using static ExceptionMessages; public static class Exceptions { - public class CommandHandlerNotFound : Exception { - public CommandHandlerNotFound(Type type) : base(MissingCommandHandler(type)) { } - } - - public class UnableToResolveAggregateId : Exception { - public UnableToResolveAggregateId(Type type) : - base($"Unable to resolve aggregate id from command {type.Name}") { } - } - - public class CommandHandlerNotFound : CommandHandlerNotFound { - public CommandHandlerNotFound() : base(typeof(T)) { } - } - - public class CommandHandlerAlreadyRegistered : Exception { - public CommandHandlerAlreadyRegistered() : base(DuplicateCommandHandler()) { } - } - - public class DuplicateTypeException : ArgumentException { - public DuplicateTypeException() : base(DuplicateTypeKey(), typeof(T).FullName) { } - } - - public class CommandMappingException : Exception { - public CommandMappingException() : base(MissingCommandMap()) { } - } + public class CommandHandlerNotFound(Type type) : Exception(MissingCommandHandler(type)); + + public class UnableToResolveAggregateId(Type type) : Exception($"Unable to resolve aggregate id from command {type.Name}"); + + public class CommandHandlerNotFound() : CommandHandlerNotFound(typeof(T)); + + public class CommandHandlerAlreadyRegistered() : Exception(DuplicateCommandHandler()); + + public class DuplicateTypeException() : ArgumentException(DuplicateTypeKey(), typeof(T).FullName); + + public class CommandMappingException() : Exception(MissingCommandMap()); } diff --git a/src/Core/src/Eventuous.Application/ExpectedState.cs b/src/Core/src/Eventuous.Application/ExpectedState.cs index e1e61500..668eec66 100644 --- a/src/Core/src/Eventuous.Application/ExpectedState.cs +++ b/src/Core/src/Eventuous.Application/ExpectedState.cs @@ -3,7 +3,7 @@ namespace Eventuous; -enum ExpectedState { +public enum ExpectedState { New, Existing, Any, diff --git a/src/Core/src/Eventuous.Application/FunctionalCommandService.cs b/src/Core/src/Eventuous.Application/FunctionalCommandService.cs deleted file mode 100644 index 60215af5..00000000 --- a/src/Core/src/Eventuous.Application/FunctionalCommandService.cs +++ /dev/null @@ -1,115 +0,0 @@ -// Copyright (C) Ubiquitous AS. All rights reserved -// Licensed under the Apache License, Version 2.0. - -namespace Eventuous; - -using static Diagnostics.ApplicationEventSource; - -public abstract class FunctionalCommandService - (IEventReader reader, IEventWriter writer, TypeMapper? typeMap = null) : IFuncCommandService, IStateCommandService - where T : State, new() { - [PublicAPI] - protected IEventReader Reader { get; } = reader; - [PublicAPI] - protected IEventWriter Writer { get; } = writer; - - readonly TypeMapper _typeMap = typeMap ?? TypeMap.Instance; - readonly FunctionalHandlersMap _handlers = new(); - readonly CommandToStreamMap _streamMap = new(); - - protected FunctionalCommandService(IEventStore store, TypeMapper? typeMap = null) - : this(store, store, typeMap) { } - - protected void OnNew( - GetStreamNameFromCommand getStreamName, - Func> action - ) where TCommand : class { - _handlers.AddHandler(ExpectedState.New, (_, _, cmd) => action(cmd)); - _streamMap.AddCommand(getStreamName); - } - - protected void OnExisting( - GetStreamNameFromCommand getStreamName, - ExecuteCommand action - ) where TCommand : class { - _handlers.AddHandler(ExpectedState.Existing, action); - _streamMap.AddCommand(getStreamName); - } - - protected void OnAny( - GetStreamNameFromCommand getStreamName, - ExecuteCommand action - ) where TCommand : class { - _handlers.AddHandler(ExpectedState.Any, action); - _streamMap.AddCommand(getStreamName); - } - - public async Task> Handle(TCommand command, CancellationToken cancellationToken) where TCommand : class { - if (!_handlers.TryGet(out var registeredHandler)) { - Log.CommandHandlerNotFound(); - var exception = new Exceptions.CommandHandlerNotFound(); - - return new ErrorResult(exception); - } - - var hasGetStreamFunction = _streamMap.TryGet(out var getStreamName); - - if (!hasGetStreamFunction || getStreamName == null) { - Log.CannotCalculateAggregateId(); - var exception = new Exceptions.CommandHandlerNotFound(); - - return new ErrorResult(exception); - } - - var streamName = await getStreamName(command, cancellationToken).NoContext(); - - try { - var loadedState = registeredHandler.ExpectedState switch { - ExpectedState.Any => await Reader.LoadStateOrNew(streamName, cancellationToken).NoContext(), - ExpectedState.Existing => await Reader.LoadState(streamName, cancellationToken).NoContext(), - ExpectedState.New => new FoldedEventStream(streamName, ExpectedStreamVersion.NoStream, Array.Empty()), - _ => throw new ArgumentOutOfRangeException(nameof(registeredHandler.ExpectedState), "Unknown expected state") - }; - - var result = await registeredHandler - .Handler(loadedState.State, loadedState.Events, command, cancellationToken) - .NoContext(); - - var newEvents = result.ToArray(); - - var newState = newEvents.Aggregate(loadedState.State, (current, evt) => current.When(evt)); - - // Zero in the global position would mean nothing, so the receiver need to check the Changes.Length - if (newEvents.Length == 0) return new OkResult(newState, Array.Empty(), 0); - - var storeResult = await Writer.Store( - streamName, - (int)loadedState.StreamVersion.Value, - newEvents, - static e => e, - cancellationToken - ) - .NoContext(); - - var changes = newEvents.Select(x => new Change(x, _typeMap.GetTypeName(x))); - - Log.CommandHandled(); - - return new OkResult(newState, changes, storeResult.GlobalPosition); - } catch (Exception e) { - Log.ErrorHandlingCommand(e); - - return new ErrorResult($"Error handling command {typeof(TCommand).Name}", e); - } - } - - async Task ICommandService.Handle(TCommand command, CancellationToken cancellationToken) { - var result = await Handle(command, cancellationToken).NoContext(); - - return result switch { - OkResult(var state, var enumerable, _) => new OkResult(state, enumerable), - ErrorResult error => new ErrorResult(error.Message, error.Exception), - _ => throw new ApplicationException("Unknown result type") - }; - } -} diff --git a/src/Core/src/Eventuous.Application/FunctionalHandlersMap.cs b/src/Core/src/Eventuous.Application/FunctionalHandlersMap.cs deleted file mode 100644 index 9f6e9586..00000000 --- a/src/Core/src/Eventuous.Application/FunctionalHandlersMap.cs +++ /dev/null @@ -1,37 +0,0 @@ -// Copyright (C) Ubiquitous AS. All rights reserved -// Licensed under the Apache License, Version 2.0. - -namespace Eventuous; - -using static Diagnostics.ApplicationEventSource; - -delegate ValueTask> ExecuteUntypedCommand(T state, object[] events, object command, CancellationToken cancellationToken) where T : State; - -record RegisteredFuncHandler(ExpectedState ExpectedState, ExecuteUntypedCommand Handler) where T : State; - -class FunctionalHandlersMap where T : State { - readonly TypeMap> _typeMap = new(); - - public void AddHandler(RegisteredFuncHandler handler) where TCommand : class { - try { - _typeMap.Add(handler); - Log.CommandHandlerRegistered(); - } - catch (Exceptions.DuplicateTypeException) { - Log.CommandHandlerAlreadyRegistered(); - throw new Exceptions.CommandHandlerAlreadyRegistered(); - } - } - - public void AddHandler(ExpectedState expectedState, ExecuteCommand action) where TCommand : class { - ValueTask> Handler(T state, object[] events, object command, CancellationToken token) { - var newEvents = action(state, events, (TCommand)command); - return new ValueTask>(newEvents); - } - - AddHandler(new RegisteredFuncHandler(expectedState, Handler)); - } - - public bool TryGet([NotNullWhen(true)] out RegisteredFuncHandler? handler) - => _typeMap.TryGetValue(out handler); -} diff --git a/src/Core/src/Eventuous.Application/FunctionalService/FuncCommandHandlerBuilder.cs b/src/Core/src/Eventuous.Application/FunctionalService/FuncCommandHandlerBuilder.cs new file mode 100644 index 00000000..0475ebaa --- /dev/null +++ b/src/Core/src/Eventuous.Application/FunctionalService/FuncCommandHandlerBuilder.cs @@ -0,0 +1,166 @@ +// Copyright (C) Ubiquitous AS.All rights reserved +// Licensed under the Apache License, Version 2.0. + +using static Eventuous.FuncServiceDelegates; + +namespace Eventuous; + +public abstract class FuncCommandHandlerBuilder where TState : State { + internal abstract RegisteredFuncHandler Build(); +} + +public class FuncCommandHandlerBuilder(IEventReader? reader, IEventWriter? writer) : FuncCommandHandlerBuilder + where TState : State where TCommand : class { + ExpectedState _expectedState = ExpectedState.Any; + GetStreamNameFromUntypedCommand? _getStream; + ExecuteUntypedCommand? _execute; + ResolveReaderFromCommand? _reader; + ResolveWriterFromCommand? _writer; + + /// + /// Defines the expected stream state for handling the command. + /// + /// Expected stream state + /// + public FuncCommandHandlerBuilder InState(ExpectedState expectedState) { + _expectedState = expectedState; + + return this; + } + + /// + /// Defines how to get the stream name from the command. + /// + /// A function to get the stream name from the command + /// + public FuncCommandHandlerBuilder GetStream(GetStreamNameFromCommand getStream) { + _getStream = getStream.AsGetStream(); + + return this; + } + + /// + /// Defines how to get the stream name from the command, asynchronously. + /// + /// A function to get the stream name from the command + /// + public FuncCommandHandlerBuilder GetStreamAsync(GetStreamNameFromCommandAsync getStream) { + _getStream = getStream.AsGetStream(); + + return this; + } + + /// + /// Defines the action to take on the stream for the command. + /// + /// Function to be executed on the stream for the command + /// + public FuncCommandHandlerBuilder Act(ExecuteCommand executeCommand) { + _execute = executeCommand.AsExecute(); + + return this; + } + + + /// + /// Defines the action to take on the stream for the command, asynchronously. + /// + /// Function to be executed on the stream for the command + /// + public FuncCommandHandlerBuilder ActAsync(ExecuteCommandAsync executeCommand) { + _execute = executeCommand.AsExecute(); + + return this; + } + + /// + /// Defines the action to take on the new stream for the command. + /// + /// Function to be executed on a new stream for the command + /// + public FuncCommandHandlerBuilder Act(Func> executeCommand) { + // This is not ideal as we can return more specific interface depending on expected state, but it would do for now. + if (_expectedState != ExpectedState.New) { + throw new InvalidOperationException("Action without state is only allowed for new streams"); + } + + _execute = executeCommand.AsExecute(); + + return this; + } + + /// + /// Defines the action to take on the new stream for the command, asynchronously. + /// + /// Function to be executed on a new stream for the command + /// + public FuncCommandHandlerBuilder ActAsync(Func>> executeCommand) { + // This is not ideal as we can return more specific interface depending on expected state, but it would do for now. + if (_expectedState != ExpectedState.New) { + throw new InvalidOperationException("Action without state is only allowed for new streams"); + } + + _execute = executeCommand.AsExecute(); + + return this; + } + + /// + /// Defines how to resolve the event reader from the command. + /// If not defined, the reader provided by the functional service will be used. + /// + /// Function to resolve the event reader + /// + public FuncCommandHandlerBuilder ResolveReader(ResolveReaderFromCommand? resolveReader) { + _reader = resolveReader; + + return this; + } + + /// + /// Defines how to resolve the event writer from the command. + /// If not defined, the writer provided by the functional service will be used. + /// + /// Function to resolve the event writer + /// + public FuncCommandHandlerBuilder ResolveWriter(ResolveWriterFromCommand? resolveWriter) { + _writer = resolveWriter; + + return this; + } + + /// + /// Defines how to resolve the event store from the command. It assigns both reader and writer. + /// If not defined, the reader and writer provided by the functional service will be used. + /// + /// Function to resolve the event writer + /// + public FuncCommandHandlerBuilder ResolveStore(ResolveEventStoreFromCommand? resolveStore) { + _reader ??= resolveStore?.AsResolveReader(); + _writer ??= resolveStore?.AsResolveWriter(); + + return this; + } + + internal override RegisteredFuncHandler Build() { + return new RegisteredFuncHandler( + _expectedState, + Ensure.NotNull(_getStream, $"Function to get the stream id from {typeof(TCommand).Name} is not defined"), + Ensure.NotNull(_execute, $"Function to act on the stream for command {typeof(TCommand).Name} is not defined"), + (_reader ?? DefaultResolveReader()).AsResolveReader(), + (_writer ?? DefaultResolveWriter()).AsResolveWriter() + ); + + ResolveWriterFromCommand DefaultResolveWriter() { + ArgumentNullException.ThrowIfNull(writer, nameof(writer)); + + return _ => writer; + } + + ResolveReaderFromCommand DefaultResolveReader() { + ArgumentNullException.ThrowIfNull(reader, nameof(reader)); + + return _ => reader; + } + } +} diff --git a/src/Core/src/Eventuous.Application/FunctionalService/FuncHandlerDelegateExtensions.cs b/src/Core/src/Eventuous.Application/FunctionalService/FuncHandlerDelegateExtensions.cs new file mode 100644 index 00000000..c02066a3 --- /dev/null +++ b/src/Core/src/Eventuous.Application/FunctionalService/FuncHandlerDelegateExtensions.cs @@ -0,0 +1,40 @@ +// Copyright (C) Ubiquitous AS.All rights reserved +// Licensed under the Apache License, Version 2.0. + +namespace Eventuous; + +public static partial class FuncServiceDelegates { + internal static GetStreamNameFromUntypedCommand AsGetStream(this GetStreamNameFromCommand getStream) where TCommand : class + => (cmd, _) => ValueTask.FromResult(getStream((TCommand)cmd)); + + internal static GetStreamNameFromUntypedCommand AsGetStream(this GetStreamNameFromCommandAsync getStream) where TCommand : class + => async (cmd, token) => await getStream((TCommand)cmd, token); + + internal static ExecuteUntypedCommand AsExecute(this ExecuteCommand execute) + where TState : State where TCommand : class + => (state, events, command, _) => ValueTask.FromResult(execute(state, events, (TCommand)command)); + + internal static ExecuteUntypedCommand AsExecute(this Func> execute) + where TState : State where TCommand : class + => (_, _, command, _) => ValueTask.FromResult(execute((TCommand)command)); + + internal static ExecuteUntypedCommand AsExecute(this Func>> execute) + where TState : State where TCommand : class + => async (_, _, command, _) => await execute((TCommand)command); + + internal static ExecuteUntypedCommand AsExecute(this ExecuteCommandAsync execute) + where TState : State where TCommand : class + => async (state, events, command, token) => await execute(state, events, (TCommand)command, token); + + internal static ResolveWriterFromCommand AsResolveWriter(this ResolveEventStoreFromCommand resolveStore) where TCommand : class + => cmd => resolveStore(cmd); + + internal static ResolveReaderFromCommand AsResolveReader(this ResolveEventStoreFromCommand resolveStore) where TCommand : class + => cmd => resolveStore(cmd); + + internal static ResolveReaderFromCommand AsResolveReader(this ResolveReaderFromCommand resolveReader) where TCommand : class + => cmd => resolveReader((TCommand)cmd); + + internal static ResolveWriterFromCommand AsResolveWriter(this ResolveWriterFromCommand resolveWriter) where TCommand : class + => cmd => resolveWriter((TCommand)cmd); +} diff --git a/src/Core/src/Eventuous.Application/FunctionalService/FuncHandlersMap.cs b/src/Core/src/Eventuous.Application/FunctionalService/FuncHandlersMap.cs new file mode 100644 index 00000000..0d6d381e --- /dev/null +++ b/src/Core/src/Eventuous.Application/FunctionalService/FuncHandlersMap.cs @@ -0,0 +1,57 @@ +// Copyright (C) Ubiquitous AS.All rights reserved +// Licensed under the Apache License, Version 2.0. + +using System.Reflection; +using Eventuous.Diagnostics; +using static Eventuous.FuncServiceDelegates; + +namespace Eventuous; + +record RegisteredFuncHandler( + ExpectedState ExpectedState, + GetStreamNameFromUntypedCommand GetStream, + ExecuteUntypedCommand Handler, + ResolveReaderFromCommand ResolveReaderFromCommand, + ResolveWriterFromCommand ResolveWriterFromCommand + ) where T : State; + +class FuncHandlersMap where TState : State { + readonly TypeMap> _typeMap = new(); + + static readonly MethodInfo AddHandlerInternalMethod = + typeof(FuncHandlersMap).GetMethod(nameof(AddHandlerInternal), BindingFlags.NonPublic | BindingFlags.Instance)!; + + internal void AddHandlerUntyped(Type command, RegisteredFuncHandler handler) + => AddHandlerInternalMethod.MakeGenericMethod(command).Invoke(this, new object?[] { handler }); + + void AddHandlerInternal(RegisteredFuncHandler handler) where TCommand : class { + try { + _typeMap.Add(handler); + ApplicationEventSource.Log.CommandHandlerRegistered(); + } catch (Exceptions.DuplicateTypeException) { + ApplicationEventSource.Log.CommandHandlerAlreadyRegistered(); + + throw new Exceptions.CommandHandlerAlreadyRegistered(); + } + } + + public void AddHandler( + ExpectedState expectedState, + GetStreamNameFromCommand getStreamName, + ExecuteCommand action, + ResolveReaderFromCommand resolveReaderFromCommand, + ResolveWriterFromCommand resolveWriterFromCommand + ) + where TCommand : class + => AddHandlerInternal( + new RegisteredFuncHandler( + expectedState, + getStreamName.AsGetStream(), + action.AsExecute(), + resolveReaderFromCommand.AsResolveReader(), + resolveWriterFromCommand.AsResolveWriter() + ) + ); + + public bool TryGet([NotNullWhen(true)] out RegisteredFuncHandler? handler) => _typeMap.TryGetValue(out handler); +} diff --git a/src/Core/src/Eventuous.Application/FunctionalService/FuncServiceDelegates.cs b/src/Core/src/Eventuous.Application/FunctionalService/FuncServiceDelegates.cs new file mode 100644 index 00000000..e50e7aa3 --- /dev/null +++ b/src/Core/src/Eventuous.Application/FunctionalService/FuncServiceDelegates.cs @@ -0,0 +1,36 @@ +// Copyright (C) Ubiquitous AS.All rights reserved +// Licensed under the Apache License, Version 2.0. + +namespace Eventuous; + +public static partial class FuncServiceDelegates { + public delegate StreamName GetStreamNameFromCommand(TCommand command) where TCommand : class; + + public delegate Task GetStreamNameFromCommandAsync(TCommand command, CancellationToken cancellationToken) where TCommand : class; + + internal delegate ValueTask GetStreamNameFromUntypedCommand(object command, CancellationToken cancellationToken); + + internal delegate ValueTask> ExecuteUntypedCommand(T state, object[] events, object command, CancellationToken cancellationToken) + where T : State; + + public delegate IEnumerable ExecuteCommand(T state, object[] originalEvents, TCommand command) + where T : State where TCommand : class; + + public delegate Task> ExecuteCommandAsync( + T state, + object[] originalEvents, + TCommand command, + CancellationToken cancellationToken + ) + where T : State where TCommand : class; + + public delegate IEventReader ResolveReaderFromCommand(TCommand command) where TCommand : class; + + internal delegate IEventReader ResolveReaderFromCommand(object command); + + public delegate IEventWriter ResolveWriterFromCommand(TCommand command) where TCommand : class; + + public delegate IEventStore ResolveEventStoreFromCommand(TCommand command) where TCommand : class; + + internal delegate IEventWriter ResolveWriterFromCommand(object command); +} diff --git a/src/Core/src/Eventuous.Application/FunctionalService/FunctionalCommandService.cs b/src/Core/src/Eventuous.Application/FunctionalService/FunctionalCommandService.cs new file mode 100644 index 00000000..d36e7be0 --- /dev/null +++ b/src/Core/src/Eventuous.Application/FunctionalService/FunctionalCommandService.cs @@ -0,0 +1,115 @@ +// Copyright (C) Ubiquitous AS. All rights reserved +// Licensed under the Apache License, Version 2.0. + +using static Eventuous.FuncServiceDelegates; + +namespace Eventuous; + +using static Diagnostics.ApplicationEventSource; + +public abstract class FunctionalCommandService(IEventReader reader, IEventWriter writer, TypeMapper? typeMap = null) + : IFuncCommandService, IStateCommandService where TState : State, new() { + [PublicAPI] + protected IEventReader Reader { get; } = reader; + [PublicAPI] + protected IEventWriter Writer { get; } = writer; + + readonly TypeMapper _typeMap = typeMap ?? TypeMap.Instance; + readonly FuncHandlersMap _handlers = new(); + + bool _initialized; + + protected FunctionalCommandService(IEventStore store, TypeMapper? typeMap = null) + : this(store, store, typeMap) { } + + /// + /// Returns the command handler builder for the specified command type. + /// + /// Command type + /// + protected FuncCommandHandlerBuilder On() where TCommand : class { + var builder = new FuncCommandHandlerBuilder(Reader, Writer); + _builders.Add(typeof(TCommand), builder); + + return builder; + } + + [Obsolete("Use On().InState(ExpectedState.New).GetStream(...).Act(...) instead")] + protected void OnNew(GetStreamNameFromCommand getStreamName, Func> action) where TCommand : class + => On().InState(ExpectedState.New).GetStream(getStreamName).Act(action); + + [Obsolete("Use On().InState(ExpectedState.Existing).GetStream(...).Act(...) instead")] + protected void OnExisting(GetStreamNameFromCommand getStreamName, ExecuteCommand action) where TCommand : class + => On().InState(ExpectedState.Existing).GetStream(getStreamName).Act(action); + + [Obsolete("Use On().InState(ExpectedState.Any).GetStream(...).Act(...) instead")] + protected void OnAny(GetStreamNameFromCommand getStreamName, ExecuteCommand action) where TCommand : class + => On().InState(ExpectedState.Any).GetStream(getStreamName).Act(action); + + public async Task> Handle(TCommand command, CancellationToken cancellationToken) where TCommand : class { + if (!_initialized) BuildHandlers(); + + if (!_handlers.TryGet(out var registeredHandler)) { + Log.CommandHandlerNotFound(); + var exception = new Exceptions.CommandHandlerNotFound(); + + return new ErrorResult(exception); + } + + var streamName = await registeredHandler.GetStream(command, cancellationToken).NoContext(); + + try { + var loadedState = registeredHandler.ExpectedState switch { + ExpectedState.Any => await Reader.LoadStateOrNew(streamName, cancellationToken).NoContext(), + ExpectedState.Existing => await Reader.LoadState(streamName, cancellationToken).NoContext(), + ExpectedState.New => new FoldedEventStream(streamName, ExpectedStreamVersion.NoStream, Array.Empty()), + _ => throw new ArgumentOutOfRangeException(nameof(registeredHandler.ExpectedState), "Unknown expected state") + }; + + var result = await registeredHandler + .Handler(loadedState.State, loadedState.Events, command, cancellationToken) + .NoContext(); + + var newEvents = result.ToArray(); + var newState = newEvents.Aggregate(loadedState.State, (current, evt) => current.When(evt)); + + // Zero in the global position would mean nothing, so the receiver need to check the Changes.Length + if (newEvents.Length == 0) return new OkResult(newState, Array.Empty(), 0); + + var storeResult = await Writer.Store(streamName, (int)loadedState.StreamVersion.Value, newEvents, static e => e, cancellationToken).NoContext(); + var changes = newEvents.Select(x => new Change(x, _typeMap.GetTypeName(x))); + Log.CommandHandled(); + + return new OkResult(newState, changes, storeResult.GlobalPosition); + } catch (Exception e) { + Log.ErrorHandlingCommand(e); + + return new ErrorResult($"Error handling command {typeof(TCommand).Name}", e); + } + } + + async Task ICommandService.Handle(TCommand command, CancellationToken cancellationToken) { + var result = await Handle(command, cancellationToken).NoContext(); + + return result switch { + OkResult(var state, var enumerable, _) => new OkResult(state, enumerable), + ErrorResult error => new ErrorResult(error.Message, error.Exception), + _ => throw new ApplicationException("Unknown result type") + }; + } + + readonly Dictionary> _builders = new(); + readonly object _handlersLock = new(); + + void BuildHandlers() { + lock (_handlersLock) { + foreach (var commandType in _builders.Keys) { + var builder = _builders[commandType]; + var handler = builder.Build(); + _handlers.AddHandlerUntyped(commandType, handler); + } + + _initialized = true; + } + } +} diff --git a/src/Core/src/Eventuous.Application/HandlersMap.cs b/src/Core/src/Eventuous.Application/HandlersMap.cs deleted file mode 100644 index 2c1ff60e..00000000 --- a/src/Core/src/Eventuous.Application/HandlersMap.cs +++ /dev/null @@ -1,55 +0,0 @@ -// Copyright (C) Ubiquitous AS. All rights reserved -// Licensed under the Apache License, Version 2.0. - -namespace Eventuous; - -using static Diagnostics.ApplicationEventSource; - -public delegate Task ActOnAggregateAsync(TAggregate aggregate, TCommand command, CancellationToken cancellationToken) where TAggregate : Aggregate; - -public delegate void ActOnAggregate(TAggregate aggregate, TCommand command) where TAggregate : Aggregate; - -record RegisteredHandler(ExpectedState ExpectedState, Func> Handler); - -class HandlersMap where TAggregate : Aggregate { - readonly TypeMap> _typeMap = new(); - - public void AddHandler(RegisteredHandler handler) { - try { - _typeMap.Add(handler); - Log.CommandHandlerRegistered(); - } - catch (Exceptions.DuplicateTypeException) { - Log.CommandHandlerAlreadyRegistered(); - throw new Exceptions.CommandHandlerAlreadyRegistered(); - } - } - - public void AddHandler(ExpectedState expectedState, ActOnAggregateAsync action) - => AddHandler( - new RegisteredHandler( - expectedState, - async (aggregate, cmd, ct) => { - await action(aggregate, (TCommand)cmd, ct).NoContext(); - return aggregate; - } - ) - ); - - public void AddHandler(ExpectedState expectedState, ActOnAggregate action) - => AddHandler( - new RegisteredHandler( - expectedState, - (aggregate, cmd, _) => { - action(aggregate, (TCommand)cmd); - return new ValueTask(aggregate); - } - ) - ); - - public bool TryGet([NotNullWhen(true)] out RegisteredHandler? handler) - => _typeMap.TryGetValue(out handler); -} - -public delegate IEnumerable ExecuteCommand(T state, object[] originalEvents, TCommand command) - where T : State where TCommand : class; diff --git a/src/Core/src/Eventuous.Application/ICommandService.cs b/src/Core/src/Eventuous.Application/ICommandService.cs index 707e12b6..b9470c5d 100644 --- a/src/Core/src/Eventuous.Application/ICommandService.cs +++ b/src/Core/src/Eventuous.Application/ICommandService.cs @@ -2,6 +2,7 @@ // Licensed under the Apache License, Version 2.0. // ReSharper disable UnusedTypeParameter + namespace Eventuous; public interface ICommandService { @@ -20,5 +21,4 @@ public interface IStateCommandService public interface ICommandService : IStateCommandService where T : Aggregate where TState : State, new() - where TId : Id { -} + where TId : Id { } \ No newline at end of file diff --git a/src/Core/src/Eventuous.Application/CommandMap.cs b/src/Core/src/Eventuous.Application/MessageMap.cs similarity index 90% rename from src/Core/src/Eventuous.Application/CommandMap.cs rename to src/Core/src/Eventuous.Application/MessageMap.cs index 8565b25b..d68386c6 100644 --- a/src/Core/src/Eventuous.Application/CommandMap.cs +++ b/src/Core/src/Eventuous.Application/MessageMap.cs @@ -10,8 +10,7 @@ public MessageMap Add(Func map) where TIn : class where TO _typeMap.Add(Map); return this; - object Map(object inCmd) - => map((TIn)inCmd); + object Map(object inCmd) => map((TIn)inCmd); } public TOut Convert(TIn command) where TIn : class { diff --git a/src/Core/src/Eventuous.Application/Result.cs b/src/Core/src/Eventuous.Application/Result.cs index 97a35f6c..bfe06f59 100644 --- a/src/Core/src/Eventuous.Application/Result.cs +++ b/src/Core/src/Eventuous.Application/Result.cs @@ -15,17 +15,8 @@ public abstract record Result(object? State, bool Success, IEnumerable? public record OkResult(object State, IEnumerable? Changes = null) : Result(State, true, Changes); [PublicAPI] -public record ErrorResult : Result { - public ErrorResult(string message, Exception? exception) : base(null, false) { - Message = message; - Exception = exception; - } - - [JsonIgnore] public Exception? Exception { get; } - +public record ErrorResult(string Message, [property: JsonIgnore] Exception? Exception) : Result(null, false) { public string ErrorMessage => Exception?.Message ?? "Unknown error"; - - public string Message { get; } } [PublicAPI] diff --git a/src/Core/src/Eventuous.Persistence/AggregateStore/AggregateStoreExceptions.cs b/src/Core/src/Eventuous.Persistence/AggregateStore/AggregateStoreExceptions.cs index 51103f4f..fdf3b76a 100644 --- a/src/Core/src/Eventuous.Persistence/AggregateStore/AggregateStoreExceptions.cs +++ b/src/Core/src/Eventuous.Persistence/AggregateStore/AggregateStoreExceptions.cs @@ -11,9 +11,8 @@ public OptimisticConcurrencyException(StreamName streamName, Exception inner) : base($"Update failed due to the wrong version in stream {streamName}. {inner.Message} {inner.InnerException?.Message}", inner) { } } -public class OptimisticConcurrencyException : OptimisticConcurrencyException where T : Aggregate { - public OptimisticConcurrencyException(StreamName streamName, Exception inner) : base(typeof(T), streamName, inner) { } -} +public class OptimisticConcurrencyException(StreamName streamName, Exception inner) : OptimisticConcurrencyException(typeof(T), streamName, inner) + where T : Aggregate; public class AggregateNotFoundException : Exception { public AggregateNotFoundException(Type aggregateType, StreamName streamName, Exception inner) diff --git a/src/Core/src/Eventuous.Persistence/Diagnostics/Tracing/TracedEventWriter.cs b/src/Core/src/Eventuous.Persistence/Diagnostics/Tracing/TracedEventWriter.cs index c9bd5203..8f3cbe81 100644 --- a/src/Core/src/Eventuous.Persistence/Diagnostics/Tracing/TracedEventWriter.cs +++ b/src/Core/src/Eventuous.Persistence/Diagnostics/Tracing/TracedEventWriter.cs @@ -8,21 +8,15 @@ namespace Eventuous.Diagnostics.Tracing; using Metrics; using static Constants; -public class TracedEventWriter : BaseTracer, IEventWriter { - public static IEventWriter Trace(IEventWriter writer) - => new TracedEventWriter(writer); - - public TracedEventWriter(IEventWriter writer) - => Inner = writer; - - IEventWriter Inner { get; } +public class TracedEventWriter(IEventWriter writer) : BaseTracer, IEventWriter { + public static IEventWriter Trace(IEventWriter writer) => new TracedEventWriter(writer); public async Task AppendEvents( - StreamName stream, - ExpectedStreamVersion expectedVersion, - IReadOnlyCollection events, - CancellationToken cancellationToken - ) { + StreamName stream, + ExpectedStreamVersion expectedVersion, + IReadOnlyCollection events, + CancellationToken cancellationToken + ) { using var activity = StartActivity(stream, Operations.AppendEvents); using var measure = Measure.Start(MetricsSource, new EventStoreMetricsContext(Operations.AppendEvents)); @@ -32,13 +26,14 @@ CancellationToken cancellationToken .ToArray(); try { - var result = await Inner.AppendEvents(stream, expectedVersion, tracedEvents, cancellationToken).NoContext(); + var result = await writer.AppendEvents(stream, expectedVersion, tracedEvents, cancellationToken).NoContext(); activity?.SetActivityStatus(ActivityStatus.Ok()); + return result; - } - catch (Exception e) { + } catch (Exception e) { activity?.SetActivityStatus(ActivityStatus.Error(e)); measure.SetError(); + throw; } } diff --git a/src/Core/src/Eventuous.Persistence/StreamNameMap.cs b/src/Core/src/Eventuous.Persistence/StreamNameMap.cs index 8af74c42..609254be 100644 --- a/src/Core/src/Eventuous.Persistence/StreamNameMap.cs +++ b/src/Core/src/Eventuous.Persistence/StreamNameMap.cs @@ -24,6 +24,5 @@ public StreamName GetStreamName(TId id) where TId : Id : throw new StreamNameMapNotFound(id); } -public class StreamNameMapNotFound : Exception where TId : Id { - public StreamNameMapNotFound(TId id) : base($"No stream name map found for {typeof(TId).Name} with value {id}") { } -} +public class StreamNameMapNotFound(TId id) : Exception($"No stream name map found for {typeof(TId).Name} with value {id}") + where TId : Id; diff --git a/src/Core/src/Eventuous.Producers/Chunk.cs b/src/Core/src/Eventuous.Producers/Chunk.cs index f0748cfd..52f17a5c 100644 --- a/src/Core/src/Eventuous.Producers/Chunk.cs +++ b/src/Core/src/Eventuous.Producers/Chunk.cs @@ -12,6 +12,7 @@ public static IEnumerable> Chunks(this IEnumerable enumerab while (e.MoveNext()) { var remaining = chunkSize; + // ReSharper disable once AccessToDisposedClosure var innerMoveNext = new Func(() => --remaining > 0 && e.MoveNext()); yield return e.GetChunk(innerMoveNext); diff --git a/src/Core/src/Eventuous.Shared/Store/StreamName.cs b/src/Core/src/Eventuous.Shared/Store/StreamName.cs index 01ec2382..d2ce5581 100644 --- a/src/Core/src/Eventuous.Shared/Store/StreamName.cs +++ b/src/Core/src/Eventuous.Shared/Store/StreamName.cs @@ -22,7 +22,4 @@ public StreamName(string value) { public override string ToString() => Value; } -public class InvalidStreamName : Exception { - public InvalidStreamName(string? streamName) - : base($"Stream name is {(string.IsNullOrWhiteSpace(streamName) ? "empty" : "invalid")}") { } -} +public class InvalidStreamName(string? streamName) : Exception($"Stream name is {(string.IsNullOrWhiteSpace(streamName) ? "empty" : "invalid")}"); diff --git a/src/Core/src/Eventuous.Shared/TypeMap/TypeMap.cs b/src/Core/src/Eventuous.Shared/TypeMap/TypeMap.cs index 2772cbf4..b4e05b98 100644 --- a/src/Core/src/Eventuous.Shared/TypeMap/TypeMap.cs +++ b/src/Core/src/Eventuous.Shared/TypeMap/TypeMap.cs @@ -54,6 +54,7 @@ struct MapEntry { const int DefaultCapacity = 10; + // ReSharper disable once UnusedTypeParameter static class TypeSlot { // ReSharper disable once StaticMemberInGenericType internal static readonly int Index = Interlocked.Increment(ref _index); diff --git a/src/Core/src/Eventuous.Subscriptions/Checkpoints/CommitPositionSequence.cs b/src/Core/src/Eventuous.Subscriptions/Checkpoints/CommitPositionSequence.cs index 4c26609f..c6790dfb 100644 --- a/src/Core/src/Eventuous.Subscriptions/Checkpoints/CommitPositionSequence.cs +++ b/src/Core/src/Eventuous.Subscriptions/Checkpoints/CommitPositionSequence.cs @@ -9,9 +9,7 @@ namespace Eventuous.Subscriptions.Checkpoints; using Diagnostics; -public class CommitPositionSequence : SortedSet { - public CommitPositionSequence() : base(new PositionsComparer()) { } - +public class CommitPositionSequence() : SortedSet(new PositionsComparer()) { [MethodImpl(MethodImplOptions.AggressiveInlining)] public CommitPosition FirstBeforeGap() => Count switch { diff --git a/src/Core/src/Eventuous.Subscriptions/Checkpoints/MeasuredCheckpointStore.cs b/src/Core/src/Eventuous.Subscriptions/Checkpoints/MeasuredCheckpointStore.cs index 91cd2aa1..acfc2d16 100644 --- a/src/Core/src/Eventuous.Subscriptions/Checkpoints/MeasuredCheckpointStore.cs +++ b/src/Core/src/Eventuous.Subscriptions/Checkpoints/MeasuredCheckpointStore.cs @@ -7,17 +7,13 @@ namespace Eventuous.Subscriptions.Checkpoints; -public class MeasuredCheckpointStore : ICheckpointStore { +public class MeasuredCheckpointStore(ICheckpointStore checkpointStore) : ICheckpointStore { public const string OperationPrefix = "checkpoint"; public const string ReadOperationName = $"{OperationPrefix}.read"; public const string WriteOperationName = $"{OperationPrefix}.write"; public const string SubscriptionIdTag = "subscriptionId"; public const string CheckpointBaggage = "checkpoint"; - readonly ICheckpointStore _checkpointStore; - - public MeasuredCheckpointStore(ICheckpointStore checkpointStore) => _checkpointStore = checkpointStore; - public async ValueTask GetLastCheckpoint( string checkpointId, CancellationToken cancellationToken @@ -31,7 +27,7 @@ CancellationToken cancellationToken ) ?.Start(); - var checkpoint = await _checkpointStore.GetLastCheckpoint(checkpointId, cancellationToken).NoContext(); + var checkpoint = await checkpointStore.GetLastCheckpoint(checkpointId, cancellationToken).NoContext(); activity?.AddBaggage(CheckpointBaggage, checkpoint.Position?.ToString()); @@ -51,7 +47,7 @@ public async ValueTask StoreCheckpoint(Checkpoint checkpoint, bool f .AddBaggage(CheckpointBaggage, checkpoint.Position?.ToString()) .Start(); - return await _checkpointStore.StoreCheckpoint(checkpoint, force, cancellationToken).NoContext(); + return await checkpointStore.StoreCheckpoint(checkpoint, force, cancellationToken).NoContext(); } static KeyValuePair[] GetTags(string checkpointId) diff --git a/src/Core/src/Eventuous.Subscriptions/Context/AsyncConsumeContext.cs b/src/Core/src/Eventuous.Subscriptions/Context/AsyncConsumeContext.cs index e48b4e44..1c88cf20 100644 --- a/src/Core/src/Eventuous.Subscriptions/Context/AsyncConsumeContext.cs +++ b/src/Core/src/Eventuous.Subscriptions/Context/AsyncConsumeContext.cs @@ -28,7 +28,8 @@ public class AsyncConsumeContext : WrappedConsumeContext { /// The original message context /// Function to ACK the message /// Function to NACK the message in case of failure - public AsyncConsumeContext(IMessageConsumeContext inner, Acknowledge acknowledge, Fail fail) : base(inner) { + public AsyncConsumeContext(IMessageConsumeContext inner, Acknowledge acknowledge, Fail fail) + : base(inner) { // ReSharper disable once NullCoalescingConditionIsAlwaysNotNullAccordingToAPIContract inner.LogContext ??= Logger.Current; _acknowledge = acknowledge; @@ -48,6 +49,6 @@ public AsyncConsumeContext(IMessageConsumeContext inner, Acknowledge acknowledge /// public ValueTask Fail(Exception exception) => _fail(this, exception); - public string? PartitionKey { get; internal set; } - public long PartitionId { get; internal set; } + public string? PartitionKey { [PublicAPI] get; internal set; } + public long PartitionId { get; internal set; } } diff --git a/src/Core/src/Eventuous.Subscriptions/Diagnostics/CheckpointCommitMetrics.cs b/src/Core/src/Eventuous.Subscriptions/Diagnostics/CheckpointCommitMetrics.cs index badcda06..c755686a 100644 --- a/src/Core/src/Eventuous.Subscriptions/Diagnostics/CheckpointCommitMetrics.cs +++ b/src/Core/src/Eventuous.Subscriptions/Diagnostics/CheckpointCommitMetrics.cs @@ -9,11 +9,9 @@ namespace Eventuous.Subscriptions.Diagnostics; using static Checkpoints.CheckpointCommitHandler; -sealed class CheckpointCommitMetrics : GenericListener, IDisposable { +sealed class CheckpointCommitMetrics() : GenericListener(DiagnosticName), IDisposable { readonly ConcurrentDictionary _commitEvents = new(); - public CheckpointCommitMetrics() : base(DiagnosticName) { } - protected override void OnEvent(KeyValuePair evt) { var (_, value) = evt; diff --git a/src/Core/src/Eventuous.Subscriptions/Filters/AsyncHandlingFilter.cs b/src/Core/src/Eventuous.Subscriptions/Filters/AsyncHandlingFilter.cs index 5a29a30a..415b5fd5 100644 --- a/src/Core/src/Eventuous.Subscriptions/Filters/AsyncHandlingFilter.cs +++ b/src/Core/src/Eventuous.Subscriptions/Filters/AsyncHandlingFilter.cs @@ -29,6 +29,7 @@ public AsyncHandlingFilter(uint concurrencyLimit, uint bufferSize = 10) { ); } + // ReSharper disable once CognitiveComplexity static async ValueTask DelayedConsume(WorkerTask workerTask, CancellationToken ct) { var ctx = workerTask.Context; diff --git a/src/Core/src/Eventuous.Subscriptions/Filters/ConsumePipe.cs b/src/Core/src/Eventuous.Subscriptions/Filters/ConsumePipe.cs index 0068e2f8..826cd3a7 100644 --- a/src/Core/src/Eventuous.Subscriptions/Filters/ConsumePipe.cs +++ b/src/Core/src/Eventuous.Subscriptions/Filters/ConsumePipe.cs @@ -61,12 +61,6 @@ public async ValueTask DisposeAsync() { } } -public class InvalidContextTypeException : InvalidOperationException { - public InvalidContextTypeException(Type expected, Type actual) - : base($"Context type {expected.Name} is not assignable to {actual.Name}") { } -} +public class InvalidContextTypeException(Type expected, Type actual) : InvalidOperationException($"Context type {expected.Name} is not assignable to {actual.Name}"); -public class DuplicateFilterException : InvalidOperationException { - public DuplicateFilterException(IConsumeFilter filter) - : base($"Filter of type {filter.GetType()} is already registered") { } -} +public class DuplicateFilterException(IConsumeFilter filter) : InvalidOperationException($"Filter of type {filter.GetType()} is already registered"); diff --git a/src/Core/src/Eventuous.Subscriptions/Filters/IConsumeFilter.cs b/src/Core/src/Eventuous.Subscriptions/Filters/IConsumeFilter.cs index 9cd259d6..4ead1803 100644 --- a/src/Core/src/Eventuous.Subscriptions/Filters/IConsumeFilter.cs +++ b/src/Core/src/Eventuous.Subscriptions/Filters/IConsumeFilter.cs @@ -1,6 +1,7 @@ // Copyright (C) Ubiquitous AS. All rights reserved // Licensed under the Apache License, Version 2.0. +// ReSharper disable UnusedTypeParameter namespace Eventuous.Subscriptions.Filters; using Context; diff --git a/src/Core/src/Eventuous.Subscriptions/Handlers/EventHandlingStatus.cs b/src/Core/src/Eventuous.Subscriptions/Handlers/EventHandlingStatus.cs index c24caf7b..a0dfb647 100644 --- a/src/Core/src/Eventuous.Subscriptions/Handlers/EventHandlingStatus.cs +++ b/src/Core/src/Eventuous.Subscriptions/Handlers/EventHandlingStatus.cs @@ -9,7 +9,7 @@ public enum EventHandlingStatus : short { Success = 0b_0001, Pending = 0b_0010, Failure = 0b_0011, - Handled = 0b_0111, + Handled = 0b_0111 // 0111 bitmask for Handled means that if any of the three lower bits is set, the message // hs been handled. } \ No newline at end of file diff --git a/src/Core/src/Eventuous.Subscriptions/Registrations/NamedRegistrations.cs b/src/Core/src/Eventuous.Subscriptions/Registrations/NamedRegistrations.cs index 1a38932d..90e53ead 100644 --- a/src/Core/src/Eventuous.Subscriptions/Registrations/NamedRegistrations.cs +++ b/src/Core/src/Eventuous.Subscriptions/Registrations/NamedRegistrations.cs @@ -34,8 +34,6 @@ string subscriptionId } } -class NamedDescriptor : ServiceDescriptor { - public string Name { get; } - - public NamedDescriptor(string name, Type serviceType, object instance) : base(serviceType, instance) => Name = name; +class NamedDescriptor(string name, Type serviceType, object instance) : ServiceDescriptor(serviceType, instance) { + public string Name { get; } = name; } \ No newline at end of file diff --git a/src/Core/test/Eventuous.Tests.Application/BookingFuncService.cs b/src/Core/test/Eventuous.Tests.Application/BookingFuncService.cs index b1d32ee2..593502d4 100644 --- a/src/Core/test/Eventuous.Tests.Application/BookingFuncService.cs +++ b/src/Core/test/Eventuous.Tests.Application/BookingFuncService.cs @@ -1,25 +1,30 @@ // Copyright (C) Ubiquitous AS. All rights reserved // Licensed under the Apache License, Version 2.0. -using Eventuous.Sut.App; using Eventuous.Sut.Domain; +using static Eventuous.Sut.App.Commands; using static Eventuous.Sut.Domain.BookingEvents; namespace Eventuous.Tests.Application; public class BookingFuncService : FunctionalCommandService { - public BookingFuncService(IEventStore store, TypeMapper? typeMap = null) : base(store, typeMap) { - OnNew(cmd => GetStream(cmd.BookingId), BookRoom); - OnExisting(cmd => GetStream(cmd.BookingId), RecordPayment); + public BookingFuncService(IEventStore store, TypeMapper? typeMap = null) + : base(store, typeMap) { +#pragma warning disable CS0618 // Type or member is obsolete + OnNew(cmd => GetStream(cmd.BookingId), BookRoom); +#pragma warning restore CS0618 // Type or member is obsolete + On().InState(ExpectedState.Existing).GetStream(cmd => GetStream(cmd.BookingId)).Act(RecordPayment); + + return; static StreamName GetStream(string id) => StreamName.For(id); - static IEnumerable BookRoom(Commands.BookRoom cmd) { + static IEnumerable BookRoom(BookRoom cmd) { yield return new RoomBooked(cmd.RoomId, cmd.CheckIn, cmd.CheckOut, cmd.Price); } - static IEnumerable RecordPayment(BookingState state, object[] originalEvents, Commands.RecordPayment cmd) { + static IEnumerable RecordPayment(BookingState state, object[] originalEvents, RecordPayment cmd) { if (state.HasPayment(cmd.PaymentId)) yield break; var registered = new BookingPaymentRegistered(cmd.PaymentId, cmd.Amount.Amount); @@ -27,6 +32,7 @@ static IEnumerable RecordPayment(BookingState state, object[] originalEv yield return registered; var newState = state.When(registered); + if (newState.IsFullyPaid()) yield return new BookingFullyPaid(cmd.PaidAt); if (newState.IsOverpaid()) yield return new BookingOverpaid((state.AmountPaid - state.Price).Amount); } diff --git a/src/Core/test/Eventuous.Tests.Subscriptions/ConsumePipeTests.cs b/src/Core/test/Eventuous.Tests.Subscriptions/ConsumePipeTests.cs index 27d48bdb..4646e6f1 100644 --- a/src/Core/test/Eventuous.Tests.Subscriptions/ConsumePipeTests.cs +++ b/src/Core/test/Eventuous.Tests.Subscriptions/ConsumePipeTests.cs @@ -4,19 +4,15 @@ namespace Eventuous.Tests.Subscriptions; -public class ConsumePipeTests { - readonly ITestOutputHelper _outputHelper; - +public class ConsumePipeTests(ITestOutputHelper outputHelper) { static readonly Fixture Auto = new(); - public ConsumePipeTests(ITestOutputHelper outputHelper) => _outputHelper = outputHelper; - [Fact] public async Task ShouldCallHandlers() { var handler = new TestHandler(); var pipe = new ConsumePipe().AddDefaultConsumer(handler); - var ctx = Auto.CreateContext(_outputHelper); + var ctx = Auto.CreateContext(outputHelper); await pipe.Send(ctx); @@ -34,7 +30,7 @@ public async Task ShouldAddContextBaggage() { pipe.AddFilterFirst(new TestFilter(Key, baggage)); - var ctx = Auto.CreateContext(_outputHelper); + var ctx = Auto.CreateContext(outputHelper); await pipe.Send(ctx); diff --git a/src/Core/test/Eventuous.Tests.Subscriptions/DefaultConsumerTests.cs b/src/Core/test/Eventuous.Tests.Subscriptions/DefaultConsumerTests.cs index d53361c0..5e7fafee 100644 --- a/src/Core/test/Eventuous.Tests.Subscriptions/DefaultConsumerTests.cs +++ b/src/Core/test/Eventuous.Tests.Subscriptions/DefaultConsumerTests.cs @@ -5,22 +5,16 @@ namespace Eventuous.Tests.Subscriptions; -public class DefaultConsumerTests : IDisposable { - readonly ITestOutputHelper _output; - readonly TestEventListener _listener; +public class DefaultConsumerTests(ITestOutputHelper output) : IDisposable { + readonly TestEventListener _listener = new(output); static readonly Fixture Auto = new(); - public DefaultConsumerTests(ITestOutputHelper output) { - _output = output; - _listener = new TestEventListener(output); - } - [Fact] public async Task ShouldFailWhenHandlerNacks() { var handler = new FailingHandler(); var consumer = new DefaultConsumer(new IEventHandler[] { handler }); - var ctx = Auto.CreateContext(_output); + var ctx = Auto.CreateContext(output); await consumer.Consume(ctx); diff --git a/src/Core/test/Eventuous.Tests.Subscriptions/HandlingStatusTests.cs b/src/Core/test/Eventuous.Tests.Subscriptions/HandlingStatusTests.cs index cac8d2ad..b58d7757 100644 --- a/src/Core/test/Eventuous.Tests.Subscriptions/HandlingStatusTests.cs +++ b/src/Core/test/Eventuous.Tests.Subscriptions/HandlingStatusTests.cs @@ -3,13 +3,9 @@ namespace Eventuous.Tests.Subscriptions; -public class HandlingStatusTests { - readonly ITestOutputHelper _output; - +public class HandlingStatusTests(ITestOutputHelper output) { static Fixture Auto { get; } = new(); - public HandlingStatusTests(ITestOutputHelper output) => _output = output; - [Fact] public void AckAndNackShouldNack() { const EventHandlingStatus actual = EventHandlingStatus.Success | EventHandlingStatus.Failure; @@ -43,7 +39,7 @@ public void IgnoredShouldBeIgnored() { [Fact] public void NackAndIgnoreShouldFail() { - var context = Auto.CreateContext(_output); + var context = Auto.CreateContext(output); context.Nack(new Exception()); context.Ignore("test"); context.HasFailed().Should().BeTrue(); @@ -53,7 +49,7 @@ public void NackAndIgnoreShouldFail() { [Fact] public void NackAckAndIgnoreShouldFail() { - var context = Auto.CreateContext(_output); + var context = Auto.CreateContext(output); context.Nack(new Exception()); context.Ack(); context.Ignore(); @@ -64,7 +60,7 @@ public void NackAckAndIgnoreShouldFail() { [Fact] public void AckAndIgnoreShouldSucceed() { - var context = Auto.CreateContext(_output); + var context = Auto.CreateContext(output); context.Ack(); context.Ignore(); context.HasFailed().Should().BeFalse(); @@ -74,7 +70,7 @@ public void AckAndIgnoreShouldSucceed() { [Fact] public void IgnoreAndIgnoreShouldIgnore() { - var context = Auto.CreateContext(_output); + var context = Auto.CreateContext(output); context.Ignore(); context.Ignore(); context.WasIgnored().Should().BeTrue(); @@ -83,7 +79,7 @@ public void IgnoreAndIgnoreShouldIgnore() { [Fact] public void PendingShouldBePending() { - var context = Auto.CreateContext(_output); + var context = Auto.CreateContext(output); context.WasIgnored().Should().BeFalse(); context.HasFailed().Should().BeFalse(); context.HandlingResults.IsPending().Should().BeTrue(); diff --git a/src/Core/test/Eventuous.Tests.Subscriptions/RegistrationTests.cs b/src/Core/test/Eventuous.Tests.Subscriptions/RegistrationTests.cs index 49a9ca86..7daa8b0e 100644 --- a/src/Core/test/Eventuous.Tests.Subscriptions/RegistrationTests.cs +++ b/src/Core/test/Eventuous.Tests.Subscriptions/RegistrationTests.cs @@ -15,9 +15,9 @@ namespace Eventuous.Tests.Subscriptions; public class RegistrationTests(ITestOutputHelper outputHelper) { - readonly TestServer _server = new TestServer(BuildHost()); - readonly Fixture _auto = new(); - readonly ILoggerFactory _logger = Logging.GetLoggerFactory(outputHelper); + readonly TestServer _server = new(BuildHost()); + readonly Fixture _auto = new(); + readonly ILoggerFactory _logger = Logging.GetLoggerFactory(outputHelper); [Fact] public void ShouldBeSingletons() { @@ -95,8 +95,9 @@ public async Task BothShouldBeRunningAndReportHealthy() { [Fact] public void ShouldRegisterTwoMeasures() { - var subs = _server.Services.GetServices().ToArray(); - var measure = _server.Services.GetRequiredService(); + var subs = _server.Services.GetServices().ToArray(); + subs.Should().NotBeEmpty(); + _server.Services.GetRequiredService(); } static IWebHostBuilder BuildHost() => new WebHostBuilder().UseStartup(); @@ -125,6 +126,7 @@ public void Configure(IApplicationBuilder app) { } } record TestOptions : SubscriptionOptions { + // ReSharper disable once UnusedAutoPropertyAccessor.Local public string? Field { get; set; } } diff --git a/src/Core/test/Eventuous.Tests/ForgotToSetId.cs b/src/Core/test/Eventuous.Tests/ForgotToSetId.cs index 2b26dc66..6646d2da 100644 --- a/src/Core/test/Eventuous.Tests/ForgotToSetId.cs +++ b/src/Core/test/Eventuous.Tests/ForgotToSetId.cs @@ -15,11 +15,9 @@ public async Task ShouldFailWithNoId() { TestService Service { get; } class TestService : CommandService { - public TestService(IAggregateStore store) : base(store) - => OnNew( - cmd => new TestId(cmd.Id), - (test, cmd) => test.Process() - ); + public TestService(IAggregateStore store) + : base(store) + => On().InState(ExpectedState.New).GetId(cmd => new TestId(cmd.Id)).Act((test, _) => test.Process()); } record DoIt(string Id); @@ -31,7 +29,8 @@ class TestAggregate : Aggregate { record TestState : State; record TestId : Id { - public TestId(string value) : base(value) { } + public TestId(string value) + : base(value) { } } record TestEvent; diff --git a/src/Diagnostics/test/Eventuous.Tests.OpenTelemetry/Fixtures/IntegrationFixture.cs b/src/Diagnostics/test/Eventuous.Tests.OpenTelemetry/Fixtures/IntegrationFixture.cs index de36f6e0..aff3b815 100644 --- a/src/Diagnostics/test/Eventuous.Tests.OpenTelemetry/Fixtures/IntegrationFixture.cs +++ b/src/Diagnostics/test/Eventuous.Tests.OpenTelemetry/Fixtures/IntegrationFixture.cs @@ -8,13 +8,11 @@ namespace Eventuous.Tests.OpenTelemetry.Fixtures; public class IntegrationFixture : IAsyncLifetime { - public IEventStore EventStore { get; set; } = null!; - public IAggregateStore AggregateStore { get; set; } = null!; - public EventStoreClient Client { get; private set; } = null!; - public Fixture Auto { get; } = new(); + public IEventStore EventStore { get; set; } = null!; + public EventStoreClient Client { get; private set; } = null!; + public Fixture Auto { get; } = new(); EventStoreDbContainer _esdbContainer = null!; - // readonly ActivityListener _listener = DummyActivityListener.Create(); IEventSerializer Serializer { get; } = new DefaultEventSerializer( new JsonSerializerOptions(JsonSerializerDefaults.Web) @@ -32,7 +30,7 @@ public virtual async Task InitializeAsync() { var settings = EventStoreClientSettings.Create(_esdbContainer.GetConnectionString()); Client = new EventStoreClient(settings); EventStore = new TracedEventStore(new EsdbEventStore(Client)); - AggregateStore = new AggregateStore(EventStore); + new AggregateStore(EventStore); } public async Task DisposeAsync() { diff --git a/src/EventStore/src/Eventuous.EventStore/EsdbEventStore.cs b/src/EventStore/src/Eventuous.EventStore/EsdbEventStore.cs index d44fa753..df205f2c 100644 --- a/src/EventStore/src/Eventuous.EventStore/EsdbEventStore.cs +++ b/src/EventStore/src/Eventuous.EventStore/EsdbEventStore.cs @@ -204,6 +204,7 @@ Func getException } catch (Exception ex) { var (message, args) = getError(); + // ReSharper disable once TemplateIsNotCompileTimeConstantProblem _logger?.LogWarning(ex, message, args); throw getException(stream, ex); } diff --git a/src/EventStore/src/Eventuous.EventStore/Subscriptions/AllStreamSubscription.cs b/src/EventStore/src/Eventuous.EventStore/Subscriptions/AllStreamSubscription.cs index da2a75bd..be1b5cfe 100644 --- a/src/EventStore/src/Eventuous.EventStore/Subscriptions/AllStreamSubscription.cs +++ b/src/EventStore/src/Eventuous.EventStore/Subscriptions/AllStreamSubscription.cs @@ -83,7 +83,7 @@ protected override async ValueTask Subscribe(CancellationToken cancellationToken Subscription = await EventStoreClient.SubscribeToAllAsync( fromAll, - (subscription, @event, ct) => HandleEvent(subscription, @event, ct), + (_, @event, ct) => HandleEvent(@event, ct), Options.ResolveLinkTos, HandleDrop, filterOptions, @@ -92,7 +92,7 @@ protected override async ValueTask Subscribe(CancellationToken cancellationToken ) .NoContext(); - async Task HandleEvent(global::EventStore.Client.StreamSubscription _, ResolvedEvent re, CancellationToken ct) + async Task HandleEvent(ResolvedEvent re, CancellationToken ct) => await HandleInternal(CreateContext(re, ct)).NoContext(); void HandleDrop(global::EventStore.Client.StreamSubscription _, SubscriptionDroppedReason reason, Exception? ex) diff --git a/src/EventStore/src/Eventuous.EventStore/Subscriptions/Diagnostics/AllStreamSubscriptionMeasure.cs b/src/EventStore/src/Eventuous.EventStore/Subscriptions/Diagnostics/AllStreamSubscriptionMeasure.cs index 04d8b830..8703623f 100644 --- a/src/EventStore/src/Eventuous.EventStore/Subscriptions/Diagnostics/AllStreamSubscriptionMeasure.cs +++ b/src/EventStore/src/Eventuous.EventStore/Subscriptions/Diagnostics/AllStreamSubscriptionMeasure.cs @@ -1,9 +1,6 @@ namespace Eventuous.EventStore.Subscriptions.Diagnostics; -class AllStreamSubscriptionMeasure : BaseSubscriptionMeasure { - public AllStreamSubscriptionMeasure(string subscriptionId, EventStoreClient eventStoreClient) - : base(subscriptionId, "$all", eventStoreClient) { } - +class AllStreamSubscriptionMeasure(string subscriptionId, EventStoreClient eventStoreClient) : BaseSubscriptionMeasure(subscriptionId, "$all", eventStoreClient) { protected override IAsyncEnumerable Read(CancellationToken cancellationToken) => EventStoreClient.ReadAllAsync( Direction.Backwards, diff --git a/src/EventStore/src/Eventuous.EventStore/Subscriptions/Diagnostics/StreamSubscriptionMeasure.cs b/src/EventStore/src/Eventuous.EventStore/Subscriptions/Diagnostics/StreamSubscriptionMeasure.cs index b270ad51..4abc5273 100644 --- a/src/EventStore/src/Eventuous.EventStore/Subscriptions/Diagnostics/StreamSubscriptionMeasure.cs +++ b/src/EventStore/src/Eventuous.EventStore/Subscriptions/Diagnostics/StreamSubscriptionMeasure.cs @@ -1,26 +1,9 @@ namespace Eventuous.EventStore.Subscriptions.Diagnostics; -class StreamSubscriptionMeasure : BaseSubscriptionMeasure { - public StreamSubscriptionMeasure( - string subscriptionId, - StreamName streamName, - EventStoreClient eventStoreClient - ) : base(subscriptionId, streamName, eventStoreClient) { - _subscriptionId = subscriptionId; - _streamName = streamName; - } - - readonly string _subscriptionId; - readonly StreamName _streamName; - +class StreamSubscriptionMeasure(string subscriptionId, StreamName streamName, EventStoreClient eventStoreClient) + : BaseSubscriptionMeasure(subscriptionId, streamName, eventStoreClient) { protected override IAsyncEnumerable Read(CancellationToken cancellationToken) - => EventStoreClient.ReadStreamAsync( - Direction.Backwards, - _streamName, - StreamPosition.End, - 1, - cancellationToken: cancellationToken - ); + => EventStoreClient.ReadStreamAsync(Direction.Backwards, streamName, StreamPosition.End, 1, cancellationToken: cancellationToken); protected override ulong GetLastPosition(ResolvedEvent resolvedEvent) => resolvedEvent.Event.EventNumber; -} \ No newline at end of file +} diff --git a/src/EventStore/src/Eventuous.EventStore/Subscriptions/Options/EventStoreSubscriptionOptions.cs b/src/EventStore/src/Eventuous.EventStore/Subscriptions/Options/EventStoreSubscriptionOptions.cs index f029dcfe..98401a51 100644 --- a/src/EventStore/src/Eventuous.EventStore/Subscriptions/Options/EventStoreSubscriptionOptions.cs +++ b/src/EventStore/src/Eventuous.EventStore/Subscriptions/Options/EventStoreSubscriptionOptions.cs @@ -7,15 +7,15 @@ public abstract record EventStoreSubscriptionOptions : SubscriptionOptions { /// /// User credentials /// - public UserCredentials? Credentials { get; set; } + public UserCredentials? Credentials { get; [PublicAPI] set; } /// /// Resolve link events /// - public bool ResolveLinkTos { get; set; } + public bool ResolveLinkTos { get; [PublicAPI] set; } /// /// Metadata serializer. If not assigned, the default one will be used. /// public IMetadataSerializer? MetadataSerializer { get; set; } -} \ No newline at end of file +} diff --git a/src/EventStore/src/Eventuous.EventStore/Subscriptions/PersistentSubscriptionBase.cs b/src/EventStore/src/Eventuous.EventStore/Subscriptions/PersistentSubscriptionBase.cs index cedf7c82..03f048ec 100644 --- a/src/EventStore/src/Eventuous.EventStore/Subscriptions/PersistentSubscriptionBase.cs +++ b/src/EventStore/src/Eventuous.EventStore/Subscriptions/PersistentSubscriptionBase.cs @@ -15,11 +15,11 @@ namespace Eventuous.EventStore.Subscriptions; /// Function type for handling event processing failures /// public delegate Task HandleEventProcessingFailure( - EventStoreClient client, - PersistentSubscription subscription, - ResolvedEvent resolvedEvent, - Exception exception -); + EventStoreClient client, + PersistentSubscription subscription, + ResolvedEvent resolvedEvent, + Exception exception + ); /// /// Base class for EventStoreDB persistent subscriptions @@ -81,13 +81,13 @@ protected override async ValueTask Subscribe(CancellationToken cancellationToken try { _subscription = await LocalSubscribe( + // ReSharper disable once ConvertClosureToMethodGroup (subscription, @event, retryCount, ct) => HandleEvent(subscription, @event, retryCount, ct), HandleDrop, cancellationToken ) .NoContext(); - } - catch (PersistentSubscriptionNotFoundException) { + } catch (PersistentSubscriptionNotFoundException) { await CreatePersistentSubscription(settings, cancellationToken); _subscription = await LocalSubscribe( @@ -115,11 +115,9 @@ async Task HandleEvent(PersistentSubscription subscription, ResolvedEvent re, in await Handler(context).NoContext(); LastProcessed = EventPosition.FromContext(context); await Ack(context).NoContext(); - } - catch (OperationCanceledException e) when (ct.IsCancellationRequested) { + } catch (OperationCanceledException e) when (ct.IsCancellationRequested) { Dropped(DropReason.Stopped, e); - } - catch (Exception e) { + } catch (Exception e) { await Nack(context, e).NoContext(); } } @@ -128,7 +126,7 @@ async Task HandleEvent(PersistentSubscription subscription, ResolvedEvent re, in /// /// Last processed event position /// - protected EventPosition? LastProcessed { get; set; } + protected EventPosition? LastProcessed { [PublicAPI] get; set; } /// /// Internal method to subscribe to a persistent subscription @@ -138,10 +136,10 @@ async Task HandleEvent(PersistentSubscription subscription, ResolvedEvent re, in /// /// protected abstract Task LocalSubscribe( - Func eventAppeared, - Action? subscriptionDropped, - CancellationToken cancellationToken - ); + Func eventAppeared, + Action? subscriptionDropped, + CancellationToken cancellationToken + ); ConcurrentQueue AckQueue { get; } = new(); @@ -218,12 +216,16 @@ protected override async ValueTask Unsubscribe(CancellationToken cancellationTok Stopping.Cancel(false); await Task.Delay(100, cancellationToken); _subscription?.Dispose(); - } - catch (Exception) { + } catch (Exception) { // It might throw } } - static Task DefaultEventProcessingFailureHandler(EventStoreClient client, PersistentSubscription subscription, ResolvedEvent resolvedEvent, Exception exception) + static Task DefaultEventProcessingFailureHandler( + EventStoreClient client, + PersistentSubscription subscription, + ResolvedEvent resolvedEvent, + Exception exception + ) => subscription.Nack(PersistentSubscriptionNakEventAction.Retry, exception.Message, resolvedEvent); } diff --git a/src/EventStore/test/Eventuous.Tests.EventStore/AppServiceTests.cs b/src/EventStore/test/Eventuous.Tests.EventStore/AppServiceTests.cs index 8f979287..b9f43096 100644 --- a/src/EventStore/test/Eventuous.Tests.EventStore/AppServiceTests.cs +++ b/src/EventStore/test/Eventuous.Tests.EventStore/AppServiceTests.cs @@ -5,7 +5,7 @@ namespace Eventuous.Tests.EventStore; public class AppServiceTests(IntegrationFixture fixture, ITestOutputHelper output) : IClassFixture, IDisposable { - readonly TestEventListener _listener = new(output); + readonly TestEventListener _listener = new(output); BookingService Service { get; } = new(fixture.AggregateStore); @@ -13,14 +13,7 @@ public class AppServiceTests(IntegrationFixture fixture, ITestOutputHelper outpu public async Task ProcessAnyForNew() { var cmd = DomainFixture.CreateImportBooking(); - var expected = new object[] { - new BookingEvents.BookingImported( - cmd.RoomId, - cmd.Price, - cmd.CheckIn, - cmd.CheckOut - ) - }; + var expected = new object[] { new BookingEvents.BookingImported(cmd.RoomId, cmd.Price, cmd.CheckIn, cmd.CheckOut) }; var handlingResult = await Service.Handle(cmd, default); handlingResult.Success.Should().BeTrue(); diff --git a/src/EventStore/test/Eventuous.Tests.EventStore/Fixtures/PersistentSubscriptionFixture.cs b/src/EventStore/test/Eventuous.Tests.EventStore/Fixtures/PersistentSubscriptionFixture.cs index 840afd70..7ee64267 100644 --- a/src/EventStore/test/Eventuous.Tests.EventStore/Fixtures/PersistentSubscriptionFixture.cs +++ b/src/EventStore/test/Eventuous.Tests.EventStore/Fixtures/PersistentSubscriptionFixture.cs @@ -24,7 +24,6 @@ protected PersistentSubscriptionFixture( T handler, bool autoStart = true ) { - IntegrationFixture1 = integrationFixture; _autoStart = autoStart; var loggerFactory = TestHelpers.Logging.GetLoggerFactory(outputHelper); @@ -51,8 +50,6 @@ protected PersistentSubscriptionFixture( protected ValueTask Stop() => Subscription.UnsubscribeWithLog(Log); - protected IntegrationFixture IntegrationFixture1 { get; } - readonly bool _autoStart; readonly LoggingEventListener _listener; diff --git a/src/EventStore/test/Eventuous.Tests.EventStore/PersistentPublishAndSubscribeManyTests.cs b/src/EventStore/test/Eventuous.Tests.EventStore/PersistentPublishAndSubscribeManyTests.cs index 96f56be1..fe1f656a 100644 --- a/src/EventStore/test/Eventuous.Tests.EventStore/PersistentPublishAndSubscribeManyTests.cs +++ b/src/EventStore/test/Eventuous.Tests.EventStore/PersistentPublishAndSubscribeManyTests.cs @@ -4,10 +4,8 @@ namespace Eventuous.Tests.EventStore; -public class PersistentPublishAndSubscribeManyTests : PersistentSubscriptionFixture { - public PersistentPublishAndSubscribeManyTests(IntegrationFixture fixture, ITestOutputHelper outputHelper) - : base(fixture, outputHelper, new TestEventHandler(), false) { } - +public class PersistentPublishAndSubscribeManyTests(IntegrationFixture fixture, ITestOutputHelper outputHelper) + : PersistentSubscriptionFixture(fixture, outputHelper, new TestEventHandler(), false) { [Fact] public async Task SubscribeAndProduceMany() { const int count = 10000; diff --git a/src/EventStore/test/Eventuous.Tests.EventStore/ProducerTracesTests.cs b/src/EventStore/test/Eventuous.Tests.EventStore/ProducerTracesTests.cs index ef6930ab..e5201949 100644 --- a/src/EventStore/test/Eventuous.Tests.EventStore/ProducerTracesTests.cs +++ b/src/EventStore/test/Eventuous.Tests.EventStore/ProducerTracesTests.cs @@ -6,7 +6,7 @@ namespace Eventuous.Tests.EventStore; -public class TracesTests : SubscriptionFixture, IClassFixture, IDisposable { +public class TracesTests : SubscriptionFixture, IDisposable { readonly ActivityListener _listener; public TracesTests(IntegrationFixture fixture, ITestOutputHelper outputHelper) diff --git a/src/Experimental/src/Eventuous.ElasticSearch/Index/IndexSetup.cs b/src/Experimental/src/Eventuous.ElasticSearch/Index/IndexSetup.cs index 199dc6ff..f953d740 100644 --- a/src/Experimental/src/Eventuous.ElasticSearch/Index/IndexSetup.cs +++ b/src/Experimental/src/Eventuous.ElasticSearch/Index/IndexSetup.cs @@ -91,6 +91,7 @@ await client.LowLevel.Indices.TemplateV2ExistsForAllAsync descriptor.Hot(GetPhase), diff --git a/src/Experimental/src/Eventuous.ElasticSearch/Store/ElasticEventStore.cs b/src/Experimental/src/Eventuous.ElasticSearch/Store/ElasticEventStore.cs index 3e70ce25..571ba118 100644 --- a/src/Experimental/src/Eventuous.ElasticSearch/Store/ElasticEventStore.cs +++ b/src/Experimental/src/Eventuous.ElasticSearch/Store/ElasticEventStore.cs @@ -3,14 +3,8 @@ namespace Eventuous.ElasticSearch.Store; -public class ElasticEventStore : IEventReader, IEventWriter { - readonly IElasticClient _client; - readonly ElasticEventStoreOptions _options; - - public ElasticEventStore(IElasticClient client, ElasticEventStoreOptions? options = null) { - _client = client; - _options = options ?? new ElasticEventStoreOptions(); - } +public class ElasticEventStore(IElasticClient client, ElasticEventStoreOptions? options = null) : IEventReader, IEventWriter { + readonly ElasticEventStoreOptions _options = options ?? new ElasticEventStoreOptions(); public async Task AppendEvents( StreamName stream, @@ -21,7 +15,7 @@ CancellationToken cancellationToken var streamName = stream.ToString(); var documents = events.Select(AsDocument).ToArray(); var bulk = new BulkDescriptor(_options.IndexName).CreateMany(documents).Refresh(Refresh.WaitFor); - var result = await _client.BulkAsync(bulk, cancellationToken); + var result = await client.BulkAsync(bulk, cancellationToken); return result.IsValid ? new AppendEventsResult(0, documents.Last().StreamPosition + 1) @@ -47,7 +41,7 @@ public async Task ReadEvents( int count, CancellationToken cancellationToken ) { - var response = await _client.SearchAsync( + var response = await client.SearchAsync( d => d .Index(_options.IndexName) .Query( diff --git a/src/Experimental/src/Eventuous.ElasticSearch/Store/ElasticSerializer.cs b/src/Experimental/src/Eventuous.ElasticSearch/Store/ElasticSerializer.cs index d954cc8f..eb2c138b 100644 --- a/src/Experimental/src/Eventuous.ElasticSearch/Store/ElasticSerializer.cs +++ b/src/Experimental/src/Eventuous.ElasticSearch/Store/ElasticSerializer.cs @@ -5,20 +5,15 @@ namespace Eventuous.ElasticSearch.Store; -public class ElasticSerializer : IElasticsearchSerializer { - readonly IElasticsearchSerializer _builtIn; - readonly JsonSerializerOptions _options; - readonly TypeMapper _typeMapper; - - public ElasticSerializer(IElasticsearchSerializer builtIn, JsonSerializerOptions? options, TypeMapper? typeMapper = null) { - _builtIn = builtIn; - _options = options ?? new JsonSerializerOptions(JsonSerializerDefaults.Web); - _typeMapper = typeMapper ?? TypeMap.Instance; - } +public class ElasticSerializer(IElasticsearchSerializer builtIn, JsonSerializerOptions? options, TypeMapper? typeMapper = null) + : IElasticsearchSerializer { + readonly JsonSerializerOptions _options = options ?? new JsonSerializerOptions(JsonSerializerDefaults.Web); + readonly TypeMapper _typeMapper = typeMapper ?? TypeMap.Instance; public object Deserialize(Type type, Stream stream) { var reader = new BinaryReader(stream); var obj = JsonSerializer.Deserialize(reader.ReadBytes((int)stream.Length), type, _options); + if (type != typeof(PersistedEvent)) return obj!; var evt = (PersistedEvent)obj!; @@ -31,7 +26,8 @@ public object Deserialize(Type type, Stream stream) { public void Serialize(T data, Stream stream, SerializationFormatting formatting = SerializationFormatting.None) { if (data is not PersistedEvent) { - _builtIn.Serialize(data, stream, formatting); + builtIn.Serialize(data, stream, formatting); + return; } @@ -48,12 +44,13 @@ public Task DeserializeAsync(Stream stream, CancellationToken cancellation => Task.FromResult(Deserialize(stream)); public Task SerializeAsync( - T data, - Stream stream, - SerializationFormatting formatting = SerializationFormatting.None, - CancellationToken cancellationToken = default - ) { + T data, + Stream stream, + SerializationFormatting formatting = SerializationFormatting.None, + CancellationToken cancellationToken = default + ) { Serialize(data, stream, formatting); + return Task.CompletedTask; } -} \ No newline at end of file +} diff --git a/src/Experimental/src/Eventuous.ElasticSearch/Store/PersistedEvent.cs b/src/Experimental/src/Eventuous.ElasticSearch/Store/PersistedEvent.cs index 68006950..6a23df38 100644 --- a/src/Experimental/src/Eventuous.ElasticSearch/Store/PersistedEvent.cs +++ b/src/Experimental/src/Eventuous.ElasticSearch/Store/PersistedEvent.cs @@ -39,11 +39,13 @@ DateTime created [Keyword] public string Stream { get; } + [PublicAPI] public ulong GlobalPosition { get; } public object? Message { get; init; } public Dictionary? Metadata { get; } [Date(Name = "@timestamp")] [JsonPropertyName("@timestamp")] + [PublicAPI] public DateTime Created { get; } } \ No newline at end of file diff --git a/src/Experimental/src/Eventuous.Spyglass/InsidePeek.cs b/src/Experimental/src/Eventuous.Spyglass/InsidePeek.cs index de8ab16c..fd9b4d49 100644 --- a/src/Experimental/src/Eventuous.Spyglass/InsidePeek.cs +++ b/src/Experimental/src/Eventuous.Spyglass/InsidePeek.cs @@ -72,7 +72,7 @@ public async Task Load(string streamName, int version) { var selectedEvents = version == -1 ? events : events.Take(version + 1); aggregate.Load(selectedEvents.Select(x => x.Payload)); - return new { aggregate.State, Events = events.Select(x => new { EventType = x.Payload!.GetType().Name, x.Payload, }) }; + return new { aggregate.State, Events = events.Select(x => new { EventType = x.Payload!.GetType().Name, x.Payload }) }; } public object[] Aggregates => AggregateInfos.Select(x => x.GetInfo()).ToArray(); diff --git a/src/Extensions/test/Eventuous.Sut.AspNetCore/BookingApi.cs b/src/Extensions/test/Eventuous.Sut.AspNetCore/BookingApi.cs index 890ef18d..d750a37b 100644 --- a/src/Extensions/test/Eventuous.Sut.AspNetCore/BookingApi.cs +++ b/src/Extensions/test/Eventuous.Sut.AspNetCore/BookingApi.cs @@ -8,9 +8,7 @@ namespace Eventuous.Sut.AspNetCore; -public class BookingApi : CommandHttpApiBase { - public BookingApi(ICommandService service, MessageMap? commandMap = null) : base(service, commandMap) { } - +public class BookingApi(ICommandService service, MessageMap? commandMap = null) : CommandHttpApiBase(service, commandMap) { [HttpPost("v2/pay")] public Task> RegisterPayment([FromBody] RegisterPaymentHttp cmd, CancellationToken cancellationToken) => Handle(cmd, cancellationToken); diff --git a/src/Extensions/test/Eventuous.Tests.AspNetCore.Web/ControllerTests.cs b/src/Extensions/test/Eventuous.Tests.AspNetCore.Web/ControllerTests.cs index d7f50924..458d415f 100644 --- a/src/Extensions/test/Eventuous.Tests.AspNetCore.Web/ControllerTests.cs +++ b/src/Extensions/test/Eventuous.Tests.AspNetCore.Web/ControllerTests.cs @@ -38,7 +38,7 @@ public async Task RecordPaymentUsingMappedCommand() { var bookRoom = _fixture.GetBookRoom(); - var bookResponse = await client.PostJsonAsync("/book", bookRoom); + await client.PostJsonAsync("/book", bookRoom); var registerPayment = new BookingApi.RegisterPaymentHttp(bookRoom.BookingId, bookRoom.RoomId, 100, DateTimeOffset.Now); diff --git a/src/Extensions/test/Eventuous.Tests.AspNetCore.Web/MappedCommandTests.cs b/src/Extensions/test/Eventuous.Tests.AspNetCore.Web/MappedCommandTests.cs index 0b625610..6f350ef2 100644 --- a/src/Extensions/test/Eventuous.Tests.AspNetCore.Web/MappedCommandTests.cs +++ b/src/Extensions/test/Eventuous.Tests.AspNetCore.Web/MappedCommandTests.cs @@ -63,7 +63,7 @@ public async Task MapAggregateContractToCommandExplicitlyWithoutRouteWithWrongGe .MapCommand(EnrichCommand) ); - Func act = () => Execute(fixture, "import3"); + var act = () => Execute(fixture, "import3"); await act.Should().ThrowAsync(); } diff --git a/src/Extensions/test/Eventuous.Tests.DependencyInjection/Sut/TestAggregate.cs b/src/Extensions/test/Eventuous.Tests.DependencyInjection/Sut/TestAggregate.cs index 20e72d1a..15e90e2e 100644 --- a/src/Extensions/test/Eventuous.Tests.DependencyInjection/Sut/TestAggregate.cs +++ b/src/Extensions/test/Eventuous.Tests.DependencyInjection/Sut/TestAggregate.cs @@ -1,15 +1,11 @@ namespace Eventuous.Tests.AspNetCore.Sut; -public class TestAggregate : Aggregate { - public TestDependency Dependency { get; } - - public TestAggregate(TestDependency dependency) => Dependency = dependency; +public class TestAggregate(TestDependency dependency) : Aggregate { + public TestDependency Dependency { get; } = dependency; } -public class AnotherTestAggregate : Aggregate { - public TestDependency Dependency { get; } - - public AnotherTestAggregate(TestDependency dependency) => Dependency = dependency; +public class AnotherTestAggregate(TestDependency dependency) : Aggregate { + public TestDependency Dependency { get; } = dependency; } public record TestState : State; diff --git a/src/Gateway/src/Eventuous.Gateway/GatewayMetaHelper.cs b/src/Gateway/src/Eventuous.Gateway/GatewayMetaHelper.cs index 414b4db0..e5d7a1ec 100644 --- a/src/Gateway/src/Eventuous.Gateway/GatewayMetaHelper.cs +++ b/src/Gateway/src/Eventuous.Gateway/GatewayMetaHelper.cs @@ -20,7 +20,7 @@ public static Metadata GetContextMeta(IMessageConsumeContext context) { [GatewayContextItems.OriginalGlobalPosition] = context.GlobalPosition, [GatewayContextItems.OriginalMessageId] = context.MessageId, [GatewayContextItems.OriginalMessageType] = context.MessageType, - [GatewayContextItems.OriginalMessageMeta] = context.Metadata, + [GatewayContextItems.OriginalMessageMeta] = context.Metadata }; return new Metadata(headers); diff --git a/src/Gateway/test/Eventuous.Tests.Gateway/RegistrationTests.cs b/src/Gateway/test/Eventuous.Tests.Gateway/RegistrationTests.cs index 17214601..959c0bf5 100644 --- a/src/Gateway/test/Eventuous.Tests.Gateway/RegistrationTests.cs +++ b/src/Gateway/test/Eventuous.Tests.Gateway/RegistrationTests.cs @@ -43,15 +43,15 @@ class TestTransform : IGatewayTransform { record TestOptions : SubscriptionOptions; - class TestSub : EventSubscription { - public TestSub(TestOptions options, ConsumePipe consumePipe) : base(options, consumePipe, NullLoggerFactory.Instance) { } - + class TestSub(TestOptions options, ConsumePipe consumePipe) : EventSubscription(options, consumePipe, NullLoggerFactory.Instance) { protected override ValueTask Subscribe(CancellationToken cancellationToken) => default; protected override ValueTask Unsubscribe(CancellationToken cancellationToken) => default; } class TestProducer : BaseProducer { + // ReSharper disable once CollectionNeverQueried.Local + // ReSharper disable once MemberCanBePrivate.Local public List ProducedMessages { get; } = new(); protected override Task ProduceMessages( diff --git a/src/GooglePubSub/src/Eventuous.GooglePubSub.CloudRun/CloudRunPubSubSubscription.cs b/src/GooglePubSub/src/Eventuous.GooglePubSub.CloudRun/CloudRunPubSubSubscription.cs index 526370a0..2637bc57 100644 --- a/src/GooglePubSub/src/Eventuous.GooglePubSub.CloudRun/CloudRunPubSubSubscription.cs +++ b/src/GooglePubSub/src/Eventuous.GooglePubSub.CloudRun/CloudRunPubSubSubscription.cs @@ -11,10 +11,8 @@ namespace Eventuous.GooglePubSub.CloudRun; -public class CloudRunPubSubSubscription : EventSubscription { - public CloudRunPubSubSubscription(CloudRunPubSubSubscriptionOptions options, ConsumePipe consumePipe, ILoggerFactory? loggerFactory) - : base(options, consumePipe, loggerFactory) { } - +public class CloudRunPubSubSubscription(CloudRunPubSubSubscriptionOptions options, ConsumePipe consumePipe, ILoggerFactory? loggerFactory) + : EventSubscription(options, consumePipe, loggerFactory) { protected override ValueTask Subscribe(CancellationToken cancellationToken) => ValueTask.CompletedTask; protected override ValueTask Unsubscribe(CancellationToken cancellationToken) => ValueTask.CompletedTask; diff --git a/src/GooglePubSub/src/Eventuous.GooglePubSub/Producers/ClientCache.cs b/src/GooglePubSub/src/Eventuous.GooglePubSub/Producers/ClientCache.cs index 1fd165c7..e2d2d0ef 100644 --- a/src/GooglePubSub/src/Eventuous.GooglePubSub/Producers/ClientCache.cs +++ b/src/GooglePubSub/src/Eventuous.GooglePubSub/Producers/ClientCache.cs @@ -24,7 +24,7 @@ public async Task GetOrAddPublisher(string topic, CancellationT async Task CreateTopicAndClient(string topicId, CancellationToken cancellationToken) { var topicName = TopicName.FromProjectTopic(_projectId, topicId); - var builder = new PublisherClientBuilder() { Logger = log }; + var builder = new PublisherClientBuilder { Logger = log }; _options.ConfigureClientBuilder?.Invoke(builder); builder.TopicName = topicName; diff --git a/src/GooglePubSub/src/Eventuous.GooglePubSub/Producers/GooglePubSubProducer.cs b/src/GooglePubSub/src/Eventuous.GooglePubSub/Producers/GooglePubSubProducer.cs index 1e029c87..fc04c637 100644 --- a/src/GooglePubSub/src/Eventuous.GooglePubSub/Producers/GooglePubSubProducer.cs +++ b/src/GooglePubSub/src/Eventuous.GooglePubSub/Producers/GooglePubSubProducer.cs @@ -26,11 +26,11 @@ public class GooglePubSubProducer : BaseProducer, IHostedP /// Optional logger instance /// Publisher client configuration action public GooglePubSubProducer( - string projectId, - IEventSerializer? serializer = null, - ILogger? log = null, - Action? configureClient = null - ) + string projectId, + IEventSerializer? serializer = null, + ILogger? log = null, + Action? configureClient = null + ) : this(new PubSubProducerOptions { ProjectId = Ensure.NotEmptyString(projectId), ConfigureClientBuilder = configureClient }, serializer, log) { } /// @@ -74,11 +74,11 @@ public async Task StopAsync(CancellationToken cancellationToken = default) { readonly ILogger? _log; protected override async Task ProduceMessages( - StreamName stream, - IEnumerable messages, - PubSubProduceOptions? options, - CancellationToken cancellationToken = default - ) { + StreamName stream, + IEnumerable messages, + PubSubProduceOptions? options, + CancellationToken cancellationToken = default + ) { var client = await _clientCache.GetOrAddPublisher(stream, cancellationToken).NoContext(); await Task.WhenAll(messages.Select(ProduceLocal)).NoContext(); @@ -103,8 +103,10 @@ PubsubMessage CreateMessage(ProducedMessage message, PubSubProduceOptions? optio Data = ByteString.CopyFrom(payload), OrderingKey = options?.OrderingKey ?? "", Attributes = { - { _attributes.ContentType, contentType }, { _attributes.EventType, eventType }, { _attributes.MessageId, message.MessageId.ToString() } - }, + { _attributes.ContentType, contentType }, + { _attributes.EventType, eventType }, + { _attributes.MessageId, message.MessageId.ToString() } + } }; if (message.Metadata != null) { diff --git a/src/GooglePubSub/src/Eventuous.GooglePubSub/Shared/PubSub.cs b/src/GooglePubSub/src/Eventuous.GooglePubSub/Shared/PubSub.cs index da6aa288..808aee22 100644 --- a/src/GooglePubSub/src/Eventuous.GooglePubSub/Shared/PubSub.cs +++ b/src/GooglePubSub/src/Eventuous.GooglePubSub/Shared/PubSub.cs @@ -15,11 +15,11 @@ public static class PubSub { public static EmulatorDetection DetectEmulator(this PublisherClient.ClientCreationSettings? value) => value?.EmulatorDetection ?? EmulatorDetection.None; public static async Task CreateTopic( - TopicName topicName, - EmulatorDetection emulatorDetection, - Action log, - CancellationToken cancellationToken - ) { + TopicName topicName, + EmulatorDetection emulatorDetection, + Action log, + CancellationToken cancellationToken + ) { var topicString = topicName.ToString(); var publisherServiceApiClient = @@ -42,17 +42,18 @@ CancellationToken cancellationToken } public static async Task CreateSubscription( - SubscriptionName subscriptionName, - TopicName topicName, - Action? configureSubscription, - EmulatorDetection emulatorDetection, - CancellationToken cancellationToken - ) { - var subName = subscriptionName.ToString(); + SubscriptionName subscriptionName, + TopicName topicName, + Action? configureSubscription, + EmulatorDetection emulatorDetection, + CancellationToken cancellationToken + ) { + var subName = subscriptionName.ToString()!; var log = Logger.Current.InfoLog; - var subscriberServiceApiClient = - await new SubscriberServiceApiClientBuilder { EmulatorDetection = emulatorDetection }.BuildAsync(cancellationToken).NoContext(); + var subscriberServiceApiClient = await new SubscriberServiceApiClientBuilder { EmulatorDetection = emulatorDetection } + .BuildAsync(cancellationToken) + .NoContext(); log?.Log("Checking subscription for topic", subName, topicName.ToString()); diff --git a/src/GooglePubSub/test/Eventuous.Tests.GooglePubSub/PubSubFixture.cs b/src/GooglePubSub/test/Eventuous.Tests.GooglePubSub/PubSubFixture.cs index 58f11649..9d7518f8 100644 --- a/src/GooglePubSub/test/Eventuous.Tests.GooglePubSub/PubSubFixture.cs +++ b/src/GooglePubSub/test/Eventuous.Tests.GooglePubSub/PubSubFixture.cs @@ -4,11 +4,11 @@ namespace Eventuous.Tests.GooglePubSub; -public partial class PubSubFixture : IAsyncLifetime { +public class PubSubFixture : IAsyncLifetime { public static string PubsubProjectId => "test-id"; public static async Task DeleteSubscription(string subscriptionId) { - var builder = new SubscriberServiceApiClientBuilder() { EmulatorDetection = EmulatorDetection.EmulatorOnly }; + var builder = new SubscriberServiceApiClientBuilder { EmulatorDetection = EmulatorDetection.EmulatorOnly }; var subscriber = await builder.BuildAsync(); var subscriptionName = SubscriptionName.FromProjectSubscription(PubsubProjectId, subscriptionId); await subscriber.DeleteSubscriptionAsync(subscriptionName); diff --git a/src/GooglePubSub/test/Eventuous.Tests.GooglePubSub/PubSubTests.cs b/src/GooglePubSub/test/Eventuous.Tests.GooglePubSub/PubSubTests.cs index a9bd6ee7..e1c688be 100644 --- a/src/GooglePubSub/test/Eventuous.Tests.GooglePubSub/PubSubTests.cs +++ b/src/GooglePubSub/test/Eventuous.Tests.GooglePubSub/PubSubTests.cs @@ -21,7 +21,8 @@ static PubSubTests() readonly string _pubsubSubscription; readonly ILogger _log; - public PubSubTests(PubSubFixture fixture, ITestOutputHelper outputHelper) { + // ReSharper disable once UnusedParameter.Local + public PubSubTests(PubSubFixture _, ITestOutputHelper outputHelper) { var loggerFactory = LoggerFactory.Create(builder => builder.SetMinimumLevel(LogLevel.Debug).AddXunit(outputHelper)); _log = loggerFactory.CreateLogger(); diff --git a/src/Kafka/src/Eventuous.Kafka/Subscriptions/KafkaBasicSubscription.cs b/src/Kafka/src/Eventuous.Kafka/Subscriptions/KafkaBasicSubscription.cs index eee33167..d47e38aa 100644 --- a/src/Kafka/src/Eventuous.Kafka/Subscriptions/KafkaBasicSubscription.cs +++ b/src/Kafka/src/Eventuous.Kafka/Subscriptions/KafkaBasicSubscription.cs @@ -7,10 +7,8 @@ namespace Eventuous.Kafka.Subscriptions; -public class KafkaBasicSubscription : EventSubscription { - public KafkaBasicSubscription(KafkaSubscriptionOptions options, ConsumePipe consumePipe, ILoggerFactory? loggerFactory) - : base(options, consumePipe, loggerFactory) { } - +public class KafkaBasicSubscription(KafkaSubscriptionOptions options, ConsumePipe consumePipe, ILoggerFactory? loggerFactory) + : EventSubscription(options, consumePipe, loggerFactory) { protected override ValueTask Subscribe(CancellationToken cancellationToken) => throw new NotImplementedException(); diff --git a/src/Kafka/test/Eventuous.Tests.Kafka/BasicProducerTests.cs b/src/Kafka/test/Eventuous.Tests.Kafka/BasicProducerTests.cs index 9ce504ad..0e4d8d33 100644 --- a/src/Kafka/test/Eventuous.Tests.Kafka/BasicProducerTests.cs +++ b/src/Kafka/test/Eventuous.Tests.Kafka/BasicProducerTests.cs @@ -25,11 +25,7 @@ public async Task ShouldProduceAndWait() { _output.WriteLine($"Topic: {topicName}"); var producer = new KafkaBasicProducer( - new KafkaProducerOptions( - new ProducerConfig { - BootstrapServers = _fixture.BootstrapServers, - } - ) + new KafkaProducerOptions(new ProducerConfig { BootstrapServers = _fixture.BootstrapServers }) ); var produced = new List(); @@ -37,13 +33,6 @@ public async Task ShouldProduceAndWait() { var events = Auto.CreateMany().ToArray(); await producer.StartAsync(default); - ValueTask OnAck(ProducedMessage msg) { - _output.WriteLine("Produced message: {0}", msg.Message); - produced.Add((TestEvent)msg.Message); - - return ValueTask.CompletedTask; - } - await producer.Produce(new StreamName(topicName), events, new Metadata(), new KafkaProduceOptions("test"), onAck: OnAck); await producer.StopAsync(default); @@ -82,6 +71,15 @@ ValueTask OnAck(ProducedMessage msg) { _output.WriteLine($"Consumed {consumed.Count} events"); consumed.Should().BeEquivalentTo(events); + + return; + + ValueTask OnAck(ProducedMessage msg) { + _output.WriteLine("Produced message: {0}", msg.Message); + produced.Add((TestEvent)msg.Message); + + return ValueTask.CompletedTask; + } } IConsumer GetConsumer(string groupId) { diff --git a/src/Mongo/src/Eventuous.Projections.MongoDB/MongoProjector.cs b/src/Mongo/src/Eventuous.Projections.MongoDB/MongoProjector.cs index 854d84eb..43e7e519 100644 --- a/src/Mongo/src/Eventuous.Projections.MongoDB/MongoProjector.cs +++ b/src/Mongo/src/Eventuous.Projections.MongoDB/MongoProjector.cs @@ -11,10 +11,8 @@ namespace Eventuous.Projections.MongoDB; using Tools; [Obsolete("Use MongoProjector instead")] -public abstract class MongoProjection : MongoProjector where T : ProjectedDocument { - protected MongoProjection(IMongoDatabase database, TypeMapper? typeMap = null) - : base(database, typeMap) { } -} +public abstract class MongoProjection(IMongoDatabase database, TypeMapper? typeMap = null) : MongoProjector(database, typeMap) + where T : ProjectedDocument; /// /// Base class for MongoDB projectors. Specify your event handlers in the constructor using On methods family. diff --git a/src/Mongo/test/Eventuous.Tests.Projections.MongoDB/Fixtures/IntegrationFixture.cs b/src/Mongo/test/Eventuous.Tests.Projections.MongoDB/Fixtures/IntegrationFixture.cs index 52e0e71d..9022f0c0 100644 --- a/src/Mongo/test/Eventuous.Tests.Projections.MongoDB/Fixtures/IntegrationFixture.cs +++ b/src/Mongo/test/Eventuous.Tests.Projections.MongoDB/Fixtures/IntegrationFixture.cs @@ -12,11 +12,10 @@ namespace Eventuous.Tests.Projections.MongoDB.Fixtures; public sealed class IntegrationFixture : IAsyncLifetime { - public IEventStore EventStore { get; set; } = null!; - public IAggregateStore AggregateStore { get; set; } = null!; - public EventStoreClient Client { get; private set; } = null!; - public IMongoDatabase Mongo { get; private set; } = null!; - public Fixture Auto { get; } = new(); + public IEventStore EventStore { get; set; } = null!; + public EventStoreClient Client { get; private set; } = null!; + public IMongoDatabase Mongo { get; private set; } = null!; + public Fixture Auto { get; } = new(); static IEventSerializer Serializer { get; } = new DefaultEventSerializer( new JsonSerializerOptions(JsonSerializerDefaults.Web) @@ -45,7 +44,7 @@ public async Task InitializeAsync() { var settings = EventStoreClientSettings.Create(_esdbContainer.GetConnectionString()); Client = new EventStoreClient(settings); EventStore = new EsdbEventStore(Client); - AggregateStore = new AggregateStore(EventStore); + new AggregateStore(EventStore); _mongoContainer = new MongoDbBuilder().Build(); await _mongoContainer.StartAsync(); diff --git a/src/Postgres/src/Eventuous.Postgresql/Extensions/RegistrationExtensions.cs b/src/Postgres/src/Eventuous.Postgresql/Extensions/RegistrationExtensions.cs index abc35143..1f437d47 100644 --- a/src/Postgres/src/Eventuous.Postgresql/Extensions/RegistrationExtensions.cs +++ b/src/Postgres/src/Eventuous.Postgresql/Extensions/RegistrationExtensions.cs @@ -23,13 +23,13 @@ public static class ServiceCollectionExtensions { /// Optional> lifetime of the data source, default is singleton /// public static IServiceCollection AddEventuousPostgres( - this IServiceCollection services, - string connectionString, - string schema, - bool initializeDatabase = false, - ServiceLifetime connectionLifetime = ServiceLifetime.Transient, - ServiceLifetime dataSourceLifetime = ServiceLifetime.Singleton - ) { + this IServiceCollection services, + string connectionString, + string schema, + bool initializeDatabase = false, + ServiceLifetime connectionLifetime = ServiceLifetime.Transient, + ServiceLifetime dataSourceLifetime = ServiceLifetime.Singleton + ) { var options = new PostgresStoreOptions { Schema = schema, ConnectionString = connectionString, @@ -69,11 +69,11 @@ public static IServiceCollection AddEventuousPostgres( } public static IServiceCollection AddEventuousPostgres( - this IServiceCollection services, - IConfiguration config, - ServiceLifetime connectionLifetime = ServiceLifetime.Transient, - ServiceLifetime dataSourceLifetime = ServiceLifetime.Singleton - ) { + this IServiceCollection services, + IConfiguration config, + ServiceLifetime connectionLifetime = ServiceLifetime.Transient, + ServiceLifetime dataSourceLifetime = ServiceLifetime.Singleton + ) { services.Configure(config); services.AddNpgsqlDataSourceCore( @@ -101,8 +101,8 @@ public static IServiceCollection AddEventuousPostgres( if (config.GetValue("postgres:initializeDatabase") == true) { services.AddHostedService( sp => { - var dataSource = sp.GetRequiredService(); - var options = sp.GetRequiredService>(); + sp.GetRequiredService(); + var options = sp.GetRequiredService>(); return new SchemaInitializer(options.Value, sp.GetRequiredService()); } @@ -113,12 +113,12 @@ public static IServiceCollection AddEventuousPostgres( } static void AddNpgsqlDataSourceCore( - this IServiceCollection services, - Func getConnectionString, - Action? configureDataSource, - ServiceLifetime connectionLifetime, - ServiceLifetime dataSourceLifetime - ) { + this IServiceCollection services, + Func getConnectionString, + Action? configureDataSource, + ServiceLifetime connectionLifetime, + ServiceLifetime dataSourceLifetime + ) { services.TryAdd( new ServiceDescriptor( typeof(NpgsqlDataSource), diff --git a/src/Postgres/src/Eventuous.Postgresql/Schema.cs b/src/Postgres/src/Eventuous.Postgresql/Schema.cs index 4ac15d01..99bf6f88 100644 --- a/src/Postgres/src/Eventuous.Postgresql/Schema.cs +++ b/src/Postgres/src/Eventuous.Postgresql/Schema.cs @@ -28,7 +28,7 @@ public class Schema { public string AddCheckpointSql => $"insert into {_schema}.checkpoints (id) values (@checkpointId)"; public string UpdateCheckpointSql => $"update {_schema}.checkpoints set position=(@position) where id=(@checkpointId)"; - readonly static Assembly Assembly = typeof(Schema).Assembly; + static readonly Assembly Assembly = typeof(Schema).Assembly; public async Task CreateSchema(NpgsqlDataSource dataSource, ILogger log, CancellationToken cancellationToken = default) { var names = Assembly.GetManifestResourceNames().Where(x => x.EndsWith(".sql")).OrderBy(x => x); diff --git a/src/Postgres/src/Eventuous.Postgresql/Subscriptions/PostgresAllStreamSubscription.cs b/src/Postgres/src/Eventuous.Postgresql/Subscriptions/PostgresAllStreamSubscription.cs index e1c727a8..d545d9a5 100644 --- a/src/Postgres/src/Eventuous.Postgresql/Subscriptions/PostgresAllStreamSubscription.cs +++ b/src/Postgres/src/Eventuous.Postgresql/Subscriptions/PostgresAllStreamSubscription.cs @@ -14,15 +14,14 @@ namespace Eventuous.Postgresql.Subscriptions; /// /// Subscription for all events in the system using PostgreSQL event store. /// -public class PostgresAllStreamSubscription : PostgresSubscriptionBase { - public PostgresAllStreamSubscription( +public class PostgresAllStreamSubscription( NpgsqlDataSource dataSource, PostgresAllStreamSubscriptionOptions options, ICheckpointStore checkpointStore, ConsumePipe consumePipe, ILoggerFactory? loggerFactory - ) : base(dataSource, options, checkpointStore, consumePipe, loggerFactory) { } - + ) + : PostgresSubscriptionBase(dataSource, options, checkpointStore, consumePipe, loggerFactory) { protected override NpgsqlCommand PrepareCommand(NpgsqlConnection connection, long start) => connection.GetCommand(Schema.ReadAllForwards) .Add("_from_position", NpgsqlDbType.Bigint, start + 1) diff --git a/src/RabbitMq/src/Eventuous.RabbitMq/Producers/ExchangeCache.cs b/src/RabbitMq/src/Eventuous.RabbitMq/Producers/ExchangeCache.cs index 79cb2deb..a3c48925 100644 --- a/src/RabbitMq/src/Eventuous.RabbitMq/Producers/ExchangeCache.cs +++ b/src/RabbitMq/src/Eventuous.RabbitMq/Producers/ExchangeCache.cs @@ -5,19 +5,16 @@ namespace Eventuous.RabbitMq.Producers; -class ExchangeCache { - public ExchangeCache(ILogger? log) - => _log = log; - +class ExchangeCache(ILogger? log) { public void EnsureExchange(string name, Action createExchange) { if (_exchanges.Contains(name)) return; try { - _log?.LogInformation("Ensuring exchange {ExchangeName}", name); + log?.LogInformation("Ensuring exchange {ExchangeName}", name); createExchange(); } catch (Exception e) { - _log?.LogError(e, "Failed to ensure exchange {ExchangeName}: {ErrorMessage}", name, e.Message); + log?.LogError(e, "Failed to ensure exchange {ExchangeName}: {ErrorMessage}", name, e.Message); throw; } @@ -25,5 +22,4 @@ public void EnsureExchange(string name, Action createExchange) { } readonly HashSet _exchanges = new(); - readonly ILogger? _log; } diff --git a/src/Redis/src/Eventuous.Redis/RedisStore.cs b/src/Redis/src/Eventuous.Redis/RedisStore.cs index 11ae04bb..ad8f70cf 100644 --- a/src/Redis/src/Eventuous.Redis/RedisStore.cs +++ b/src/Redis/src/Eventuous.Redis/RedisStore.cs @@ -21,11 +21,12 @@ public class RedisStore : IEventReader, IEventWriter { readonly IMetadataSerializer _metaSerializer; public RedisStore( - GetRedisDatabase getDatabase, - RedisStoreOptions options, - IEventSerializer? serializer = null, - IMetadataSerializer? metaSerializer = null - ) { + GetRedisDatabase getDatabase, + // ReSharper disable once UnusedParameter.Local + RedisStoreOptions options, + IEventSerializer? serializer = null, + IMetadataSerializer? metaSerializer = null + ) { _serializer = serializer ?? DefaultEventSerializer.Instance; _metaSerializer = metaSerializer ?? DefaultMetadataSerializer.Instance; _getDatabase = Ensure.NotNull(getDatabase, "Connection factory"); @@ -34,21 +35,26 @@ public RedisStore( const string ContentType = "application/json"; public async Task ReadEvents(StreamName stream, StreamReadPosition start, int count, CancellationToken cancellationToken) { - var result = await _getDatabase().StreamReadAsync(stream.ToString(), start.Value.ToRedisValue(), count).NoContext(); - if (result == null) throw new StreamNotFound(stream); + try { + var result = await _getDatabase().StreamReadAsync(stream.ToString(), start.Value.ToRedisValue(), count).NoContext(); + + if (result == null) throw new StreamNotFound(stream); - return result.Select(x => ToStreamEvent(x, _serializer, _metaSerializer)).ToArray(); + return result.Select(x => ToStreamEvent(x, _serializer, _metaSerializer)).ToArray(); + } catch (InvalidOperationException e) when (e.Message.Contains("Reading is not allowed after reader was completed") || cancellationToken.IsCancellationRequested) { + throw new OperationCanceledException("Redis read operation terminated", e, cancellationToken); + } } public Task ReadEventsBackwards(StreamName stream, int count, CancellationToken cancellationToken) => throw new NotImplementedException(); public async Task AppendEvents( - StreamName stream, - ExpectedStreamVersion expectedVersion, - IReadOnlyCollection events, - CancellationToken cancellationToken - ) { + StreamName stream, + ExpectedStreamVersion expectedVersion, + IReadOnlyCollection events, + CancellationToken cancellationToken + ) { var keys = new object[] { "append_events", 3, @@ -73,16 +79,18 @@ CancellationToken cancellationToken var streamPosition = (long)Ensure.NotNull(response?[0]); var globalPositionString = Ensure.NotNull(response?[1]).ToString(); var globalPosition = globalPositionString.AsSpan().ToULong(); + return new AppendEventsResult(globalPosition, streamPosition); - } - catch (Exception e) when (e.Message.Contains("WrongExpectedVersion")) { + } catch (Exception e) when (e.Message.Contains("WrongExpectedVersion")) { Log.UnableToAppendEvents(stream, e); + throw new AppendToStreamException(stream, e); } object[] ConvertStreamEvent(StreamEvent evt) { var data = _serializer.SerializeEvent(evt.Payload!); var meta = _metaSerializer.Serialize(evt.Metadata); + return new object[] { evt.Id.ToString(), data.EventType, AsString(data.Payload), AsString(meta) }; } diff --git a/src/Redis/src/Eventuous.Redis/Subscriptions/RedisAllStreamSubscription.cs b/src/Redis/src/Eventuous.Redis/Subscriptions/RedisAllStreamSubscription.cs index d7b838c8..d2b16b2e 100644 --- a/src/Redis/src/Eventuous.Redis/Subscriptions/RedisAllStreamSubscription.cs +++ b/src/Redis/src/Eventuous.Redis/Subscriptions/RedisAllStreamSubscription.cs @@ -13,15 +13,14 @@ namespace Eventuous.Redis.Subscriptions; using Tools; -public class RedisAllStreamSubscription : RedisSubscriptionBase { - public RedisAllStreamSubscription( +public class RedisAllStreamSubscription( GetRedisDatabase getDatabase, RedisAllStreamSubscriptionOptions options, ICheckpointStore checkpointStore, ConsumePipe consumePipe, ILoggerFactory? loggerFactory - ) : base(getDatabase, options, checkpointStore, consumePipe, loggerFactory) { } - + ) + : RedisSubscriptionBase(getDatabase, options, checkpointStore, consumePipe, loggerFactory) { protected override async Task ReadEvents(IDatabase database, long position) { var linkedEvents = await database.StreamReadAsync("_all", position.ToRedisValue(), Options.MaxPageSize).NoContext(); var persistentEvents = new List(); diff --git a/src/Redis/test/Eventuous.Tests.Redis/Fixtures/IntegrationFixture.cs b/src/Redis/test/Eventuous.Tests.Redis/Fixtures/IntegrationFixture.cs index a2fea1ab..d9cef6f0 100644 --- a/src/Redis/test/Eventuous.Tests.Redis/Fixtures/IntegrationFixture.cs +++ b/src/Redis/test/Eventuous.Tests.Redis/Fixtures/IntegrationFixture.cs @@ -10,10 +10,9 @@ namespace Eventuous.Tests.Redis.Fixtures; public sealed class IntegrationFixture : IAsyncLifetime { - public IEventWriter EventWriter { get; private set; } = null!; - public IEventReader EventReader { get; private set; } = null!; - public IAggregateStore AggregateStore { get; private set; } = null!; - public GetRedisDatabase GetDatabase { get; private set; } = null!; + public IEventWriter EventWriter { get; private set; } = null!; + public IEventReader EventReader { get; private set; } = null!; + public GetRedisDatabase GetDatabase { get; private set; } = null!; readonly ActivityListener _listener = DummyActivityListener.Create(); RedisContainer _redisContainer = null!; @@ -38,7 +37,7 @@ public async Task InitializeAsync() { var store = new RedisStore(GetDb, new RedisStoreOptions(), Serializer); EventWriter = store; EventReader = store; - AggregateStore = new AggregateStore(store, store); + new AggregateStore(store, store); return; diff --git a/src/Redis/test/Eventuous.Tests.Redis/Store/Append.cs b/src/Redis/test/Eventuous.Tests.Redis/Store/Append.cs index 2864c4f6..491849a5 100644 --- a/src/Redis/test/Eventuous.Tests.Redis/Store/Append.cs +++ b/src/Redis/test/Eventuous.Tests.Redis/Store/Append.cs @@ -3,7 +3,6 @@ namespace Eventuous.Tests.Redis.Store; -[Collection("Sequential")] public class AppendEvents(IntegrationFixture fixture) : IClassFixture { [Fact] public async Task ShouldAppendToNoStream() { diff --git a/src/Redis/test/Eventuous.Tests.Redis/Store/Read.cs b/src/Redis/test/Eventuous.Tests.Redis/Store/Read.cs index d36e8d7e..dbdc8a76 100644 --- a/src/Redis/test/Eventuous.Tests.Redis/Store/Read.cs +++ b/src/Redis/test/Eventuous.Tests.Redis/Store/Read.cs @@ -3,7 +3,6 @@ namespace Eventuous.Tests.Redis.Store; -[Collection("Sequential")] public class ReadEvents(IntegrationFixture fixture) : IClassFixture { [Fact] public async Task ShouldReadOne() { diff --git a/src/Redis/test/Eventuous.Tests.Redis/Subscriptions/SubscribeToAll.cs b/src/Redis/test/Eventuous.Tests.Redis/Subscriptions/SubscribeToAll.cs index d6670308..78108522 100644 --- a/src/Redis/test/Eventuous.Tests.Redis/Subscriptions/SubscribeToAll.cs +++ b/src/Redis/test/Eventuous.Tests.Redis/Subscriptions/SubscribeToAll.cs @@ -8,7 +8,6 @@ namespace Eventuous.Tests.Redis.Subscriptions; -[Collection("Sequential")] public class SubscribeToAll(ITestOutputHelper outputHelper) : SubscriptionFixture(outputHelper, true, false) { [Fact] public async Task ShouldConsumeProducedEvents() { @@ -31,7 +30,7 @@ public async Task ShouldUseExistingCheckpoint() { var (_, result) = await GenerateAndProduceEvents(count); Handler.AssertThat().Any(_ => true); - var checkpoint = await CheckpointStore.GetLastCheckpoint(SubscriptionId, default); + await CheckpointStore.GetLastCheckpoint(SubscriptionId, default); Logger.ConfigureIfNull(SubscriptionId, LoggerFactory); await CheckpointStore.StoreCheckpoint(new Checkpoint(SubscriptionId, result.GlobalPosition), true, default); diff --git a/src/Redis/test/Eventuous.Tests.Redis/Subscriptions/SubscribeToStream.cs b/src/Redis/test/Eventuous.Tests.Redis/Subscriptions/SubscribeToStream.cs index 51ba33e6..87e7de80 100644 --- a/src/Redis/test/Eventuous.Tests.Redis/Subscriptions/SubscribeToStream.cs +++ b/src/Redis/test/Eventuous.Tests.Redis/Subscriptions/SubscribeToStream.cs @@ -8,10 +8,7 @@ namespace Eventuous.Tests.Redis.Subscriptions; -[Collection("Sequential")] -public class SubscribeToStream : SubscriptionFixture { - public SubscribeToStream(ITestOutputHelper outputHelper) : base(outputHelper, false, false) { } - +public class SubscribeToStream(ITestOutputHelper outputHelper) : SubscriptionFixture(outputHelper, false, false) { [Fact] public async Task ShouldConsumeProducedEvents() { const int count = 10; diff --git a/src/SqlServer/src/Eventuous.SqlServer/Schema.cs b/src/SqlServer/src/Eventuous.SqlServer/Schema.cs index 206521b3..28984de2 100644 --- a/src/SqlServer/src/Eventuous.SqlServer/Schema.cs +++ b/src/SqlServer/src/Eventuous.SqlServer/Schema.cs @@ -5,23 +5,19 @@ namespace Eventuous.SqlServer; -public class Schema { +public class Schema(string schema = Schema.DefaultSchema) { public const string DefaultSchema = "eventuous"; - - readonly string _schema; - - public Schema(string schema = DefaultSchema) => _schema = schema; - - public string AppendEvents => $"{_schema}.append_events"; - public string ReadStreamForwards => $"{_schema}.read_stream_forwards"; - public string ReadStreamSub => $"{_schema}.read_stream_sub"; - public string ReadAllForwards => $"{_schema}.read_all_forwards"; - public string CheckStream => $"{_schema}.check_stream"; - public string StreamExists => $"SELECT CAST(IIF(EXISTS(SELECT 1 FROM {_schema}.Streams WHERE StreamName = (@name)), 1, 0) AS BIT)"; - public string GetCheckpointSql => $"SELECT Position FROM {_schema}.Checkpoints where Id=(@checkpointId)"; - public string AddCheckpointSql => $"INSERT INTO {_schema}.Checkpoints (Id) VALUES ((@checkpointId))"; + + public string AppendEvents => $"{schema}.append_events"; + public string ReadStreamForwards => $"{schema}.read_stream_forwards"; + public string ReadStreamSub => $"{schema}.read_stream_sub"; + public string ReadAllForwards => $"{schema}.read_all_forwards"; + public string CheckStream => $"{schema}.check_stream"; + public string StreamExists => $"SELECT CAST(IIF(EXISTS(SELECT 1 FROM {schema}.Streams WHERE StreamName = (@name)), 1, 0) AS BIT)"; + public string GetCheckpointSql => $"SELECT Position FROM {schema}.Checkpoints where Id=(@checkpointId)"; + public string AddCheckpointSql => $"INSERT INTO {schema}.Checkpoints (Id) VALUES ((@checkpointId))"; public string UpdateCheckpointSql - => $"UPDATE {_schema}.Checkpoints set Position=(@position) where Id=(@checkpointId)"; + => $"UPDATE {schema}.Checkpoints set Position=(@position) where Id=(@checkpointId)"; static readonly Assembly Assembly = typeof(Schema).Assembly; @@ -39,7 +35,7 @@ public async Task CreateSchema(GetSqlServerConnection getConnection) { await using var stream = Assembly.GetManifestResourceStream(name); using var reader = new StreamReader(stream!); var script = await reader.ReadToEndAsync().NoContext(); - var cmdScript = script.Replace("__schema__", _schema); + var cmdScript = script.Replace("__schema__", schema); await using var cmd = new SqlCommand(cmdScript, connection, transaction); await cmd.ExecuteNonQueryAsync().NoContext(); } diff --git a/src/SqlServer/src/Eventuous.SqlServer/Subscriptions/SqlServerAllStreamSubscription.cs b/src/SqlServer/src/Eventuous.SqlServer/Subscriptions/SqlServerAllStreamSubscription.cs index c0c2d6a8..ba8b9179 100644 --- a/src/SqlServer/src/Eventuous.SqlServer/Subscriptions/SqlServerAllStreamSubscription.cs +++ b/src/SqlServer/src/Eventuous.SqlServer/Subscriptions/SqlServerAllStreamSubscription.cs @@ -14,16 +14,14 @@ namespace Eventuous.SqlServer.Subscriptions; /// /// Subscription for all events in the system using SQL Server event store. /// -public class SqlServerAllStreamSubscription : SqlServerSubscriptionBase { - public SqlServerAllStreamSubscription( - GetSqlServerConnection getConnection, - SqlServerAllStreamSubscriptionOptions options, - ICheckpointStore checkpointStore, - ConsumePipe consumePipe, - ILoggerFactory? loggerFactory = null - ) - : base(getConnection, options, checkpointStore, consumePipe, loggerFactory) { } - +public class SqlServerAllStreamSubscription( + GetSqlServerConnection getConnection, + SqlServerAllStreamSubscriptionOptions options, + ICheckpointStore checkpointStore, + ConsumePipe consumePipe, + ILoggerFactory? loggerFactory = null + ) + : SqlServerSubscriptionBase(getConnection, options, checkpointStore, consumePipe, loggerFactory) { protected override SqlCommand PrepareCommand(SqlConnection connection, long start) => connection.GetStoredProcCommand(Schema.ReadAllForwards) .Add("@from_position", SqlDbType.BigInt, start + 1) @@ -34,12 +32,7 @@ protected override long MoveStart(PersistedEvent evt) ulong _sequence; - protected override IMessageConsumeContext AsContext( - PersistedEvent evt, - object? e, - Metadata? meta, - CancellationToken cancellationToken - ) + protected override IMessageConsumeContext AsContext(PersistedEvent evt, object? e, Metadata? meta, CancellationToken cancellationToken) => new MessageConsumeContext( evt.MessageId.ToString(), evt.MessageType, diff --git a/src/SqlServer/src/Eventuous.SqlServer/Subscriptions/SqlServerStreamSubscription.cs b/src/SqlServer/src/Eventuous.SqlServer/Subscriptions/SqlServerStreamSubscription.cs index 2a38a8bd..44e1a3e4 100644 --- a/src/SqlServer/src/Eventuous.SqlServer/Subscriptions/SqlServerStreamSubscription.cs +++ b/src/SqlServer/src/Eventuous.SqlServer/Subscriptions/SqlServerStreamSubscription.cs @@ -14,16 +14,14 @@ namespace Eventuous.SqlServer.Subscriptions; /// /// Subscription for events in a single stream in SQL Server event store. /// -public class SqlServerStreamSubscription : SqlServerSubscriptionBase { - public SqlServerStreamSubscription( +public class SqlServerStreamSubscription( GetSqlServerConnection getConnection, SqlServerStreamSubscriptionOptions options, ICheckpointStore checkpointStore, ConsumePipe consumePipe, ILoggerFactory? loggerFactory = null - ) : base(getConnection, options, checkpointStore, consumePipe, loggerFactory) - => _streamName = options.Stream.ToString(); - + ) + : SqlServerSubscriptionBase(getConnection, options, checkpointStore, consumePipe, loggerFactory) { protected override SqlCommand PrepareCommand(SqlConnection connection, long start) => connection.GetStoredProcCommand(Schema.ReadStreamSub) .Add("@stream_id", SqlDbType.Int, _streamId) @@ -50,7 +48,7 @@ protected override long MoveStart(PersistedEvent evt) ulong _sequence; int _streamId; - readonly string _streamName; + readonly string _streamName = options.Stream.ToString(); protected override IMessageConsumeContext AsContext(PersistedEvent evt, object? e, Metadata? meta, CancellationToken cancellationToken) => new MessageConsumeContext( @@ -71,4 +69,4 @@ protected override IMessageConsumeContext AsContext(PersistedEvent evt, object? protected override EventPosition GetPositionFromContext(IMessageConsumeContext context) => EventPosition.FromContext(context); -} \ No newline at end of file +} diff --git a/src/SqlServer/test/Eventuous.Tests.SqlServer/Fixtures/IntegrationFixture.cs b/src/SqlServer/test/Eventuous.Tests.SqlServer/Fixtures/IntegrationFixture.cs index 06e81f7a..1908a3ad 100644 --- a/src/SqlServer/test/Eventuous.Tests.SqlServer/Fixtures/IntegrationFixture.cs +++ b/src/SqlServer/test/Eventuous.Tests.SqlServer/Fixtures/IntegrationFixture.cs @@ -12,11 +12,10 @@ namespace Eventuous.Tests.SqlServer.Fixtures; public sealed class IntegrationFixture : IAsyncLifetime { - public IEventStore EventStore { get; private set; } = null!; - public IAggregateStore AggregateStore { get; set; } = null!; - public IFixture Auto { get; } = new Fixture().Customize(new NodaTimeCustomization()); - public GetSqlServerConnection GetConnection { get; private set; } = null!; - public Faker Faker { get; } = new(); + public IEventStore EventStore { get; private set; } = null!; + public IFixture Auto { get; } = new Fixture().Customize(new NodaTimeCustomization()); + public GetSqlServerConnection GetConnection { get; private set; } = null!; + public Faker Faker { get; } = new(); public string SchemaName { get; } @@ -45,7 +44,7 @@ public async Task InitializeAsync() { await schema.CreateSchema(GetConnection); DefaultEventSerializer.SetDefaultSerializer(Serializer); EventStore = new SqlServerStore(GetConnection, new SqlServerStoreOptions(SchemaName), Serializer); - AggregateStore = new AggregateStore(EventStore); + new AggregateStore(EventStore); ActivitySource.AddActivityListener(_listener); return; diff --git a/test/Eventuous.Sut.App/BookingService.cs b/test/Eventuous.Sut.App/BookingService.cs index 528016bb..f372dd5f 100644 --- a/test/Eventuous.Sut.App/BookingService.cs +++ b/test/Eventuous.Sut.App/BookingService.cs @@ -6,33 +6,26 @@ namespace Eventuous.Sut.App; public class BookingService : CommandService { public BookingService(IAggregateStore store, StreamNameMap? streamNameMap = null) : base(store, streamNameMap: streamNameMap) { - OnNewAsync( - cmd => new BookingId(cmd.BookingId), - (booking, cmd, _) - => { - booking.BookRoom( - cmd.RoomId, - new StayPeriod(cmd.CheckIn, cmd.CheckOut), - new Money(cmd.Price) - ); + On() + .InState(ExpectedState.New) + .GetId(cmd => new BookingId(cmd.BookingId)) + .ActAsync( + (booking, cmd, _) + => { + booking.BookRoom(cmd.RoomId, new StayPeriod(cmd.CheckIn, cmd.CheckOut), new Money(cmd.Price)); - return Task.CompletedTask; - } - ); + return Task.CompletedTask; + } + ); - OnAny( - cmd => new BookingId(cmd.BookingId), - (booking, cmd) - => booking.Import( - cmd.RoomId, - new StayPeriod(cmd.CheckIn, cmd.CheckOut), - new Money(cmd.Price) - ) - ); + On() + .InState(ExpectedState.New) + .GetId(cmd => new BookingId(cmd.BookingId)) + .Act((booking, cmd) => booking.Import(cmd.RoomId, new StayPeriod(cmd.CheckIn, cmd.CheckOut), new Money(cmd.Price))); - OnExisting( - cmd => cmd.BookingId, - (booking, cmd) => booking.RecordPayment(cmd.PaymentId, cmd.Amount, cmd.PaidAt) - ); + On() + .InState(ExpectedState.Existing) + .GetId(cmd => cmd.BookingId) + .Act((booking, cmd) => booking.RecordPayment(cmd.PaymentId, cmd.Amount, cmd.PaidAt)); } -} \ No newline at end of file +} diff --git a/test/Eventuous.Sut.App/Commands.cs b/test/Eventuous.Sut.App/Commands.cs index afc0a17b..2b34a18c 100644 --- a/test/Eventuous.Sut.App/Commands.cs +++ b/test/Eventuous.Sut.App/Commands.cs @@ -1,5 +1,7 @@ using Eventuous.Sut.Domain; using NodaTime; +// ReSharper disable AutoPropertyCanBeMadeGetOnly.Global +// ReSharper disable UnusedAutoPropertyAccessor.Global namespace Eventuous.Sut.App; diff --git a/test/Eventuous.Sut.Domain/Money.cs b/test/Eventuous.Sut.Domain/Money.cs index a195aa6c..1256b5e6 100644 --- a/test/Eventuous.Sut.Domain/Money.cs +++ b/test/Eventuous.Sut.Domain/Money.cs @@ -6,11 +6,11 @@ namespace Eventuous.Sut.Domain; public record Money(float Amount, string Currency = "EUR") { public static Money operator +(Money left, Money right) { if (left.Currency != right.Currency) throw new InvalidOperationException("Currencies must match"); - return new Money(left.Amount + right.Amount, left.Currency); + return left with { Amount = left.Amount + right.Amount }; } public static Money operator -(Money left, Money right) { if (left.Currency != right.Currency) throw new InvalidOperationException("Currencies must match"); - return new Money(left.Amount - right.Amount, left.Currency); + return left with { Amount = left.Amount - right.Amount }; } } diff --git a/test/Eventuous.TestHelpers/EventStoreDbContainerBuilder.cs b/test/Eventuous.TestHelpers/EventStoreDbContainerBuilder.cs index 65ec194d..ce4af3e0 100644 --- a/test/Eventuous.TestHelpers/EventStoreDbContainerBuilder.cs +++ b/test/Eventuous.TestHelpers/EventStoreDbContainerBuilder.cs @@ -31,7 +31,7 @@ public EventStoreDbContainerBuilder() /// Initializes a new instance of the class. /// /// The Docker resource configuration. - private EventStoreDbContainerBuilder(EventStoreDbConfiguration dockerResourceConfiguration) + EventStoreDbContainerBuilder(EventStoreDbConfiguration dockerResourceConfiguration) : base(dockerResourceConfiguration) { DockerResourceConfiguration = dockerResourceConfiguration; } diff --git a/test/Eventuous.TestHelpers/Fakes/InMemoryEventStore.cs b/test/Eventuous.TestHelpers/Fakes/InMemoryEventStore.cs index 4f2b610b..b5ea5e3b 100644 --- a/test/Eventuous.TestHelpers/Fakes/InMemoryEventStore.cs +++ b/test/Eventuous.TestHelpers/Fakes/InMemoryEventStore.cs @@ -87,9 +87,7 @@ InMemoryStream FindStream(StreamName stream) { return existing; } - class NotFound : Exception { - public NotFound(StreamName stream) : base($"Stream not found: {stream}") { } - } + class NotFound(StreamName stream) : Exception($"Stream not found: {stream}"); } record StoredEvent(StreamEvent Event, int Position);