-
Notifications
You must be signed in to change notification settings - Fork 107
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
ed9fbe4
commit 70e3741
Showing
10 changed files
with
181 additions
and
80 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,8 +2,8 @@ | |
using System.Globalization; | ||
using System.Linq.Expressions; | ||
using Microsoft.EntityFrameworkCore; | ||
using Microsoft.EntityFrameworkCore.Query.Internal; | ||
using Stl.Fusion.EntityFramework.Internal; | ||
using Stl.Internal; | ||
using Stl.Multitenancy; | ||
using Stl.Net; | ||
|
||
|
@@ -13,8 +13,8 @@ public interface IDbEntityResolver<TKey, TDbEntity> | |
where TKey : notnull | ||
where TDbEntity : class | ||
{ | ||
public Func<Expression, Expression> KeyExtractorExpressionBuilder { get; } | ||
public Func<TDbEntity, TKey> KeyExtractor { get; } | ||
Func<TDbEntity, TKey> KeyExtractor { get; init; } | ||
Expression<Func<TDbEntity, TKey>> KeyExtractorExpression { get; init; } | ||
|
||
Task<TDbEntity?> Get(Symbol tenantId, TKey key, CancellationToken cancellationToken = default); | ||
} | ||
|
@@ -37,10 +37,10 @@ public record Options | |
{ | ||
public static Options Default { get; set; } = new(); | ||
|
||
public string? KeyPropertyName { get; init; } | ||
public Func<Expression, Expression>? KeyExtractorExpressionBuilder { get; init; } | ||
public Func<IQueryable<TDbEntity>, IQueryable<TDbEntity>> QueryTransformer { get; init; } = q => q; | ||
public Expression<Func<TDbEntity, TKey>>? KeyExtractor { get; init; } | ||
public Expression<Func<IQueryable<TDbEntity>, IQueryable<TDbEntity>>>? QueryTransformer { get; init; } | ||
public Action<Dictionary<TKey, TDbEntity>> PostProcessor { get; init; } = _ => { }; | ||
public int BatchSize { get; init; } = 8; | ||
public Action<BatchProcessor<TKey, TDbEntity?>>? ConfigureBatchProcessor { get; init; } | ||
public TimeSpan? Timeout { get; init; } = TimeSpan.FromSeconds(1); | ||
public IRetryDelayer RetryDelayer { get; init; } = new RetryDelayer() { | ||
|
@@ -49,38 +49,55 @@ public record Options | |
}; | ||
} | ||
|
||
protected static MethodInfo ContainsMethod { get; } = typeof(HashSet<TKey>).GetMethod(nameof(HashSet<TKey>.Contains))!; | ||
// ReSharper disable once StaticMemberInGenericType | ||
protected static MethodInfo DbContextSetMethod { get; } = typeof(DbContext) | ||
.GetMethods(BindingFlags.Public | BindingFlags.Instance) | ||
.Single(m => m.Name == nameof(DbContext.Set) && m.IsGenericMethod && m.GetParameters().Length == 0) | ||
Check warning on line 55 in src/Stl.Fusion.EntityFramework/DbEntityResolver.cs GitHub Actions / build
|
||
.MakeGenericMethod(typeof(TDbEntity)); | ||
protected static MethodInfo QueryableWhereMethod { get; } | ||
= new Func<IQueryable<TDbEntity>, Expression<Func<TDbEntity, bool>>, IQueryable<TDbEntity>>(Queryable.Where).Method; | ||
|
||
private ConcurrentDictionary<Symbol, BatchProcessor<TKey, TDbEntity?>>? _batchProcessors; | ||
private ITransientErrorDetector<TDbContext>? _transientErrorDetector; | ||
|
||
protected Options Settings { get; } | ||
protected string KeyPropertyName { get; init; } | ||
protected (Func<TDbContext, TKey[], IAsyncEnumerable<TDbEntity>> Query, int BatchSize)[] Queries { get; init; } | ||
|
||
public Func<Expression, Expression> KeyExtractorExpressionBuilder { get; init; } | ||
public Func<TDbEntity, TKey> KeyExtractor { get; init; } | ||
public Expression<Func<TDbEntity, TKey>> KeyExtractorExpression { get; init; } | ||
public ITransientErrorDetector<TDbContext> TransientErrorDetector => | ||
_transientErrorDetector ??= Services.GetRequiredService<ITransientErrorDetector<TDbContext>>(); | ||
|
||
public DbEntityResolver(Options settings, IServiceProvider services) : base(services) | ||
{ | ||
Settings = settings; | ||
if (settings.KeyPropertyName == null) { | ||
var keyExtractor = Settings.KeyExtractor; | ||
if (keyExtractor == null) { | ||
var dummyTenant = TenantRegistry.IsSingleTenant ? Tenant.Default : Tenant.Dummy; | ||
using var dbContext = CreateDbContext(dummyTenant); | ||
KeyPropertyName = dbContext.Model | ||
var keyPropertyName = dbContext.Model | ||
.FindEntityType(typeof(TDbEntity))! | ||
.FindPrimaryKey()! | ||
.Properties.Single().Name; | ||
|
||
var pEntity = Expression.Parameter(typeof(TDbEntity), "e"); | ||
var eBody = Expression.PropertyOrField(pEntity, keyPropertyName); | ||
keyExtractor = Expression.Lambda<Func<TDbEntity, TKey>>(eBody, pEntity); | ||
} | ||
else | ||
KeyPropertyName = settings.KeyPropertyName; | ||
KeyExtractorExpressionBuilder = settings.KeyExtractorExpressionBuilder | ||
?? (eEntity => Expression.PropertyOrField(eEntity, KeyPropertyName)); | ||
var pEntity = Expression.Parameter(typeof(TDbEntity), "e"); | ||
var eBody = KeyExtractorExpressionBuilder(pEntity); | ||
KeyExtractor = (Func<TDbEntity, TKey>) Expression.Lambda(eBody, pEntity).Compile(); | ||
KeyExtractorExpression = keyExtractor; | ||
KeyExtractor = keyExtractor.Compile(); | ||
_batchProcessors = new(); | ||
|
||
var buffer = ArrayBuffer<(Func<TDbContext, TKey[], IAsyncEnumerable<TDbEntity>>, int)>.Lease(false); | ||
try { | ||
for (var batchSize = 2; batchSize < Settings.BatchSize; batchSize *= 2) | ||
buffer.Add((CreateCompiledQuery(batchSize), batchSize)); | ||
buffer.Add((CreateCompiledQuery(Settings.BatchSize), Settings.BatchSize)); | ||
Queries = buffer.ToArray(); | ||
} | ||
finally { | ||
buffer.Release(); | ||
} | ||
} | ||
|
||
public async ValueTask DisposeAsync() | ||
|
@@ -102,11 +119,66 @@ await batchProcessors.Values | |
|
||
// Protected methods | ||
|
||
protected Func<TDbContext, TKey[], IAsyncEnumerable<TDbEntity>> CreateCompiledQuery(int batchSize) | ||
{ | ||
var pDbContext = Expression.Parameter(typeof(TDbContext), "dbContext"); | ||
var pKeys = new ParameterExpression[batchSize]; | ||
for (var i = 0; i < batchSize; i++) | ||
pKeys[i] = Expression.Parameter(typeof(TKey), $"key{i.ToString(CultureInfo.InvariantCulture)}"); | ||
var pEntity = Expression.Parameter(typeof(TDbEntity), "e"); | ||
|
||
// entity.Key expression | ||
var eKey = KeyExtractorExpression.Body.Replace(KeyExtractorExpression.Parameters[0], pEntity); | ||
|
||
// .Where predicate expression | ||
var ePredicate = (Expression?)null; | ||
for (var i = 0; i < batchSize; i++) { | ||
var eCondition = Expression.Equal(eKey, pKeys[i]); | ||
ePredicate = ePredicate == null ? eCondition : Expression.OrElse(ePredicate, eCondition); | ||
} | ||
var lPredicate = Expression.Lambda<Func<TDbEntity, bool>>(ePredicate!, pEntity); | ||
|
||
// dbContext.Set<TDbEntity>().Where(...) | ||
var eEntitySet = Expression.Call(pDbContext, DbContextSetMethod); | ||
var eWhere = Expression.Call(null, QueryableWhereMethod, eEntitySet, Expression.Quote(lPredicate)); | ||
|
||
// Applying QueryTransformer | ||
var qt = Settings.QueryTransformer; | ||
var eBody = qt == null | ||
? eWhere | ||
: qt.Body.Replace(qt.Parameters[0], eWhere); | ||
|
||
// Creating compiled query | ||
var lambdaParameters = new ParameterExpression[batchSize + 1]; | ||
lambdaParameters[0] = pDbContext; | ||
pKeys.CopyTo(lambdaParameters, 1); | ||
var lambda = Expression.Lambda(eBody, lambdaParameters); | ||
var query = new CompiledAsyncEnumerableQuery<TDbContext, TDbEntity>(lambda); | ||
|
||
// Locating query.Execute methods | ||
var mExecute = query.GetType() | ||
.GetMethods() | ||
.SingleOrDefault(m => m.Name == nameof(query.Execute) | ||
&& m.IsGenericMethod | ||
&& m.GetGenericArguments().Length == batchSize) | ||
?.MakeGenericMethod(pKeys.Select(p => p.Type).ToArray()); | ||
if (mExecute == null) | ||
throw Errors.BatchSizeIsTooLarge(); | ||
|
||
// Creating compiled query invoker | ||
var pAllKeys = Expression.Parameter(typeof(TKey[])); | ||
var eDbContext = Enumerable.Range(0, 1).Select(_ => (Expression)pDbContext); | ||
var eAllKeys = Enumerable.Range(0, batchSize).Select(i => Expression.ArrayIndex(pAllKeys, Expression.Constant(i))); | ||
var eExecuteCall = Expression.Call(Expression.Constant(query), mExecute, eDbContext.Concat(eAllKeys)); | ||
return (Func<TDbContext, TKey[], IAsyncEnumerable<TDbEntity>>)Expression.Lambda(eExecuteCall, pDbContext, pAllKeys).Compile(); | ||
} | ||
|
||
protected BatchProcessor<TKey, TDbEntity?> GetBatchProcessor(Symbol tenantId) | ||
{ | ||
var batchProcessors = _batchProcessors; | ||
if (batchProcessors == null) | ||
throw Stl.Internal.Errors.AlreadyDisposed(GetType()); | ||
|
||
return batchProcessors.GetOrAdd(tenantId, | ||
static (tenantId1, self) => self.CreateBatchProcessor(tenantId1), this); | ||
} | ||
|
@@ -115,11 +187,14 @@ await batchProcessors.Values | |
{ | ||
var tenant = TenantRegistry.Get(tenantId); | ||
var batchProcessor = new BatchProcessor<TKey, TDbEntity?> { | ||
MaxBatchSize = 16, | ||
BatchSize = Settings.BatchSize, | ||
ConcurrencyLevel = 1, | ||
Implementation = (batch, cancellationToken) => ProcessBatch(tenant, batch, cancellationToken), | ||
}; | ||
Settings.ConfigureBatchProcessor?.Invoke(batchProcessor); | ||
if (batchProcessor.BatchSize != Settings.BatchSize) | ||
throw Errors.BatchSizeCannotBeChanged(); | ||
|
||
return batchProcessor; | ||
} | ||
|
||
|
@@ -138,41 +213,43 @@ protected virtual async Task ProcessBatch( | |
List<BatchItem<TKey, TDbEntity?>> batch, | ||
CancellationToken cancellationToken) | ||
{ | ||
if (batch.Count == 0) | ||
return; | ||
|
||
using var activity = StartProcessBatchActivity(tenant, batch.Count); | ||
var (query, batchSize) = Queries.First(q => q.BatchSize >= batch.Count); | ||
for (var tryIndex = 0;; tryIndex++) { | ||
var dbContext = CreateDbContext(tenant); | ||
await using var _ = dbContext.ConfigureAwait(false); | ||
try { | ||
var keys = new HashSet<TKey>(); | ||
foreach (var item in batch) { | ||
var keys = new TKey[batchSize]; | ||
var i = 0; | ||
foreach (var item in batch) | ||
if (!item.TryCancel()) | ||
keys.Add(item.Input); | ||
} | ||
var pEntity = Expression.Parameter(typeof(TDbEntity), "e"); | ||
var eKey = KeyExtractorExpressionBuilder(pEntity); | ||
var eBody = Expression.Call(Expression.Constant(keys), ContainsMethod, eKey); | ||
var eLambda = (Expression<Func<TDbEntity, bool>>) Expression.Lambda(eBody, pEntity); | ||
var query = Settings.QueryTransformer(dbContext.Set<TDbEntity>().Where(eLambda)); | ||
keys[i++] = item.Input; | ||
var lastKey = keys[i - 1]; | ||
for (; i < batchSize; i++) | ||
keys[i] = lastKey; | ||
|
||
Dictionary<TKey, TDbEntity>? entities; | ||
var entities = new Dictionary<TKey, TDbEntity>(); | ||
if (Settings.Timeout is { } timeout) { | ||
using var cts = new CancellationTokenSource(timeout); | ||
using var linkedCts = cancellationToken.LinkWith(cts.Token); | ||
try { | ||
entities = await query | ||
.ToDictionaryAsync(KeyExtractor, linkedCts.Token) | ||
.ConfigureAwait(false); | ||
var result = query.Invoke(dbContext, keys); | ||
await foreach (var e in result.WithCancellation(cancellationToken).ConfigureAwait(false)) | ||
entities.Add(KeyExtractor.Invoke(e), e); | ||
} | ||
catch (OperationCanceledException) when (!cancellationToken.IsCancellationRequested) { | ||
throw new TimeoutException(); | ||
} | ||
} | ||
else { | ||
entities = await query | ||
.ToDictionaryAsync(KeyExtractor, cancellationToken) | ||
.ConfigureAwait(false); | ||
var result = query.Invoke(dbContext, keys); | ||
await foreach (var e in result.WithCancellation(cancellationToken).ConfigureAwait(false)) | ||
entities.Add(KeyExtractor.Invoke(e), e); | ||
} | ||
Settings.PostProcessor(entities); | ||
Settings.PostProcessor.Invoke(entities); | ||
|
||
foreach (var item in batch) { | ||
var entity = entities.GetValueOrDefault(item.Input); | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
using System.Linq.Expressions; | ||
using Microsoft.EntityFrameworkCore.Query; | ||
|
||
namespace Stl.Fusion.EntityFramework.Internal; | ||
|
||
public static class ExpressionExt | ||
{ | ||
public static Expression Replace(this Expression source, | ||
Expression from, Expression to) | ||
=> new ReplacingExpressionVisitor(new[] { from }, new [] { to }).Visit(source); | ||
|
||
public static Expression Replace(this Expression source, | ||
Expression from1, Expression to1, | ||
Expression from2, Expression to2) | ||
=> new ReplacingExpressionVisitor(new[] { from1, from2 }, new [] { to1, to2 }).Visit(source); | ||
|
||
public static Expression Replace(this Expression source, | ||
Expression from1, Expression to1, | ||
Expression from2, Expression to2, | ||
Expression from3, Expression to3) | ||
=> new ReplacingExpressionVisitor(new[] { from1, from2, from3 }, new [] { to1, to2, to3 }).Visit(source); | ||
} |
Oops, something went wrong.