Skip to content

Commit

Permalink
Added LoadState to the same interface (for now)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexeyzimarev committed May 20, 2021
1 parent 6e40c94 commit c9c6da2
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 5 deletions.
24 changes: 21 additions & 3 deletions src/Eventuous/AggregateState.cs
Original file line number Diff line number Diff line change
@@ -1,13 +1,31 @@
using System;
using System.Collections.Generic;
using JetBrains.Annotations;

namespace Eventuous {
public abstract record AggregateState<T> where T : AggregateState<T> {
public abstract T When(object @event);
public virtual T When(object @event) {
var eventType = @event.GetType();
if (!_handlers.ContainsKey(eventType)) return (T) this;

return _handlers[eventType](@event);
}

[PublicAPI]
protected void On<TEvent>(Func<TEvent, T> handle) {
if (!_handlers.TryAdd(typeof(TEvent), x => handle((TEvent) x))) {
throw new InvalidOperationException($"Duplicate handler for {typeof(TEvent).Name}");
}
}

readonly Dictionary<Type, Func<object, T>> _handlers = new();
}

public abstract record AggregateState<T, TId> : AggregateState<T>
where T : AggregateState<T, TId>
where TId : AggregateId {
public TId Id { get; protected init; } = null!;

internal T SetId(TId id) => (T) this with { Id = id };
}
}
}
25 changes: 23 additions & 2 deletions src/Eventuous/AggregateStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,30 @@ void Fold(StreamEvent streamEvent) {

aggregate!.Fold(evt);
}
}

public async Task<T> LoadState<T, TId>(StreamName stream, CancellationToken cancellationToken)
where T : AggregateState<T, TId>, new() where TId : AggregateId {
var state = new T();

try {
await _eventStore.ReadStream(stream, StreamReadPosition.Start, Fold, cancellationToken).Ignore();
}
catch (Exceptions.StreamNotFound e) {
throw new Exceptions.StreamNotFound(stream);
}

object? Deserialize(StreamEvent streamEvent)
=> _serializer.Deserialize(streamEvent.Data.AsSpan(), streamEvent.EventType);
return state;

void Fold(StreamEvent streamEvent) {
var evt = Deserialize(streamEvent);
if (evt == null) return;

state = state.When(evt);
}
}

object? Deserialize(StreamEvent streamEvent)
=> _serializer.Deserialize(streamEvent.Data.AsSpan(), streamEvent.EventType);
}
}
3 changes: 3 additions & 0 deletions src/Eventuous/IAggregateStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,8 @@ public interface IAggregateStore {
Task Store<T>(T entity, CancellationToken cancellationToken) where T : Aggregate;

Task<T> Load<T>(string id, CancellationToken cancellationToken) where T : Aggregate, new();

Task<T> LoadState<T, TId>(StreamName stream, CancellationToken cancellationToken)
where T : AggregateState<T, TId>, new() where TId : AggregateId;
}
}

0 comments on commit c9c6da2

Please sign in to comment.