Skip to content

Commit

Permalink
Refresh workforce data with "most open" version of establishment (#1248)
Browse files Browse the repository at this point in the history
  • Loading branch information
hortha authored Mar 19, 2024
1 parent 1c2e862 commit 7ca558f
Show file tree
Hide file tree
Showing 7 changed files with 270 additions and 79 deletions.
Original file line number Diff line number Diff line change
@@ -1,68 +1,14 @@
using TeachingRecordSystem.Core.DataStore.Postgres;
using TeachingRecordSystem.Core.Jobs.Scheduling;
using TeachingRecordSystem.Core.Services.Establishments;
using TeachingRecordSystem.Core.Services.WorkforceData;

namespace TeachingRecordSystem.Core.Jobs;

public class RefreshEstablishmentsJob(TrsDbContext dbContext, IEstablishmentMasterDataService establishmentMasterDataService)
public class RefreshEstablishmentsJob(IBackgroundJobScheduler backgroundJobScheduler)
{
public async Task ExecuteAsync(CancellationToken cancellationToken)
{
int i = 0;
await foreach (var establishment in establishmentMasterDataService.GetEstablishments())
{
var existingEstablishment = await dbContext.Establishments.SingleOrDefaultAsync(e => e.Urn == establishment.Urn);
if (existingEstablishment == null)
{
dbContext.Establishments.Add(new()
{
EstablishmentId = Guid.NewGuid(),
Urn = establishment.Urn,
LaCode = establishment.LaCode,
LaName = establishment.LaName,
EstablishmentNumber = establishment.EstablishmentNumber,
EstablishmentName = establishment.EstablishmentName,
EstablishmentTypeCode = establishment.EstablishmentTypeCode,
EstablishmentTypeName = establishment.EstablishmentTypeName,
EstablishmentTypeGroupCode = establishment.EstablishmentTypeGroupCode,
EstablishmentTypeGroupName = establishment.EstablishmentTypeGroupName,
EstablishmentStatusCode = establishment.EstablishmentStatusCode,
EstablishmentStatusName = establishment.EstablishmentStatusName,
Street = establishment.Street,
Locality = establishment.Locality,
Address3 = establishment.Address3,
Town = establishment.Town,
County = establishment.County,
Postcode = establishment.Postcode
});
}
else
{
existingEstablishment.LaCode = establishment.LaCode;
existingEstablishment.LaName = establishment.LaName;
existingEstablishment.EstablishmentName = establishment.EstablishmentName;
existingEstablishment.EstablishmentTypeCode = establishment.EstablishmentTypeCode;
existingEstablishment.EstablishmentTypeName = establishment.EstablishmentTypeName;
existingEstablishment.EstablishmentTypeGroupCode = establishment.EstablishmentTypeGroupCode;
existingEstablishment.EstablishmentTypeGroupName = establishment.EstablishmentTypeGroupName;
existingEstablishment.EstablishmentStatusCode = establishment.EstablishmentStatusCode;
existingEstablishment.EstablishmentStatusName = establishment.EstablishmentStatusName;
existingEstablishment.Street = establishment.Street;
existingEstablishment.Locality = establishment.Locality;
existingEstablishment.Address3 = establishment.Address3;
existingEstablishment.Town = establishment.Town;
existingEstablishment.County = establishment.County;
existingEstablishment.Postcode = establishment.Postcode;
}

if (++i % 2000 == 0)
{
await dbContext.SaveChangesAsync(cancellationToken);
}
}

if (dbContext.ChangeTracker.HasChanges())
{
await dbContext.SaveChangesAsync(cancellationToken);
}
var refreshJobId = await backgroundJobScheduler.Enqueue<EstablishmentRefresher>(j => j.RefreshEstablishments(cancellationToken));
await backgroundJobScheduler.ContinueJobWith<TpsCsvExtractProcessor>(refreshJobId, j => j.UpdateLatestEstablishmentVersions(cancellationToken));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
using TeachingRecordSystem.Core.DataStore.Postgres;

namespace TeachingRecordSystem.Core.Services.Establishments;

public class EstablishmentRefresher(
TrsDbContext dbContext,
IEstablishmentMasterDataService establishmentMasterDataService)
{
public async Task RefreshEstablishments(CancellationToken cancellationToken)
{
int i = 0;
await foreach (var establishment in establishmentMasterDataService.GetEstablishments())
{
var existingEstablishment = await dbContext.Establishments.SingleOrDefaultAsync(e => e.Urn == establishment.Urn);
if (existingEstablishment == null)
{
dbContext.Establishments.Add(new()
{
EstablishmentId = Guid.NewGuid(),
Urn = establishment.Urn,
LaCode = establishment.LaCode,
LaName = establishment.LaName,
EstablishmentNumber = establishment.EstablishmentNumber,
EstablishmentName = establishment.EstablishmentName,
EstablishmentTypeCode = establishment.EstablishmentTypeCode,
EstablishmentTypeName = establishment.EstablishmentTypeName,
EstablishmentTypeGroupCode = establishment.EstablishmentTypeGroupCode,
EstablishmentTypeGroupName = establishment.EstablishmentTypeGroupName,
EstablishmentStatusCode = establishment.EstablishmentStatusCode,
EstablishmentStatusName = establishment.EstablishmentStatusName,
Street = establishment.Street,
Locality = establishment.Locality,
Address3 = establishment.Address3,
Town = establishment.Town,
County = establishment.County,
Postcode = establishment.Postcode
});
}
else
{
existingEstablishment.LaCode = establishment.LaCode;
existingEstablishment.LaName = establishment.LaName;
existingEstablishment.EstablishmentName = establishment.EstablishmentName;
existingEstablishment.EstablishmentTypeCode = establishment.EstablishmentTypeCode;
existingEstablishment.EstablishmentTypeName = establishment.EstablishmentTypeName;
existingEstablishment.EstablishmentTypeGroupCode = establishment.EstablishmentTypeGroupCode;
existingEstablishment.EstablishmentTypeGroupName = establishment.EstablishmentTypeGroupName;
existingEstablishment.EstablishmentStatusCode = establishment.EstablishmentStatusCode;
existingEstablishment.EstablishmentStatusName = establishment.EstablishmentStatusName;
existingEstablishment.Street = establishment.Street;
existingEstablishment.Locality = establishment.Locality;
existingEstablishment.Address3 = establishment.Address3;
existingEstablishment.Town = establishment.Town;
existingEstablishment.County = establishment.County;
existingEstablishment.Postcode = establishment.Postcode;
}

if (++i % 2000 == 0)
{
await dbContext.SaveChangesAsync(cancellationToken);
}
}

if (dbContext.ChangeTracker.HasChanges())
{
await dbContext.SaveChangesAsync(cancellationToken);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ public static IHostApplicationBuilder AddGias(this IHostApplicationBuilder build
.ValidateOnStart();

builder.Services
.AddSingleton<EstablishmentRefresher>()
.AddSingleton<IEstablishmentMasterDataService, CsvDownloadEstablishmentMasterDataService>()
.AddHttpClient<IEstablishmentMasterDataService, CsvDownloadEstablishmentMasterDataService>((sp, httpClient) =>
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ WITH unique_establishments AS (
establishment_name,
establishment_type_code,
postcode,
ROW_NUMBER() OVER (PARTITION BY la_code, establishment_number ORDER BY translate(establishment_status_code::text, '1234', '1324')) as row_number
ROW_NUMBER() OVER (PARTITION BY la_code, establishment_number, CASE WHEN establishment_number IS NULL THEN postcode ELSE NULL END ORDER BY translate(establishment_status_code::text, '1234', '1324'), urn desc) as row_number
FROM
establishments) e
WHERE
Expand Down Expand Up @@ -114,7 +114,7 @@ WITH unique_establishments AS (
establishment_name,
establishment_type_code,
postcode,
ROW_NUMBER() OVER (PARTITION BY la_code, establishment_number ORDER BY translate(establishment_status_code::text, '1234', '1324')) as row_number
ROW_NUMBER() OVER (PARTITION BY la_code, establishment_number, CASE WHEN establishment_number IS NULL THEN postcode ELSE NULL END ORDER BY translate(establishment_status_code::text, '1234', '1324'), urn desc) as row_number
FROM
establishments) e
WHERE
Expand Down Expand Up @@ -240,7 +240,7 @@ WITH unique_establishments AS (
establishment_name,
establishment_type_code,
postcode,
ROW_NUMBER() OVER (PARTITION BY la_code, establishment_number ORDER BY translate(establishment_status_code::text, '1234', '1324')) as row_number
ROW_NUMBER() OVER (PARTITION BY la_code, establishment_number, CASE WHEN establishment_number IS NULL THEN postcode ELSE NULL END ORDER BY translate(establishment_status_code::text, '1234', '1324'), urn desc) as row_number
FROM
establishments) e
WHERE
Expand Down Expand Up @@ -407,4 +407,149 @@ async Task SaveChanges()
}
}
}

public async Task UpdateLatestEstablishmentVersions(CancellationToken cancellationToken)
{
using var readDbContext = dbContextFactory.CreateDbContext();
readDbContext.Database.SetCommandTimeout(600);
using var writeDbContext = dbContextFactory.CreateDbContext();
var connection = (NpgsqlConnection)writeDbContext.Database.GetDbConnection();
await connection.OpenAsync(CancellationToken.None);

FormattableString querySql =
$"""
WITH unique_establishments AS (
SELECT
establishment_id,
la_code,
establishment_number,
establishment_name,
establishment_type_code,
postcode
FROM
(SELECT
establishment_id,
la_code,
establishment_number,
establishment_name,
establishment_type_code,
postcode,
ROW_NUMBER() OVER (PARTITION BY la_code, establishment_number, CASE WHEN establishment_number IS NULL THEN postcode ELSE NULL END ORDER BY translate(establishment_status_code::text, '1234', '1324'), urn desc) as row_number
FROM
establishments) e
WHERE
e.row_number = 1
),
establishment_changes AS (
SELECT
*
FROM
person_employments pe
WHERE
NOT EXISTS (SELECT
1
FROM
unique_establishments e
WHERE
e.establishment_id = pe.establishment_id)
)
select
ec.person_employment_id,
ec.person_id,
ec.establishment_id as current_establishment_id,
ec.start_date,
ec.end_date,
ec.employment_type,
ue.establishment_id
from
establishment_changes ec
JOIN
establishments e ON e.establishment_id = ec.establishment_id
JOIN
unique_establishments ue ON ue.la_code = e.la_code
AND (ue.establishment_number = e.establishment_number
OR (e.establishment_type_code = '29'
AND ue.postcode = e.postcode
AND NOT EXISTS (SELECT
1
FROM
unique_establishments e2
WHERE
e2.la_code = e.la_code
AND e2.establishment_number = e.establishment_number)))
""";

var batchCommands = new List<NpgsqlBatchCommand>();

await foreach (var item in readDbContext.Database.SqlQuery<UpdatedPersonEmploymentEstablishment>(querySql).AsAsyncEnumerable())
{
var updatePersonEmploymentsCommand = new NpgsqlBatchCommand(
$"""
UPDATE
person_employments
SET
establishment_id = '{item.EstablishmentId}'
WHERE
person_employment_id = '{item.PersonEmploymentId}'
""");

batchCommands.Add(updatePersonEmploymentsCommand);
writeDbContext.AddEvent(new PersonEmploymentUpdatedEvent
{
EventId = Guid.NewGuid(),
PersonId = item.PersonEmploymentId,
PersonEmployment = new()
{
PersonEmploymentId = item.PersonEmploymentId,
PersonId = item.PersonId,
EstablishmentId = item.EstablishmentId,
StartDate = item.StartDate,
EndDate = item.EndDate,
EmploymentType = item.EmploymentType
},
OldPersonEmployment = new()
{
PersonEmploymentId = item.PersonEmploymentId,
PersonId = item.PersonId,
EstablishmentId = item.CurrentEstablishmentId,
StartDate = item.StartDate,
EndDate = item.EndDate,
EmploymentType = item.EmploymentType
},
Changes = PersonEmploymentUpdatedEventChanges.EstablishmentId,
CreatedUtc = clock.UtcNow,
RaisedBy = DataStore.Postgres.Models.SystemUser.SystemUserId
});

if (batchCommands.Count == 50)
{
await SaveChanges();
}
}

if (batchCommands.Any())
{
await SaveChanges();
}

async Task SaveChanges()
{
if (writeDbContext.ChangeTracker.HasChanges())
{
await writeDbContext.SaveChangesAsync(cancellationToken);
}

if (batchCommands.Count > 0)
{
using var batch = new NpgsqlBatch(connection);
foreach (var command in batchCommands)
{
batch.BatchCommands.Add(command);
}

await batch.ExecuteNonQueryAsync(cancellationToken);
batchCommands.Clear();
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
namespace TeachingRecordSystem.Core.Services.WorkforceData;

public record UpdatedPersonEmploymentEstablishment
{
public required Guid PersonEmploymentId { get; init; }
public required Guid PersonId { get; init; }
public required Guid CurrentEstablishmentId { get; init; }
public required DateOnly StartDate { get; init; }
public required DateOnly? EndDate { get; init; }
public required EmploymentType EmploymentType { get; init; }
public required Guid EstablishmentId { get; init; }
}
Loading

0 comments on commit 7ca558f

Please sign in to comment.