Skip to content

Commit

Permalink
Prevent multiple instances of SyncAll* functions running concurrently
Browse files Browse the repository at this point in the history
Should fix PTCD-378 issues running in prod.
  • Loading branch information
gunndabad committed Sep 28, 2020
1 parent 0049165 commit 06099d4
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 25 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
namespace Dfc.CourseDirectory.Core.DataStore.Sql.Queries
{
public class GetExclusiveLock : ISqlQuery<bool>
{
public string Name { get; set; }
public int TimeoutMilliseconds { get; set; }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
using System.Data;
using System.Data.SqlClient;
using System.Threading.Tasks;
using Dapper;
using Dfc.CourseDirectory.Core.DataStore.Sql.Queries;

namespace Dfc.CourseDirectory.Core.DataStore.Sql.QueryHandlers
{
public class GetExclusiveLockHandler : ISqlQueryHandler<GetExclusiveLock, bool>
{
public async Task<bool> Execute(SqlTransaction transaction, GetExclusiveLock query)
{
var param = new DynamicParameters(new
{
Resource = query.Name,
LockMode = "Exclusive",
LockTimeout = query.TimeoutMilliseconds
});

param.Add(name: "@RetVal", dbType: DbType.Int32, direction: ParameterDirection.ReturnValue);

await transaction.Connection.ExecuteAsync(
"sp_getapplock",
param,
transaction,
commandTimeout: 0,
commandType: CommandType.StoredProcedure);

var result = param.Get<int>("@RetVal");

// See https://docs.microsoft.com/en-us/sql/relational-databases/system-stored-procedures/sp-getapplock-transact-sql?view=sql-server-ver15#return-code-values
return result switch
{
0 => true,
1 => true,
_ => false
};
}
}
}
80 changes: 59 additions & 21 deletions src/Dfc.CourseDirectory.Core/SqlDataSync.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,24 @@
using Dfc.CourseDirectory.Core.DataStore.Sql.Queries;
using Dfc.CourseDirectory.Core.Models;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;

namespace Dfc.CourseDirectory.Core
{
public class SqlDataSync
{
private readonly IServiceScopeFactory _serviceScopeFactory;
private readonly ICosmosDbQueryDispatcher _cosmosDbQueryDispatcher;
private readonly ILogger _logger;

public SqlDataSync(
IServiceScopeFactory serviceScopeFactory,
ICosmosDbQueryDispatcher cosmosDbQueryDispatcher)
ICosmosDbQueryDispatcher cosmosDbQueryDispatcher,
ILogger<SqlDataSync> logger)
{
_serviceScopeFactory = serviceScopeFactory;
_cosmosDbQueryDispatcher = cosmosDbQueryDispatcher;
_logger = logger;
}

public async Task SyncAll()
Expand All @@ -33,29 +37,37 @@ public async Task SyncAll()
await SyncAllApprenticeships();
}

public Task SyncAllApprenticeships() => _cosmosDbQueryDispatcher.ExecuteQuery(
new ProcessAllApprenticeships()
{
ProcessChunk = SyncApprenticeships
});
public Task SyncAllApprenticeships() => WithExclusiveSqlLock(
nameof(SyncAllApprenticeships),
() => _cosmosDbQueryDispatcher.ExecuteQuery(
new ProcessAllApprenticeships()
{
ProcessChunk = SyncApprenticeships
}));

public Task SyncAllCourses() => _cosmosDbQueryDispatcher.ExecuteQuery(
new ProcessAllCourses()
{
ProcessChunk = SyncCourses
});
public Task SyncAllCourses() => WithExclusiveSqlLock(
nameof(SyncAllCourses),
() =>_cosmosDbQueryDispatcher.ExecuteQuery(
new ProcessAllCourses()
{
ProcessChunk = SyncCourses
}));

public Task SyncAllProviders() => _cosmosDbQueryDispatcher.ExecuteQuery(
new ProcessAllProviders()
{
ProcessChunk = SyncProviders
});
public Task SyncAllProviders() => WithExclusiveSqlLock(
nameof(SyncAllProviders),
() => _cosmosDbQueryDispatcher.ExecuteQuery(
new ProcessAllProviders()
{
ProcessChunk = SyncProviders
}));

public Task SyncAllVenues() => _cosmosDbQueryDispatcher.ExecuteQuery(
new ProcessAllVenues()
{
ProcessChunk = SyncVenues
});
public Task SyncAllVenues() => WithExclusiveSqlLock(
nameof(SyncAllVenues),
() => _cosmosDbQueryDispatcher.ExecuteQuery(
new ProcessAllVenues()
{
ProcessChunk = SyncVenues
}));

public Task SyncApprenticeship(Apprenticeship apprenticeship) => SyncApprenticeships(new[] { apprenticeship });

Expand Down Expand Up @@ -224,6 +236,32 @@ public Task SyncVenues(IEnumerable<Venue> venues) => WithSqlDispatcher(dispatche
})
}));

private async Task WithExclusiveSqlLock(string lockName, Func<Task> action)
{
// Grab an exclusive lock inside a transaction that spans the entire duration of `action`'s execution.
// ISqlQueryDispatcher owns the transaction; Dispose()ing the scope Dispose()s the transaction too.
// Note that commiting this transaction is not necessary.
// If the lock cannot be grabbed immediately then log & bail.

using var scope = _serviceScopeFactory.CreateScope();

var sqlDispatcher = scope.ServiceProvider.GetRequiredService<ISqlQueryDispatcher>();

var result = await sqlDispatcher.ExecuteQuery(new GetExclusiveLock()
{
Name = lockName,
TimeoutMilliseconds = 0 // Return immediately if lock cannot be acquired
});

if (!result)
{
_logger.LogWarning($"Failed to acquire exclusive lock: '{lockName}'.");
return;
}

await action();
}

private async Task WithSqlDispatcher(Func<ISqlQueryDispatcher, Task> action)
{
using (var scope = _serviceScopeFactory.CreateScope())
Expand Down
13 changes: 9 additions & 4 deletions tests/Dfc.CourseDirectory.Core.Tests/SqlDataSyncTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using Dfc.CourseDirectory.Core.Models;
using Dfc.CourseDirectory.Testing;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging.Abstractions;
using OneOf.Types;
using Xunit;

Expand Down Expand Up @@ -70,7 +71,8 @@ public async Task SyncProvider_UpsertsProvider()

var sqlDataSync = new SqlDataSync(
Fixture.Services.GetRequiredService<IServiceScopeFactory>(),
CosmosDbQueryDispatcher.Object);
CosmosDbQueryDispatcher.Object,
new NullLogger<SqlDataSync>());

// Act
await sqlDataSync.SyncProvider(provider);
Expand Down Expand Up @@ -121,7 +123,8 @@ public async Task SyncVenue_UpsertsVenue()

var sqlDataSync = new SqlDataSync(
Fixture.Services.GetRequiredService<IServiceScopeFactory>(),
CosmosDbQueryDispatcher.Object);
CosmosDbQueryDispatcher.Object,
new NullLogger<SqlDataSync>());

// Act
await sqlDataSync.SyncVenue(venue);
Expand Down Expand Up @@ -212,7 +215,8 @@ public async Task SyncCourse_UpsertsCourse()

var sqlDataSync = new SqlDataSync(
Fixture.Services.GetRequiredService<IServiceScopeFactory>(),
CosmosDbQueryDispatcher.Object);
CosmosDbQueryDispatcher.Object,
new NullLogger<SqlDataSync>());

// Act
await sqlDataSync.SyncCourse(course);
Expand Down Expand Up @@ -324,7 +328,8 @@ public async Task SyncApprenticeship_UpsertsApprenticeship()

var sqlDataSync = new SqlDataSync(
Fixture.Services.GetRequiredService<IServiceScopeFactory>(),
CosmosDbQueryDispatcher.Object);
CosmosDbQueryDispatcher.Object,
new NullLogger<SqlDataSync>());

// Act
await sqlDataSync.SyncApprenticeship(apprenticeship);
Expand Down
1 change: 1 addition & 0 deletions tests/Dfc.CourseDirectory.Testing/DatabaseFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public static void ConfigureServices(IServiceCollection services)
services.AddSingleton<SqlQuerySpy>();
services.Decorate<ISqlQueryDispatcher, SqlQuerySpyDecorator>();
services.AddTransient<SqlDataSync>();
services.AddLogging();
}

public void Dispose()
Expand Down

0 comments on commit 06099d4

Please sign in to comment.