From 0d6984de620ee6a0fe7cd6716e8a2787e69c20b1 Mon Sep 17 00:00:00 2001 From: Andrew Horth Date: Mon, 18 Mar 2024 22:25:37 +0000 Subject: [PATCH] Extend establishment refresh job to also upated person_employments for an updated establishments --- .../Jobs/RefreshEstablishmentsJob.cs | 64 +------- .../Establishments/EstablishmentRefresher.cs | 69 ++++++++ .../ServiceCollectionExtensions.cs | 1 + .../WorkforceData/TpsCsvExtractProcessor.cs | 151 +++++++++++++++++- .../UpdatedPersonEmploymentEstablishment.cs | 12 ++ .../EstablishmentRefresherTests.cs} | 19 ++- .../TpsCsvExtractProcessorTests.cs | 21 +++ 7 files changed, 265 insertions(+), 72 deletions(-) create mode 100644 TeachingRecordSystem/src/TeachingRecordSystem.Core/Services/Establishments/EstablishmentRefresher.cs create mode 100644 TeachingRecordSystem/src/TeachingRecordSystem.Core/Services/WorkforceData/UpdatedPersonEmploymentEstablishment.cs rename TeachingRecordSystem/tests/TeachingRecordSystem.Core.Tests/{Jobs/RefreshEstablishmentsJobTests.cs => Services/Establishments/EstablishmentRefresherTests.cs} (94%) diff --git a/TeachingRecordSystem/src/TeachingRecordSystem.Core/Jobs/RefreshEstablishmentsJob.cs b/TeachingRecordSystem/src/TeachingRecordSystem.Core/Jobs/RefreshEstablishmentsJob.cs index 072cedfa21..17b4b72298 100644 --- a/TeachingRecordSystem/src/TeachingRecordSystem.Core/Jobs/RefreshEstablishmentsJob.cs +++ b/TeachingRecordSystem/src/TeachingRecordSystem.Core/Jobs/RefreshEstablishmentsJob.cs @@ -1,68 +1,14 @@ -using TeachingRecordSystem.Core.DataStore.Postgres; +using TeachingRecordSystem.Core.Jobs.Scheduling; using TeachingRecordSystem.Core.Services.Establishments; +using TeachingRecordSystem.Core.Services.WorkforceData; namespace TeachingRecordSystem.Core.Jobs; -public class RefreshEstablishmentsJob(TrsDbContext dbContext, IEstablishmentMasterDataService establishmentMasterDataService) +public class RefreshEstablishmentsJob(IBackgroundJobScheduler backgroundJobScheduler) { public async Task ExecuteAsync(CancellationToken cancellationToken) { - int i = 0; - await foreach (var establishment in establishmentMasterDataService.GetEstablishments()) - { - var existingEstablishment = await dbContext.Establishments.SingleOrDefaultAsync(e => e.Urn == establishment.Urn); - if (existingEstablishment == null) - { - dbContext.Establishments.Add(new() - { - EstablishmentId = Guid.NewGuid(), - Urn = establishment.Urn, - LaCode = establishment.LaCode, - LaName = establishment.LaName, - EstablishmentNumber = establishment.EstablishmentNumber, - EstablishmentName = establishment.EstablishmentName, - EstablishmentTypeCode = establishment.EstablishmentTypeCode, - EstablishmentTypeName = establishment.EstablishmentTypeName, - EstablishmentTypeGroupCode = establishment.EstablishmentTypeGroupCode, - EstablishmentTypeGroupName = establishment.EstablishmentTypeGroupName, - EstablishmentStatusCode = establishment.EstablishmentStatusCode, - EstablishmentStatusName = establishment.EstablishmentStatusName, - Street = establishment.Street, - Locality = establishment.Locality, - Address3 = establishment.Address3, - Town = establishment.Town, - County = establishment.County, - Postcode = establishment.Postcode - }); - } - else - { - existingEstablishment.LaCode = establishment.LaCode; - existingEstablishment.LaName = establishment.LaName; - existingEstablishment.EstablishmentName = establishment.EstablishmentName; - existingEstablishment.EstablishmentTypeCode = establishment.EstablishmentTypeCode; - existingEstablishment.EstablishmentTypeName = establishment.EstablishmentTypeName; - existingEstablishment.EstablishmentTypeGroupCode = establishment.EstablishmentTypeGroupCode; - existingEstablishment.EstablishmentTypeGroupName = establishment.EstablishmentTypeGroupName; - existingEstablishment.EstablishmentStatusCode = establishment.EstablishmentStatusCode; - existingEstablishment.EstablishmentStatusName = establishment.EstablishmentStatusName; - existingEstablishment.Street = establishment.Street; - existingEstablishment.Locality = establishment.Locality; - existingEstablishment.Address3 = establishment.Address3; - existingEstablishment.Town = establishment.Town; - existingEstablishment.County = establishment.County; - existingEstablishment.Postcode = establishment.Postcode; - } - - if (++i % 2000 == 0) - { - await dbContext.SaveChangesAsync(cancellationToken); - } - } - - if (dbContext.ChangeTracker.HasChanges()) - { - await dbContext.SaveChangesAsync(cancellationToken); - } + var refreshJobId = await backgroundJobScheduler.Enqueue(j => j.RefreshEstablishments(cancellationToken)); + await backgroundJobScheduler.ContinueJobWith(refreshJobId, j => j.UpdateLatestEstablishmentVersions(cancellationToken)); } } diff --git a/TeachingRecordSystem/src/TeachingRecordSystem.Core/Services/Establishments/EstablishmentRefresher.cs b/TeachingRecordSystem/src/TeachingRecordSystem.Core/Services/Establishments/EstablishmentRefresher.cs new file mode 100644 index 0000000000..4913e4d6cc --- /dev/null +++ b/TeachingRecordSystem/src/TeachingRecordSystem.Core/Services/Establishments/EstablishmentRefresher.cs @@ -0,0 +1,69 @@ +using TeachingRecordSystem.Core.DataStore.Postgres; + +namespace TeachingRecordSystem.Core.Services.Establishments; + +public class EstablishmentRefresher( + TrsDbContext dbContext, + IEstablishmentMasterDataService establishmentMasterDataService) +{ + public async Task RefreshEstablishments(CancellationToken cancellationToken) + { + int i = 0; + await foreach (var establishment in establishmentMasterDataService.GetEstablishments()) + { + var existingEstablishment = await dbContext.Establishments.SingleOrDefaultAsync(e => e.Urn == establishment.Urn); + if (existingEstablishment == null) + { + dbContext.Establishments.Add(new() + { + EstablishmentId = Guid.NewGuid(), + Urn = establishment.Urn, + LaCode = establishment.LaCode, + LaName = establishment.LaName, + EstablishmentNumber = establishment.EstablishmentNumber, + EstablishmentName = establishment.EstablishmentName, + EstablishmentTypeCode = establishment.EstablishmentTypeCode, + EstablishmentTypeName = establishment.EstablishmentTypeName, + EstablishmentTypeGroupCode = establishment.EstablishmentTypeGroupCode, + EstablishmentTypeGroupName = establishment.EstablishmentTypeGroupName, + EstablishmentStatusCode = establishment.EstablishmentStatusCode, + EstablishmentStatusName = establishment.EstablishmentStatusName, + Street = establishment.Street, + Locality = establishment.Locality, + Address3 = establishment.Address3, + Town = establishment.Town, + County = establishment.County, + Postcode = establishment.Postcode + }); + } + else + { + existingEstablishment.LaCode = establishment.LaCode; + existingEstablishment.LaName = establishment.LaName; + existingEstablishment.EstablishmentName = establishment.EstablishmentName; + existingEstablishment.EstablishmentTypeCode = establishment.EstablishmentTypeCode; + existingEstablishment.EstablishmentTypeName = establishment.EstablishmentTypeName; + existingEstablishment.EstablishmentTypeGroupCode = establishment.EstablishmentTypeGroupCode; + existingEstablishment.EstablishmentTypeGroupName = establishment.EstablishmentTypeGroupName; + existingEstablishment.EstablishmentStatusCode = establishment.EstablishmentStatusCode; + existingEstablishment.EstablishmentStatusName = establishment.EstablishmentStatusName; + existingEstablishment.Street = establishment.Street; + existingEstablishment.Locality = establishment.Locality; + existingEstablishment.Address3 = establishment.Address3; + existingEstablishment.Town = establishment.Town; + existingEstablishment.County = establishment.County; + existingEstablishment.Postcode = establishment.Postcode; + } + + if (++i % 2000 == 0) + { + await dbContext.SaveChangesAsync(cancellationToken); + } + } + + if (dbContext.ChangeTracker.HasChanges()) + { + await dbContext.SaveChangesAsync(cancellationToken); + } + } +} diff --git a/TeachingRecordSystem/src/TeachingRecordSystem.Core/Services/Establishments/ServiceCollectionExtensions.cs b/TeachingRecordSystem/src/TeachingRecordSystem.Core/Services/Establishments/ServiceCollectionExtensions.cs index 2eeaefc909..6908b69239 100644 --- a/TeachingRecordSystem/src/TeachingRecordSystem.Core/Services/Establishments/ServiceCollectionExtensions.cs +++ b/TeachingRecordSystem/src/TeachingRecordSystem.Core/Services/Establishments/ServiceCollectionExtensions.cs @@ -17,6 +17,7 @@ public static IHostApplicationBuilder AddGias(this IHostApplicationBuilder build .ValidateOnStart(); builder.Services + .AddSingleton() .AddSingleton() .AddHttpClient((sp, httpClient) => { diff --git a/TeachingRecordSystem/src/TeachingRecordSystem.Core/Services/WorkforceData/TpsCsvExtractProcessor.cs b/TeachingRecordSystem/src/TeachingRecordSystem.Core/Services/WorkforceData/TpsCsvExtractProcessor.cs index 872194d964..7dc474c245 100644 --- a/TeachingRecordSystem/src/TeachingRecordSystem.Core/Services/WorkforceData/TpsCsvExtractProcessor.cs +++ b/TeachingRecordSystem/src/TeachingRecordSystem.Core/Services/WorkforceData/TpsCsvExtractProcessor.cs @@ -51,7 +51,7 @@ WITH unique_establishments AS ( establishment_name, establishment_type_code, postcode, - ROW_NUMBER() OVER (PARTITION BY la_code, establishment_number ORDER BY translate(establishment_status_code::text, '1234', '1324')) as row_number + ROW_NUMBER() OVER (PARTITION BY la_code, establishment_number, CASE WHEN establishment_number IS NULL THEN postcode ELSE NULL END ORDER BY translate(establishment_status_code::text, '1234', '1324')) as row_number FROM establishments) e WHERE @@ -114,7 +114,7 @@ WITH unique_establishments AS ( establishment_name, establishment_type_code, postcode, - ROW_NUMBER() OVER (PARTITION BY la_code, establishment_number ORDER BY translate(establishment_status_code::text, '1234', '1324')) as row_number + ROW_NUMBER() OVER (PARTITION BY la_code, establishment_number, CASE WHEN establishment_number IS NULL THEN postcode ELSE NULL END ORDER BY translate(establishment_status_code::text, '1234', '1324'), urn desc) as row_number FROM establishments) e WHERE @@ -240,7 +240,7 @@ WITH unique_establishments AS ( establishment_name, establishment_type_code, postcode, - ROW_NUMBER() OVER (PARTITION BY la_code, establishment_number ORDER BY translate(establishment_status_code::text, '1234', '1324')) as row_number + ROW_NUMBER() OVER (PARTITION BY la_code, establishment_number, CASE WHEN establishment_number IS NULL THEN postcode ELSE NULL END ORDER BY translate(establishment_status_code::text, '1234', '1324'), urn desc) as row_number FROM establishments) e WHERE @@ -407,4 +407,149 @@ async Task SaveChanges() } } } + + public async Task UpdateLatestEstablishmentVersions(CancellationToken cancellationToken) + { + using var readDbContext = dbContextFactory.CreateDbContext(); + readDbContext.Database.SetCommandTimeout(600); + using var writeDbContext = dbContextFactory.CreateDbContext(); + var connection = (NpgsqlConnection)writeDbContext.Database.GetDbConnection(); + await connection.OpenAsync(CancellationToken.None); + + FormattableString querySql = + $""" + WITH unique_establishments AS ( + SELECT + establishment_id, + la_code, + establishment_number, + establishment_name, + establishment_type_code, + postcode + FROM + (SELECT + establishment_id, + la_code, + establishment_number, + establishment_name, + establishment_type_code, + postcode, + ROW_NUMBER() OVER (PARTITION BY la_code, establishment_number, CASE WHEN establishment_number IS NULL THEN postcode ELSE NULL END ORDER BY translate(establishment_status_code::text, '1234', '1324'), urn desc) as row_number + FROM + establishments) e + WHERE + e.row_number = 1 + ), + establishment_changes AS ( + SELECT + * + FROM + person_employments pe + WHERE + NOT EXISTS (SELECT + 1 + FROM + unique_establishments e + WHERE + e.establishment_id = pe.establishment_id) + ) + select + ec.person_employment_id, + ec.person_id, + ec.establishment_id as current_establishment_id, + ec.start_date, + ec.end_date, + ec.employment_type, + ue.establishment_id + from + establishment_changes ec + JOIN + establishments e ON e.establishment_id = ec.establishment_id + JOIN + unique_establishments ue ON ue.la_code = e.la_code + AND (ue.establishment_number = e.establishment_number + OR (e.establishment_type_code = '29' + AND ue.postcode = e.postcode + AND NOT EXISTS (SELECT + 1 + FROM + unique_establishments e2 + WHERE + e2.la_code = e.la_code + AND e2.establishment_number = e.establishment_number))) + """; + + var batchCommands = new List(); + + await foreach (var item in readDbContext.Database.SqlQuery(querySql).AsAsyncEnumerable()) + { + var updatePersonEmploymentsCommand = new NpgsqlBatchCommand( + $""" + UPDATE + person_employments + SET + establishment_id = '{item.EstablishmentId}' + WHERE + person_employment_id = '{item.PersonEmploymentId}' + """); + + batchCommands.Add(updatePersonEmploymentsCommand); + writeDbContext.AddEvent(new PersonEmploymentUpdatedEvent + { + EventId = Guid.NewGuid(), + PersonId = item.PersonEmploymentId, + PersonEmployment = new() + { + PersonEmploymentId = item.PersonEmploymentId, + PersonId = item.PersonId, + EstablishmentId = item.EstablishmentId, + StartDate = item.StartDate, + EndDate = item.EndDate, + EmploymentType = item.EmploymentType + }, + OldPersonEmployment = new() + { + PersonEmploymentId = item.PersonEmploymentId, + PersonId = item.PersonId, + EstablishmentId = item.CurrentEstablishmentId, + StartDate = item.StartDate, + EndDate = item.EndDate, + EmploymentType = item.EmploymentType + }, + Changes = PersonEmploymentUpdatedEventChanges.EstablishmentId, + CreatedUtc = clock.UtcNow, + RaisedBy = DataStore.Postgres.Models.SystemUser.SystemUserId + }); + + if (batchCommands.Count == 50) + { + await SaveChanges(); + } + } + + if (batchCommands.Any()) + { + await SaveChanges(); + } + + async Task SaveChanges() + { + if (writeDbContext.ChangeTracker.HasChanges()) + { + await writeDbContext.SaveChangesAsync(cancellationToken); + } + + if (batchCommands.Count > 0) + { + using var batch = new NpgsqlBatch(connection); + foreach (var command in batchCommands) + { + batch.BatchCommands.Add(command); + } + + await batch.ExecuteNonQueryAsync(cancellationToken); + batchCommands.Clear(); + } + } + } } diff --git a/TeachingRecordSystem/src/TeachingRecordSystem.Core/Services/WorkforceData/UpdatedPersonEmploymentEstablishment.cs b/TeachingRecordSystem/src/TeachingRecordSystem.Core/Services/WorkforceData/UpdatedPersonEmploymentEstablishment.cs new file mode 100644 index 0000000000..adef4fdc72 --- /dev/null +++ b/TeachingRecordSystem/src/TeachingRecordSystem.Core/Services/WorkforceData/UpdatedPersonEmploymentEstablishment.cs @@ -0,0 +1,12 @@ +namespace TeachingRecordSystem.Core.Services.WorkforceData; + +public record UpdatedPersonEmploymentEstablishment +{ + public required Guid PersonEmploymentId { get; init; } + public required Guid PersonId { get; init; } + public required Guid CurrentEstablishmentId { get; init; } + public required DateOnly StartDate { get; init; } + public required DateOnly? EndDate { get; init; } + public required EmploymentType EmploymentType { get; init; } + public required Guid EstablishmentId { get; init; } +} diff --git a/TeachingRecordSystem/tests/TeachingRecordSystem.Core.Tests/Jobs/RefreshEstablishmentsJobTests.cs b/TeachingRecordSystem/tests/TeachingRecordSystem.Core.Tests/Services/Establishments/EstablishmentRefresherTests.cs similarity index 94% rename from TeachingRecordSystem/tests/TeachingRecordSystem.Core.Tests/Jobs/RefreshEstablishmentsJobTests.cs rename to TeachingRecordSystem/tests/TeachingRecordSystem.Core.Tests/Services/Establishments/EstablishmentRefresherTests.cs index c690d17101..d702b0cfbb 100644 --- a/TeachingRecordSystem/tests/TeachingRecordSystem.Core.Tests/Jobs/RefreshEstablishmentsJobTests.cs +++ b/TeachingRecordSystem/tests/TeachingRecordSystem.Core.Tests/Services/Establishments/EstablishmentRefresherTests.cs @@ -1,15 +1,14 @@ using Microsoft.PowerPlatform.Dataverse.Client; using TeachingRecordSystem.Core.Dqt; -using TeachingRecordSystem.Core.Jobs; using TeachingRecordSystem.Core.Services.Establishments; using TeachingRecordSystem.Core.Services.TrsDataSync; using Establishment = TeachingRecordSystem.Core.Models.Establishment; -namespace TeachingRecordSystem.Core.Tests.Jobs; +namespace TeachingRecordSystem.Core.Tests.Services.Establishments; -public class RefreshEstablishmentsJobTests +public class EstablishmentRefresherTests { - public RefreshEstablishmentsJobTests( + public EstablishmentRefresherTests( DbFixture dbFixture, IOrganizationServiceAsync2 organizationService, ReferenceDataCache referenceDataCache, @@ -36,7 +35,7 @@ public RefreshEstablishmentsJobTests( } [Fact] - public Task ExecuteAsync_WhenCalledforNewUrn_AddsNewEstablishments() => + public Task RefreshEstablishments_WhenCalledforNewUrn_AddsNewEstablishments() => DbFixture.WithDbContext(async dbContext => { // Arrange @@ -87,12 +86,12 @@ public Task ExecuteAsync_WhenCalledforNewUrn_AddsNewEstablishments() => .Setup(s => s.GetEstablishments()) .Returns(establishments.ToAsyncEnumerable()); - var job = new RefreshEstablishmentsJob( + var establishmentRefresher = new EstablishmentRefresher( dbContext, establishmentMasterDataService); // Act - await job.ExecuteAsync(CancellationToken.None); + await establishmentRefresher.RefreshEstablishments(CancellationToken.None); // Assert var establishmentsActual = await dbContext.Establishments.Where(e => e.Urn == establishment1.Urn || e.Urn == establishment2.Urn).OrderBy(e => e.Urn).ToListAsync(); @@ -140,7 +139,7 @@ public Task ExecuteAsync_WhenCalledforNewUrn_AddsNewEstablishments() => }); [Fact] - public Task ExecuteAsync_WhenCalledForExistingUrn_UpdatesEstablishment() => + public Task RefreshEstablishments_WhenCalledForExistingUrn_UpdatesEstablishment() => DbFixture.WithDbContext(async dbContext => { // Arrange @@ -196,12 +195,12 @@ public Task ExecuteAsync_WhenCalledForExistingUrn_UpdatesEstablishment() => .Setup(s => s.GetEstablishments()) .Returns(establishments.ToAsyncEnumerable()); - var job = new RefreshEstablishmentsJob( + var establishmentRefresher = new EstablishmentRefresher( dbContext, establishmentMasterDataService); // Act - await job.ExecuteAsync(CancellationToken.None); + await establishmentRefresher.RefreshEstablishments(CancellationToken.None); // Assert var urnEstablishments = await dbContext.Establishments.Where(e => e.Urn == dbEstablishment.Urn).ToListAsync(); diff --git a/TeachingRecordSystem/tests/TeachingRecordSystem.Core.Tests/Services/WorkforceData/TpsCsvExtractProcessorTests.cs b/TeachingRecordSystem/tests/TeachingRecordSystem.Core.Tests/Services/WorkforceData/TpsCsvExtractProcessorTests.cs index 99c88fa135..6e9e7037b8 100644 --- a/TeachingRecordSystem/tests/TeachingRecordSystem.Core.Tests/Services/WorkforceData/TpsCsvExtractProcessorTests.cs +++ b/TeachingRecordSystem/tests/TeachingRecordSystem.Core.Tests/Services/WorkforceData/TpsCsvExtractProcessorTests.cs @@ -218,6 +218,27 @@ public async Task ProcessUpdatedEmploymentHistory_WhenCalledWithUpdatedEmploymen Assert.All(items, i => Assert.Equal(TpsCsvExtractItemResult.ValidNoChange, i.Result)); } + [Fact] + public async Task UpdateLatestEstablishmentVersions_WithEstablishmentChangingUrn_UpdatesPersonEmploymentRecord() + { + // Arrange + var person = await TestData.CreatePerson(); + var establishment1 = await TestData.CreateEstablishment(localAuthorityCode: "127", establishmentNumber: "1238", establishmentStatusCode: 2); // Closed + var establishment2 = await TestData.CreateEstablishment(localAuthorityCode: "127", establishmentNumber: "1238", establishmentStatusCode: 1); // Open + var existingPersonEmployment = await TestData.CreatePersonEmployment(person.PersonId, establishment1.EstablishmentId, new DateOnly(2023, 02, 02), EmploymentType.FullTime); + + // Act + var processor = new TpsCsvExtractProcessor( + TestData.DbContextFactory, + TestData.Clock); + await processor.UpdateLatestEstablishmentVersions(CancellationToken.None); + + // Assert + using var dbContext = TestData.DbContextFactory.CreateDbContext(); + var updatedPersonEmployment = await dbContext.PersonEmployments.SingleAsync(e => e.PersonEmploymentId == existingPersonEmployment.PersonEmploymentId); + Assert.Equal(establishment2.EstablishmentId, updatedPersonEmployment.EstablishmentId); + } + private DbFixture DbFixture { get; } private TestData TestData { get; }