From 730b30b04b6bba6bfb97d98ae98d7f0343a21f39 Mon Sep 17 00:00:00 2001 From: Stephane Royer Date: Mon, 25 Nov 2024 19:02:49 +0100 Subject: [PATCH] bugs on efcoresave --- .../EfSave/EfSaveEngine.cs | 165 +++++++-------- src/SharedSettings.props | 2 +- .../DataAccess/TestDbContext.cs | 2 +- .../InputFiles/Input01.Portfolios.csv | 6 +- src/Tutorials/Paillave.Etl.Samples/Program.cs | 64 +++--- .../Paillave.Etl.Samples/Program2.cs | 17 +- .../Paillave.Etl.Samples/TestImport2.cs | 193 +++++++++--------- .../connectorsLocalConfig.json | 21 ++ 8 files changed, 250 insertions(+), 220 deletions(-) create mode 100644 src/Tutorials/Paillave.Etl.Samples/connectorsLocalConfig.json diff --git a/src/Paillave.EntityFrameworkCoreExtension/EfSave/EfSaveEngine.cs b/src/Paillave.EntityFrameworkCoreExtension/EfSave/EfSaveEngine.cs index 1494879a..c5751dde 100644 --- a/src/Paillave.EntityFrameworkCoreExtension/EfSave/EfSaveEngine.cs +++ b/src/Paillave.EntityFrameworkCoreExtension/EfSave/EfSaveEngine.cs @@ -1,114 +1,115 @@ using Microsoft.EntityFrameworkCore; -using Microsoft.EntityFrameworkCore.Metadata; using Paillave.EntityFrameworkCoreExtension.Core; using System; using System.Collections.Generic; using System.Linq; using System.Linq.Expressions; using System.Reflection; -using System.Text; using System.Threading; using System.Threading.Tasks; -namespace Paillave.EntityFrameworkCoreExtension.EfSave +namespace Paillave.EntityFrameworkCoreExtension.EfSave; +public class EfSaveEngine where T : class { - public class EfSaveEngine where T : class + private readonly Expression> _findConditionExpression; + private readonly List _keyPropertyInfos; + private readonly DbContext _context; + private readonly CancellationToken _cancellationToken; + public EfSaveEngine(DbContext context, CancellationToken cancellationToken, params Expression>[] pivotKeys) { - private Expression> _findConditionExpression; - private List _keyPropertyInfos; - private DbContext _context; - private readonly CancellationToken _cancellationToken; - public EfSaveEngine(DbContext context, CancellationToken cancellationToken, params Expression>[] pivotKeys) + this._cancellationToken = cancellationToken; + _context = context; + var entityType = context.Model.FindEntityType(typeof(T)) ?? throw new InvalidOperationException("DbContext does not contain EntitySet for Type: " + typeof(T).Name); + _keyPropertyInfos = entityType.GetProperties() + .Where(i => !i.IsShadowProperty() && i.IsPrimaryKey()) + .Where(i => i.PropertyInfo != null) + .Select(i => i.PropertyInfo!) + .ToList(); + List> propertyInfosForPivot; + if ((pivotKeys?.Length ?? 0) == 0) + propertyInfosForPivot = new List> { entityType.FindPrimaryKey().Properties + .Where(i => i.PropertyInfo != null) + .Select(i => i.PropertyInfo!) + .ToList() }; + else + propertyInfosForPivot = pivotKeys.Select(pivotKey => KeyDefinitionExtractor.GetKeys(pivotKey)) + .ToList(); + + _findConditionExpression = CreateFindConditionExpression(propertyInfosForPivot); + } + private Expression> CreateFindConditionExpression(List> propertyInfosForPivotSet) + { + ParameterExpression leftParam = Expression.Parameter(typeof(T), "i"); + ParameterExpression rightParam = Expression.Parameter(typeof(T), "rightParam"); + Expression predicateBody = null; + foreach (var propertyInfosForPivot in propertyInfosForPivotSet) { - this._cancellationToken = cancellationToken; - _context = context; - var entityType = context.Model.FindEntityType(typeof(T)); - if (entityType == null) - throw new InvalidOperationException("DbContext does not contain EntitySet for Type: " + typeof(T).Name); - _keyPropertyInfos = entityType.GetProperties().Where(i => !i.IsShadowProperty() && i.IsPrimaryKey()).Select(i => i.PropertyInfo).ToList(); - List> propertyInfosForPivot; - if ((pivotKeys?.Length ?? 0) == 0) - propertyInfosForPivot = new List> { entityType.FindPrimaryKey().Properties.Select(i => i.PropertyInfo).ToList() }; + var pivotPartExpression = CreatePivotPartExpression(propertyInfosForPivot, leftParam, rightParam); + if (predicateBody == null) + predicateBody = pivotPartExpression; else - propertyInfosForPivot = pivotKeys.Select(pivotKey => KeyDefinitionExtractor.GetKeys(pivotKey)).ToList(); - - _findConditionExpression = CreateFindConditionExpression(propertyInfosForPivot); + predicateBody = Expression.OrElse(predicateBody, pivotPartExpression); } - private Expression> CreateFindConditionExpression(List> propertyInfosForPivotSet) + return Expression.Lambda>(predicateBody, new[] { leftParam, rightParam }); + } + private Expression CreatePivotPartExpression(List propertyInfosForPivot, ParameterExpression leftParam, ParameterExpression rightParam) + { + Expression predicatePivotPart = null; + foreach (var propertyInfoForPivot in propertyInfosForPivot) { - ParameterExpression leftParam = Expression.Parameter(typeof(T), "i"); - ParameterExpression rightParam = Expression.Parameter(typeof(T), "rightParam"); - Expression predicateBody = null; - foreach (var propertyInfosForPivot in propertyInfosForPivotSet) - { - var pivotPartExpression = CreatePivotPartExpression(propertyInfosForPivot, leftParam, rightParam); - if (predicateBody == null) - predicateBody = pivotPartExpression; - else - predicateBody = Expression.OrElse(predicateBody, pivotPartExpression); - } - return Expression.Lambda>(predicateBody, new[] { leftParam, rightParam }); + var equalityExpression = CreateEqualityExpression(propertyInfoForPivot, leftParam, rightParam); + if (predicatePivotPart == null) + predicatePivotPart = equalityExpression; + else + predicatePivotPart = Expression.AndAlso(predicatePivotPart, equalityExpression); } - private Expression CreatePivotPartExpression(List propertyInfosForPivot, ParameterExpression leftParam, ParameterExpression rightParam) + return predicatePivotPart; + } + private Expression CreateEqualityExpression(PropertyInfo propertyInfo, ParameterExpression leftParam, ParameterExpression rightParam) + { + Expression leftValue = Expression.Property(leftParam, propertyInfo); + Expression rightValue = Expression.Property(rightParam, propertyInfo); + return Expression.Equal(leftValue, rightValue); + } + public async Task SaveAsync(IList entities, bool doNotUpdateIfExists = false, bool insertOnly = false) + { + var contextSet = _context.Set(); + foreach (var entity in entities) { - Expression predicatePivotPart = null; - foreach (var propertyInfoForPivot in propertyInfosForPivot) + if (_cancellationToken.IsCancellationRequested) { - var equalityExpression = CreateEqualityExpression(propertyInfoForPivot, leftParam, rightParam); - if (predicatePivotPart == null) - predicatePivotPart = equalityExpression; - else - predicatePivotPart = Expression.AndAlso(predicatePivotPart, equalityExpression); + return; } - return predicatePivotPart; + if (insertOnly) + contextSet.Add(entity); + else + InsertOrUpdateEntity(doNotUpdateIfExists, contextSet, entity); } - private Expression CreateEqualityExpression(PropertyInfo propertyInfo, ParameterExpression leftParam, ParameterExpression rightParam) + await _context.SaveChangesAsync(_cancellationToken); + } + + private void InsertOrUpdateEntity(bool doNotUpdateIfExists, DbSet contextSet, T entity) + { + var entityCondition = _findConditionExpression.ApplyPartialLeft(entity); + var existingEntity = contextSet.AsNoTracking().FirstOrDefault(entityCondition); + if (this._context is MultiTenantDbContext mtCtx) { - Expression leftValue = Expression.Property(leftParam, propertyInfo); - Expression rightValue = Expression.Property(rightParam, propertyInfo); - return Expression.Equal(leftValue, rightValue); + mtCtx.UpdateEntityForMultiTenancy(entity); } - public async Task SaveAsync(IList entities, bool doNotUpdateIfExists = false, bool insertOnly = false) + if (existingEntity == null) { - var contextSet = _context.Set(); - foreach (var entity in entities) - { - if (_cancellationToken.IsCancellationRequested) - { - return; - } - if (insertOnly) - contextSet.Add(entity); - else - InsertOrUpdateEntity(doNotUpdateIfExists, contextSet, entity); - } - await _context.SaveChangesAsync(_cancellationToken); + _context.Entry(entity).State = EntityState.Added; } - - private void InsertOrUpdateEntity(bool doNotUpdateIfExists, DbSet contextSet, T entity) + else { - var expr3 = _findConditionExpression.ApplyPartialLeft(entity); - T existingEntity = contextSet.AsNoTracking().FirstOrDefault(expr3); - if (existingEntity != null) - { - if (!doNotUpdateIfExists) - { - foreach (var keyPropertyInfo in _keyPropertyInfos) - { - object val = keyPropertyInfo.GetValue(existingEntity); - keyPropertyInfo.SetValue(entity, val); - } - // contextSet.Update(entity); - } - } - if (this._context is MultiTenantDbContext mtCtx) + foreach (var keyPropertyInfo in _keyPropertyInfos) { - mtCtx.UpdateEntityForMultiTenancy(entity); + var val = keyPropertyInfo.GetValue(existingEntity); + keyPropertyInfo.SetValue(entity, val); } - contextSet.Update(entity); - if (existingEntity == null) + if (!doNotUpdateIfExists) { - _context.Entry(entity).State = EntityState.Added; + contextSet.Update(entity); } } } diff --git a/src/SharedSettings.props b/src/SharedSettings.props index e119d351..2f7ff0ae 100644 --- a/src/SharedSettings.props +++ b/src/SharedSettings.props @@ -2,7 +2,7 @@ latest enable - 2.1.35-beta + 2.1.36-beta NugetIcon.png README.md Stéphane Royer diff --git a/src/Tutorials/Paillave.Etl.Samples/DataAccess/TestDbContext.cs b/src/Tutorials/Paillave.Etl.Samples/DataAccess/TestDbContext.cs index 4deff300..ecee310a 100644 --- a/src/Tutorials/Paillave.Etl.Samples/DataAccess/TestDbContext.cs +++ b/src/Tutorials/Paillave.Etl.Samples/DataAccess/TestDbContext.cs @@ -6,7 +6,7 @@ public class TestDbContext : DbContext { protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder) { - optionsBuilder.UseSqlServer(@"Server=localhost,1433;Database=TestEtl;user=TestEtl;password=TestEtl.TestEtl;MultipleActiveResultSets=True"); + optionsBuilder.UseSqlServer(@"Server=localhost,1433;Initial Catalog=TestEtl;Persist Security Info=False;User ID=SA;Password=;MultipleActiveResultSets=True;Encrypt=True;TrustServerCertificate=true;Connection Timeout=300;"); } protected override void OnModelCreating(ModelBuilder modelBuilder) { diff --git a/src/Tutorials/Paillave.Etl.Samples/InputFiles/Input01.Portfolios.csv b/src/Tutorials/Paillave.Etl.Samples/InputFiles/Input01.Portfolios.csv index dd7d72a7..3b1b0e5b 100644 --- a/src/Tutorials/Paillave.Etl.Samples/InputFiles/Input01.Portfolios.csv +++ b/src/Tutorials/Paillave.Etl.Samples/InputFiles/Input01.Portfolios.csv @@ -1,4 +1,4 @@ SicavCode,SicavName,SicavType,PortfolioCode,PortfolioName -S1,Sicav1,UCITS,P1,Portfolio1 -S2,Sicav2,AIFM,P2,Portfolio2 -S1,Sicav1,UCITS,P3,Portfolio3 \ No newline at end of file +S1,Sicav1AA,UCITS,P1,Portfolio1 +S2,Sicav2AA,AIFM,P2,Portfolio2 +S1,Sicav1AA,UCITS,P3,Portfolio3 \ No newline at end of file diff --git a/src/Tutorials/Paillave.Etl.Samples/Program.cs b/src/Tutorials/Paillave.Etl.Samples/Program.cs index 6bdd7967..fdbf6c97 100644 --- a/src/Tutorials/Paillave.Etl.Samples/Program.cs +++ b/src/Tutorials/Paillave.Etl.Samples/Program.cs @@ -1,33 +1,33 @@ -using Paillave.Etl.Core; -using Paillave.Etl.FileSystem; -using Paillave.Etl.GraphApi; -var graphApiConnectionParameters = new GraphApiAdapterConnectionParameters -{ -}; -var executionOptions = new ExecutionOptions -{ - Connectors = new FileValueConnectors() - .Register(new GraphApiMailFileValueProvider("GRAPHIN", "GraphIn", "GraphIn", graphApiConnectionParameters, - new GraphApiMailAdapterProviderParameters - { - AttachmentNamePattern = "*", - SetToReadIfBatchDeletion = true, - Folder="Boîte de réception" - })) - .Register(new GraphApiMailFileValueProcessor("GRAPHOUT", "GraphOut", "GraphOut", graphApiConnectionParameters, - new GraphApiAdapterProcessorParameters - { - Body = "Body", - Subject = "Subject", - })) - .Register(new FileSystemFileValueProvider("IN", "Input", Path.Combine(Environment.CurrentDirectory, "InputFiles"), "*")) - .Register(new FileSystemFileValueProcessor("OUT1", "Output", Path.Combine(Environment.CurrentDirectory, "Output"))) - .Register(new FileSystemFileValueProcessor("OUT2", "Output", Path.Combine(Environment.CurrentDirectory, "Output", "B"))), -}; +// using Paillave.Etl.Core; +// using Paillave.Etl.FileSystem; +// using Paillave.Etl.GraphApi; +// var graphApiConnectionParameters = new GraphApiAdapterConnectionParameters +// { +// }; +// var executionOptions = new ExecutionOptions +// { +// Connectors = new FileValueConnectors() +// .Register(new GraphApiMailFileValueProvider("GRAPHIN", "GraphIn", "GraphIn", graphApiConnectionParameters, +// new GraphApiMailAdapterProviderParameters +// { +// AttachmentNamePattern = "*", +// SetToReadIfBatchDeletion = true, +// Folder="Boîte de réception" +// })) +// .Register(new GraphApiMailFileValueProcessor("GRAPHOUT", "GraphOut", "GraphOut", graphApiConnectionParameters, +// new GraphApiAdapterProcessorParameters +// { +// Body = "Body", +// Subject = "Subject", +// })) +// .Register(new FileSystemFileValueProvider("IN", "Input", Path.Combine(Environment.CurrentDirectory, "InputFiles"), "*")) +// .Register(new FileSystemFileValueProcessor("OUT1", "Output", Path.Combine(Environment.CurrentDirectory, "Output"))) +// .Register(new FileSystemFileValueProcessor("OUT2", "Output", Path.Combine(Environment.CurrentDirectory, "Output", "B"))), +// }; -var result = await StreamProcessRunner.CreateAndExecuteAsync("", triggerStream => -{ - triggerStream.FromConnector("get input", "IN") - // .ToConnector("write output", "S3OUT") - .ToConnector("write output2", "GRAPHOUT"); -}, executionOptions); +// var result = await StreamProcessRunner.CreateAndExecuteAsync("", triggerStream => +// { +// triggerStream.FromConnector("get input", "IN") +// // .ToConnector("write output", "S3OUT") +// .ToConnector("write output2", "GRAPHOUT"); +// }, executionOptions); diff --git a/src/Tutorials/Paillave.Etl.Samples/Program2.cs b/src/Tutorials/Paillave.Etl.Samples/Program2.cs index a8236adb..5bc7ae3b 100644 --- a/src/Tutorials/Paillave.Etl.Samples/Program2.cs +++ b/src/Tutorials/Paillave.Etl.Samples/Program2.cs @@ -13,11 +13,12 @@ namespace Paillave.Etl.Samples { - class Program2 + class Program { - static async Task Main2(string[] args) + static async Task Main(string[] args) { - await ImportAndCreateFileWithConfigAsync(args); + await ImportAndCreateFileAsync(args); + // await ImportAndCreateFileWithConfigAsync(args); } private static ConfigurationFileValueConnectorParser CreateConfigurationFileValueConnectorParser() => new ConfigurationFileValueConnectorParser( new FileSystemProviderProcessorAdapter(), @@ -58,6 +59,8 @@ static async Task ImportAndCreateFileAsync(string[] args) structure.OpenEstimatedExecutionPlan(); ITraceReporter traceReporter = new AdvancedConsoleExecutionDisplay(); + var dataAccess= new DataAccess.TestDbContext(); + await dataAccess.Database.EnsureCreatedAsync(); var executionOptions = new ExecutionOptions { Connectors = new FileValueConnectors() @@ -65,7 +68,7 @@ static async Task ImportAndCreateFileAsync(string[] args) .Register(new FileSystemFileValueProvider("POS", "Positions", Path.Combine(Environment.CurrentDirectory, "InputFiles"), "*.Positions.csv")) .Register(new FileSystemFileValueProcessor("OUT", "Result", Path.Combine(Environment.CurrentDirectory, "InputFiles"))), Resolver = new SimpleDependencyResolver() - .Register(new DataAccess.TestDbContext()), + .Register(dataAccess), TraceProcessDefinition = traceReporter.TraceProcessDefinition, }; traceReporter.Initialize(structure); @@ -82,12 +85,14 @@ static async Task ImportAndCreateFileAsync(string[] args) static async Task ImportAndCreateFileWithConfigAsync(string[] args) { var processRunner = StreamProcessRunner.Create(TestImport2.Import); + var dataAccess= new DataAccess.TestDbContext(); + await dataAccess.Database.EnsureCreatedAsync(); var executionOptions = new ExecutionOptions { Connectors = CreateConfigurationFileValueConnectorParser() - .GetConnectors(File.ReadAllText(Path.Combine(Environment.CurrentDirectory, "connectorsConfig.json"))), + .GetConnectors(File.ReadAllText(Path.Combine(Environment.CurrentDirectory, "connectorsLocalConfig.json"))), Resolver = new SimpleDependencyResolver() - .Register(new DataAccess.TestDbContext()) + .Register(dataAccess) }; var res = await processRunner.ExecuteAsync(args, executionOptions); } diff --git a/src/Tutorials/Paillave.Etl.Samples/TestImport2.cs b/src/Tutorials/Paillave.Etl.Samples/TestImport2.cs index 606610c7..069f085a 100644 --- a/src/Tutorials/Paillave.Etl.Samples/TestImport2.cs +++ b/src/Tutorials/Paillave.Etl.Samples/TestImport2.cs @@ -1,5 +1,6 @@ using Paillave.Etl.Core; using Paillave.Etl.EntityFrameworkCore; +using Paillave.Etl.Samples.DataAccess; using Paillave.Etl.TextFile; namespace Paillave.Etl.Samples @@ -8,6 +9,7 @@ public static class TestImport2 { public static void Import(ISingleStream contextStream) { + contextStream.EfCoreSelect("erre", (o, i) => o.Set().Select(i => new Portfolio { InternalCode = i.InternalCode, Name = i.Name, SicavId = i.SicavId })); var portfolioFileStream = contextStream .FromConnector("Get portfolio files", "PTF") .CrossApplyTextFile("Parse portfolio file", FlatFileDefinition.Create(i => new @@ -20,20 +22,20 @@ public static void Import(ISingleStream contextStream) }).IsColumnSeparated(',')) .SetForCorrelation("Correlate portfolio row"); - var positionFileStream = contextStream - .FromConnector("Get position files", "POS") - .CrossApplyTextFile("Parse position file", FlatFileDefinition.Create(i => new - { - PortfolioCode = i.ToColumn("PortfolioCode"), - SecurityCode = i.ToColumn("SecurityCode"), - Isin = i.ToColumn("Isin"), - SecurityName = i.ToColumn("SecurityName"), - SecurityClass = i.ToColumn("SecurityClass"), - Issuer = i.ToColumn("Issuer"), - Date = i.ToDateColumn("Date", "yyyyMMdd"), - Value = i.ToNumberColumn("Value", "."), - }).IsColumnSeparated(',')) - .SetForCorrelation("Correlate position row"); + // var positionFileStream = contextStream + // .FromConnector("Get position files", "POS") + // .CrossApplyTextFile("Parse position file", FlatFileDefinition.Create(i => new + // { + // PortfolioCode = i.ToColumn("PortfolioCode"), + // SecurityCode = i.ToColumn("SecurityCode"), + // Isin = i.ToColumn("Isin"), + // SecurityName = i.ToColumn("SecurityName"), + // SecurityClass = i.ToColumn("SecurityClass"), + // Issuer = i.ToColumn("Issuer"), + // Date = i.ToDateColumn("Date", "yyyyMMdd"), + // Value = i.ToNumberColumn("Value", "."), + // }).IsColumnSeparated(',')) + // .SetForCorrelation("Correlate position row"); var sicavStream = portfolioFileStream .Distinct("Distinct sicav", i => i.SicavCode, true) @@ -45,91 +47,92 @@ public static void Import(ISingleStream contextStream) }) .EfCoreSave("Save sicav", o => o .SeekOn(i => i.InternalCode) - .DoNotUpdateIfExists()); + // .DoNotUpdateIfExists() + .WithMode(SaveMode.EntityFrameworkCore)); - var portfolioStream = portfolioFileStream - .Distinct("Distinct portfolio", i => i.PortfolioCode, true) - .CorrelateToSingle("Get related sicav and create portfolio", sicavStream, (row, sicav) => new DataAccess.Portfolio - { - InternalCode = row.PortfolioCode, - Name = row.PortfolioName, - SicavId = sicav.Id - }) - .EfCoreSave("Save portfolio", o => o - .SeekOn(i => i.InternalCode) - .DoNotUpdateIfExists()); + // var portfolioStream = portfolioFileStream + // .Distinct("Distinct portfolio", i => i.PortfolioCode, true) + // .CorrelateToSingle("Get related sicav and create portfolio", sicavStream, (row, sicav) => new DataAccess.Portfolio + // { + // InternalCode = row.PortfolioCode, + // Name = row.PortfolioName, + // SicavId = sicav.Id + // }) + // .EfCoreSave("Save portfolio", o => o + // .SeekOn(i => i.InternalCode) + // .DoNotUpdateIfExists()); - var compositionStream = positionFileStream - .Distinct("Distinct compositions", i => new { i.PortfolioCode, i.Date }, true) - .Lookup("Get related portfolio", - portfolioStream, - i => i.PortfolioCode, - i => i.InternalCode, - (row, portfolio) => new - { - Portfolio = portfolio, - Composition = new DataAccess.Composition - { - Date = row.Date, - PortfolioId = portfolio.Id - } - }) - .EfCoreSave("Save composition", o => o - .Entity(i => i.Composition) - .SeekOn(i => new { i.Date, i.PortfolioId }) - .DoNotUpdateIfExists() - .Output((i, e) => new - { - i.Portfolio, - Composition = e - })); + // var compositionStream = positionFileStream + // .Distinct("Distinct compositions", i => new { i.PortfolioCode, i.Date }, true) + // .Lookup("Get related portfolio", + // portfolioStream, + // i => i.PortfolioCode, + // i => i.InternalCode, + // (row, portfolio) => new + // { + // Portfolio = portfolio, + // Composition = new DataAccess.Composition + // { + // Date = row.Date, + // PortfolioId = portfolio.Id + // } + // }) + // .EfCoreSave("Save composition", o => o + // .Entity(i => i.Composition) + // .SeekOn(i => new { i.Date, i.PortfolioId }) + // .DoNotUpdateIfExists() + // .Output((i, e) => new + // { + // i.Portfolio, + // Composition = e + // })); - var securityStream = positionFileStream - .Distinct("Distinct securities", i => i.SecurityCode) - .Select("Create security", i => - { - if (string.IsNullOrWhiteSpace(i.SecurityClass)) - { - return new DataAccess.Equity - { - InternalCode = i.SecurityCode, - Name = i.SecurityName, - Isin = i.Isin, - Issuer = i.Issuer - } as DataAccess.Security; - } - return new DataAccess.ShareClass - { - InternalCode = i.SecurityCode, - Name = i.SecurityName, - Isin = i.Isin, - Class = i.SecurityClass - } as DataAccess.Security; - }) - .EfCoreSave("Save security", o => o - .SeekOn(i => i.Isin) - .AlternativelySeekOn(i => i.InternalCode) - .DoNotUpdateIfExists()); + // var securityStream = positionFileStream + // .Distinct("Distinct securities", i => i.SecurityCode) + // .Select("Create security", i => + // { + // if (string.IsNullOrWhiteSpace(i.SecurityClass)) + // { + // return new DataAccess.Equity + // { + // InternalCode = i.SecurityCode, + // Name = i.SecurityName, + // Isin = i.Isin, + // Issuer = i.Issuer + // } as DataAccess.Security; + // } + // return new DataAccess.ShareClass + // { + // InternalCode = i.SecurityCode, + // Name = i.SecurityName, + // Isin = i.Isin, + // Class = i.SecurityClass + // } as DataAccess.Security; + // }) + // .EfCoreSave("Save security", o => o + // .SeekOn(i => i.Isin) + // .AlternativelySeekOn(i => i.InternalCode) + // .DoNotUpdateIfExists()); - positionFileStream - .CorrelateToSingle("Get related security", securityStream, (row, security) => new { Row = row, SecurityId = security.Id }) - .CorrelateToSingle("Get related composition and create position", compositionStream, (row, composition) => new DataAccess.Position - { - Value = row.Row.Value, - SecurityId = row.SecurityId, - CompositionId = composition.Composition.Id - }) - .Distinct("Distinct positions", i => new { i.CompositionId, i.SecurityId }, o => o.ForProperty(i => i.Value, DistinctAggregator.Sum)) - .EfCoreSave("Save position") - .CorrelateToSingle("Get position portfolio", compositionStream, (p, c) => new { c.Portfolio.InternalCode, c.Composition.Date, p.Value }) - .Distinct("Aggregate position per portfolio", i => new { i.InternalCode, i.Date }, o => o.ForProperty(i => i.Value, DistinctAggregator.Sum)) - .ToTextFileValue("Export portfolios weight", "PortfoliosWeight.csv", FlatFileDefinition.Create(i => new - { - InternalCode = i.ToColumn("Code"), - Date = i.ToDateColumn("Date", "ddd dd MMM, yyyy"), - Value = i.ToNumberColumn("Weight", "."), - }).IsColumnSeparated(',')) - .ToConnector("Save portfolios position file", "OUT"); + // positionFileStream + // .CorrelateToSingle("Get related security", securityStream, (row, security) => new { Row = row, SecurityId = security.Id }) + // .CorrelateToSingle("Get related composition and create position", compositionStream, (row, composition) => new DataAccess.Position + // { + // Value = row.Row.Value, + // SecurityId = row.SecurityId, + // CompositionId = composition.Composition.Id + // }) + // .Distinct("Distinct positions", i => new { i.CompositionId, i.SecurityId }, o => o.ForProperty(i => i.Value, DistinctAggregator.Sum)) + // .EfCoreSave("Save position") + // .CorrelateToSingle("Get position portfolio", compositionStream, (p, c) => new { c.Portfolio.InternalCode, c.Composition.Date, p.Value }) + // .Distinct("Aggregate position per portfolio", i => new { i.InternalCode, i.Date }, o => o.ForProperty(i => i.Value, DistinctAggregator.Sum)) + // .ToTextFileValue("Export portfolios weight", "PortfoliosWeight.csv", FlatFileDefinition.Create(i => new + // { + // InternalCode = i.ToColumn("Code"), + // Date = i.ToDateColumn("Date", "ddd dd MMM, yyyy"), + // Value = i.ToNumberColumn("Weight", "."), + // }).IsColumnSeparated(',')) + // .ToConnector("Save portfolios position file", "OUT"); } } } \ No newline at end of file diff --git a/src/Tutorials/Paillave.Etl.Samples/connectorsLocalConfig.json b/src/Tutorials/Paillave.Etl.Samples/connectorsLocalConfig.json new file mode 100644 index 00000000..18665e68 --- /dev/null +++ b/src/Tutorials/Paillave.Etl.Samples/connectorsLocalConfig.json @@ -0,0 +1,21 @@ +{ + "$schema": "./connectorsConfigSchema.json", + "inputFilesForSomePartnerFTP": { + "Type": "FileSystem", + "Connection": { + "RootFolder": "InputFiles" + }, + "Providers": { + "POS": { + "FileNamePattern": "*.Positions.csv" + }, + "PTF": { + "FileNamePattern": "*.Portfolios.csv" + } + }, + "Processors": { + "OUT": { + } + } + } +} \ No newline at end of file