Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Make MultiStreamProjection usable for AggregateTo<> #2890

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 87 additions & 0 deletions src/EventSourcingTests/Bugs/Bug_2889_multi_stream_aggregate_to.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
using System.Linq;
using System.Threading.Tasks;
using EventSourcingTests.Projections.MultiStreamProjections.CustomGroupers;
using Marten;
using Marten.Events;
using Marten.Events.Archiving;
using Marten.Events.Projections;
using Marten.Testing.Harness;
using Shouldly;
using Xunit;
using Xunit.Abstractions;

namespace EventSourcingTests.Bugs;

public class Bug_2889_multi_stream_aggregate_to : BugIntegrationContext
{
private readonly ITestOutputHelper _helper;

public Bug_2889_multi_stream_aggregate_to(ITestOutputHelper helper)
{
_helper = helper;
StoreOptions(x =>
{
x.Projections.Add<SimpleProjection>(ProjectionLifecycle.Inline);
});
}

[Fact]
public async Task CanAggregateWithMultiStream()
{
var idToMatch = "test";
var number = 10;

for (var i = 0; i < number; i++)
{
var stream = theSession.Events.StartStream(new SimpleEvent() { IdToMatch = idToMatch });
}

await theSession.SaveChangesAsync();
var doc = theSession.Query<SimpleAggregate>().ToList();
doc.Count.ShouldBe(1);
doc.First().Count.ShouldBe(number);
doc.First().Id.ShouldBe(idToMatch);

var aggregate = theSession.Events.QueryAllRawEvents().AggregateTo<SimpleAggregate>();
_helper.WriteLine(aggregate.ToString());
aggregate.Id.ShouldBe(idToMatch);
aggregate.Count.ShouldBe(number);

aggregate = await theSession.Events.QueryAllRawEvents().AggregateToAsync<SimpleAggregate>();
aggregate.Id.ShouldBe(idToMatch);
aggregate.Count.ShouldBe(number);
}


public record SimpleAggregate
{
public string Id { get; set; }
public int Count { get; set; } = 0;
}

public class SimpleProjection : MultiStreamProjection<SimpleAggregate, string>
{
public SimpleProjection()
{
Identity<SimpleEvent>(x => x.IdToMatch);
}

public SimpleAggregate Create(IEvent<SimpleEvent> @event) => new SimpleAggregate()
{
Count = 1,
};

public SimpleAggregate Apply(SimpleEvent simpleEvent, SimpleAggregate current)
=> current with
{
Count = current.Count + 1
};

}


public class SimpleEvent
{
public string IdToMatch { get; set; }
}
}
6 changes: 3 additions & 3 deletions src/Marten/Events/Projections/ProjectionOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -305,9 +305,9 @@ internal ILiveAggregator<T> AggregatorFor<T>() where T : class
return (ILiveAggregator<T>)aggregator;
}

private SingleStreamProjection<T> tryFindProjectionSourceForAggregateType<T>() where T : class
private GeneratedAggregateProjectionBase<T> tryFindProjectionSourceForAggregateType<T>() where T : class
{
var candidate = All.OfType<SingleStreamProjection<T>>().FirstOrDefault();
var candidate = All.OfType<GeneratedAggregateProjectionBase<T>>().FirstOrDefault();
if (candidate != null)
{
return candidate;
Expand All @@ -318,7 +318,7 @@ private SingleStreamProjection<T> tryFindProjectionSourceForAggregateType<T>() w
return new SingleStreamProjection<T>();
}

return source as SingleStreamProjection<T>;
return source as GeneratedAggregateProjectionBase<T>;
}

internal void AssertValidity(DocumentStore store)
Expand Down
Loading