Skip to content

Commit

Permalink
bugs on efcoresave
Browse files Browse the repository at this point in the history
  • Loading branch information
Stephane Royer committed Nov 25, 2024
1 parent b296c1c commit 730b30b
Show file tree
Hide file tree
Showing 8 changed files with 250 additions and 220 deletions.
165 changes: 83 additions & 82 deletions src/Paillave.EntityFrameworkCoreExtension/EfSave/EfSaveEngine.cs
Original file line number Diff line number Diff line change
@@ -1,114 +1,115 @@
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Metadata;
using Paillave.EntityFrameworkCoreExtension.Core;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions;
using System.Reflection;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace Paillave.EntityFrameworkCoreExtension.EfSave
namespace Paillave.EntityFrameworkCoreExtension.EfSave;
public class EfSaveEngine<T> where T : class
{
public class EfSaveEngine<T> where T : class
private readonly Expression<Func<T, T, bool>> _findConditionExpression;
private readonly List<PropertyInfo> _keyPropertyInfos;
private readonly DbContext _context;
private readonly CancellationToken _cancellationToken;
public EfSaveEngine(DbContext context, CancellationToken cancellationToken, params Expression<Func<T, object>>[] pivotKeys)
{
private Expression<Func<T, T, bool>> _findConditionExpression;
private List<PropertyInfo> _keyPropertyInfos;
private DbContext _context;
private readonly CancellationToken _cancellationToken;
public EfSaveEngine(DbContext context, CancellationToken cancellationToken, params Expression<Func<T, object>>[] pivotKeys)
this._cancellationToken = cancellationToken;
_context = context;
var entityType = context.Model.FindEntityType(typeof(T)) ?? throw new InvalidOperationException("DbContext does not contain EntitySet for Type: " + typeof(T).Name);
_keyPropertyInfos = entityType.GetProperties()
.Where(i => !i.IsShadowProperty() && i.IsPrimaryKey())
.Where(i => i.PropertyInfo != null)
.Select(i => i.PropertyInfo!)
.ToList();
List<List<PropertyInfo>> propertyInfosForPivot;
if ((pivotKeys?.Length ?? 0) == 0)
propertyInfosForPivot = new List<List<PropertyInfo>> { entityType.FindPrimaryKey().Properties
.Where(i => i.PropertyInfo != null)
.Select(i => i.PropertyInfo!)
.ToList() };
else
propertyInfosForPivot = pivotKeys.Select(pivotKey => KeyDefinitionExtractor.GetKeys(pivotKey))
.ToList();

_findConditionExpression = CreateFindConditionExpression(propertyInfosForPivot);
}
private Expression<Func<T, T, bool>> CreateFindConditionExpression(List<List<PropertyInfo>> propertyInfosForPivotSet)
{
ParameterExpression leftParam = Expression.Parameter(typeof(T), "i");
ParameterExpression rightParam = Expression.Parameter(typeof(T), "rightParam");
Expression predicateBody = null;
foreach (var propertyInfosForPivot in propertyInfosForPivotSet)
{
this._cancellationToken = cancellationToken;
_context = context;
var entityType = context.Model.FindEntityType(typeof(T));
if (entityType == null)
throw new InvalidOperationException("DbContext does not contain EntitySet for Type: " + typeof(T).Name);
_keyPropertyInfos = entityType.GetProperties().Where(i => !i.IsShadowProperty() && i.IsPrimaryKey()).Select(i => i.PropertyInfo).ToList();
List<List<PropertyInfo>> propertyInfosForPivot;
if ((pivotKeys?.Length ?? 0) == 0)
propertyInfosForPivot = new List<List<PropertyInfo>> { entityType.FindPrimaryKey().Properties.Select(i => i.PropertyInfo).ToList() };
var pivotPartExpression = CreatePivotPartExpression(propertyInfosForPivot, leftParam, rightParam);
if (predicateBody == null)
predicateBody = pivotPartExpression;
else
propertyInfosForPivot = pivotKeys.Select(pivotKey => KeyDefinitionExtractor.GetKeys(pivotKey)).ToList();

_findConditionExpression = CreateFindConditionExpression(propertyInfosForPivot);
predicateBody = Expression.OrElse(predicateBody, pivotPartExpression);
}
private Expression<Func<T, T, bool>> CreateFindConditionExpression(List<List<PropertyInfo>> propertyInfosForPivotSet)
return Expression.Lambda<Func<T, T, bool>>(predicateBody, new[] { leftParam, rightParam });
}
private Expression CreatePivotPartExpression(List<PropertyInfo> propertyInfosForPivot, ParameterExpression leftParam, ParameterExpression rightParam)
{
Expression predicatePivotPart = null;
foreach (var propertyInfoForPivot in propertyInfosForPivot)
{
ParameterExpression leftParam = Expression.Parameter(typeof(T), "i");
ParameterExpression rightParam = Expression.Parameter(typeof(T), "rightParam");
Expression predicateBody = null;
foreach (var propertyInfosForPivot in propertyInfosForPivotSet)
{
var pivotPartExpression = CreatePivotPartExpression(propertyInfosForPivot, leftParam, rightParam);
if (predicateBody == null)
predicateBody = pivotPartExpression;
else
predicateBody = Expression.OrElse(predicateBody, pivotPartExpression);
}
return Expression.Lambda<Func<T, T, bool>>(predicateBody, new[] { leftParam, rightParam });
var equalityExpression = CreateEqualityExpression(propertyInfoForPivot, leftParam, rightParam);
if (predicatePivotPart == null)
predicatePivotPart = equalityExpression;
else
predicatePivotPart = Expression.AndAlso(predicatePivotPart, equalityExpression);
}
private Expression CreatePivotPartExpression(List<PropertyInfo> propertyInfosForPivot, ParameterExpression leftParam, ParameterExpression rightParam)
return predicatePivotPart;
}
private Expression CreateEqualityExpression(PropertyInfo propertyInfo, ParameterExpression leftParam, ParameterExpression rightParam)
{
Expression leftValue = Expression.Property(leftParam, propertyInfo);
Expression rightValue = Expression.Property(rightParam, propertyInfo);
return Expression.Equal(leftValue, rightValue);
}
public async Task SaveAsync(IList<T> entities, bool doNotUpdateIfExists = false, bool insertOnly = false)
{
var contextSet = _context.Set<T>();
foreach (var entity in entities)
{
Expression predicatePivotPart = null;
foreach (var propertyInfoForPivot in propertyInfosForPivot)
if (_cancellationToken.IsCancellationRequested)
{
var equalityExpression = CreateEqualityExpression(propertyInfoForPivot, leftParam, rightParam);
if (predicatePivotPart == null)
predicatePivotPart = equalityExpression;
else
predicatePivotPart = Expression.AndAlso(predicatePivotPart, equalityExpression);
return;
}
return predicatePivotPart;
if (insertOnly)
contextSet.Add(entity);
else
InsertOrUpdateEntity(doNotUpdateIfExists, contextSet, entity);
}
private Expression CreateEqualityExpression(PropertyInfo propertyInfo, ParameterExpression leftParam, ParameterExpression rightParam)
await _context.SaveChangesAsync(_cancellationToken);
}

private void InsertOrUpdateEntity(bool doNotUpdateIfExists, DbSet<T> contextSet, T entity)
{
var entityCondition = _findConditionExpression.ApplyPartialLeft(entity);
var existingEntity = contextSet.AsNoTracking().FirstOrDefault(entityCondition);
if (this._context is MultiTenantDbContext mtCtx)
{
Expression leftValue = Expression.Property(leftParam, propertyInfo);
Expression rightValue = Expression.Property(rightParam, propertyInfo);
return Expression.Equal(leftValue, rightValue);
mtCtx.UpdateEntityForMultiTenancy(entity);
}
public async Task SaveAsync(IList<T> entities, bool doNotUpdateIfExists = false, bool insertOnly = false)
if (existingEntity == null)
{
var contextSet = _context.Set<T>();
foreach (var entity in entities)
{
if (_cancellationToken.IsCancellationRequested)
{
return;
}
if (insertOnly)
contextSet.Add(entity);
else
InsertOrUpdateEntity(doNotUpdateIfExists, contextSet, entity);
}
await _context.SaveChangesAsync(_cancellationToken);
_context.Entry(entity).State = EntityState.Added;
}

private void InsertOrUpdateEntity(bool doNotUpdateIfExists, DbSet<T> contextSet, T entity)
else
{
var expr3 = _findConditionExpression.ApplyPartialLeft(entity);
T existingEntity = contextSet.AsNoTracking().FirstOrDefault(expr3);
if (existingEntity != null)
{
if (!doNotUpdateIfExists)
{
foreach (var keyPropertyInfo in _keyPropertyInfos)
{
object val = keyPropertyInfo.GetValue(existingEntity);
keyPropertyInfo.SetValue(entity, val);
}
// contextSet.Update(entity);
}
}
if (this._context is MultiTenantDbContext mtCtx)
foreach (var keyPropertyInfo in _keyPropertyInfos)
{
mtCtx.UpdateEntityForMultiTenancy(entity);
var val = keyPropertyInfo.GetValue(existingEntity);
keyPropertyInfo.SetValue(entity, val);
}
contextSet.Update(entity);
if (existingEntity == null)
if (!doNotUpdateIfExists)
{
_context.Entry(entity).State = EntityState.Added;
contextSet.Update(entity);
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/SharedSettings.props
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
<PropertyGroup>
<LangVersion>latest</LangVersion>
<Nullable>enable</Nullable>
<Version>2.1.35-beta</Version>
<Version>2.1.36-beta</Version>
<PackageIcon>NugetIcon.png</PackageIcon>
<PackageReadmeFile>README.md</PackageReadmeFile>
<Authors>Stéphane Royer</Authors>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ public class TestDbContext : DbContext
{
protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
{
optionsBuilder.UseSqlServer(@"Server=localhost,1433;Database=TestEtl;user=TestEtl;password=TestEtl.TestEtl;MultipleActiveResultSets=True");
optionsBuilder.UseSqlServer(@"Server=localhost,1433;Initial Catalog=TestEtl;Persist Security Info=False;User ID=SA;Password=<YourStrong@Passw0rd>;MultipleActiveResultSets=True;Encrypt=True;TrustServerCertificate=true;Connection Timeout=300;");
}
protected override void OnModelCreating(ModelBuilder modelBuilder)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
SicavCode,SicavName,SicavType,PortfolioCode,PortfolioName
S1,Sicav1,UCITS,P1,Portfolio1
S2,Sicav2,AIFM,P2,Portfolio2
S1,Sicav1,UCITS,P3,Portfolio3
S1,Sicav1AA,UCITS,P1,Portfolio1
S2,Sicav2AA,AIFM,P2,Portfolio2
S1,Sicav1AA,UCITS,P3,Portfolio3
64 changes: 32 additions & 32 deletions src/Tutorials/Paillave.Etl.Samples/Program.cs
Original file line number Diff line number Diff line change
@@ -1,33 +1,33 @@
using Paillave.Etl.Core;
using Paillave.Etl.FileSystem;
using Paillave.Etl.GraphApi;
var graphApiConnectionParameters = new GraphApiAdapterConnectionParameters
{
};
var executionOptions = new ExecutionOptions<string>
{
Connectors = new FileValueConnectors()
.Register(new GraphApiMailFileValueProvider("GRAPHIN", "GraphIn", "GraphIn", graphApiConnectionParameters,
new GraphApiMailAdapterProviderParameters
{
AttachmentNamePattern = "*",
SetToReadIfBatchDeletion = true,
Folder="Boîte de réception"
}))
.Register(new GraphApiMailFileValueProcessor("GRAPHOUT", "GraphOut", "GraphOut", graphApiConnectionParameters,
new GraphApiAdapterProcessorParameters
{
Body = "Body",
Subject = "Subject",
}))
.Register(new FileSystemFileValueProvider("IN", "Input", Path.Combine(Environment.CurrentDirectory, "InputFiles"), "*"))
.Register(new FileSystemFileValueProcessor("OUT1", "Output", Path.Combine(Environment.CurrentDirectory, "Output")))
.Register(new FileSystemFileValueProcessor("OUT2", "Output", Path.Combine(Environment.CurrentDirectory, "Output", "B"))),
};
// using Paillave.Etl.Core;
// using Paillave.Etl.FileSystem;
// using Paillave.Etl.GraphApi;
// var graphApiConnectionParameters = new GraphApiAdapterConnectionParameters
// {
// };
// var executionOptions = new ExecutionOptions<string>
// {
// Connectors = new FileValueConnectors()
// .Register(new GraphApiMailFileValueProvider("GRAPHIN", "GraphIn", "GraphIn", graphApiConnectionParameters,
// new GraphApiMailAdapterProviderParameters
// {
// AttachmentNamePattern = "*",
// SetToReadIfBatchDeletion = true,
// Folder="Boîte de réception"
// }))
// .Register(new GraphApiMailFileValueProcessor("GRAPHOUT", "GraphOut", "GraphOut", graphApiConnectionParameters,
// new GraphApiAdapterProcessorParameters
// {
// Body = "Body",
// Subject = "Subject",
// }))
// .Register(new FileSystemFileValueProvider("IN", "Input", Path.Combine(Environment.CurrentDirectory, "InputFiles"), "*"))
// .Register(new FileSystemFileValueProcessor("OUT1", "Output", Path.Combine(Environment.CurrentDirectory, "Output")))
// .Register(new FileSystemFileValueProcessor("OUT2", "Output", Path.Combine(Environment.CurrentDirectory, "Output", "B"))),
// };

var result = await StreamProcessRunner.CreateAndExecuteAsync("", triggerStream =>
{
triggerStream.FromConnector("get input", "IN")
// .ToConnector("write output", "S3OUT")
.ToConnector("write output2", "GRAPHOUT");
}, executionOptions);
// var result = await StreamProcessRunner.CreateAndExecuteAsync("", triggerStream =>
// {
// triggerStream.FromConnector("get input", "IN")
// // .ToConnector("write output", "S3OUT")
// .ToConnector("write output2", "GRAPHOUT");
// }, executionOptions);
17 changes: 11 additions & 6 deletions src/Tutorials/Paillave.Etl.Samples/Program2.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@

namespace Paillave.Etl.Samples
{
class Program2
class Program
{
static async Task Main2(string[] args)
static async Task Main(string[] args)
{
await ImportAndCreateFileWithConfigAsync(args);
await ImportAndCreateFileAsync(args);
// await ImportAndCreateFileWithConfigAsync(args);
}
private static ConfigurationFileValueConnectorParser CreateConfigurationFileValueConnectorParser() => new ConfigurationFileValueConnectorParser(
new FileSystemProviderProcessorAdapter(),
Expand Down Expand Up @@ -58,14 +59,16 @@ static async Task ImportAndCreateFileAsync(string[] args)
structure.OpenEstimatedExecutionPlan();

ITraceReporter traceReporter = new AdvancedConsoleExecutionDisplay();
var dataAccess= new DataAccess.TestDbContext();
await dataAccess.Database.EnsureCreatedAsync();
var executionOptions = new ExecutionOptions<string[]>
{
Connectors = new FileValueConnectors()
.Register(new FileSystemFileValueProvider("PTF", "Portfolios", Path.Combine(Environment.CurrentDirectory, "InputFiles"), "*.Portfolios.csv"))
.Register(new FileSystemFileValueProvider("POS", "Positions", Path.Combine(Environment.CurrentDirectory, "InputFiles"), "*.Positions.csv"))
.Register(new FileSystemFileValueProcessor("OUT", "Result", Path.Combine(Environment.CurrentDirectory, "InputFiles"))),
Resolver = new SimpleDependencyResolver()
.Register(new DataAccess.TestDbContext()),
.Register(dataAccess),
TraceProcessDefinition = traceReporter.TraceProcessDefinition,
};
traceReporter.Initialize(structure);
Expand All @@ -82,12 +85,14 @@ static async Task ImportAndCreateFileAsync(string[] args)
static async Task ImportAndCreateFileWithConfigAsync(string[] args)
{
var processRunner = StreamProcessRunner.Create<string[]>(TestImport2.Import);
var dataAccess= new DataAccess.TestDbContext();
await dataAccess.Database.EnsureCreatedAsync();
var executionOptions = new ExecutionOptions<string[]>
{
Connectors = CreateConfigurationFileValueConnectorParser()
.GetConnectors(File.ReadAllText(Path.Combine(Environment.CurrentDirectory, "connectorsConfig.json"))),
.GetConnectors(File.ReadAllText(Path.Combine(Environment.CurrentDirectory, "connectorsLocalConfig.json"))),
Resolver = new SimpleDependencyResolver()
.Register(new DataAccess.TestDbContext())
.Register(dataAccess)
};
var res = await processRunner.ExecuteAsync(args, executionOptions);
}
Expand Down
Loading

0 comments on commit 730b30b

Please sign in to comment.