diff --git a/src/Dfc.CourseDirectory.Core/DataStore/Sql/Queries/GetExclusiveLock.cs b/src/Dfc.CourseDirectory.Core/DataStore/Sql/Queries/GetExclusiveLock.cs new file mode 100644 index 000000000..decd951f3 --- /dev/null +++ b/src/Dfc.CourseDirectory.Core/DataStore/Sql/Queries/GetExclusiveLock.cs @@ -0,0 +1,8 @@ +namespace Dfc.CourseDirectory.Core.DataStore.Sql.Queries +{ + public class GetExclusiveLock : ISqlQuery + { + public string Name { get; set; } + public int TimeoutMilliseconds { get; set; } + } +} diff --git a/src/Dfc.CourseDirectory.Core/DataStore/Sql/QueryHandlers/GetExclusiveLockHandler.cs b/src/Dfc.CourseDirectory.Core/DataStore/Sql/QueryHandlers/GetExclusiveLockHandler.cs new file mode 100644 index 000000000..6a5cd54ec --- /dev/null +++ b/src/Dfc.CourseDirectory.Core/DataStore/Sql/QueryHandlers/GetExclusiveLockHandler.cs @@ -0,0 +1,40 @@ +using System.Data; +using System.Data.SqlClient; +using System.Threading.Tasks; +using Dapper; +using Dfc.CourseDirectory.Core.DataStore.Sql.Queries; + +namespace Dfc.CourseDirectory.Core.DataStore.Sql.QueryHandlers +{ + public class GetExclusiveLockHandler : ISqlQueryHandler + { + public async Task Execute(SqlTransaction transaction, GetExclusiveLock query) + { + var param = new DynamicParameters(new + { + Resource = query.Name, + LockMode = "Exclusive", + LockTimeout = query.TimeoutMilliseconds + }); + + param.Add(name: "@RetVal", dbType: DbType.Int32, direction: ParameterDirection.ReturnValue); + + await transaction.Connection.ExecuteAsync( + "sp_getapplock", + param, + transaction, + commandTimeout: 0, + commandType: CommandType.StoredProcedure); + + var result = param.Get("@RetVal"); + + // See https://docs.microsoft.com/en-us/sql/relational-databases/system-stored-procedures/sp-getapplock-transact-sql?view=sql-server-ver15#return-code-values + return result switch + { + 0 => true, + 1 => true, + _ => false + }; + } + } +} diff --git a/src/Dfc.CourseDirectory.Core/SqlDataSync.cs b/src/Dfc.CourseDirectory.Core/SqlDataSync.cs index 5b9b0571c..b0aadbb24 100644 --- a/src/Dfc.CourseDirectory.Core/SqlDataSync.cs +++ b/src/Dfc.CourseDirectory.Core/SqlDataSync.cs @@ -9,6 +9,7 @@ using Dfc.CourseDirectory.Core.DataStore.Sql.Queries; using Dfc.CourseDirectory.Core.Models; using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; namespace Dfc.CourseDirectory.Core { @@ -16,13 +17,16 @@ public class SqlDataSync { private readonly IServiceScopeFactory _serviceScopeFactory; private readonly ICosmosDbQueryDispatcher _cosmosDbQueryDispatcher; + private readonly ILogger _logger; public SqlDataSync( IServiceScopeFactory serviceScopeFactory, - ICosmosDbQueryDispatcher cosmosDbQueryDispatcher) + ICosmosDbQueryDispatcher cosmosDbQueryDispatcher, + ILogger logger) { _serviceScopeFactory = serviceScopeFactory; _cosmosDbQueryDispatcher = cosmosDbQueryDispatcher; + _logger = logger; } public async Task SyncAll() @@ -33,29 +37,37 @@ public async Task SyncAll() await SyncAllApprenticeships(); } - public Task SyncAllApprenticeships() => _cosmosDbQueryDispatcher.ExecuteQuery( - new ProcessAllApprenticeships() - { - ProcessChunk = SyncApprenticeships - }); + public Task SyncAllApprenticeships() => WithExclusiveSqlLock( + nameof(SyncAllApprenticeships), + () => _cosmosDbQueryDispatcher.ExecuteQuery( + new ProcessAllApprenticeships() + { + ProcessChunk = SyncApprenticeships + })); - public Task SyncAllCourses() => _cosmosDbQueryDispatcher.ExecuteQuery( - new ProcessAllCourses() - { - ProcessChunk = SyncCourses - }); + public Task SyncAllCourses() => WithExclusiveSqlLock( + nameof(SyncAllCourses), + () =>_cosmosDbQueryDispatcher.ExecuteQuery( + new ProcessAllCourses() + { + ProcessChunk = SyncCourses + })); - public Task SyncAllProviders() => _cosmosDbQueryDispatcher.ExecuteQuery( - new ProcessAllProviders() - { - ProcessChunk = SyncProviders - }); + public Task SyncAllProviders() => WithExclusiveSqlLock( + nameof(SyncAllProviders), + () => _cosmosDbQueryDispatcher.ExecuteQuery( + new ProcessAllProviders() + { + ProcessChunk = SyncProviders + })); - public Task SyncAllVenues() => _cosmosDbQueryDispatcher.ExecuteQuery( - new ProcessAllVenues() - { - ProcessChunk = SyncVenues - }); + public Task SyncAllVenues() => WithExclusiveSqlLock( + nameof(SyncAllVenues), + () => _cosmosDbQueryDispatcher.ExecuteQuery( + new ProcessAllVenues() + { + ProcessChunk = SyncVenues + })); public Task SyncApprenticeship(Apprenticeship apprenticeship) => SyncApprenticeships(new[] { apprenticeship }); @@ -224,6 +236,32 @@ public Task SyncVenues(IEnumerable venues) => WithSqlDispatcher(dispatche }) })); + private async Task WithExclusiveSqlLock(string lockName, Func action) + { + // Grab an exclusive lock inside a transaction that spans the entire duration of `action`'s execution. + // ISqlQueryDispatcher owns the transaction; Dispose()ing the scope Dispose()s the transaction too. + // Note that commiting this transaction is not necessary. + // If the lock cannot be grabbed immediately then log & bail. + + using var scope = _serviceScopeFactory.CreateScope(); + + var sqlDispatcher = scope.ServiceProvider.GetRequiredService(); + + var result = await sqlDispatcher.ExecuteQuery(new GetExclusiveLock() + { + Name = lockName, + TimeoutMilliseconds = 0 // Return immediately if lock cannot be acquired + }); + + if (!result) + { + _logger.LogWarning($"Failed to acquire exclusive lock: '{lockName}'."); + return; + } + + await action(); + } + private async Task WithSqlDispatcher(Func action) { using (var scope = _serviceScopeFactory.CreateScope()) diff --git a/tests/Dfc.CourseDirectory.Core.Tests/SqlDataSyncTests.cs b/tests/Dfc.CourseDirectory.Core.Tests/SqlDataSyncTests.cs index 7682c925f..f7a939fb0 100644 --- a/tests/Dfc.CourseDirectory.Core.Tests/SqlDataSyncTests.cs +++ b/tests/Dfc.CourseDirectory.Core.Tests/SqlDataSyncTests.cs @@ -7,6 +7,7 @@ using Dfc.CourseDirectory.Core.Models; using Dfc.CourseDirectory.Testing; using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging.Abstractions; using OneOf.Types; using Xunit; @@ -70,7 +71,8 @@ public async Task SyncProvider_UpsertsProvider() var sqlDataSync = new SqlDataSync( Fixture.Services.GetRequiredService(), - CosmosDbQueryDispatcher.Object); + CosmosDbQueryDispatcher.Object, + new NullLogger()); // Act await sqlDataSync.SyncProvider(provider); @@ -121,7 +123,8 @@ public async Task SyncVenue_UpsertsVenue() var sqlDataSync = new SqlDataSync( Fixture.Services.GetRequiredService(), - CosmosDbQueryDispatcher.Object); + CosmosDbQueryDispatcher.Object, + new NullLogger()); // Act await sqlDataSync.SyncVenue(venue); @@ -212,7 +215,8 @@ public async Task SyncCourse_UpsertsCourse() var sqlDataSync = new SqlDataSync( Fixture.Services.GetRequiredService(), - CosmosDbQueryDispatcher.Object); + CosmosDbQueryDispatcher.Object, + new NullLogger()); // Act await sqlDataSync.SyncCourse(course); @@ -324,7 +328,8 @@ public async Task SyncApprenticeship_UpsertsApprenticeship() var sqlDataSync = new SqlDataSync( Fixture.Services.GetRequiredService(), - CosmosDbQueryDispatcher.Object); + CosmosDbQueryDispatcher.Object, + new NullLogger()); // Act await sqlDataSync.SyncApprenticeship(apprenticeship); diff --git a/tests/Dfc.CourseDirectory.Testing/DatabaseFixture.cs b/tests/Dfc.CourseDirectory.Testing/DatabaseFixture.cs index 13715c675..2be15a2f5 100644 --- a/tests/Dfc.CourseDirectory.Testing/DatabaseFixture.cs +++ b/tests/Dfc.CourseDirectory.Testing/DatabaseFixture.cs @@ -66,6 +66,7 @@ public static void ConfigureServices(IServiceCollection services) services.AddSingleton(); services.Decorate(); services.AddTransient(); + services.AddLogging(); } public void Dispose()