Skip to content

Commit

Permalink
Add an option to resolve store when handling a command (#216)
Browse files Browse the repository at this point in the history
* Add an option to resolve store when handling a command
* New command/func service API
  • Loading branch information
alexeyzimarev authored Aug 22, 2023
1 parent ed2c62e commit a520bbe
Show file tree
Hide file tree
Showing 103 changed files with 1,219 additions and 967 deletions.
4 changes: 3 additions & 1 deletion .github/workflows/code_quality.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ jobs:
with:
fetch-depth: 0
- name: 'Qodana Scan'
uses: JetBrains/[email protected]
uses: JetBrains/[email protected]
with:
pr-mode: false
env:
QODANA_TOKEN: ${{ secrets.QODANA_TOKEN }}
8 changes: 4 additions & 4 deletions Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@
<PackageVersion Include="RabbitMQ.Client" Version="6.5.0" />
<PackageVersion Include="Microsoft.Data.SqlClient" Version="5.1.1" />
<PackageVersion Include="NEST" Version="7.17.5" />
<PackageVersion Include="Polly" Version="7.2.3" />
<PackageVersion Include="Newtonsoft.Json" Version="13.0.1" />
<PackageVersion Include="Polly" Version="7.2.4" />
<PackageVersion Include="Newtonsoft.Json" Version="13.0.3" />
<PackageVersion Include="StackExchange.Redis" Version="2.6.90" />
<PackageVersion Include="Testcontainers.EventStoreDb" Version="3.4.0" />
<PackageVersion Include="Testcontainers.Kafka" Version="3.4.0" />
Expand All @@ -55,7 +55,7 @@
</ItemGroup>
<ItemGroup Label="References for packable projects">
<PackageVersion Include="MinVer" Version="4.3.0" PrivateAssets="All" />
<PackageVersion Include="JetBrains.Annotations" Version="2022.3.1" PrivateAssets="All" />
<PackageVersion Include="JetBrains.Annotations" Version="2023.2.0" PrivateAssets="All" />
<PackageVersion Include="Microsoft.SourceLink.GitHub" Version="1.1.1" PrivateAssets="All" />
</ItemGroup>
<ItemGroup Label="References for test projects">
Expand All @@ -73,7 +73,7 @@
<PackageVersion Include="RestSharp" Version="110.2.0" />
<PackageVersion Include="Hypothesist" Version="2.1.55" />
<PackageVersion Include="NodaTime" Version="3.1.9" />
<PackageVersion Include="NodaTime.Serialization.SystemTextJson" Version="1.0.0" />
<PackageVersion Include="NodaTime.Serialization.SystemTextJson" Version="1.1.0" />
<PackageVersion Include="MicroElements.AutoFixture.NodaTime" Version="1.0.0" />
<PackageVersion Include="MongoDb.Bson.NodaTime" Version="3.0.0" />
<PackageVersion Include="Testcontainers" Version="3.4.0" />
Expand Down
2 changes: 1 addition & 1 deletion src/Benchmarks/Benchmarks/GapDetectionBenchmarks.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public class GapDetectionBenchmarks {
public void Setup() {
_store = new NoOpCheckpointStore();

_store.CheckpointStored += (sender, checkpoint) => Console.WriteLine(checkpoint);
_store.CheckpointStored += (_, checkpoint) => Console.WriteLine(checkpoint);

var numbers = Enumerable.Range(1, 1000).ToList();
numbers.RemoveAll(x => x % 10 == 0);
Expand Down
1 change: 1 addition & 0 deletions src/Benchmarks/Benchmarks/TypeMapBenchmark.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ public class TypeMapBenchmark {
KeyValuePair<string, Type>[] _types = null!;

[Params(5, 20, 100)]
// ReSharper disable once UnusedAutoPropertyAccessor.Global
public int TypesCount { get; set; }

[GlobalSetup]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
// Copyright (C) Ubiquitous AS.All rights reserved
// Licensed under the Apache License, Version 2.0.

namespace Eventuous;

public abstract class CommandHandlerBuilder<TAggregate, TState, TId>
where TAggregate : Aggregate<TState> where TId : Id where TState : State<TState>, new() {
internal abstract RegisteredHandler<TAggregate, TId> Build();
}

/// <summary>
/// Builds a command handler for a specific command type. You would not need to instantiate this class directly,
/// use <see cref="CommandService{TAggregate,TState,TId}.On{TCommand}" /> function.
/// </summary>
/// <param name="store">Default aggregate store instance for the command service</param>
/// <typeparam name="TCommand">Command type</typeparam>
/// <typeparam name="TAggregate">Aggregate type</typeparam>
/// <typeparam name="TState">State of the aggregate type</typeparam>
/// <typeparam name="TId">Identity of the aggregate type</typeparam>
public class CommandHandlerBuilder<TCommand, TAggregate, TState, TId>(IAggregateStore? store) : CommandHandlerBuilder<TAggregate, TState, TId>
where TCommand : class
where TAggregate : Aggregate<TState>, new()
where TState : State<TState>, new()
where TId : Id {
GetIdFromUntypedCommand<TId>? _getId;
HandleUntypedCommand<TAggregate>? _action;
ResolveStore<TCommand>? _resolveStore;
ExpectedState _expectedState = ExpectedState.Any;

/// <summary>
/// Set the expected aggregate state for the command handler.
/// If the aggregate won't be in the expected state, the command handler will return an error.
/// The default is <see cref="ExpectedState.Any" />.
/// </summary>
/// <param name="expectedState">Expected aggregate state</param>
/// <returns></returns>
public CommandHandlerBuilder<TCommand, TAggregate, TState, TId> InState(ExpectedState expectedState) {
_expectedState = expectedState;

return this;
}

/// <summary>
/// Defines how the aggregate id is extracted from the command.
/// </summary>
/// <param name="getId">A function to get the aggregate id from the command.</param>
/// <returns></returns>
public CommandHandlerBuilder<TCommand, TAggregate, TState, TId> GetId(GetIdFromCommand<TId, TCommand> getId) {
_getId = getId.AsGetId();

return this;
}

/// <summary>
/// Defines how the aggregate id is extracted from the command, asynchronously.
/// </summary>
/// <param name="getId">A function to get the aggregate id from the command.</param>
/// <returns></returns>
public CommandHandlerBuilder<TCommand, TAggregate, TState, TId> GetIdAsync(GetIdFromCommandAsync<TId, TCommand> getId) {
_getId = getId.AsGetId();

return this;
}

/// <summary>
/// Defines how the aggregate is acted upon by the command.
/// </summary>
/// <param name="action">A function that executes an operation on an aggregate</param>
/// <returns></returns>
public CommandHandlerBuilder<TCommand, TAggregate, TState, TId> Act(ActOnAggregate<TAggregate, TCommand> action) {
_action = action.AsAct();

return this;
}

/// <summary>
/// Defines how the aggregate is acted upon by the command, asynchronously.
/// </summary>
/// <param name="action">A function that executes an asynchronous operation on an aggregate</param>
/// <returns></returns>
public CommandHandlerBuilder<TCommand, TAggregate, TState, TId> ActAsync(ActOnAggregateAsync<TAggregate, TCommand> action) {
_action = action.AsAct();

return this;
}

/// <summary>
/// Defines how the aggregate store is resolved from the command. It is optional. If not defined, the default
/// aggregate store of the command service will be used.
/// </summary>
/// <param name="resolveStore"></param>
/// <returns></returns>
public CommandHandlerBuilder<TCommand, TAggregate, TState, TId> ResolveStore(ResolveStore<TCommand>? resolveStore) {
_resolveStore = resolveStore;

return this;
}

internal override RegisteredHandler<TAggregate, TId> Build() {
return new RegisteredHandler<TAggregate, TId>(
_expectedState,
Ensure.NotNull(_getId, $"Function to get the aggregate id from {typeof(TCommand).Name} is not defined"),
Ensure.NotNull(_action, $"Function to act on the aggregate for command {typeof(TCommand).Name} is not defined"),
(_resolveStore ?? DefaultResolve()).AsResolveStore()
);
}

ResolveStore<TCommand> DefaultResolve() {
ArgumentNullException.ThrowIfNull(store, nameof(store));

return _ => store;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Copyright (C) Ubiquitous AS. All rights reserved
// Licensed under the Apache License, Version 2.0.

using System.Reflection;

namespace Eventuous;

using static Diagnostics.ApplicationEventSource;

public delegate Task ActOnAggregateAsync<in TAggregate, in TCommand>(TAggregate aggregate, TCommand command, CancellationToken cancellationToken)
where TAggregate : Aggregate;

public delegate void ActOnAggregate<in TAggregate, in TCommand>(TAggregate aggregate, TCommand command) where TAggregate : Aggregate;

delegate ValueTask<T> HandleUntypedCommand<T>(T aggregate, object command, CancellationToken cancellationToken) where T : Aggregate;

public delegate Task<TId> GetIdFromCommandAsync<TId, in TCommand>(TCommand command, CancellationToken cancellationToken) where TId : Id where TCommand : class;

public delegate TId GetIdFromCommand<out TId, in TCommand>(TCommand command) where TId : Id where TCommand : class;

delegate ValueTask<TId> GetIdFromUntypedCommand<TId>(object command, CancellationToken cancellationToken) where TId : Id;

public delegate IAggregateStore ResolveStore<in TCommand>(TCommand command) where TCommand : class;

delegate IAggregateStore ResolveStoreFromCommand(object command);

record RegisteredHandler<T, TId>(
ExpectedState ExpectedState,
GetIdFromUntypedCommand<TId> GetId,
HandleUntypedCommand<T> Handler,
ResolveStoreFromCommand ResolveStore
) where T : Aggregate where TId : Id;

class HandlersMap<TAggregate, TId> where TAggregate : Aggregate where TId : Id {
readonly TypeMap<RegisteredHandler<TAggregate, TId>> _typeMap = new();

static readonly MethodInfo AddHandlerInternalMethod =
typeof(HandlersMap<TAggregate, TId>).GetMethod(nameof(AddHandlerInternal), BindingFlags.NonPublic | BindingFlags.Instance)!;

internal void AddHandlerInternal<TCommand>(RegisteredHandler<TAggregate, TId> handler) {
try {
_typeMap.Add<TCommand>(handler);
Log.CommandHandlerRegistered<TCommand>();
} catch (Exceptions.DuplicateTypeException<TCommand>) {
Log.CommandHandlerAlreadyRegistered<TCommand>();

throw new Exceptions.CommandHandlerAlreadyRegistered<TCommand>();
}
}

internal void AddHandlerUntyped(Type command, RegisteredHandler<TAggregate, TId> handler)
=> AddHandlerInternalMethod.MakeGenericMethod(command).Invoke(this, new object?[] { handler });

public bool TryGet<TCommand>([NotNullWhen(true)] out RegisteredHandler<TAggregate, TId>? handler) => _typeMap.TryGetValue<TCommand>(out handler);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright (C) Ubiquitous AS.All rights reserved
// Licensed under the Apache License, Version 2.0.

namespace Eventuous;

static class CommandHandlingDelegateExtensions {
public static GetIdFromUntypedCommand<TId> AsGetId<TId, TCommand>(this GetIdFromCommandAsync<TId, TCommand> getId) where TId : Id where TCommand : class
=> async (cmd, ct) => await getId((TCommand)cmd, ct);

public static GetIdFromUntypedCommand<TId> AsGetId<TId, TCommand>(this GetIdFromCommand<TId, TCommand> getId) where TId : Id where TCommand : class
=> (cmd, _) => ValueTask.FromResult(getId((TCommand)cmd));

public static HandleUntypedCommand<TAggregate> AsAct<TAggregate, TCommand>(this ActOnAggregateAsync<TAggregate, TCommand> act) where TAggregate : Aggregate
=> async (aggregate, cmd, ct) => {
await act(aggregate, (TCommand)cmd, ct).NoContext();

return aggregate;
};

public static HandleUntypedCommand<TAggregate> AsAct<TAggregate, TCommand>(this ActOnAggregate<TAggregate, TCommand> act) where TAggregate : Aggregate
=> (aggregate, cmd, _) => {
act(aggregate, (TCommand)cmd);

return ValueTask.FromResult(aggregate);
};

public static ResolveStoreFromCommand AsResolveStore<TCommand>(this ResolveStore<TCommand> resolveStore) where TCommand : class
=> cmd => resolveStore((TCommand)cmd);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Copyright (C) Ubiquitous AS. All rights reserved
// Licensed under the Apache License, Version 2.0.

namespace Eventuous;

public abstract partial class CommandService<TAggregate, TState, TId> {
/// <summary>
/// Register an asynchronous handler for a command, which is expected to create a new aggregate instance.
/// </summary>
/// <param name="getId">A function to get the aggregate id from the command</param>
/// <param name="action">Asynchronous action to be performed on the aggregate, given the aggregate instance and the command</param>
/// <param name="resolveStore">Resolve aggregate store from the command</param>
/// <typeparam name="TCommand">Command type</typeparam>
[Obsolete("Use On<TCommand>().InState(ExpectedState.New).GetId(...).ActAsync(...).ResolveStore(...) instead")]
protected void OnNewAsync<TCommand>(
GetIdFromCommand<TId, TCommand> getId,
ActOnAggregateAsync<TAggregate, TCommand> action,
ResolveStore<TCommand>? resolveStore = null
) where TCommand : class
=> On<TCommand>().InState(ExpectedState.New).GetId(getId).ActAsync(action).ResolveStore(resolveStore);

/// <summary>
/// Register an asynchronous handler for a command, which is expected to use an existing aggregate instance.
/// </summary>
/// <param name="getId">A function to get the aggregate id from the command</param>
/// <param name="action">Asynchronous action to be performed on the aggregate, given the aggregate instance and the command</param>
/// <param name="resolveStore">Resolve aggregate store from the command</param>
/// <typeparam name="TCommand">Command type</typeparam>
[Obsolete("Use On<TCommand>().InState(ExpectedState.Existing).GetId(...).ActAsync(...).ResolveStore(...) instead")]
[PublicAPI]
protected void OnExistingAsync<TCommand>(
GetIdFromCommand<TId, TCommand> getId,
ActOnAggregateAsync<TAggregate, TCommand> action,
ResolveStore<TCommand>? resolveStore = null
) where TCommand : class
=> On<TCommand>().InState(ExpectedState.Existing).GetId(getId).ActAsync(action).ResolveStore(resolveStore);

/// <summary>
/// Register an asynchronous handler for a command, which is expected to use an existing aggregate instance.
/// </summary>
/// <param name="getId">Asynchronous function to get the aggregate id from the command</param>
/// <param name="action">Asynchronous action to be performed on the aggregate, given the aggregate instance and the command</param>
/// <param name="resolveStore">Resolve aggregate store from the command</param>
/// <typeparam name="TCommand">Command type</typeparam>
[Obsolete("Use On<TCommand>().InState(ExpectedState.Existing).GetIdAsync(...).ActAsync(...).ResolveStore(...) instead")]
[PublicAPI]
protected void OnExistingAsync<TCommand>(
GetIdFromCommandAsync<TId, TCommand> getId,
ActOnAggregateAsync<TAggregate, TCommand> action,
ResolveStore<TCommand>? resolveStore = null
) where TCommand : class
// => _handlers.AddHandler(ExpectedState.Existing, getId, action, resolveStore ?? DefaultResolve<TCommand>());
=> On<TCommand>().InState(ExpectedState.Existing).GetIdAsync(getId).ActAsync(action).ResolveStore(resolveStore);

/// <summary>
/// Register an asynchronous handler for a command, which is expected to use an a new or an existing aggregate instance.
/// </summary>
/// <param name="getId">A function to get the aggregate id from the command</param>
/// <param name="action">Asynchronous action to be performed on the aggregate, given the aggregate instance and the command</param>
/// <param name="resolveStore">Resolve aggregate store from the command</param>
/// <typeparam name="TCommand">Command type</typeparam>
[Obsolete("Use On<TCommand>().InState(ExpectedState.Any).GetId(...).ActAsync(...).ResolveStore(...) instead")]
[PublicAPI]
protected void OnAnyAsync<TCommand>(
GetIdFromCommand<TId, TCommand> getId,
ActOnAggregateAsync<TAggregate, TCommand> action,
ResolveStore<TCommand>? resolveStore = null
) where TCommand : class
// => _handlers.AddHandler(ExpectedState.Any, getId, action, resolveStore ?? DefaultResolve<TCommand>());
=> On<TCommand>().InState(ExpectedState.Any).GetId(getId).ActAsync(action).ResolveStore(resolveStore);

/// <summary>
/// Register an asynchronous handler for a command, which is expected to use an a new or an existing aggregate instance.
/// </summary>
/// <param name="getId">Asynchronous function to get the aggregate id from the command</param>
/// <param name="action">Asynchronous action to be performed on the aggregate, given the aggregate instance and the command</param>
/// <param name="resolveStore">Resolve aggregate store from the command</param>
/// <typeparam name="TCommand">Command type</typeparam>
[Obsolete("Use On<TCommand>().InState(ExpectedState.Any).GetIdAsync(...).ActAsync(...).ResolveStore(...) instead")]
[PublicAPI]
protected void OnAnyAsync<TCommand>(
GetIdFromCommandAsync<TId, TCommand> getId,
ActOnAggregateAsync<TAggregate, TCommand> action,
ResolveStore<TCommand>? resolveStore = null
) where TCommand : class
// => _handlers.AddHandler(ExpectedState.Any, getId, action, resolveStore ?? DefaultResolve<TCommand>());
=> On<TCommand>().InState(ExpectedState.Any).GetIdAsync(getId).ActAsync(action).ResolveStore(resolveStore);
}
Loading

0 comments on commit a520bbe

Please sign in to comment.