Skip to content

Commit

Permalink
Tests
Browse files Browse the repository at this point in the history
  • Loading branch information
SebastianStehle committed Jan 15, 2025
1 parent 74e315f commit 6861093
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 13 deletions.
14 changes: 12 additions & 2 deletions events/Squidex.Events.EntityFramework/EFEventStore_Writer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,18 @@ public async Task AppendAsync(Guid commitId, string streamName, long expectedVer

try
{
commit.Position = await adapter.GetPositionAsync(context, ct);
await context.SaveChangesAsync(ct);
await using var transaction = await context.Database.BeginTransactionAsync(ct);
try
{
commit.Position = await adapter.GetPositionAsync(context, ct);
await context.SaveChangesAsync(ct);
await transaction.CommitAsync(ct);
}
catch (Exception)
{
await transaction.RollbackAsync(ct);
throw;
}
}
catch
{
Expand Down
6 changes: 3 additions & 3 deletions events/Squidex.Events.GetEventStore/GetEventStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public async IAsyncEnumerable<StoredEvent> QueryAllReverseAsync(StreamFilter fil
}

var streamName = await projectionClient.CreateProjectionAsync(filter, true, ct);
var streamEvents = QueryReverseAsync(streamName, ESStreamPosition.End, int.MaxValue, ct);
var streamEvents = QueryReverseAsync(streamName, take, ct);

var query = streamEvents
.IgnoreNotFound(ct)
Expand Down Expand Up @@ -115,13 +115,13 @@ private IAsyncEnumerable<StoredEvent> QueryAsync(string streamName, ESStreamPosi
return result.Select(x => Formatter.Read(x, options.Value.Prefix));
}

private IAsyncEnumerable<StoredEvent> QueryReverseAsync(string streamName, ESStreamPosition start, long count,
private IAsyncEnumerable<StoredEvent> QueryReverseAsync(string streamName, long count,
CancellationToken ct = default)
{
var result = client.ReadStreamAsync(
Direction.Backwards,
streamName,
start,
ESStreamPosition.End,
count,
true,
cancellationToken: ct);
Expand Down
12 changes: 7 additions & 5 deletions events/Squidex.Events.Tests/EventStoreTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,7 @@ private static EventData CreateEventData(int i)
return new EventData($"Type{i}", headers, i.ToString(CultureInfo.InvariantCulture));
}

private async Task<IReadOnlyList<StoredEvent>?> QueryWithSubscriptionAsync(
private async Task<IReadOnlyList<StoredEvent>> QueryWithSubscriptionAsync(
IEventStore sut,
StreamFilter streamFilter,
int expectedCount,
Expand Down Expand Up @@ -546,14 +546,16 @@ private static EventData CreateEventData(int i)
if (subscriber.LastEvents.Count >= expectedCount)
{
subscriptionPosition = subscriber.LastPosition;

return subscriber.LastEvents;
}
}

cts.Token.ThrowIfCancellationRequested();
return null;
}

return subscriber.LastEvents;
}
catch (OperationCanceledException)
{
return subscriber.LastEvents;
}
finally
{
Expand Down
10 changes: 7 additions & 3 deletions events/Squidex.Events/PollingSubscription.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ namespace Squidex.Events;
public sealed class PollingSubscription : IEventSubscription
{
private readonly CompletionTimer timer;
#pragma warning disable IDE0052 // Remove unread private members
private int eventsTotal;
#pragma warning restore IDE0052 // Remove unread private members

public PollingSubscription(
IEventStore eventStore,
Expand All @@ -29,16 +32,17 @@ public PollingSubscription(
{
while (true)
{
var hasAddedEvent = false;
var eventsInAttempt = 0;
await foreach (var storedEvent in eventStore.QueryAllAsync(streamFilter, streamPosition, ct: ct))
{
await eventSubscriber.OnNextAsync(this, storedEvent);

streamPosition = storedEvent.EventPosition;
hasAddedEvent = true;
eventsInAttempt++;
eventsTotal++;
}

if (!hasAddedEvent)
if (eventsInAttempt == 0)
{
break;
}
Expand Down

0 comments on commit 6861093

Please sign in to comment.