Skip to content

Commit

Permalink
Elastic: fixed threads
Browse files Browse the repository at this point in the history
  • Loading branch information
pavelbannov committed Feb 1, 2024
1 parent ae0155d commit 03e33b6
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 137 deletions.
10 changes: 6 additions & 4 deletions common/services/ASC.ElasticSearch/Engine/BaseIndexer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@ public async IAsyncEnumerable<List<T>> IndexAllAsync(
Func<DateTime, List<int>> getIds,
Func<long, long, DateTime, List<T>> getData)
{
var now = DateTime.UtcNow;
DateTime lastIndexed;

await using (var webStudioDbContext = await _dbContextFactory.CreateDbContextAsync())
Expand All @@ -123,22 +122,25 @@ public async IAsyncEnumerable<List<T>> IndexAllAsync(
{
yield return getData(ids[i], ids[i + 1], lastIndexed);
}
}

public async Task OnComplete(DateTime lastModified)
{
await using (var webStudioDbContext = await _dbContextFactory.CreateDbContextAsync())
{
await webStudioDbContext.AddOrUpdateAsync(q => q.WebstudioIndex, new DbWebstudioIndex
{
IndexName = Wrapper.IndexName,
LastModified = now
LastModified = lastModified
});

await webStudioDbContext.SaveChangesAsync();
}

_logger.DebugIndexCompleted(Wrapper.IndexName);
}

public async Task ReIndrexAsync()
public async Task ReIndexAsync()
{
await ClearAsync();
}
Expand Down
87 changes: 3 additions & 84 deletions common/services/ASC.ElasticSearch/Engine/FactoryIndexer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ public async Task Index(List<T> data, bool immediately = true, int retry = 0)
}
else if (e.Response.HttpStatusCode == 429)
{
Thread.Sleep(60000);
await Task.Delay(60000);
if (retry < 10)
{
await Index(data.Where(r => r != null).ToList(), immediately, retry + 1);
Expand All @@ -251,87 +251,6 @@ public async Task Index(List<T> data, bool immediately = true, int retry = 0)
{
Logger.ErrorInner(inner);

if (inner.Response.HttpStatusCode is 413 or 403)
{
Logger.Error(inner.Response.HttpStatusCode.ToString());
foreach (var r in data.Where(r => r != null))
{
await Index(r, immediately);
}
}
else if (inner.Response.HttpStatusCode == 429)
{
Thread.Sleep(60000);
if (retry < 10)
{
await Index(data.Where(r => r != null).ToList(), immediately, retry + 1);
return;
}

throw;
}
}
else
{
throw;
}
}
}

protected async Task IndexAsync(List<T> data, bool immediately = true, int retry = 0)
{
var t = _serviceProvider.GetService<T>();
if (!await SupportAsync(t) || data.Count == 0)
{
return;
}

try
{
await _indexer.IndexAsync(data, immediately).ConfigureAwait(false);
}
catch (ElasticsearchClientException e)
{
Logger.ErrorIndexAsync(e);

if (e.Response != null)
{
Logger.Error(e.Response.HttpStatusCode.ToString());

if (e.Response.HttpStatusCode is 413 or 403 or 408)
{
foreach (var r in data.Where(r => r != null))
{
await Index(r, immediately);
}
}
else if (e.Response.HttpStatusCode == 429)
{
await Task.Delay(60000);
if (retry < 10)
{
await IndexAsync(data.Where(r => r != null).ToList(), immediately, retry + 1);
return;
}

throw;
}
}
}
catch (AggregateException e) //ElasticsearchClientException
{
if (e.InnerExceptions.Count == 0)
{
throw;
}

var inner = e.InnerExceptions.OfType<ElasticsearchClientException>().FirstOrDefault();


if (inner != null)
{
Logger.ErrorIndexAsync(inner);

if (inner.Response.HttpStatusCode is 413 or 403)
{
Logger.Error(inner.Response.HttpStatusCode.ToString());
Expand All @@ -345,7 +264,7 @@ protected async Task IndexAsync(List<T> data, bool immediately = true, int retry
await Task.Delay(60000);
if (retry < 10)
{
await IndexAsync(data.Where(r => r != null).ToList(), immediately, retry + 1);
await Index(data.Where(r => r != null).ToList(), immediately, retry + 1);
return;
}

Expand Down Expand Up @@ -533,7 +452,7 @@ public virtual Task IndexAllAsync()

public async Task ReIndexAsync()
{
await _indexer.ReIndrexAsync();
await _indexer.ReIndexAsync();
}

public bool Support(T t)
Expand Down
3 changes: 0 additions & 3 deletions common/services/ASC.ElasticSearch/Log/FactoryIndexerLogger.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,6 @@ internal static partial class FactoryIndexerLogger
[LoggerMessage(Level = LogLevel.Error, Message = "inner")]
public static partial void ErrorInner(this ILogger logger, Exception exception);

[LoggerMessage(Level = LogLevel.Error, Message = "IndexAsync")]
public static partial void ErrorIndexAsync(this ILogger logger, Exception exception);

[LoggerMessage(Level = LogLevel.Error, Message = "Update")]
public static partial void ErrorUpdate(this ILogger logger, Exception exception);

Expand Down
7 changes: 5 additions & 2 deletions products/ASC.Files/Core/Core/Search/FactoryIndexerFile.cs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@ public override async Task IndexAllAsync()
{
var j = 0;
var tasks = new List<Task>();

var now = DateTime.UtcNow;

await foreach (var data in _indexer.IndexAllAsync(GetCount, GetIds, GetData))
{
if (_settings.Threads == 1)
Expand All @@ -103,7 +104,7 @@ public override async Task IndexAllAsync()
}
else
{
tasks.Add(IndexAsync(data));
tasks.Add(Index(data));
j++;
if (j >= _settings.Threads)
{
Expand All @@ -118,6 +119,8 @@ public override async Task IndexAllAsync()
{
Task.WaitAll(tasks.ToArray());
}

await _indexer.OnComplete(now);
}
catch (Exception e)
{
Expand Down
93 changes: 49 additions & 44 deletions products/ASC.Files/Core/Core/Search/FactoryIndexerFolder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,26 +50,47 @@ public FactoryIndexerFolder(

public override async Task IndexAllAsync()
{
(int, int, int) getCount(DateTime lastIndexed)
try
{
using var filesDbContext = _dbContextFactory.CreateDbContext();

var minid = Queries.FolderMinId(filesDbContext, lastIndexed);

var maxid = Queries.FolderMaxId(filesDbContext, lastIndexed);

var count = Queries.FoldersCount(filesDbContext, lastIndexed);
var j = 0;
var tasks = new List<Task>();
var now = DateTime.UtcNow;

await foreach (var data in _indexer.IndexAllAsync(GetCount, GetIds, GetData))
{
if (_settings.Threads == 1)
{
await Index(data);
}
else
{
tasks.Add(Index(data));
j++;
if (j >= _settings.Threads)
{
Task.WaitAll(tasks.ToArray());
tasks = new List<Task>();
j = 0;
}
}
}

return new(count, maxid, minid);
if (tasks.Count > 0)
{
Task.WaitAll(tasks.ToArray());
}

await _indexer.OnComplete(now);
}

List<DbFolder> getData(long start, long stop, DateTime lastIndexed)
catch (Exception e)
{
using var filesDbContext = _dbContextFactory.CreateDbContext();
return Queries.FolderData(filesDbContext, lastIndexed, start, stop).ToList();
Logger.ErrorFactoryIndexerFolder(e);
throw;
}

List<int> getIds(DateTime lastIndexed)
return;

List<int> GetIds(DateTime lastIndexed)
{
var start = 0;
var result = new List<int>();
Expand All @@ -93,39 +114,23 @@ List<int> getIds(DateTime lastIndexed)
return result;
}

try
List<DbFolder> GetData(long start, long stop, DateTime lastIndexed)
{
var j = 0;
var tasks = new List<Task>();

await foreach (var data in _indexer.IndexAllAsync(getCount, getIds, getData))
{
if (_settings.Threads == 1)
{
await Index(data);
}
else
{
tasks.Add(IndexAsync(data));
j++;
if (j >= _settings.Threads)
{
Task.WaitAll(tasks.ToArray());
tasks = new List<Task>();
j = 0;
}
}
}

if (tasks.Count > 0)
{
Task.WaitAll(tasks.ToArray());
}
using var filesDbContext = _dbContextFactory.CreateDbContext();
return Queries.FolderData(filesDbContext, lastIndexed, start, stop).ToList();
}
catch (Exception e)

(int, int, int) GetCount(DateTime lastIndexed)
{
Logger.ErrorFactoryIndexerFolder(e);
throw;
using var filesDbContext = _dbContextFactory.CreateDbContext();

var minId = Queries.FolderMinId(filesDbContext, lastIndexed);

var maxId = Queries.FolderMaxId(filesDbContext, lastIndexed);

var count = Queries.FoldersCount(filesDbContext, lastIndexed);

return new(count, maxId, minId);
}
}
}
Expand Down

0 comments on commit 03e33b6

Please sign in to comment.