diff --git a/src/Eventuous/AggregateState.cs b/src/Eventuous/AggregateState.cs index 71e377b1..85e9e7f0 100644 --- a/src/Eventuous/AggregateState.cs +++ b/src/Eventuous/AggregateState.cs @@ -1,8 +1,26 @@ +using System; +using System.Collections.Generic; +using JetBrains.Annotations; + namespace Eventuous { public abstract record AggregateState where T : AggregateState { - public abstract T When(object @event); + public virtual T When(object @event) { + var eventType = @event.GetType(); + if (!_handlers.ContainsKey(eventType)) return (T) this; + + return _handlers[eventType](@event); + } + + [PublicAPI] + protected void On(Func handle) { + if (!_handlers.TryAdd(typeof(TEvent), x => handle((TEvent) x))) { + throw new InvalidOperationException($"Duplicate handler for {typeof(TEvent).Name}"); + } + } + + readonly Dictionary> _handlers = new(); } - + public abstract record AggregateState : AggregateState where T : AggregateState where TId : AggregateId { @@ -10,4 +28,4 @@ public abstract record AggregateState : AggregateState internal T SetId(TId id) => (T) this with { Id = id }; } -} +} \ No newline at end of file diff --git a/src/Eventuous/AggregateStore.cs b/src/Eventuous/AggregateStore.cs index d467c268..b3c4ba90 100644 --- a/src/Eventuous/AggregateStore.cs +++ b/src/Eventuous/AggregateStore.cs @@ -56,9 +56,30 @@ void Fold(StreamEvent streamEvent) { aggregate!.Fold(evt); } + } + + public async Task LoadState(StreamName stream, CancellationToken cancellationToken) + where T : AggregateState, new() where TId : AggregateId { + var state = new T(); + + try { + await _eventStore.ReadStream(stream, StreamReadPosition.Start, Fold, cancellationToken).Ignore(); + } + catch (Exceptions.StreamNotFound e) { + throw new Exceptions.StreamNotFound(stream); + } - object? Deserialize(StreamEvent streamEvent) - => _serializer.Deserialize(streamEvent.Data.AsSpan(), streamEvent.EventType); + return state; + + void Fold(StreamEvent streamEvent) { + var evt = Deserialize(streamEvent); + if (evt == null) return; + + state = state.When(evt); + } } + + object? Deserialize(StreamEvent streamEvent) + => _serializer.Deserialize(streamEvent.Data.AsSpan(), streamEvent.EventType); } } \ No newline at end of file diff --git a/src/Eventuous/IAggregateStore.cs b/src/Eventuous/IAggregateStore.cs index 8546342e..85a9c350 100644 --- a/src/Eventuous/IAggregateStore.cs +++ b/src/Eventuous/IAggregateStore.cs @@ -7,5 +7,8 @@ public interface IAggregateStore { Task Store(T entity, CancellationToken cancellationToken) where T : Aggregate; Task Load(string id, CancellationToken cancellationToken) where T : Aggregate, new(); + + Task LoadState(StreamName stream, CancellationToken cancellationToken) + where T : AggregateState, new() where TId : AggregateId; } }