diff --git a/dotnet/VippsMobilePay.Changefeed/VippsMobilePay.Changefeed.AcceptanceTests/Scripts/001CreateEventSource.sql b/dotnet/VippsMobilePay.Changefeed/VippsMobilePay.Changefeed.AcceptanceTests/Scripts/001CreateEventSource.sql new file mode 100644 index 0000000..84cf7fd --- /dev/null +++ b/dotnet/VippsMobilePay.Changefeed/VippsMobilePay.Changefeed.AcceptanceTests/Scripts/001CreateEventSource.sql @@ -0,0 +1,8 @@ +CREATE TABLE [dbo].[EventSource] +( + [AggregateId] [uniqueidentifier] NOT NULL, + [Sequence] [int] NOT NULL, + [Data] [varchar](max) NOT NULL, + [Timestamp] [datetime2](7) NOT NULL, + CONSTRAINT [PK_EventSource] PRIMARY KEY ([AggregateId], [Sequence]), +); \ No newline at end of file diff --git a/dotnet/VippsMobilePay.Changefeed/VippsMobilePay.Changefeed.AcceptanceTests/Setup/SqlServerTestContainer.cs b/dotnet/VippsMobilePay.Changefeed/VippsMobilePay.Changefeed.AcceptanceTests/Setup/SqlServerTestContainer.cs new file mode 100644 index 0000000..2934426 --- /dev/null +++ b/dotnet/VippsMobilePay.Changefeed/VippsMobilePay.Changefeed.AcceptanceTests/Setup/SqlServerTestContainer.cs @@ -0,0 +1,108 @@ +using Dapper; +using DbUp; +using DotNet.Testcontainers.Builders; +using DotNet.Testcontainers.Containers; +using Microsoft.Data.SqlClient; +using Polly; +using VippsMobilePay.Changefeed.Sql.DbUp; +using Xunit; + +namespace VippsMobilePay.Changefeed.AcceptanceTests.Setup; + +public class SqlServerTestContainer : IAsyncLifetime +{ + public const string TableWithChangefeed = "EventSource"; + public static readonly string ConnectionString = $"Server=127.0.0.1,{ContainerPort};Initial Catalog={TestDatabaseName};User Id=SA;Password={MssqlSaPassword};TrustServerCertificate=true;Encrypt=False"; + + private const int ContainerPort = 1433; + private const string MssqlSaPassword = "Secret.00"; + private const string TestDatabaseName = "ChangefeedAcceptanceTests"; + private IContainer? _sqlContainer; + + private readonly ContainerBuilder _sqlContainerBuilder = new ContainerBuilder() + .WithImage("mcr.microsoft.com/azure-sql-edge:latest") + .WithPortBinding(1433, ContainerPort) + .WithEnvironment("ACCEPT_EULA", "Y") + .WithEnvironment("MSSQL_SA_PASSWORD", MssqlSaPassword) + .WithEnvironment("MSSQL_PID", "Developer") + .WithWaitStrategy(Wait.ForUnixContainer().UntilPortIsAvailable(1433)); + + private readonly Policy _retryPolicy = Policy + .Handle() + .WaitAndRetry(10, _ => TimeSpan.FromSeconds(1)); + + public async Task InitializeAsync() + { + if (await DbExists()) + { + EnsureAndUpgradeDatabase(); + return; + } + + _sqlContainer = _sqlContainerBuilder.Build(); + await _sqlContainer.StartAsync(); + _retryPolicy.Execute(EnsureAndUpgradeDatabase); + } + + public static async Task CleanupDatabase(SqlConnection connection) + { + const string cleanupStatement = + """ + TRUNCATE TABLE [changefeed].[feed:dbo.EventSource] + TRUNCATE TABLE [changefeed].[outbox:dbo.EventSource] + TRUNCATE TABLE [changefeed].[state:dbo.EventSource] + TRUNCATE TABLE [dbo].[EventSource] + """; + + await connection.ExecuteAsync(cleanupStatement); + } + + private void EnsureAndUpgradeDatabase() + { + EnsureDatabase.For.SqlDatabase(ConnectionString); + + var upgradeEngine = DeployChanges.To.SqlDatabase(ConnectionString) + .WithScriptsEmbeddedInAssembly(typeof(SqlServerTestContainer).Assembly) + .AddChangefeedMigrationSource(TableWithChangefeed, true, false) + .LogToConsole() + .Build(); + + var upgradeResult = upgradeEngine.PerformUpgrade(); + + if (!upgradeResult.Successful) + { + throw new InvalidOperationException( + "Sql database upgrade not successful", + upgradeResult.Error + ); + } + } + + public async Task DisposeAsync() + { + if (_sqlContainer is not null) + { + await _sqlContainer.StopAsync(); + } + } + + private async Task DbExists() + { + var connectionStringBuilder = new SqlConnectionStringBuilder(ConnectionString) + { + ConnectTimeout = 2, + InitialCatalog = "master" + }; + await using var connection = new SqlConnection(connectionStringBuilder.ConnectionString); + try + { + connection.Open(); + connection.Close(); + return true; + } + catch (SqlException) + { + return false; + } + } +} \ No newline at end of file diff --git a/dotnet/VippsMobilePay.Changefeed/VippsMobilePay.Changefeed.AcceptanceTests/Setup/TestCollection.cs b/dotnet/VippsMobilePay.Changefeed/VippsMobilePay.Changefeed.AcceptanceTests/Setup/TestCollection.cs new file mode 100644 index 0000000..8d4e94d --- /dev/null +++ b/dotnet/VippsMobilePay.Changefeed/VippsMobilePay.Changefeed.AcceptanceTests/Setup/TestCollection.cs @@ -0,0 +1,8 @@ +using Xunit; + +namespace VippsMobilePay.Changefeed.AcceptanceTests.Setup; + +[CollectionDefinition(nameof(TestCollection))] +public class TestCollection : ICollectionFixture +{ +} \ No newline at end of file diff --git a/dotnet/VippsMobilePay.Changefeed/VippsMobilePay.Changefeed.AcceptanceTests/Tests/Changefeed_Tests.cs b/dotnet/VippsMobilePay.Changefeed/VippsMobilePay.Changefeed.AcceptanceTests/Tests/Changefeed_Tests.cs new file mode 100644 index 0000000..89b8e4e --- /dev/null +++ b/dotnet/VippsMobilePay.Changefeed/VippsMobilePay.Changefeed.AcceptanceTests/Tests/Changefeed_Tests.cs @@ -0,0 +1,196 @@ +using AutoFixture.Xunit2; +using Dapper; +using Microsoft.Data.SqlClient; +using VippsMobilePay.Changefeed.AcceptanceTests.Setup; +using Xunit; + +namespace VippsMobilePay.Changefeed.AcceptanceTests.Tests; + +[Collection(nameof(TestCollection))] +public class Changefeed_Tests : IAsyncLifetime +{ + private readonly byte[] _startCursor = new byte[16]; + private readonly SqlConnection _connection; + + public Changefeed_Tests() + { + _connection = new SqlConnection(SqlServerTestContainer.ConnectionString); + } + + public async Task InitializeAsync() + { + await _connection.OpenAsync(); + await SqlServerTestContainer.CleanupDatabase(_connection); + } + + public async Task DisposeAsync() + { + await _connection.DisposeAsync(); + } + + [Theory, AutoData] + public async Task Insert_In_Transaction_And_Read_Outside_Transaction(EventSourceDto eventSourceDto) + { + await InsertIntoDatabaseInTransaction(eventSourceDto, _connection); + + var result = await ReadFeed(_startCursor, 10, _connection); + + Assert.NotNull(result); + var feedResult = result.ToList(); + Assert.Single(feedResult); + Assert.Equal(eventSourceDto.AggregateId, feedResult.Single().AggregateId); + Assert.Equal(eventSourceDto.Sequence, feedResult.Single().Sequence); + Assert.Equal(eventSourceDto.Data, feedResult.Single().Data); + Assert.Equal(eventSourceDto.Timestamp, feedResult.Single().Timestamp); + Assert.NotEqual(_startCursor, feedResult.Single().Ulid); + } + + [Fact] + public async Task Insert_Multiple_And_Read_Pages() + { + await InsertIntoDatabaseInTransaction(new EventSourceDto{AggregateId = Guid.NewGuid(), Sequence = 0, Data = "0", Timestamp = DateTimeOffset.Now}, _connection); + await InsertIntoDatabaseInTransaction(new EventSourceDto{AggregateId = Guid.NewGuid(), Sequence = 0, Data = "1", Timestamp = DateTimeOffset.Now}, _connection); + await InsertIntoDatabaseInTransaction(new EventSourceDto{AggregateId = Guid.NewGuid(), Sequence = 0, Data = "2", Timestamp = DateTimeOffset.Now}, _connection); + await InsertIntoDatabaseInTransaction(new EventSourceDto{AggregateId = Guid.NewGuid(), Sequence = 0, Data = "3", Timestamp = DateTimeOffset.Now}, _connection); + await InsertIntoDatabaseInTransaction(new EventSourceDto{AggregateId = Guid.NewGuid(), Sequence = 0, Data = "4", Timestamp = DateTimeOffset.Now}, _connection); + + var result = await ReadFeed(_startCursor, 3, _connection); + var feedResult = result.ToList(); + Assert.Equal(3, feedResult.Count); + + result = await ReadFeed(result.Last().Ulid, 3, _connection); + feedResult = result.ToList(); + Assert.Equal(2, feedResult.Count); + } + + [Fact] + public async Task Events_Are_Ordered_By_Insert_Order_And_Ignoring_Timestamp() + { + var combinedReturnedEntries = new List(); + + var timestamp = DateTimeOffset.Now; + await InsertIntoDatabaseInTransaction(new EventSourceDto{AggregateId = Guid.NewGuid(), Sequence = 0, Data = "0", Timestamp = timestamp}, _connection); + await InsertIntoDatabaseInTransaction(new EventSourceDto{AggregateId = Guid.NewGuid(), Sequence = 0, Data = "1", Timestamp = timestamp}, _connection); + var firstPageResult = await ReadFeed(_startCursor, 100, _connection); + combinedReturnedEntries.AddRange(firstPageResult); + + timestamp = DateTimeOffset.Now; + await InsertIntoDatabaseInTransaction(new EventSourceDto{AggregateId = Guid.NewGuid(), Sequence = 0, Data = "3", Timestamp = timestamp}, _connection); + await InsertIntoDatabaseInTransaction(new EventSourceDto{AggregateId = Guid.NewGuid(), Sequence = 0, Data = "2", Timestamp = timestamp.AddSeconds(-1)}, _connection); + var secondPageResult = await ReadFeed(firstPageResult.Last().Ulid, 100, _connection); + combinedReturnedEntries.AddRange(secondPageResult); + + Assert.Equal(4, combinedReturnedEntries.Count); + Assert.Equal("0", combinedReturnedEntries[0].Data); + Assert.Equal("1", combinedReturnedEntries[1].Data); + Assert.Equal("3", combinedReturnedEntries[2].Data); + Assert.Equal("2", combinedReturnedEntries[3].Data); + + var thirdPageResult = await ReadFeed(secondPageResult.Last().Ulid, 100, _connection); + Assert.Empty(thirdPageResult); + } + + private static async Task InsertIntoDatabaseInTransaction( + EventSourceDto eventSourceDto, + SqlConnection connection) + { + await using var transaction = connection.BeginTransaction(); + await InsertIntoDatabase(eventSourceDto, connection, transaction); + await transaction.CommitAsync(); + } + + private static async Task InsertIntoDatabase( + EventSourceDto eventSourceDto, + SqlConnection connection, + SqlTransaction transaction) + { + await InsertIntoOutbox(eventSourceDto, connection, transaction); + await InsertIntoEventSource(eventSourceDto, connection, transaction); + } + + private static async Task InsertIntoOutbox( + EventSourceDto eventSourceDto, + SqlConnection connection, + SqlTransaction transaction) + { + const string insertIntoOutboxStatement = + """ + INSERT INTO [changefeed].[outbox:dbo.EventSource] (shard_id, time_hint, AggregateId, Sequence) + VALUES (0, @TimeHint, @AggregateId, @Sequence); + """; + + var results = await connection.ExecuteAsync( + insertIntoOutboxStatement, + new + { + TimeHint = eventSourceDto.Timestamp, + AggregateId = eventSourceDto.AggregateId, + Sequence = eventSourceDto.Sequence + }, + transaction); + } + + private static async Task InsertIntoEventSource( + EventSourceDto eventSourceDto, + SqlConnection connection, + SqlTransaction transaction) + { + const string insertIntoEventSourceStatement = + """ + INSERT INTO [EventSource] (AggregateId, Sequence, Data, Timestamp) + VALUES (@AggregateId, @Sequence, @Data, @Timestamp); + """; + + var results = await connection.ExecuteAsync( + insertIntoEventSourceStatement, + new + { + AggregateId = eventSourceDto.AggregateId, + Sequence = eventSourceDto.Sequence, + Data = eventSourceDto.Data, + Timestamp = eventSourceDto.Timestamp + }, + transaction); + } + + private async Task> ReadFeed( + byte[] cursor, + int pageSize, + SqlConnection connection) + { + const string readEventSourceStatement = + """ + DECLARE @shard_id INT = 0; + + CREATE TABLE #read ( + [ulid] BINARY(16) NOT NULL, + [AggregateId] [uniqueidentifier] NOT NULL, + [Sequence] [int] NOT NULL); + + EXEC [changefeed].[read_feed:dbo.EventSource] @shard_id = @shard_id, @cursor = @cursor, @pagesize = @pagesize; + + SELECT + [ChangefeedAcceptanceTests].[dbo].[EventSource].[AggregateId], + [ChangefeedAcceptanceTests].[dbo].[EventSource].[Sequence], + [Data], + [Timestamp], + [ulid] + FROM + [ChangefeedAcceptanceTests].[dbo].[EventSource] + INNER JOIN #read AS R ON + R.AggregateId = [ChangefeedAcceptanceTests].[dbo].[EventSource].AggregateId AND + R.Sequence = [ChangefeedAcceptanceTests].[dbo].[EventSource].Sequence; + """; + + + var results = await connection.QueryAsync( + readEventSourceStatement, + new + { + cursor = cursor, + pagesize = pageSize + }); + + return results; + } +} \ No newline at end of file diff --git a/dotnet/VippsMobilePay.Changefeed/VippsMobilePay.Changefeed.AcceptanceTests/Tests/EventSourceDto.cs b/dotnet/VippsMobilePay.Changefeed/VippsMobilePay.Changefeed.AcceptanceTests/Tests/EventSourceDto.cs new file mode 100644 index 0000000..3c3f6f6 --- /dev/null +++ b/dotnet/VippsMobilePay.Changefeed/VippsMobilePay.Changefeed.AcceptanceTests/Tests/EventSourceDto.cs @@ -0,0 +1,9 @@ +namespace VippsMobilePay.Changefeed.AcceptanceTests.Tests; + +public record EventSourceDto +{ + public required Guid AggregateId { get; set; } + public required int Sequence { get; set; } + public required string Data { get; set; } + public required DateTimeOffset Timestamp { get; set; } +} \ No newline at end of file diff --git a/dotnet/VippsMobilePay.Changefeed/VippsMobilePay.Changefeed.AcceptanceTests/Tests/FeedResult.cs b/dotnet/VippsMobilePay.Changefeed/VippsMobilePay.Changefeed.AcceptanceTests/Tests/FeedResult.cs new file mode 100644 index 0000000..ac6f279 --- /dev/null +++ b/dotnet/VippsMobilePay.Changefeed/VippsMobilePay.Changefeed.AcceptanceTests/Tests/FeedResult.cs @@ -0,0 +1,6 @@ +namespace VippsMobilePay.Changefeed.AcceptanceTests.Tests; + +public record FeedResult : EventSourceDto +{ + public required byte[] Ulid { get; set; } +} \ No newline at end of file diff --git a/dotnet/VippsMobilePay.Changefeed/VippsMobilePay.Changefeed.AcceptanceTests/VippsMobilePay.Changefeed.AcceptanceTests.csproj b/dotnet/VippsMobilePay.Changefeed/VippsMobilePay.Changefeed.AcceptanceTests/VippsMobilePay.Changefeed.AcceptanceTests.csproj new file mode 100644 index 0000000..5e03ca3 --- /dev/null +++ b/dotnet/VippsMobilePay.Changefeed/VippsMobilePay.Changefeed.AcceptanceTests/VippsMobilePay.Changefeed.AcceptanceTests.csproj @@ -0,0 +1,38 @@ + + + + net8.0 + true + enable + enable + + + + + + + + + + + + + + runtime; build; native; contentfiles; analyzers; buildtransitive + all + + + runtime; build; native; contentfiles; analyzers; buildtransitive + all + + + + + + + + + + + + diff --git a/dotnet/VippsMobilePay.Changefeed/VippsMobilePay.Changefeed.sln b/dotnet/VippsMobilePay.Changefeed/VippsMobilePay.Changefeed.sln index 4c5c2e5..5105128 100644 --- a/dotnet/VippsMobilePay.Changefeed/VippsMobilePay.Changefeed.sln +++ b/dotnet/VippsMobilePay.Changefeed/VippsMobilePay.Changefeed.sln @@ -2,6 +2,8 @@ Microsoft Visual Studio Solution File, Format Version 12.00 Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "VippsMobilePay.Changefeed.Sql.DbUp", "VippsMobilePay.Changefeed.Sql.DbUp\VippsMobilePay.Changefeed.Sql.DbUp.csproj", "{6B32D6B1-B910-4F86-839C-D9AE0F91E0BD}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "VippsMobilePay.Changefeed.AcceptanceTests", "VippsMobilePay.Changefeed.AcceptanceTests\VippsMobilePay.Changefeed.AcceptanceTests.csproj", "{8FD45217-742E-48B3-8119-AAF6088B0C7C}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -12,5 +14,9 @@ Global {6B32D6B1-B910-4F86-839C-D9AE0F91E0BD}.Debug|Any CPU.Build.0 = Debug|Any CPU {6B32D6B1-B910-4F86-839C-D9AE0F91E0BD}.Release|Any CPU.ActiveCfg = Release|Any CPU {6B32D6B1-B910-4F86-839C-D9AE0F91E0BD}.Release|Any CPU.Build.0 = Release|Any CPU + {8FD45217-742E-48B3-8119-AAF6088B0C7C}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {8FD45217-742E-48B3-8119-AAF6088B0C7C}.Debug|Any CPU.Build.0 = Debug|Any CPU + {8FD45217-742E-48B3-8119-AAF6088B0C7C}.Release|Any CPU.ActiveCfg = Release|Any CPU + {8FD45217-742E-48B3-8119-AAF6088B0C7C}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection EndGlobal