Skip to content

Commit

Permalink
Moved LoadState to StateStore
Browse files Browse the repository at this point in the history
  • Loading branch information
alexeyzimarev committed May 21, 2021
1 parent c9c6da2 commit aac753c
Show file tree
Hide file tree
Showing 10 changed files with 157 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@

namespace Eventuous.EventStoreDB {
[PublicAPI]
public class EsDbEventStore : IEventStore {
public class EsdbEventStore : IEventStore {
readonly EventStoreClient _client;

public EsDbEventStore(EventStoreClient client) => _client = Ensure.NotNull(client, nameof(client));
public EsdbEventStore(EventStoreClient client) => _client = Ensure.NotNull(client, nameof(client));

public EsDbEventStore(EventStoreClientSettings clientSettings)
public EsdbEventStore(EventStoreClientSettings clientSettings)
: this(new EventStoreClient(Ensure.NotNull(clientSettings, nameof(clientSettings)))) { }

public async Task AppendEvents(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ public GooglePubSubSubscription(
loggerFactory,
measure
) {
_options = options;

_failureHandler = Ensure.NotNull(options, nameof(options)).FailureHandler
?? DefaultEventProcessingErrorHandler;

Expand Down Expand Up @@ -131,7 +133,7 @@ async Task<Reply> Handle(PubsubMessage msg, CancellationToken ct) {
var contentType = msg.Attributes["contentType"];

var evt = DeserializeData(contentType, eventType, msg.Data.ToByteArray(), _topicName.TopicId);

var receivedEvent = new ReceivedEvent(
msg.MessageId,
eventType,
Expand Down
25 changes: 2 additions & 23 deletions src/Eventuous/AggregateStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,30 +56,9 @@ void Fold(StreamEvent streamEvent) {

aggregate!.Fold(evt);
}
}

public async Task<T> LoadState<T, TId>(StreamName stream, CancellationToken cancellationToken)
where T : AggregateState<T, TId>, new() where TId : AggregateId {
var state = new T();

try {
await _eventStore.ReadStream(stream, StreamReadPosition.Start, Fold, cancellationToken).Ignore();
}
catch (Exceptions.StreamNotFound e) {
throw new Exceptions.StreamNotFound(stream);
}

return state;

void Fold(StreamEvent streamEvent) {
var evt = Deserialize(streamEvent);
if (evt == null) return;

state = state.When(evt);
}
object? Deserialize(StreamEvent streamEvent)
=> _serializer.Deserialize(streamEvent.Data.AsSpan(), streamEvent.EventType);
}

object? Deserialize(StreamEvent streamEvent)
=> _serializer.Deserialize(streamEvent.Data.AsSpan(), streamEvent.EventType);
}
}
18 changes: 18 additions & 0 deletions src/Eventuous/AggregateStoreExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
using System.Threading;
using System.Threading.Tasks;
using JetBrains.Annotations;

namespace Eventuous {
[PublicAPI]
public static class AggregateStoreExtensions {
public static Task<T> Load<T, TState, TId>(
this IAggregateStore store,
TId id,
CancellationToken cancellationToken
)
where T : Aggregate<TState, TId>, new()
where TState : AggregateState<TState, TId>, new()
where TId : AggregateId
=> store.Load<T>(id.ToString(), cancellationToken);
}
}
80 changes: 50 additions & 30 deletions src/Eventuous/ApplicationService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ public abstract class ApplicationService<T, TState, TId>
protected void OnNew<TCommand>(Func<TCommand, TId> getId, Action<T, TCommand> action) where TCommand : class {
_handlers.Add(
typeof(TCommand),
new RegisteredHandler<T>(ExpectedState.New, (aggregate, cmd) => AsTask(aggregate!, cmd, action))
new RegisteredHandler<T>(ExpectedState.New, (aggregate, cmd, _) => AsTask(aggregate!, cmd, action))
);

_getId.TryAdd(typeof(TCommand), cmd => new ValueTask<TId>(getId((TCommand) cmd)));
_getId.TryAdd(typeof(TCommand), (cmd, _) => new ValueTask<TId>(getId((TCommand) cmd)));
}

/// <summary>
Expand All @@ -45,14 +45,17 @@ protected void OnNew<TCommand>(Func<TCommand, TId> getId, Action<T, TCommand> ac
/// <param name="action">Asynchronous action to be performed on the aggregate,
/// given the aggregate instance and the command</param>
/// <typeparam name="TCommand">Command type</typeparam>
protected void OnNewAsync<TCommand>(Func<TCommand, TId> getId, Func<T, TCommand, Task> action)
protected void OnNewAsync<TCommand>(
Func<TCommand, TId> getId,
Func<T, TCommand, CancellationToken, Task> action
)
where TCommand : class {
_handlers.Add(
typeof(TCommand),
new RegisteredHandler<T>(ExpectedState.New, (aggregate, cmd) => AsTask(aggregate!, cmd, action))
new RegisteredHandler<T>(ExpectedState.New, (aggregate, cmd, ct) => AsTask(aggregate!, cmd, ct, action))
);

_getId.TryAdd(typeof(TCommand), cmd => new ValueTask<TId>(getId((TCommand) cmd)));
_getId.TryAdd(typeof(TCommand), (cmd, _) => new ValueTask<TId>(getId((TCommand) cmd)));
}

/// <summary>
Expand All @@ -65,10 +68,10 @@ protected void OnExisting<TCommand>(Func<TCommand, TId> getId, Action<T, TComman
where TCommand : class {
_handlers.Add(
typeof(TCommand),
new RegisteredHandler<T>(ExpectedState.Existing, (aggregate, cmd) => AsTask(aggregate!, cmd, action))
new RegisteredHandler<T>(ExpectedState.Existing, (aggregate, cmd, _) => AsTask(aggregate!, cmd, action))
);

_getId.TryAdd(typeof(TCommand), cmd => new ValueTask<TId>(getId((TCommand) cmd)));
_getId.TryAdd(typeof(TCommand), (cmd, _) => new ValueTask<TId>(getId((TCommand) cmd)));
}

/// <summary>
Expand All @@ -78,14 +81,20 @@ protected void OnExisting<TCommand>(Func<TCommand, TId> getId, Action<T, TComman
/// <param name="action">Asynchronous action to be performed on the aggregate,
/// given the aggregate instance and the command</param>
/// <typeparam name="TCommand">Command type</typeparam>
protected void OnExistingAsync<TCommand>(Func<TCommand, TId> getId, Func<T, TCommand, Task> action)
protected void OnExistingAsync<TCommand>(
Func<TCommand, TId> getId,
Func<T, TCommand, CancellationToken, Task> action
)
where TCommand : class {
_handlers.Add(
typeof(TCommand),
new RegisteredHandler<T>(ExpectedState.Existing, (aggregate, cmd) => AsTask(aggregate!, cmd, action))
new RegisteredHandler<T>(
ExpectedState.Existing,
(aggregate, cmd, ct) => AsTask(aggregate!, cmd, ct, action)
)
);

_getId.TryAdd(typeof(TCommand), cmd => new ValueTask<TId>(getId((TCommand) cmd)));
_getId.TryAdd(typeof(TCommand), (cmd, _) => new ValueTask<TId>(getId((TCommand) cmd)));
}

/// <summary>
Expand All @@ -99,10 +108,10 @@ protected void OnAny<TCommand>(Func<TCommand, TId> getId, Action<T, TCommand> ac
where TCommand : class {
_handlers.Add(
typeof(TCommand),
new RegisteredHandler<T>(ExpectedState.Any, (aggregate, cmd) => AsTask(aggregate!, cmd, action))
new RegisteredHandler<T>(ExpectedState.Any, (aggregate, cmd, _) => AsTask(aggregate!, cmd, action))
);

_getId.TryAdd(typeof(TCommand), cmd => new ValueTask<TId>(getId((TCommand) cmd)));
_getId.TryAdd(typeof(TCommand), (cmd, _) => new ValueTask<TId>(getId((TCommand) cmd)));
}

/// <summary>
Expand All @@ -112,14 +121,17 @@ protected void OnAny<TCommand>(Func<TCommand, TId> getId, Action<T, TCommand> ac
/// <param name="action">Asynchronous action to be performed on the aggregate,
/// given the aggregate instance and the command</param>
/// <typeparam name="TCommand">Command type</typeparam>
protected void OnAnyAsync<TCommand>(Func<TCommand, TId> getId, Func<T, TCommand, Task> action)
protected void OnAnyAsync<TCommand>(
Func<TCommand, TId> getId,
Func<T, TCommand, CancellationToken, Task> action
)
where TCommand : class {
_handlers.Add(
typeof(TCommand),
new RegisteredHandler<T>(ExpectedState.Any, (aggregate, cmd) => AsTask(aggregate!, cmd, action))
new RegisteredHandler<T>(ExpectedState.Any, (aggregate, cmd, ct) => AsTask(aggregate!, cmd, ct, action))
);

_getId.TryAdd(typeof(TCommand), cmd => new ValueTask<TId>(getId((TCommand) cmd)));
_getId.TryAdd(typeof(TCommand), (cmd, _) => new ValueTask<TId>(getId((TCommand) cmd)));
}

/// <summary>
Expand All @@ -129,14 +141,14 @@ protected void OnAnyAsync<TCommand>(Func<TCommand, TId> getId, Func<T, TCommand,
/// <param name="action">Action to be performed on the aggregate,
/// given the aggregate instance and the command</param>
/// <typeparam name="TCommand">Command type</typeparam>
protected void OnAny<TCommand>(Func<TCommand, Task<TId>> getId, Action<T, TCommand> action)
protected void OnAny<TCommand>(Func<TCommand, CancellationToken, Task<TId>> getId, Action<T, TCommand> action)
where TCommand : class {
_handlers.Add(
typeof(TCommand),
new RegisteredHandler<T>(ExpectedState.Any, (aggregate, cmd) => AsTask(aggregate!, cmd, action))
new RegisteredHandler<T>(ExpectedState.Any, (aggregate, cmd, _) => AsTask(aggregate!, cmd, action))
);

_getId.TryAdd(typeof(TCommand), async cmd => await getId((TCommand) cmd).Ignore());
_getId.TryAdd(typeof(TCommand), async (cmd, ct) => await getId((TCommand) cmd, ct).Ignore());
}

/// <summary>
Expand All @@ -146,27 +158,30 @@ protected void OnAny<TCommand>(Func<TCommand, Task<TId>> getId, Action<T, TComma
/// <param name="action">Asynchronous action to be performed on the aggregate,
/// given the aggregate instance and the command</param>
/// <typeparam name="TCommand">Command type</typeparam>
protected void OnAnyAsync<TCommand>(Func<TCommand, Task<TId>> getId, Func<T, TCommand, Task> action)
protected void OnAnyAsync<TCommand>(
Func<TCommand, CancellationToken, Task<TId>> getId,
Func<T, TCommand, CancellationToken, Task> action
)
where TCommand : class {
_handlers.Add(
typeof(TCommand),
new RegisteredHandler<T>(ExpectedState.Any, (aggregate, cmd) => AsTask(aggregate!, cmd, action))
new RegisteredHandler<T>(ExpectedState.Any, (aggregate, cmd, ct) => AsTask(aggregate!, cmd, ct, action))
);

_getId.TryAdd(typeof(TCommand), async cmd => await getId((TCommand) cmd).Ignore());
_getId.TryAdd(typeof(TCommand), async (cmd, ct) => await getId((TCommand) cmd, ct).Ignore());
}

/// <summary>
/// Register an asynchronous handler for a command, which can figure out the aggregate instance by itself, and then return one.
/// </summary>
/// <param name="action">Function, which returns some aggregate instance to store</param>
/// <typeparam name="TCommand">Command type</typeparam>
protected void OnAsync<TCommand>(Func<TCommand, Task<T>> action) where TCommand : class
protected void OnAsync<TCommand>(Func<TCommand, CancellationToken, Task<T>> action) where TCommand : class
=> _handlers.Add(
typeof(TCommand),
new RegisteredHandler<T>(
ExpectedState.Unknown,
async (_, cmd) => await action((TCommand) cmd)
async (_, cmd, ct) => await action((TCommand) cmd, ct)
)
);

Expand All @@ -175,8 +190,13 @@ static ValueTask<T> AsTask<TCommand>(T aggregate, object cmd, Action<T, TCommand
return new ValueTask<T>(aggregate);
}

static async ValueTask<T> AsTask<TCommand>(T aggregate, object cmd, Func<T, TCommand, Task> action) {
await action(aggregate, (TCommand) cmd).Ignore();
static async ValueTask<T> AsTask<TCommand>(
T aggregate,
object cmd,
CancellationToken cancellationToken,
Func<T, TCommand, CancellationToken, Task> action
) {
await action(aggregate, (TCommand) cmd, cancellationToken).Ignore();
return aggregate;
}

Expand All @@ -197,7 +217,7 @@ CancellationToken cancellationToken
throw new Exceptions.CommandHandlerNotFound(typeof(TCommand));
}

var id = await _getId[typeof(TCommand)](command).Ignore();
var id = await _getId[typeof(TCommand)](command, cancellationToken).Ignore();

var aggregate = registeredHandler.ExpectedState switch {
ExpectedState.Any => await TryLoad().Ignore(),
Expand All @@ -210,13 +230,13 @@ CancellationToken cancellationToken
)
};

var result = await registeredHandler.Handler(aggregate, command).Ignore();
var result = await registeredHandler.Handler(aggregate, command, cancellationToken).Ignore();

await Store.Store(result, cancellationToken).Ignore();

return new OkResult<T, TState, TId>(result.State, result.Changes);

Task<T> Load() => Store.Load<T>(id, cancellationToken);
Task<T> Load() => Store.Load<T, TState, TId>(id, cancellationToken);

async Task<T> TryLoad() {
try {
Expand All @@ -235,11 +255,11 @@ T Create() {
}
}

record RegisteredHandler<T>(ExpectedState ExpectedState, Func<T?, object, ValueTask<T>> Handler);
record RegisteredHandler<T>(ExpectedState ExpectedState, Func<T?, object, CancellationToken, ValueTask<T>> Handler);

class HandlersMap<T> : Dictionary<Type, RegisteredHandler<T>> { }

class IdMap<T> : Dictionary<Type, Func<object, ValueTask<T>>> { }
class IdMap<T> : Dictionary<Type, Func<object, CancellationToken, ValueTask<T>>> { }

enum ExpectedState {
New,
Expand Down
24 changes: 20 additions & 4 deletions src/Eventuous/IAggregateStore.cs
Original file line number Diff line number Diff line change
@@ -1,14 +1,30 @@
using System.Threading;
using System.Threading.Tasks;
using JetBrains.Annotations;

namespace Eventuous
{
/// <summary>
/// Aggregate state persistent store
/// </summary>
[PublicAPI]
public interface IAggregateStore {
Task Store<T>(T entity, CancellationToken cancellationToken) where T : Aggregate;
/// <summary>
/// Store the new or updated aggregate state
/// </summary>
/// <param name="aggregate">Aggregate instance, which needs to be persisted</param>
/// <param name="cancellationToken">Cancellation token</param>
/// <typeparam name="T">Aggregate type</typeparam>
/// <returns></returns>
Task Store<T>(T aggregate, CancellationToken cancellationToken) where T : Aggregate;

/// <summary>
/// Load the aggregate from the store for a given id
/// </summary>
/// <param name="id">Aggregate id</param>
/// <param name="cancellationToken">Cancellation token</param>
/// <typeparam name="T">Aggregate type</typeparam>
/// <returns></returns>
Task<T> Load<T>(string id, CancellationToken cancellationToken) where T : Aggregate, new();

Task<T> LoadState<T, TId>(StreamName stream, CancellationToken cancellationToken)
where T : AggregateState<T, TId>, new() where TId : AggregateId;
}
}
23 changes: 23 additions & 0 deletions src/Eventuous/IStateStore.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
using System.Threading;
using System.Threading.Tasks;
using JetBrains.Annotations;

namespace Eventuous {
/// <summary>
/// State store allows loading an aggregate state from any stream. The idea is to be able to load a different
/// version of the state than used in the aggregate itself, like an in-memory projection.
/// </summary>
[PublicAPI]
public interface IStateStore {
/// <summary>
/// Load the aggregate state from the store, without initialising the aggregate itself
/// </summary>
/// <param name="stream">Aggregate event</param>
/// <param name="cancellationToken"></param>
/// <typeparam name="T"></typeparam>
/// <typeparam name="TId"></typeparam>
/// <returns></returns>
Task<T> LoadState<T, TId>(StreamName stream, CancellationToken cancellationToken)
where T : AggregateState<T, TId>, new() where TId : AggregateId;
}
}
36 changes: 36 additions & 0 deletions src/Eventuous/StateStore.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using JetBrains.Annotations;

namespace Eventuous {
[PublicAPI]
public class StateStore : IStateStore {
readonly IEventStore _eventStore;
readonly IEventSerializer _serializer;

public StateStore(IEventStore eventStore, IEventSerializer serializer) {
_eventStore = Ensure.NotNull(eventStore, nameof(eventStore));
_serializer = Ensure.NotNull(serializer, nameof(serializer));
}

public async Task<T> LoadState<T, TId>(StreamName stream, CancellationToken cancellationToken)
where T : AggregateState<T, TId>, new() where TId : AggregateId {
var state = new T();

await _eventStore.ReadStream(stream, StreamReadPosition.Start, Fold, cancellationToken).Ignore();

return state;

void Fold(StreamEvent streamEvent) {
var evt = Deserialize(streamEvent);
if (evt == null) return;

state = state.When(evt);
}

object? Deserialize(StreamEvent streamEvent)
=> _serializer.Deserialize(streamEvent.Data.AsSpan(), streamEvent.EventType);
}
}
}
Loading

0 comments on commit aac753c

Please sign in to comment.