From 6b01d328fd60f4d9109826546cce7a155a3fa81f Mon Sep 17 00:00:00 2001 From: henrikvindshoj Date: Fri, 8 Nov 2024 07:15:30 +0100 Subject: [PATCH 1/3] UF-2022 Add back fill acceptance tests --- .../Tests/Changefeed_Tests.cs | 87 ++++++++++++++++++- 1 file changed, 85 insertions(+), 2 deletions(-) diff --git a/dotnet/VippsMobilePay.Changefeed/VippsMobilePay.Changefeed.AcceptanceTests/Tests/Changefeed_Tests.cs b/dotnet/VippsMobilePay.Changefeed/VippsMobilePay.Changefeed.AcceptanceTests/Tests/Changefeed_Tests.cs index 89b8e4e..a02e628 100644 --- a/dotnet/VippsMobilePay.Changefeed/VippsMobilePay.Changefeed.AcceptanceTests/Tests/Changefeed_Tests.cs +++ b/dotnet/VippsMobilePay.Changefeed/VippsMobilePay.Changefeed.AcceptanceTests/Tests/Changefeed_Tests.cs @@ -90,6 +90,81 @@ public async Task Events_Are_Ordered_By_Insert_Order_And_Ignoring_Timestamp() Assert.Empty(thirdPageResult); } + [Fact] + public async Task Back_Fill_OutBox() + { + // Existing events - before back fill and changefeed is started + var timestamp = DateTimeOffset.Now.AddDays(-1); + var oldEventSourceEntry1 = new EventSourceDto { AggregateId = Guid.NewGuid(), Sequence = 0, Data = "0", Timestamp = timestamp }; + var oldEventSourceEntry2 = new EventSourceDto { AggregateId = Guid.NewGuid(), Sequence = 0, Data = "1", Timestamp = timestamp }; + await using var oldEventsTransaction = _connection.BeginTransaction(); + await InsertIntoEventSource(oldEventSourceEntry1, _connection, oldEventsTransaction); + await InsertIntoEventSource(oldEventSourceEntry2, _connection, oldEventsTransaction); + await oldEventsTransaction.CommitAsync(); + + // Starting to back fill outbox + await using var backFillTransaction = _connection.BeginTransaction(); + await InsertIntoOutbox(oldEventSourceEntry1, _connection, backFillTransaction); + await InsertIntoOutbox(oldEventSourceEntry2, _connection, backFillTransaction); + await backFillTransaction.CommitAsync(); + + // Starting changefeed - Simulate live events at the same time as back filling is running + timestamp = DateTimeOffset.Now; + var liveEvent1 = new EventSourceDto { AggregateId = Guid.NewGuid(), Sequence = 0, Data = "2", Timestamp = timestamp }; + var liveEvent2 = new EventSourceDto { AggregateId = Guid.NewGuid(), Sequence = 0, Data = "3", Timestamp = timestamp }; + await InsertIntoDatabaseInTransaction(liveEvent1, _connection); + await InsertIntoDatabaseInTransaction(liveEvent2, _connection); + + // Continue to back fill outbox after changefeed is started + await using var continueBackFillTransaction = _connection.BeginTransaction(); + await InsertIntoOutbox(liveEvent1, _connection, continueBackFillTransaction); + await InsertIntoOutbox(liveEvent2, _connection, continueBackFillTransaction); + await continueBackFillTransaction.CommitAsync(); + + // Running changefeed - Back filling is stopped + timestamp = DateTimeOffset.Now; + var liveEvent3 = new EventSourceDto { AggregateId = Guid.NewGuid(), Sequence = 0, Data = "4", Timestamp = timestamp }; + var liveEvent4 = new EventSourceDto { AggregateId = Guid.NewGuid(), Sequence = 0, Data = "5", Timestamp = timestamp }; + await InsertIntoDatabaseInTransaction(liveEvent3, _connection); + await InsertIntoDatabaseInTransaction(liveEvent4, _connection); + + // First read determines the order, so wait until back filling is done and stopped + // There should be a small overlap on back filling and live events processing to ensure all + // events are added to the outbox + var result = await ReadFeed(_startCursor, 100, _connection); + + var feedResult = result.ToList(); + Assert.Equal(6, feedResult.Count); + Assert.Equal("0", feedResult[0].Data); + Assert.Equal("1", feedResult[1].Data); + Assert.Equal("2", feedResult[2].Data); + Assert.Equal("3", feedResult[3].Data); + Assert.Equal("4", feedResult[4].Data); + Assert.Equal("5", feedResult[5].Data); + } + + [Fact] + public async Task Ignore_Inserting_Into_Outbox_If_It_Already_Exists() + { + var eventSourceDto = new EventSourceDto { AggregateId = Guid.NewGuid(), Sequence = 0, Data = "0", Timestamp = DateTimeOffset.Now }; + await InsertIntoDatabaseInTransaction(eventSourceDto, _connection); + + await using var transaction = _connection.BeginTransaction(); + await InsertIntoOutbox(eventSourceDto, _connection, transaction); + await transaction.CommitAsync(); + + var result = await ReadFeed(_startCursor, 10, _connection); + + Assert.NotNull(result); + var feedResult = result.ToList(); + Assert.Single(feedResult); + Assert.Equal(eventSourceDto.AggregateId, feedResult.Single().AggregateId); + Assert.Equal(eventSourceDto.Sequence, feedResult.Single().Sequence); + Assert.Equal(eventSourceDto.Data, feedResult.Single().Data); + Assert.Equal(eventSourceDto.Timestamp, feedResult.Single().Timestamp); + Assert.NotEqual(_startCursor, feedResult.Single().Ulid); + } + private static async Task InsertIntoDatabaseInTransaction( EventSourceDto eventSourceDto, SqlConnection connection) @@ -115,8 +190,16 @@ private static async Task InsertIntoOutbox( { const string insertIntoOutboxStatement = """ - INSERT INTO [changefeed].[outbox:dbo.EventSource] (shard_id, time_hint, AggregateId, Sequence) - VALUES (0, @TimeHint, @AggregateId, @Sequence); + IF NOT EXISTS ( + SELECT 1 + FROM [changefeed].[outbox:dbo.EventSource] + WHERE + AggregateId = @AggregateId + AND Sequence = @Sequence) + BEGIN + INSERT INTO [changefeed].[outbox:dbo.EventSource] (shard_id, time_hint, AggregateId, Sequence) + VALUES (0, @TimeHint, @AggregateId, @Sequence); + END; """; var results = await connection.ExecuteAsync( From 45ca701f8827618631bc66f98c8d65ec33e5bc56 Mon Sep 17 00:00:00 2001 From: henrikvindshoj Date: Tue, 12 Nov 2024 15:59:19 +0100 Subject: [PATCH 2/3] Fixed Back_Fill_Feed test to back fill the feed table instead of the outbox --- .../Tests/Changefeed_Tests.cs | 107 ++++++++++++++---- ...obilePay.Changefeed.AcceptanceTests.csproj | 1 + 2 files changed, 86 insertions(+), 22 deletions(-) diff --git a/dotnet/VippsMobilePay.Changefeed/VippsMobilePay.Changefeed.AcceptanceTests/Tests/Changefeed_Tests.cs b/dotnet/VippsMobilePay.Changefeed/VippsMobilePay.Changefeed.AcceptanceTests/Tests/Changefeed_Tests.cs index a02e628..02f4982 100644 --- a/dotnet/VippsMobilePay.Changefeed/VippsMobilePay.Changefeed.AcceptanceTests/Tests/Changefeed_Tests.cs +++ b/dotnet/VippsMobilePay.Changefeed/VippsMobilePay.Changefeed.AcceptanceTests/Tests/Changefeed_Tests.cs @@ -1,3 +1,4 @@ +using System.Data; using AutoFixture.Xunit2; using Dapper; using Microsoft.Data.SqlClient; @@ -76,7 +77,7 @@ public async Task Events_Are_Ordered_By_Insert_Order_And_Ignoring_Timestamp() timestamp = DateTimeOffset.Now; await InsertIntoDatabaseInTransaction(new EventSourceDto{AggregateId = Guid.NewGuid(), Sequence = 0, Data = "3", Timestamp = timestamp}, _connection); - await InsertIntoDatabaseInTransaction(new EventSourceDto{AggregateId = Guid.NewGuid(), Sequence = 0, Data = "2", Timestamp = timestamp.AddSeconds(-1)}, _connection); + await InsertIntoDatabaseInTransaction(new EventSourceDto{AggregateId = Guid.NewGuid(), Sequence = 0, Data = "2", Timestamp = timestamp}, _connection); var secondPageResult = await ReadFeed(firstPageResult.Last().Ulid, 100, _connection); combinedReturnedEntries.AddRange(secondPageResult); @@ -91,49 +92,58 @@ public async Task Events_Are_Ordered_By_Insert_Order_And_Ignoring_Timestamp() } [Fact] - public async Task Back_Fill_OutBox() + public async Task Back_Fill_Feed() { // Existing events - before back fill and changefeed is started - var timestamp = DateTimeOffset.Now.AddDays(-1); - var oldEventSourceEntry1 = new EventSourceDto { AggregateId = Guid.NewGuid(), Sequence = 0, Data = "0", Timestamp = timestamp }; - var oldEventSourceEntry2 = new EventSourceDto { AggregateId = Guid.NewGuid(), Sequence = 0, Data = "1", Timestamp = timestamp }; + //var timestamp = DateTimeOffset.Now.AddDays(-1); + var oldEventSourceEntry1 = new EventSourceDto { AggregateId = Guid.NewGuid(), Sequence = 0, Data = "0", Timestamp = DateTimeOffset.Now.AddDays(-2) }; + var oldEventSourceEntry2 = new EventSourceDto { AggregateId = Guid.NewGuid(), Sequence = 1, Data = "1", Timestamp = DateTimeOffset.Now.AddDays(-1) }; await using var oldEventsTransaction = _connection.BeginTransaction(); await InsertIntoEventSource(oldEventSourceEntry1, _connection, oldEventsTransaction); await InsertIntoEventSource(oldEventSourceEntry2, _connection, oldEventsTransaction); await oldEventsTransaction.CommitAsync(); - // Starting to back fill outbox + // Starting to back fill feed await using var backFillTransaction = _connection.BeginTransaction(); - await InsertIntoOutbox(oldEventSourceEntry1, _connection, backFillTransaction); - await InsertIntoOutbox(oldEventSourceEntry2, _connection, backFillTransaction); + await InsertIntoFeed(oldEventSourceEntry1, _connection, backFillTransaction); + await InsertIntoFeed(oldEventSourceEntry2, _connection, backFillTransaction); await backFillTransaction.CommitAsync(); // Starting changefeed - Simulate live events at the same time as back filling is running - timestamp = DateTimeOffset.Now; - var liveEvent1 = new EventSourceDto { AggregateId = Guid.NewGuid(), Sequence = 0, Data = "2", Timestamp = timestamp }; - var liveEvent2 = new EventSourceDto { AggregateId = Guid.NewGuid(), Sequence = 0, Data = "3", Timestamp = timestamp }; + var timestamp = DateTimeOffset.Now; + var liveEvent1 = new EventSourceDto { AggregateId = Guid.NewGuid(), Sequence = 2, Data = "2", Timestamp = timestamp }; + var liveEvent2 = new EventSourceDto { AggregateId = Guid.NewGuid(), Sequence = 3, Data = "3", Timestamp = timestamp }; await InsertIntoDatabaseInTransaction(liveEvent1, _connection); await InsertIntoDatabaseInTransaction(liveEvent2, _connection); // Continue to back fill outbox after changefeed is started await using var continueBackFillTransaction = _connection.BeginTransaction(); - await InsertIntoOutbox(liveEvent1, _connection, continueBackFillTransaction); - await InsertIntoOutbox(liveEvent2, _connection, continueBackFillTransaction); + await InsertIntoFeed(liveEvent1, _connection, continueBackFillTransaction); + await InsertIntoFeed(liveEvent2, _connection, continueBackFillTransaction); await continueBackFillTransaction.CommitAsync(); - + // Running changefeed - Back filling is stopped timestamp = DateTimeOffset.Now; - var liveEvent3 = new EventSourceDto { AggregateId = Guid.NewGuid(), Sequence = 0, Data = "4", Timestamp = timestamp }; - var liveEvent4 = new EventSourceDto { AggregateId = Guid.NewGuid(), Sequence = 0, Data = "5", Timestamp = timestamp }; + var liveEvent3 = new EventSourceDto { AggregateId = Guid.NewGuid(), Sequence = 4, Data = "4", Timestamp = timestamp }; + var liveEvent4 = new EventSourceDto { AggregateId = Guid.NewGuid(), Sequence = 5, Data = "5", Timestamp = timestamp }; await InsertIntoDatabaseInTransaction(liveEvent3, _connection); await InsertIntoDatabaseInTransaction(liveEvent4, _connection); // First read determines the order, so wait until back filling is done and stopped // There should be a small overlap on back filling and live events processing to ensure all - // events are added to the outbox - var result = await ReadFeed(_startCursor, 100, _connection); + // events are added to the outbox, - var feedResult = result.ToList(); + // Note that the first page only contains the rows from the feed table. Only when the cursor is up to date + // with all rows in the feed table it will start to look into the outbox table. The feed and outbox output + // will not be combined in one read, but requires an additional read to get the last event from the outbox + var combinedReturnedEntries = new List(); + var firstPageResult = await ReadFeed(_startCursor, 100, _connection); + combinedReturnedEntries.AddRange(firstPageResult); + + var secondPageResult = await ReadFeed(firstPageResult.Last().Ulid, 100, _connection); + combinedReturnedEntries.AddRange(secondPageResult); + + var feedResult = combinedReturnedEntries.ToList(); Assert.Equal(6, feedResult.Count); Assert.Equal("0", feedResult[0].Data); Assert.Equal("1", feedResult[1].Data); @@ -150,7 +160,7 @@ public async Task Ignore_Inserting_Into_Outbox_If_It_Already_Exists() await InsertIntoDatabaseInTransaction(eventSourceDto, _connection); await using var transaction = _connection.BeginTransaction(); - await InsertIntoOutbox(eventSourceDto, _connection, transaction); + await InsertIntoFeed(eventSourceDto, _connection, transaction); await transaction.CommitAsync(); var result = await ReadFeed(_startCursor, 10, _connection); @@ -213,6 +223,57 @@ SELECT 1 transaction); } + private static async Task InsertIntoFeed( + EventSourceDto eventSourceDto, + SqlConnection connection, + SqlTransaction transaction) + { + const string insertIntoFeedStatement = + """ + INSERT INTO [changefeed].[feed:dbo.EventSource] (shard_id, ulid, AggregateId, Sequence) + SELECT 0, @Ulid, @AggregateId, @Sequence + WHERE NOT EXISTS ( + SELECT 1 + FROM [changefeed].[feed:dbo.EventSource] + WHERE AggregateId = @AggregateId + AND Sequence = @Sequence + ) + AND NOT EXISTS ( + SELECT 1 + FROM [changefeed].[outbox:dbo.EventSource] + WHERE AggregateId = @AggregateId + AND Sequence = @Sequence + ); + """; + + var results = await connection.ExecuteAsync( + insertIntoFeedStatement, + new + { + Ulid = GenerateUlidBinary(eventSourceDto.Timestamp), + AggregateId = eventSourceDto.AggregateId, + Sequence = eventSourceDto.Sequence + }, + transaction); + } + + public static byte[] GenerateUlidBinary(DateTimeOffset timestamp) + { + var ulid = Ulid.NewUlid(); + var ulidBytes = ulid.ToByteArray(); + + var timestampMs = (long)(timestamp.UtcDateTime - DateTime.UnixEpoch).TotalMilliseconds; + + ulidBytes[0] = (byte)((timestampMs >> 40) & 0xFF); + ulidBytes[1] = (byte)((timestampMs >> 32) & 0xFF); + ulidBytes[2] = (byte)((timestampMs >> 24) & 0xFF); + ulidBytes[3] = (byte)((timestampMs >> 16) & 0xFF); + ulidBytes[4] = (byte)((timestampMs >> 8) & 0xFF); + ulidBytes[5] = (byte)(timestampMs & 0xFF); + + return ulidBytes; + } + private static async Task InsertIntoEventSource( EventSourceDto eventSourceDto, SqlConnection connection, @@ -262,7 +323,9 @@ [ulid] BINARY(16) NOT NULL, [ChangefeedAcceptanceTests].[dbo].[EventSource] INNER JOIN #read AS R ON R.AggregateId = [ChangefeedAcceptanceTests].[dbo].[EventSource].AggregateId AND - R.Sequence = [ChangefeedAcceptanceTests].[dbo].[EventSource].Sequence; + R.Sequence = [ChangefeedAcceptanceTests].[dbo].[EventSource].Sequence + ORDER BY + ulid """; @@ -275,5 +338,5 @@ INNER JOIN #read AS R ON }); return results; - } + } } \ No newline at end of file diff --git a/dotnet/VippsMobilePay.Changefeed/VippsMobilePay.Changefeed.AcceptanceTests/VippsMobilePay.Changefeed.AcceptanceTests.csproj b/dotnet/VippsMobilePay.Changefeed/VippsMobilePay.Changefeed.AcceptanceTests/VippsMobilePay.Changefeed.AcceptanceTests.csproj index 5e03ca3..03eca85 100644 --- a/dotnet/VippsMobilePay.Changefeed/VippsMobilePay.Changefeed.AcceptanceTests/VippsMobilePay.Changefeed.AcceptanceTests.csproj +++ b/dotnet/VippsMobilePay.Changefeed/VippsMobilePay.Changefeed.AcceptanceTests/VippsMobilePay.Changefeed.AcceptanceTests.csproj @@ -16,6 +16,7 @@ + runtime; build; native; contentfiles; analyzers; buildtransitive From 00805aa042607a89e655f025b5f53fbdfe3e57d9 Mon Sep 17 00:00:00 2001 From: henrikvindshoj Date: Wed, 13 Nov 2024 08:02:43 +0100 Subject: [PATCH 3/3] Fill outbox table instead of feed table to avoid ulid processing --- .../Tests/Changefeed_Tests.cs | 110 ++++++------------ 1 file changed, 36 insertions(+), 74 deletions(-) diff --git a/dotnet/VippsMobilePay.Changefeed/VippsMobilePay.Changefeed.AcceptanceTests/Tests/Changefeed_Tests.cs b/dotnet/VippsMobilePay.Changefeed/VippsMobilePay.Changefeed.AcceptanceTests/Tests/Changefeed_Tests.cs index 02f4982..7762bb1 100644 --- a/dotnet/VippsMobilePay.Changefeed/VippsMobilePay.Changefeed.AcceptanceTests/Tests/Changefeed_Tests.cs +++ b/dotnet/VippsMobilePay.Changefeed/VippsMobilePay.Changefeed.AcceptanceTests/Tests/Changefeed_Tests.cs @@ -95,37 +95,46 @@ public async Task Events_Are_Ordered_By_Insert_Order_And_Ignoring_Timestamp() public async Task Back_Fill_Feed() { // Existing events - before back fill and changefeed is started - //var timestamp = DateTimeOffset.Now.AddDays(-1); - var oldEventSourceEntry1 = new EventSourceDto { AggregateId = Guid.NewGuid(), Sequence = 0, Data = "0", Timestamp = DateTimeOffset.Now.AddDays(-2) }; - var oldEventSourceEntry2 = new EventSourceDto { AggregateId = Guid.NewGuid(), Sequence = 1, Data = "1", Timestamp = DateTimeOffset.Now.AddDays(-1) }; - await using var oldEventsTransaction = _connection.BeginTransaction(); - await InsertIntoEventSource(oldEventSourceEntry1, _connection, oldEventsTransaction); - await InsertIntoEventSource(oldEventSourceEntry2, _connection, oldEventsTransaction); - await oldEventsTransaction.CommitAsync(); + var oldEvent0 = new EventSourceDto { AggregateId = Guid.NewGuid(), Sequence = 0, Data = "0", Timestamp = DateTimeOffset.Now.AddDays(-4) }; + var oldEvent1 = new EventSourceDto { AggregateId = Guid.NewGuid(), Sequence = 1, Data = "1", Timestamp = DateTimeOffset.Now.AddDays(-3) }; + var oldEvent2 = new EventSourceDto { AggregateId = Guid.NewGuid(), Sequence = 2, Data = "2", Timestamp = DateTimeOffset.Now.AddDays(-2) }; + var oldEvent3 = new EventSourceDto { AggregateId = Guid.NewGuid(), Sequence = 3, Data = "3", Timestamp = DateTimeOffset.Now.AddDays(-1) }; + await using var oldEventsTransaction1 = _connection.BeginTransaction(); + await InsertIntoEventSource(oldEvent0, _connection, oldEventsTransaction1); + await InsertIntoEventSource(oldEvent1, _connection, oldEventsTransaction1); + await InsertIntoEventSource(oldEvent2, _connection, oldEventsTransaction1); + await InsertIntoEventSource(oldEvent3, _connection, oldEventsTransaction1); + await oldEventsTransaction1.CommitAsync(); // Starting to back fill feed - await using var backFillTransaction = _connection.BeginTransaction(); - await InsertIntoFeed(oldEventSourceEntry1, _connection, backFillTransaction); - await InsertIntoFeed(oldEventSourceEntry2, _connection, backFillTransaction); - await backFillTransaction.CommitAsync(); + await using var backFillTransaction1 = _connection.BeginTransaction(); + await InsertIntoOutbox(oldEvent0, _connection, backFillTransaction1); + await InsertIntoOutbox(oldEvent1, _connection, backFillTransaction1); + await backFillTransaction1.CommitAsync(); + + // Continue to back fill events - before changefeed is started + await using var backFillTransaction2 = _connection.BeginTransaction(); + await InsertIntoOutbox(oldEvent2, _connection, backFillTransaction2); + await InsertIntoOutbox(oldEvent3, _connection, backFillTransaction2); + await backFillTransaction2.CommitAsync(); // Starting changefeed - Simulate live events at the same time as back filling is running var timestamp = DateTimeOffset.Now; - var liveEvent1 = new EventSourceDto { AggregateId = Guid.NewGuid(), Sequence = 2, Data = "2", Timestamp = timestamp }; - var liveEvent2 = new EventSourceDto { AggregateId = Guid.NewGuid(), Sequence = 3, Data = "3", Timestamp = timestamp }; + var liveEvent1 = new EventSourceDto { AggregateId = Guid.NewGuid(), Sequence = 4, Data = "4", Timestamp = timestamp }; + var liveEvent2 = new EventSourceDto { AggregateId = Guid.NewGuid(), Sequence = 5, Data = "5", Timestamp = timestamp }; await InsertIntoDatabaseInTransaction(liveEvent1, _connection); await InsertIntoDatabaseInTransaction(liveEvent2, _connection); - - // Continue to back fill outbox after changefeed is started + + // Continue to back fill outbox with events already processed by changefeed is started await using var continueBackFillTransaction = _connection.BeginTransaction(); - await InsertIntoFeed(liveEvent1, _connection, continueBackFillTransaction); - await InsertIntoFeed(liveEvent2, _connection, continueBackFillTransaction); + await InsertIntoOutbox(liveEvent1, _connection, continueBackFillTransaction); + await InsertIntoOutbox(liveEvent2, _connection, continueBackFillTransaction); await continueBackFillTransaction.CommitAsync(); // Running changefeed - Back filling is stopped timestamp = DateTimeOffset.Now; - var liveEvent3 = new EventSourceDto { AggregateId = Guid.NewGuid(), Sequence = 4, Data = "4", Timestamp = timestamp }; - var liveEvent4 = new EventSourceDto { AggregateId = Guid.NewGuid(), Sequence = 5, Data = "5", Timestamp = timestamp }; + var liveEvent3 = new EventSourceDto { AggregateId = Guid.NewGuid(), Sequence = 6, Data = "6", Timestamp = timestamp }; + var liveEvent4 = new EventSourceDto { AggregateId = Guid.NewGuid(), Sequence = 7, Data = "7", Timestamp = timestamp }; await InsertIntoDatabaseInTransaction(liveEvent3, _connection); await InsertIntoDatabaseInTransaction(liveEvent4, _connection); @@ -133,9 +142,11 @@ public async Task Back_Fill_Feed() // There should be a small overlap on back filling and live events processing to ensure all // events are added to the outbox, - // Note that the first page only contains the rows from the feed table. Only when the cursor is up to date + // Note that the first page only contains rows from the feed table. Only when the cursor is up to date // with all rows in the feed table it will start to look into the outbox table. The feed and outbox output - // will not be combined in one read, but requires an additional read to get the last event from the outbox + // will not be combined in one read, but requires an additional read to get the events from the outbox. + // In this case it does not matter as all events are in the outbox table and the secondPageResult will + // therefore be empty. var combinedReturnedEntries = new List(); var firstPageResult = await ReadFeed(_startCursor, 100, _connection); combinedReturnedEntries.AddRange(firstPageResult); @@ -144,13 +155,15 @@ public async Task Back_Fill_Feed() combinedReturnedEntries.AddRange(secondPageResult); var feedResult = combinedReturnedEntries.ToList(); - Assert.Equal(6, feedResult.Count); + Assert.Equal(8, feedResult.Count); Assert.Equal("0", feedResult[0].Data); Assert.Equal("1", feedResult[1].Data); Assert.Equal("2", feedResult[2].Data); Assert.Equal("3", feedResult[3].Data); Assert.Equal("4", feedResult[4].Data); Assert.Equal("5", feedResult[5].Data); + Assert.Equal("6", feedResult[6].Data); + Assert.Equal("7", feedResult[7].Data); } [Fact] @@ -160,7 +173,7 @@ public async Task Ignore_Inserting_Into_Outbox_If_It_Already_Exists() await InsertIntoDatabaseInTransaction(eventSourceDto, _connection); await using var transaction = _connection.BeginTransaction(); - await InsertIntoFeed(eventSourceDto, _connection, transaction); + await InsertIntoOutbox(eventSourceDto, _connection, transaction); await transaction.CommitAsync(); var result = await ReadFeed(_startCursor, 10, _connection); @@ -223,57 +236,6 @@ SELECT 1 transaction); } - private static async Task InsertIntoFeed( - EventSourceDto eventSourceDto, - SqlConnection connection, - SqlTransaction transaction) - { - const string insertIntoFeedStatement = - """ - INSERT INTO [changefeed].[feed:dbo.EventSource] (shard_id, ulid, AggregateId, Sequence) - SELECT 0, @Ulid, @AggregateId, @Sequence - WHERE NOT EXISTS ( - SELECT 1 - FROM [changefeed].[feed:dbo.EventSource] - WHERE AggregateId = @AggregateId - AND Sequence = @Sequence - ) - AND NOT EXISTS ( - SELECT 1 - FROM [changefeed].[outbox:dbo.EventSource] - WHERE AggregateId = @AggregateId - AND Sequence = @Sequence - ); - """; - - var results = await connection.ExecuteAsync( - insertIntoFeedStatement, - new - { - Ulid = GenerateUlidBinary(eventSourceDto.Timestamp), - AggregateId = eventSourceDto.AggregateId, - Sequence = eventSourceDto.Sequence - }, - transaction); - } - - public static byte[] GenerateUlidBinary(DateTimeOffset timestamp) - { - var ulid = Ulid.NewUlid(); - var ulidBytes = ulid.ToByteArray(); - - var timestampMs = (long)(timestamp.UtcDateTime - DateTime.UnixEpoch).TotalMilliseconds; - - ulidBytes[0] = (byte)((timestampMs >> 40) & 0xFF); - ulidBytes[1] = (byte)((timestampMs >> 32) & 0xFF); - ulidBytes[2] = (byte)((timestampMs >> 24) & 0xFF); - ulidBytes[3] = (byte)((timestampMs >> 16) & 0xFF); - ulidBytes[4] = (byte)((timestampMs >> 8) & 0xFF); - ulidBytes[5] = (byte)(timestampMs & 0xFF); - - return ulidBytes; - } - private static async Task InsertIntoEventSource( EventSourceDto eventSourceDto, SqlConnection connection,