-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
henrikvindshoj
committed
Nov 7, 2024
1 parent
df33e65
commit 7627d44
Showing
8 changed files
with
379 additions
and
0 deletions.
There are no files selected for viewing
8 changes: 8 additions & 0 deletions
8
...Pay.Changefeed/VippsMobilePay.Changefeed.AcceptanceTests/Scripts/001CreateEventSource.sql
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,8 @@ | ||
CREATE TABLE [dbo].[EventSource] | ||
( | ||
[AggregateId] [uniqueidentifier] NOT NULL, | ||
[Sequence] [int] NOT NULL, | ||
[Data] [varchar](max) NOT NULL, | ||
[Timestamp] [datetime2](7) NOT NULL, | ||
CONSTRAINT [PK_EventSource] PRIMARY KEY ([AggregateId], [Sequence]), | ||
); |
108 changes: 108 additions & 0 deletions
108
...ePay.Changefeed/VippsMobilePay.Changefeed.AcceptanceTests/Setup/SqlServerTestContainer.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,108 @@ | ||
using Dapper; | ||
using DbUp; | ||
using DotNet.Testcontainers.Builders; | ||
using DotNet.Testcontainers.Containers; | ||
using Microsoft.Data.SqlClient; | ||
using Polly; | ||
using VippsMobilePay.Changefeed.Sql.DbUp; | ||
using Xunit; | ||
|
||
namespace VippsMobilePay.Changefeed.AcceptanceTests.Setup; | ||
|
||
public class SqlServerTestContainer : IAsyncLifetime | ||
{ | ||
public const string TableWithChangefeed = "EventSource"; | ||
public static readonly string ConnectionString = $"Server=127.0.0.1,{ContainerPort};Initial Catalog={TestDatabaseName};User Id=SA;Password={MssqlSaPassword};TrustServerCertificate=true;Encrypt=False"; | ||
|
||
private const int ContainerPort = 1433; | ||
private const string MssqlSaPassword = "Secret.00"; | ||
private const string TestDatabaseName = "ChangefeedAcceptanceTests"; | ||
private IContainer? _sqlContainer; | ||
|
||
private readonly ContainerBuilder _sqlContainerBuilder = new ContainerBuilder() | ||
.WithImage("mcr.microsoft.com/azure-sql-edge:latest") | ||
.WithPortBinding(1433, ContainerPort) | ||
.WithEnvironment("ACCEPT_EULA", "Y") | ||
.WithEnvironment("MSSQL_SA_PASSWORD", MssqlSaPassword) | ||
.WithEnvironment("MSSQL_PID", "Developer") | ||
.WithWaitStrategy(Wait.ForUnixContainer().UntilPortIsAvailable(1433)); | ||
|
||
private readonly Policy _retryPolicy = Policy | ||
.Handle<Exception>() | ||
.WaitAndRetry(10, _ => TimeSpan.FromSeconds(1)); | ||
|
||
public async Task InitializeAsync() | ||
{ | ||
if (await DbExists()) | ||
{ | ||
EnsureAndUpgradeDatabase(); | ||
return; | ||
} | ||
|
||
_sqlContainer = _sqlContainerBuilder.Build(); | ||
await _sqlContainer.StartAsync(); | ||
_retryPolicy.Execute(EnsureAndUpgradeDatabase); | ||
} | ||
|
||
public static async Task CleanupDatabase(SqlConnection connection) | ||
{ | ||
const string cleanupStatement = | ||
""" | ||
TRUNCATE TABLE [changefeed].[feed:dbo.EventSource] | ||
TRUNCATE TABLE [changefeed].[outbox:dbo.EventSource] | ||
TRUNCATE TABLE [changefeed].[state:dbo.EventSource] | ||
TRUNCATE TABLE [dbo].[EventSource] | ||
"""; | ||
|
||
await connection.ExecuteAsync(cleanupStatement); | ||
} | ||
|
||
private void EnsureAndUpgradeDatabase() | ||
{ | ||
EnsureDatabase.For.SqlDatabase(ConnectionString); | ||
|
||
var upgradeEngine = DeployChanges.To.SqlDatabase(ConnectionString) | ||
.WithScriptsEmbeddedInAssembly(typeof(SqlServerTestContainer).Assembly) | ||
.AddChangefeedMigrationSource(TableWithChangefeed, true, false) | ||
.LogToConsole() | ||
.Build(); | ||
|
||
var upgradeResult = upgradeEngine.PerformUpgrade(); | ||
|
||
if (!upgradeResult.Successful) | ||
{ | ||
throw new InvalidOperationException( | ||
"Sql database upgrade not successful", | ||
upgradeResult.Error | ||
); | ||
} | ||
} | ||
|
||
public async Task DisposeAsync() | ||
{ | ||
if (_sqlContainer is not null) | ||
{ | ||
await _sqlContainer.StopAsync(); | ||
} | ||
} | ||
|
||
private async Task<bool> DbExists() | ||
{ | ||
var connectionStringBuilder = new SqlConnectionStringBuilder(ConnectionString) | ||
{ | ||
ConnectTimeout = 2, | ||
InitialCatalog = "master" | ||
}; | ||
await using var connection = new SqlConnection(connectionStringBuilder.ConnectionString); | ||
try | ||
{ | ||
connection.Open(); | ||
connection.Close(); | ||
return true; | ||
} | ||
catch (SqlException) | ||
{ | ||
return false; | ||
} | ||
} | ||
} |
8 changes: 8 additions & 0 deletions
8
...ppsMobilePay.Changefeed/VippsMobilePay.Changefeed.AcceptanceTests/Setup/TestCollection.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,8 @@ | ||
using Xunit; | ||
|
||
namespace VippsMobilePay.Changefeed.AcceptanceTests.Setup; | ||
|
||
[CollectionDefinition(nameof(TestCollection))] | ||
public class TestCollection : ICollectionFixture<SqlServerTestContainer> | ||
{ | ||
} |
196 changes: 196 additions & 0 deletions
196
...sMobilePay.Changefeed/VippsMobilePay.Changefeed.AcceptanceTests/Tests/Changefeed_Tests.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,196 @@ | ||
using AutoFixture.Xunit2; | ||
using Dapper; | ||
using Microsoft.Data.SqlClient; | ||
using VippsMobilePay.Changefeed.AcceptanceTests.Setup; | ||
using Xunit; | ||
|
||
namespace VippsMobilePay.Changefeed.AcceptanceTests.Tests; | ||
|
||
[Collection(nameof(TestCollection))] | ||
public class Changefeed_Tests : IAsyncLifetime | ||
{ | ||
private readonly byte[] _startCursor = new byte[16]; | ||
private readonly SqlConnection _connection; | ||
|
||
public Changefeed_Tests() | ||
{ | ||
_connection = new SqlConnection(SqlServerTestContainer.ConnectionString); | ||
} | ||
|
||
public async Task InitializeAsync() | ||
{ | ||
await _connection.OpenAsync(); | ||
await SqlServerTestContainer.CleanupDatabase(_connection); | ||
} | ||
|
||
public async Task DisposeAsync() | ||
{ | ||
await _connection.DisposeAsync(); | ||
} | ||
|
||
[Theory, AutoData] | ||
public async Task Insert_In_Transaction_And_Read_Outside_Transaction(EventSourceDto eventSourceDto) | ||
{ | ||
await InsertIntoDatabaseInTransaction(eventSourceDto, _connection); | ||
|
||
var result = await ReadFeed(_startCursor, 10, _connection); | ||
|
||
Assert.NotNull(result); | ||
var feedResult = result.ToList(); | ||
Assert.Single(feedResult); | ||
Assert.Equal(eventSourceDto.AggregateId, feedResult.Single().AggregateId); | ||
Assert.Equal(eventSourceDto.Sequence, feedResult.Single().Sequence); | ||
Assert.Equal(eventSourceDto.Data, feedResult.Single().Data); | ||
Assert.Equal(eventSourceDto.Timestamp, feedResult.Single().Timestamp); | ||
Assert.NotEqual(_startCursor, feedResult.Single().Ulid); | ||
} | ||
|
||
[Fact] | ||
public async Task Insert_Multiple_And_Read_Pages() | ||
{ | ||
await InsertIntoDatabaseInTransaction(new EventSourceDto{AggregateId = Guid.NewGuid(), Sequence = 0, Data = "0", Timestamp = DateTimeOffset.Now}, _connection); | ||
await InsertIntoDatabaseInTransaction(new EventSourceDto{AggregateId = Guid.NewGuid(), Sequence = 0, Data = "1", Timestamp = DateTimeOffset.Now}, _connection); | ||
await InsertIntoDatabaseInTransaction(new EventSourceDto{AggregateId = Guid.NewGuid(), Sequence = 0, Data = "2", Timestamp = DateTimeOffset.Now}, _connection); | ||
await InsertIntoDatabaseInTransaction(new EventSourceDto{AggregateId = Guid.NewGuid(), Sequence = 0, Data = "3", Timestamp = DateTimeOffset.Now}, _connection); | ||
await InsertIntoDatabaseInTransaction(new EventSourceDto{AggregateId = Guid.NewGuid(), Sequence = 0, Data = "4", Timestamp = DateTimeOffset.Now}, _connection); | ||
|
||
var result = await ReadFeed(_startCursor, 3, _connection); | ||
var feedResult = result.ToList(); | ||
Assert.Equal(3, feedResult.Count); | ||
|
||
result = await ReadFeed(result.Last().Ulid, 3, _connection); | ||
feedResult = result.ToList(); | ||
Assert.Equal(2, feedResult.Count); | ||
} | ||
|
||
[Fact] | ||
public async Task Events_Are_Ordered_By_Insert_Order_And_Ignoring_Timestamp() | ||
{ | ||
var combinedReturnedEntries = new List<FeedResult>(); | ||
|
||
var timestamp = DateTimeOffset.Now; | ||
await InsertIntoDatabaseInTransaction(new EventSourceDto{AggregateId = Guid.NewGuid(), Sequence = 0, Data = "0", Timestamp = timestamp}, _connection); | ||
await InsertIntoDatabaseInTransaction(new EventSourceDto{AggregateId = Guid.NewGuid(), Sequence = 0, Data = "1", Timestamp = timestamp}, _connection); | ||
var firstPageResult = await ReadFeed(_startCursor, 100, _connection); | ||
combinedReturnedEntries.AddRange(firstPageResult); | ||
|
||
timestamp = DateTimeOffset.Now; | ||
await InsertIntoDatabaseInTransaction(new EventSourceDto{AggregateId = Guid.NewGuid(), Sequence = 0, Data = "3", Timestamp = timestamp}, _connection); | ||
await InsertIntoDatabaseInTransaction(new EventSourceDto{AggregateId = Guid.NewGuid(), Sequence = 0, Data = "2", Timestamp = timestamp.AddSeconds(-1)}, _connection); | ||
var secondPageResult = await ReadFeed(firstPageResult.Last().Ulid, 100, _connection); | ||
combinedReturnedEntries.AddRange(secondPageResult); | ||
|
||
Assert.Equal(4, combinedReturnedEntries.Count); | ||
Assert.Equal("0", combinedReturnedEntries[0].Data); | ||
Assert.Equal("1", combinedReturnedEntries[1].Data); | ||
Assert.Equal("3", combinedReturnedEntries[2].Data); | ||
Assert.Equal("2", combinedReturnedEntries[3].Data); | ||
|
||
var thirdPageResult = await ReadFeed(secondPageResult.Last().Ulid, 100, _connection); | ||
Assert.Empty(thirdPageResult); | ||
} | ||
|
||
private static async Task InsertIntoDatabaseInTransaction( | ||
EventSourceDto eventSourceDto, | ||
SqlConnection connection) | ||
{ | ||
await using var transaction = connection.BeginTransaction(); | ||
await InsertIntoDatabase(eventSourceDto, connection, transaction); | ||
await transaction.CommitAsync(); | ||
} | ||
|
||
private static async Task InsertIntoDatabase( | ||
EventSourceDto eventSourceDto, | ||
SqlConnection connection, | ||
SqlTransaction transaction) | ||
{ | ||
await InsertIntoOutbox(eventSourceDto, connection, transaction); | ||
await InsertIntoEventSource(eventSourceDto, connection, transaction); | ||
} | ||
|
||
private static async Task InsertIntoOutbox( | ||
EventSourceDto eventSourceDto, | ||
SqlConnection connection, | ||
SqlTransaction transaction) | ||
{ | ||
const string insertIntoOutboxStatement = | ||
""" | ||
INSERT INTO [changefeed].[outbox:dbo.EventSource] (shard_id, time_hint, AggregateId, Sequence) | ||
VALUES (0, @TimeHint, @AggregateId, @Sequence); | ||
"""; | ||
|
||
var results = await connection.ExecuteAsync( | ||
insertIntoOutboxStatement, | ||
new | ||
{ | ||
TimeHint = eventSourceDto.Timestamp, | ||
AggregateId = eventSourceDto.AggregateId, | ||
Sequence = eventSourceDto.Sequence | ||
}, | ||
transaction); | ||
} | ||
|
||
private static async Task InsertIntoEventSource( | ||
EventSourceDto eventSourceDto, | ||
SqlConnection connection, | ||
SqlTransaction transaction) | ||
{ | ||
const string insertIntoEventSourceStatement = | ||
""" | ||
INSERT INTO [EventSource] (AggregateId, Sequence, Data, Timestamp) | ||
VALUES (@AggregateId, @Sequence, @Data, @Timestamp); | ||
"""; | ||
|
||
var results = await connection.ExecuteAsync( | ||
insertIntoEventSourceStatement, | ||
new | ||
{ | ||
AggregateId = eventSourceDto.AggregateId, | ||
Sequence = eventSourceDto.Sequence, | ||
Data = eventSourceDto.Data, | ||
Timestamp = eventSourceDto.Timestamp | ||
}, | ||
transaction); | ||
} | ||
|
||
private async Task<IEnumerable<FeedResult>> ReadFeed( | ||
byte[] cursor, | ||
int pageSize, | ||
SqlConnection connection) | ||
{ | ||
const string readEventSourceStatement = | ||
""" | ||
DECLARE @shard_id INT = 0; | ||
CREATE TABLE #read ( | ||
[ulid] BINARY(16) NOT NULL, | ||
[AggregateId] [uniqueidentifier] NOT NULL, | ||
[Sequence] [int] NOT NULL); | ||
EXEC [changefeed].[read_feed:dbo.EventSource] @shard_id = @shard_id, @cursor = @cursor, @pagesize = @pagesize; | ||
SELECT | ||
[ChangefeedAcceptanceTests].[dbo].[EventSource].[AggregateId], | ||
[ChangefeedAcceptanceTests].[dbo].[EventSource].[Sequence], | ||
[Data], | ||
[Timestamp], | ||
[ulid] | ||
FROM | ||
[ChangefeedAcceptanceTests].[dbo].[EventSource] | ||
INNER JOIN #read AS R ON | ||
R.AggregateId = [ChangefeedAcceptanceTests].[dbo].[EventSource].AggregateId AND | ||
R.Sequence = [ChangefeedAcceptanceTests].[dbo].[EventSource].Sequence; | ||
"""; | ||
|
||
|
||
var results = await connection.QueryAsync<FeedResult>( | ||
readEventSourceStatement, | ||
new | ||
{ | ||
cursor = cursor, | ||
pagesize = pageSize | ||
}); | ||
|
||
return results; | ||
} | ||
} |
9 changes: 9 additions & 0 deletions
9
...ppsMobilePay.Changefeed/VippsMobilePay.Changefeed.AcceptanceTests/Tests/EventSourceDto.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,9 @@ | ||
namespace VippsMobilePay.Changefeed.AcceptanceTests.Tests; | ||
|
||
public record EventSourceDto | ||
{ | ||
public required Guid AggregateId { get; set; } | ||
public required int Sequence { get; set; } | ||
public required string Data { get; set; } | ||
public required DateTimeOffset Timestamp { get; set; } | ||
} |
6 changes: 6 additions & 0 deletions
6
...t/VippsMobilePay.Changefeed/VippsMobilePay.Changefeed.AcceptanceTests/Tests/FeedResult.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
namespace VippsMobilePay.Changefeed.AcceptanceTests.Tests; | ||
|
||
public record FeedResult : EventSourceDto | ||
{ | ||
public required byte[] Ulid { get; set; } | ||
} |
38 changes: 38 additions & 0 deletions
38
...ippsMobilePay.Changefeed.AcceptanceTests/VippsMobilePay.Changefeed.AcceptanceTests.csproj
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,38 @@ | ||
<Project Sdk="Microsoft.NET.Sdk"> | ||
|
||
<PropertyGroup> | ||
<TargetFramework>net8.0</TargetFramework> | ||
<TreatWarningsAsErrors>true</TreatWarningsAsErrors> | ||
<ImplicitUsings>enable</ImplicitUsings> | ||
<Nullable>enable</Nullable> | ||
</PropertyGroup> | ||
|
||
<ItemGroup> | ||
<PackageReference Include="AutoFixture.AutoNSubstitute" Version="4.18.1" /> | ||
<PackageReference Include="AutoFixture.Xunit2" Version="4.18.1" /> | ||
<PackageReference Include="Dapper" Version="2.1.35" /> | ||
<PackageReference Include="Microsoft.AspNetCore.Mvc.Testing" Version="8.0.10" /> | ||
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.11.1" /> | ||
<PackageReference Include="NSubstitute" Version="5.1.0" /> | ||
<PackageReference Include="Polly" Version="8.4.2" /> | ||
<PackageReference Include="Testcontainers" Version="3.10.0" /> | ||
<PackageReference Include="xunit" Version="2.9.2" /> | ||
<PackageReference Include="xunit.runner.visualstudio" Version="2.8.2"> | ||
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets> | ||
<PrivateAssets>all</PrivateAssets> | ||
</PackageReference> | ||
<PackageReference Include="coverlet.collector" Version="6.0.2"> | ||
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets> | ||
<PrivateAssets>all</PrivateAssets> | ||
</PackageReference> | ||
</ItemGroup> | ||
|
||
<ItemGroup> | ||
<EmbeddedResource Include="Scripts\*.sql" /> | ||
</ItemGroup> | ||
|
||
<ItemGroup> | ||
<ProjectReference Include="..\VippsMobilePay.Changefeed.Sql.DbUp\VippsMobilePay.Changefeed.Sql.DbUp.csproj" /> | ||
</ItemGroup> | ||
|
||
</Project> |
Oops, something went wrong.