Skip to content

Commit

Permalink
UF-2022 Add back fill acceptance tests
Browse files Browse the repository at this point in the history
  • Loading branch information
henrikvindshoj committed Nov 8, 2024
1 parent 7374c27 commit 6b01d32
Showing 1 changed file with 85 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,81 @@ public async Task Events_Are_Ordered_By_Insert_Order_And_Ignoring_Timestamp()
Assert.Empty(thirdPageResult);
}

[Fact]
public async Task Back_Fill_OutBox()
{
// Existing events - before back fill and changefeed is started
var timestamp = DateTimeOffset.Now.AddDays(-1);
var oldEventSourceEntry1 = new EventSourceDto { AggregateId = Guid.NewGuid(), Sequence = 0, Data = "0", Timestamp = timestamp };
var oldEventSourceEntry2 = new EventSourceDto { AggregateId = Guid.NewGuid(), Sequence = 0, Data = "1", Timestamp = timestamp };
await using var oldEventsTransaction = _connection.BeginTransaction();
await InsertIntoEventSource(oldEventSourceEntry1, _connection, oldEventsTransaction);
await InsertIntoEventSource(oldEventSourceEntry2, _connection, oldEventsTransaction);
await oldEventsTransaction.CommitAsync();

// Starting to back fill outbox
await using var backFillTransaction = _connection.BeginTransaction();
await InsertIntoOutbox(oldEventSourceEntry1, _connection, backFillTransaction);
await InsertIntoOutbox(oldEventSourceEntry2, _connection, backFillTransaction);
await backFillTransaction.CommitAsync();

// Starting changefeed - Simulate live events at the same time as back filling is running
timestamp = DateTimeOffset.Now;
var liveEvent1 = new EventSourceDto { AggregateId = Guid.NewGuid(), Sequence = 0, Data = "2", Timestamp = timestamp };
var liveEvent2 = new EventSourceDto { AggregateId = Guid.NewGuid(), Sequence = 0, Data = "3", Timestamp = timestamp };
await InsertIntoDatabaseInTransaction(liveEvent1, _connection);
await InsertIntoDatabaseInTransaction(liveEvent2, _connection);

// Continue to back fill outbox after changefeed is started
await using var continueBackFillTransaction = _connection.BeginTransaction();
await InsertIntoOutbox(liveEvent1, _connection, continueBackFillTransaction);
await InsertIntoOutbox(liveEvent2, _connection, continueBackFillTransaction);
await continueBackFillTransaction.CommitAsync();

// Running changefeed - Back filling is stopped
timestamp = DateTimeOffset.Now;
var liveEvent3 = new EventSourceDto { AggregateId = Guid.NewGuid(), Sequence = 0, Data = "4", Timestamp = timestamp };
var liveEvent4 = new EventSourceDto { AggregateId = Guid.NewGuid(), Sequence = 0, Data = "5", Timestamp = timestamp };
await InsertIntoDatabaseInTransaction(liveEvent3, _connection);
await InsertIntoDatabaseInTransaction(liveEvent4, _connection);

// First read determines the order, so wait until back filling is done and stopped
// There should be a small overlap on back filling and live events processing to ensure all
// events are added to the outbox
var result = await ReadFeed(_startCursor, 100, _connection);

var feedResult = result.ToList();
Assert.Equal(6, feedResult.Count);
Assert.Equal("0", feedResult[0].Data);
Assert.Equal("1", feedResult[1].Data);
Assert.Equal("2", feedResult[2].Data);
Assert.Equal("3", feedResult[3].Data);
Assert.Equal("4", feedResult[4].Data);
Assert.Equal("5", feedResult[5].Data);
}

[Fact]
public async Task Ignore_Inserting_Into_Outbox_If_It_Already_Exists()
{
var eventSourceDto = new EventSourceDto { AggregateId = Guid.NewGuid(), Sequence = 0, Data = "0", Timestamp = DateTimeOffset.Now };
await InsertIntoDatabaseInTransaction(eventSourceDto, _connection);

await using var transaction = _connection.BeginTransaction();
await InsertIntoOutbox(eventSourceDto, _connection, transaction);
await transaction.CommitAsync();

var result = await ReadFeed(_startCursor, 10, _connection);

Assert.NotNull(result);
var feedResult = result.ToList();
Assert.Single(feedResult);
Assert.Equal(eventSourceDto.AggregateId, feedResult.Single().AggregateId);
Assert.Equal(eventSourceDto.Sequence, feedResult.Single().Sequence);
Assert.Equal(eventSourceDto.Data, feedResult.Single().Data);
Assert.Equal(eventSourceDto.Timestamp, feedResult.Single().Timestamp);
Assert.NotEqual(_startCursor, feedResult.Single().Ulid);
}

private static async Task InsertIntoDatabaseInTransaction(
EventSourceDto eventSourceDto,
SqlConnection connection)
Expand All @@ -115,8 +190,16 @@ private static async Task InsertIntoOutbox(
{
const string insertIntoOutboxStatement =
"""
INSERT INTO [changefeed].[outbox:dbo.EventSource] (shard_id, time_hint, AggregateId, Sequence)
VALUES (0, @TimeHint, @AggregateId, @Sequence);
IF NOT EXISTS (
SELECT 1
FROM [changefeed].[outbox:dbo.EventSource]
WHERE
AggregateId = @AggregateId
AND Sequence = @Sequence)
BEGIN
INSERT INTO [changefeed].[outbox:dbo.EventSource] (shard_id, time_hint, AggregateId, Sequence)
VALUES (0, @TimeHint, @AggregateId, @Sequence);
END;
""";

var results = await connection.ExecuteAsync(
Expand Down

0 comments on commit 6b01d32

Please sign in to comment.