Skip to content

Commit

Permalink
Tweaks to allow for service line id not being in the TPS extract file
Browse files Browse the repository at this point in the history
  • Loading branch information
hortha committed Mar 12, 2024
1 parent 7cb1b25 commit addc022
Show file tree
Hide file tree
Showing 9 changed files with 67 additions and 120 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ public void Configure(EntityTypeBuilder<TpsCsvExtractItem> builder)
builder.Property(x => x.EstablishmentNumber).HasMaxLength(4).IsFixedLength();
builder.Property(x => x.EstablishmentPostcode).HasMaxLength(10);
builder.Property(x => x.EstablishmentEmailAddress).HasMaxLength(200);
builder.Property(x => x.MemberId).IsRequired();
builder.Property(x => x.EmploymentStartDate).IsRequired();
builder.Property(x => x.EmploymentType).IsRequired();
builder.Property(x => x.WithdrawlIndicator).HasMaxLength(1).IsFixedLength();
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ protected override void Up(MigrationBuilder migrationBuilder)
establishment_number = table.Column<string>(type: "character(4)", fixedLength: true, maxLength: 4, nullable: true),
establishment_postcode = table.Column<string>(type: "character varying(10)", maxLength: 10, nullable: true),
establishment_email_address = table.Column<string>(type: "character varying(200)", maxLength: 200, nullable: true),
member_id = table.Column<int>(type: "integer", nullable: false),
member_id = table.Column<int>(type: "integer", nullable: true),
employment_start_date = table.Column<DateOnly>(type: "date", nullable: false),
employment_end_date = table.Column<DateOnly>(type: "date", nullable: true),
employment_type = table.Column<int>(type: "integer", nullable: false),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1125,7 +1125,7 @@ protected override void BuildModel(ModelBuilder modelBuilder)
.HasColumnType("character varying(200)")
.HasColumnName("member_email_address");

b.Property<int>("MemberId")
b.Property<int?>("MemberId")
.HasColumnType("integer")
.HasColumnName("member_id");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public class TpsCsvExtractItem
public required string? EstablishmentNumber { get; set; }
public required string? EstablishmentPostcode { get; set; }
public required string? EstablishmentEmailAddress { get; set; }
public required int MemberId { get; set; }
public required int? MemberId { get; set; }
public required DateOnly EmploymentStartDate { get; set; }
public required DateOnly? EmploymentEndDate { get; set; }
public required EmploymentType EmploymentType { get; set; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ public async Task ExecuteAsync(CancellationToken cancellationToken)
// If we ever need to process more than one file then we can always manually trigger this job again or add a loop here
var tpsCsvExtractId = Guid.NewGuid();
var importJobId = await backgroundJobScheduler.Enqueue<TpsCsvExtractFileImporter>(j => j.ImportFile(tpsCsvExtractId, pendingImportFileNames[0], cancellationToken));
var archiveJobId = await backgroundJobScheduler.ContinueJobWith<ITpsExtractStorageService>(importJobId, j => j.ArchiveFile(pendingImportFileNames[0], cancellationToken));
var copyJobId = await backgroundJobScheduler.ContinueJobWith<TpsCsvExtractFileImporter>(archiveJobId, j => j.CopyValidFormatDataToStaging(tpsCsvExtractId, cancellationToken));
var processInvalidTrnsJobId = await backgroundJobScheduler.ContinueJobWith<TpsCsvExtractProcessor>(copyJobId, j => j.ProcessNonMatchingTrns(tpsCsvExtractId, cancellationToken));
var processInvalidEstablishmentsJobId = await backgroundJobScheduler.ContinueJobWith<TpsCsvExtractProcessor>(processInvalidTrnsJobId, j => j.ProcessNonMatchingEstablishments(tpsCsvExtractId, cancellationToken));
var processNewEmploymentHistoryJobId = await backgroundJobScheduler.ContinueJobWith<TpsCsvExtractProcessor>(processInvalidEstablishmentsJobId, j => j.ProcessNewEmploymentHistory(tpsCsvExtractId, cancellationToken));
var processUpdatedEmploymentHistoryJobId = await backgroundJobScheduler.ContinueJobWith<TpsCsvExtractProcessor>(processNewEmploymentHistoryJobId, j => j.ProcessUpdatedEmploymentHistory(tpsCsvExtractId, cancellationToken));
//var archiveJobId = await backgroundJobScheduler.ContinueJobWith<ITpsExtractStorageService>(importJobId, j => j.ArchiveFile(pendingImportFileNames[0], cancellationToken));
var copyJobId = await backgroundJobScheduler.ContinueJobWith<TpsCsvExtractFileImporter>(importJobId, j => j.CopyValidFormatDataToStaging(tpsCsvExtractId, cancellationToken));
//var processInvalidTrnsJobId = await backgroundJobScheduler.ContinueJobWith<TpsCsvExtractProcessor>(copyJobId, j => j.ProcessNonMatchingTrns(tpsCsvExtractId, cancellationToken));
//var processInvalidEstablishmentsJobId = await backgroundJobScheduler.ContinueJobWith<TpsCsvExtractProcessor>(processInvalidTrnsJobId, j => j.ProcessNonMatchingEstablishments(tpsCsvExtractId, cancellationToken));
//var processNewEmploymentHistoryJobId = await backgroundJobScheduler.ContinueJobWith<TpsCsvExtractProcessor>(processInvalidEstablishmentsJobId, j => j.ProcessNewEmploymentHistory(tpsCsvExtractId, cancellationToken));
//var processUpdatedEmploymentHistoryJobId = await backgroundJobScheduler.ContinueJobWith<TpsCsvExtractProcessor>(processNewEmploymentHistoryJobId, j => j.ProcessUpdatedEmploymentHistory(tpsCsvExtractId, cancellationToken));
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@

using System.Diagnostics;
using System.Globalization;
using System.Text.RegularExpressions;
using CsvHelper;
Expand All @@ -17,6 +18,7 @@ public class TpsCsvExtractFileImporter(
{
public async Task ImportFile(Guid tpsCsvExtractId, string fileName, CancellationToken cancellationToken)
{
var stopwatch = Stopwatch.StartNew();
var fileNameParts = fileName.Split("/");
var fileNameWithoutFolder = fileNameParts.Last();

Expand Down Expand Up @@ -57,7 +59,6 @@ INSERT INTO tps_csv_extracts (
establishment_number,
establishment_postcode,
establishment_email_address,
member_id,
employment_start_date,
employment_end_date,
full_or_part_time_indicator,
Expand Down Expand Up @@ -113,11 +114,6 @@ INSERT INTO tps_csv_extracts (
loadErrors = loadErrors | TpsCsvExtractItemLoadErrors.EstablishmentNumberIncorrectFormat;
}

if (row.MemberId is null || !int.TryParse(row.MemberId, out _))
{
loadErrors = loadErrors | TpsCsvExtractItemLoadErrors.MemberIdIncorrectFormat;
}

if (row.EmploymentStartDate is null || !DateOnly.TryParseExact(row.EmploymentStartDate, "dd/MM/yyyy", out _))
{
loadErrors = loadErrors | TpsCsvExtractItemLoadErrors.EmploymentStartDateIncorrectFormat;
Expand Down Expand Up @@ -148,38 +144,41 @@ INSERT INTO tps_csv_extracts (
loadErrors = loadErrors | TpsCsvExtractItemLoadErrors.GenderIncorrectFormat;
}

await writer.StartRowAsync();
await writer.WriteAsync(Guid.NewGuid(), NpgsqlDbType.Uuid);
await writer.WriteAsync(tpsCsvExtractId, NpgsqlDbType.Uuid);
await writer.WriteAsync(row.Trn, NpgsqlDbType.Varchar);
await writer.WriteAsync(row.NationalInsuranceNumber, NpgsqlDbType.Varchar);
await writer.WriteAsync(row.DateOfBirth, NpgsqlDbType.Varchar);
await writer.WriteAsync(row.DateOfDeath, NpgsqlDbType.Varchar);
await writer.WriteAsync(row.MemberPostcode, NpgsqlDbType.Varchar);
await writer.WriteAsync(row.MemberEmailAddress, NpgsqlDbType.Varchar);
await writer.WriteAsync(row.LocalAuthorityCode, NpgsqlDbType.Varchar);
await writer.WriteAsync(row.EstablishmentCode, NpgsqlDbType.Varchar);
await writer.WriteAsync(row.EstablishmentPostcode, NpgsqlDbType.Varchar);
await writer.WriteAsync(row.EstablishmentEmailAddress, NpgsqlDbType.Varchar);
await writer.WriteAsync(row.MemberId, NpgsqlDbType.Varchar);
await writer.WriteAsync(row.EmploymentStartDate, NpgsqlDbType.Varchar);
await writer.WriteAsync(row.EmploymentEndDate, NpgsqlDbType.Varchar);
await writer.WriteAsync(row.FullOrPartTimeIndicator, NpgsqlDbType.Varchar);
await writer.WriteAsync(row.WithdrawlIndicator, NpgsqlDbType.Varchar);
await writer.WriteAsync(row.ExtractDate, NpgsqlDbType.Varchar);
await writer.WriteAsync(row.Gender, NpgsqlDbType.Varchar);
await writer.WriteAsync(clock.UtcNow, NpgsqlDbType.TimestampTz);
await writer.WriteAsync((int)loadErrors, NpgsqlDbType.Integer);
writer.StartRow();
writer.Write(Guid.NewGuid(), NpgsqlDbType.Uuid);
writer.Write(tpsCsvExtractId, NpgsqlDbType.Uuid);
writer.Write(row.Trn, NpgsqlDbType.Varchar);
writer.Write(row.NationalInsuranceNumber, NpgsqlDbType.Varchar);
writer.Write(row.DateOfBirth, NpgsqlDbType.Varchar);
writer.Write(row.DateOfDeath, NpgsqlDbType.Varchar);
writer.Write(row.MemberPostcode, NpgsqlDbType.Varchar);
writer.Write(row.MemberEmailAddress, NpgsqlDbType.Varchar);
writer.Write(row.LocalAuthorityCode, NpgsqlDbType.Varchar);
writer.Write(row.EstablishmentCode, NpgsqlDbType.Varchar);
writer.Write(row.EstablishmentPostcode, NpgsqlDbType.Varchar);
writer.Write(row.EstablishmentEmailAddress, NpgsqlDbType.Varchar);
writer.Write(row.EmploymentStartDate, NpgsqlDbType.Varchar);
writer.Write(row.EmploymentEndDate, NpgsqlDbType.Varchar);
writer.Write(row.FullOrPartTimeIndicator, NpgsqlDbType.Varchar);
writer.Write(row.WithdrawlIndicator, NpgsqlDbType.Varchar);
writer.Write(row.ExtractDate, NpgsqlDbType.Varchar);
writer.Write(row.Gender, NpgsqlDbType.Varchar);
writer.Write(clock.UtcNow, NpgsqlDbType.TimestampTz);
writer.Write((int)loadErrors, NpgsqlDbType.Integer);
}

await writer.CompleteAsync(cancellationToken);
await writer.CloseAsync(cancellationToken);

await transaction.CommitAsync(cancellationToken);

stopwatch.Stop();
Console.WriteLine($"ImportFile took {stopwatch.ElapsedMilliseconds}ms");
}

public async Task CopyValidFormatDataToStaging(Guid tpsCsvExtractId, CancellationToken cancellationToken)
{
var stopwatch = Stopwatch.StartNew();
using var readDbContext = dbContextFactory.CreateDbContext();
using var writeDbContext = dbContextFactory.CreateDbContext();
var connection = (NpgsqlConnection)writeDbContext.Database.GetDbConnection();
Expand All @@ -202,7 +201,6 @@ public async Task CopyValidFormatDataToStaging(Guid tpsCsvExtractId, Cancellatio
establishment_number,
establishment_postcode,
establishment_email_address,
member_id,
employment_start_date,
employment_end_date,
employment_type,
Expand All @@ -219,31 +217,33 @@ public async Task CopyValidFormatDataToStaging(Guid tpsCsvExtractId, Cancellatio

await foreach (var item in readDbContext.TpsCsvExtractLoadItems.Where(x => x.TpsCsvExtractId == tpsCsvExtractId && x.Errors == TpsCsvExtractItemLoadErrors.None).AsNoTracking().AsAsyncEnumerable())
{
await writer.StartRowAsync();
await writer.WriteAsync(Guid.NewGuid(), NpgsqlDbType.Uuid);
await writer.WriteAsync(tpsCsvExtractId, NpgsqlDbType.Uuid);
await writer.WriteAsync(item.TpsCsvExtractLoadItemId, NpgsqlDbType.Uuid);
await writer.WriteAsync(item.Trn, NpgsqlDbType.Char);
await writer.WriteAsync(item.NationalInsuranceNumber, NpgsqlDbType.Char);
await writer.WriteAsync(DateOnly.ParseExact(item.DateOfBirth!, "dd/MM/yyyy"), NpgsqlDbType.Date);
await writer.WriteAsync(!string.IsNullOrEmpty(item.DateOfDeath) ? DateOnly.ParseExact(item.DateOfDeath, "dd/MM/yyyy") : (DateOnly?)null, NpgsqlDbType.Date);
await writer.WriteAsync(item.MemberPostcode, NpgsqlDbType.Varchar);
await writer.WriteAsync(item.MemberEmailAddress, NpgsqlDbType.Varchar);
await writer.WriteAsync(item.LocalAuthorityCode, NpgsqlDbType.Char);
await writer.WriteAsync(item.EstablishmentNumber, NpgsqlDbType.Char);
await writer.WriteAsync(item.EstablishmentPostcode, NpgsqlDbType.Varchar);
await writer.WriteAsync(item.EstablishmentEmailAddress, NpgsqlDbType.Varchar);
await writer.WriteAsync(int.Parse(item.MemberId!), NpgsqlDbType.Integer);
await writer.WriteAsync(DateOnly.ParseExact(item.EmploymentStartDate!, "dd/MM/yyyy"), NpgsqlDbType.Date);
await writer.WriteAsync(!string.IsNullOrEmpty(item.EmploymentEndDate) ? DateOnly.ParseExact(item.EmploymentEndDate!, "dd/MM/yyyy") : (DateOnly?)null, NpgsqlDbType.Date);
await writer.WriteAsync((int)EmploymentTypeHelper.FromFullOrPartTimeIndicator(item.FullOrPartTimeIndicator!), NpgsqlDbType.Integer);
await writer.WriteAsync(item.WithdrawlIndicator, NpgsqlDbType.Char);
await writer.WriteAsync(DateOnly.ParseExact(item.ExtractDate!, "dd/MM/yyyy"), NpgsqlDbType.Date);
await writer.WriteAsync(item.Gender, NpgsqlDbType.Varchar);
await writer.WriteAsync(clock.UtcNow, NpgsqlDbType.TimestampTz);
writer.StartRow();
writer.Write(Guid.NewGuid(), NpgsqlDbType.Uuid);
writer.Write(tpsCsvExtractId, NpgsqlDbType.Uuid);
writer.Write(item.TpsCsvExtractLoadItemId, NpgsqlDbType.Uuid);
writer.Write(item.Trn, NpgsqlDbType.Char);
writer.Write(item.NationalInsuranceNumber, NpgsqlDbType.Char);
writer.Write(DateOnly.ParseExact(item.DateOfBirth!, "dd/MM/yyyy"), NpgsqlDbType.Date);
writer.Write(!string.IsNullOrEmpty(item.DateOfDeath) ? DateOnly.ParseExact(item.DateOfDeath, "dd/MM/yyyy") : (DateOnly?)null, NpgsqlDbType.Date);
writer.Write(item.MemberPostcode, NpgsqlDbType.Varchar);
writer.Write(item.MemberEmailAddress, NpgsqlDbType.Varchar);
writer.Write(item.LocalAuthorityCode, NpgsqlDbType.Char);
writer.Write(item.EstablishmentNumber, NpgsqlDbType.Char);
writer.Write(item.EstablishmentPostcode, NpgsqlDbType.Varchar);
writer.Write(item.EstablishmentEmailAddress, NpgsqlDbType.Varchar);
writer.Write(DateOnly.ParseExact(item.EmploymentStartDate!, "dd/MM/yyyy"), NpgsqlDbType.Date);
writer.Write(!string.IsNullOrEmpty(item.EmploymentEndDate) ? DateOnly.ParseExact(item.EmploymentEndDate!, "dd/MM/yyyy") : (DateOnly?)null, NpgsqlDbType.Date);
writer.Write((int)EmploymentTypeHelper.FromFullOrPartTimeIndicator(item.FullOrPartTimeIndicator!), NpgsqlDbType.Integer);
writer.Write(item.WithdrawlIndicator, NpgsqlDbType.Char);
writer.Write(DateOnly.ParseExact(item.ExtractDate!, "dd/MM/yyyy"), NpgsqlDbType.Date);
writer.Write(item.Gender, NpgsqlDbType.Varchar);
writer.Write(clock.UtcNow, NpgsqlDbType.TimestampTz);
}

await writer.CompleteAsync(cancellationToken);
await writer.CloseAsync(cancellationToken);

stopwatch.Stop();
Console.WriteLine($"CopyValidFormatDataToStaging took {stopwatch.ElapsedMilliseconds}ms");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public class TpsCsvExtractRowRaw
[Name("Date of Death")]
[NullValues("")]
public required string? DateOfDeath { get; init; }
[Name("Postcode (Member)")]
[Name("Postcode")]
[NullValues("")]
public required string? MemberPostcode { get; init; }
[Name("Email Address (Member)")]
Expand All @@ -34,9 +34,6 @@ public class TpsCsvExtractRowRaw
[Name("Email Address (Establishment)")]
[NullValues("")]
public required string? EstablishmentEmailAddress { get; init; }
[Name("Service Line ID")]
[NullValues("")]
public required string? MemberId { get; init; }
[Name("Start Date")]
[NullValues("")]
public required string? EmploymentStartDate { get; init; }
Expand Down
Loading

0 comments on commit addc022

Please sign in to comment.