-
-
Notifications
You must be signed in to change notification settings - Fork 522
/
Copy pathAggregateStreamExtensions.cs
38 lines (31 loc) · 1.03 KB
/
AggregateStreamExtensions.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
using Core.Events;
using Core.EventStoreDB.Serialization;
using Core.Projections;
using EventStore.Client;
namespace Core.EventStoreDB.Events;
public static class AggregateStreamExtensions
{
public static async Task<T?> AggregateStream<T>(
this EventStoreClient eventStore,
Guid id,
CancellationToken cancellationToken,
ulong? fromVersion = null
) where T : class, IProjection
{
var readResult = eventStore.ReadStreamAsync(
Direction.Forwards,
StreamNameMapper.ToStreamId<T>(id),
fromVersion ?? StreamPosition.Start,
cancellationToken: cancellationToken
);
if (await readResult.ReadState.ConfigureAwait(false) == ReadState.StreamNotFound)
return null;
var aggregate = (T)Activator.CreateInstance(typeof(T), true)!;
await foreach (var @event in readResult)
{
var eventData = @event.Deserialize();
aggregate.Apply(eventData!);
}
return aggregate;
}
}