Skip to content

Commit

Permalink
Merge pull request #12 from AElfProject/feature/bulk-update
Browse files Browse the repository at this point in the history
feat: add bulk update function
  • Loading branch information
AElfBourneShi authored Dec 7, 2023
2 parents fcebcdd + e985d8d commit 80f7347
Show file tree
Hide file tree
Showing 3 changed files with 219 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,74 @@ public async Task UpdateAsync(TEntity model, string collectionName = null,
$"Update Document failed at index {indexName} id {(model == null ? "" : model.Id.ToString())} : {result.ServerError.Error.Reason}");
}

public async Task UpdateManyAsync(List<TEntity> list, string collectionName = null,
CancellationToken cancellationToken = default)
{
var indexNames = await GetFullCollectionNameAsync(collectionName, list);
var client = await GetElasticsearchClientAsync(cancellationToken);
var isSharding = _shardingKeyProvider.IsShardingCollection();
if (!isSharding)
{
await BulkUpdateAsync(client, indexNames, list, isSharding, cancellationToken);
return;
}

var bulkUpdateTaskList = new List<Task>();
bulkUpdateTaskList.Add(BulkUpdateAsync(client, indexNames, list, isSharding, cancellationToken));
var routeKeyTaskList =
await GetBulkUpdateCollectionRouteKeyTasksAsync(isSharding, list, indexNames, cancellationToken);
if (routeKeyTaskList.Count > 0)
{
bulkUpdateTaskList.AddRange(routeKeyTaskList);
}
await Task.WhenAll(bulkUpdateTaskList.ToArray());
}

private async Task BulkUpdateAsync(IElasticClient client, List<string> indexNames, List<TEntity> list, bool isSharding,
CancellationToken cancellationToken = default)
{
var response = new BulkResponse();
var currentIndexName = indexNames[0];
var bulk = new BulkRequest(currentIndexName)
{
Operations = new List<IBulkOperation>(),
Refresh = _elasticsearchOptions.Refresh
};
for (int i = 0; i < list.Count; i++)
{
if (isSharding && (currentIndexName != indexNames[i]))
{
response = await client.BulkAsync(bulk, cancellationToken);
if (!response.IsValid)
{
throw new ElasticsearchException(
$"Bulk Update Document failed at index {indexNames} :{response.ServerError.Error.Reason}");
}

currentIndexName = indexNames[i];

bulk = new BulkRequest(currentIndexName)
{
Operations = new List<IBulkOperation>(),
Refresh = _elasticsearchOptions.Refresh
};
}
var updateOperation = new BulkUpdateOperation<TEntity,TEntity>(new Id(list[i]))
{
Doc = list[i],
Index = currentIndexName
};
bulk.Operations.Add(updateOperation);
}

response = await client.BulkAsync(bulk, cancellationToken);
if (!response.IsValid)
{
throw new ElasticsearchException(
$"Bulk Update Document failed at index {indexNames} :{response.ServerError.Error.Reason}");
}
}

public async Task DeleteAsync(TKey id, string collectionName = null, CancellationToken cancellationToken = default)
{
var indexName = await GetFullCollectionNameByIdAsync(id);
Expand Down Expand Up @@ -453,6 +521,69 @@ private async Task BulkAddRouteKey(IElasticClient client, List<TEntity> modelLis
}
}

private async Task<List<Task>> GetBulkUpdateCollectionRouteKeyTasksAsync(bool isSharding, List<TEntity> modelList,
List<string> fullCollectionNameList, CancellationToken cancellationToken = default)
{
var collectionRouteKeys = await _collectionRouteKeyProvider.GetCollectionRouteKeyItemsAsync();
if (collectionRouteKeys != null && collectionRouteKeys.Any() && isSharding)
{
var routeKeyTaskList = new List<Task>();
var client = await GetElasticsearchClientAsync(cancellationToken);
foreach (var collectionRouteKey in collectionRouteKeys)
{
routeKeyTaskList.Add(BulkUpdateRouteKey(client, modelList, collectionRouteKey, fullCollectionNameList,
cancellationToken));
}

return routeKeyTaskList;
}

return new List<Task>();
}

private async Task BulkUpdateRouteKey(IElasticClient client, List<TEntity> modelList,
CollectionRouteKeyItem<TEntity> collectionRouteKey, List<string> fullCollectionNameList,
CancellationToken cancellationToken)
{
var collectionRouteKeyIndexName =
IndexNameHelper.GetCollectionRouteKeyIndexName(typeof(TEntity), collectionRouteKey.FieldName,
_aelfEntityMappingOptions.CollectionPrefix);
var collectionRouteKeyBulk = new BulkRequest(collectionRouteKeyIndexName)
{
Operations = new List<IBulkOperation>(),
Refresh = _elasticsearchOptions.Refresh
};
int indexNameCount = 0;
foreach (var item in modelList)
{
// var value = item.GetType().GetProperty(collectionRouteKey.FieldName)?.GetValue(item);
var value = collectionRouteKey.GetRouteKeyValueFunc(item);
string indexName = IndexNameHelper.RemoveCollectionPrefix(fullCollectionNameList[indexNameCount],
_aelfEntityMappingOptions.CollectionPrefix);
var collectionRouteKeyIndexModel = new RouteKeyCollection()
{
Id = item.Id.ToString(),
CollectionName = indexName,
// SearchKey = Convert.ChangeType(value, collectionRouteKey.FieldValueType)
CollectionRouteKey = value?.ToString()
};
var updateOperation = new BulkUpdateOperation<RouteKeyCollection,RouteKeyCollection>(new Id(collectionRouteKeyIndexModel))
{
Doc = collectionRouteKeyIndexModel,
Index = collectionRouteKeyIndexName
};
collectionRouteKeyBulk.Operations.Add(updateOperation);
indexNameCount++;
}

var response = await client.BulkAsync(collectionRouteKeyBulk, cancellationToken);
if (!response.IsValid)
{
throw new ElasticsearchException(
$"Bulk Update Document failed at index {collectionRouteKeyIndexName} :{response.ServerError.Error.Reason}");
}
}

private async Task<List<Task>> GetBulkDeleteCollectionRouteKeyTasksAsync(bool isSharding, List<TEntity> modelList,
CancellationToken cancellationToken = default)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ public interface IEntityMappingBasicRepository<TEntity, TKey> where TEntity : cl
Task AddOrUpdateManyAsync(List<TEntity> list, string collectionName = null, CancellationToken cancellationToken = default);

Task UpdateAsync(TEntity model, string collectionName = null, CancellationToken cancellationToken = default);

Task UpdateManyAsync(List<TEntity> list, string collectionName = null, CancellationToken cancellationToken = default);

Task DeleteAsync(TKey id, string collectionName = null, CancellationToken cancellationToken = default);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ public async Task AddOrUpdateManyAsyncTest()
}

[Fact]
public async Task UpdateAsync()
public async Task UpdateAsyncTest()
{
var blockIndex = new BlockIndex
{
Expand All @@ -179,6 +179,91 @@ public async Task UpdateAsync()
Assert.True(results.First().LogEventCount == blockIndex.LogEventCount);
await _elasticsearchRepository.DeleteAsync(blockIndex);
}

[Fact]
public async Task UpdateManyAsyncTest()
{
var blockIndex1 = new BlockIndex
{
Id = "block001",
BlockHash = "BlockHash001",
BlockHeight = 10,
BlockTime = DateTime.Now.AddDays(-8),
LogEventCount = 10,
ChainId = "AELF"
};
var blockIndex2 = new BlockIndex
{
Id = "block002",
BlockHash = "BlockHash002",
BlockHeight = 20,
BlockTime = DateTime.Now.AddDays(-8),
LogEventCount = 10,
ChainId = "AELF"
};
var blockIndex3 = new BlockIndex
{
Id = "block003",
BlockHash = "BlockHash003",
BlockHeight = 30,
BlockTime = DateTime.Now.AddDays(-8),
LogEventCount = 10,
ChainId = "AELF"
};
var bulkList = new List<BlockIndex> {blockIndex1, blockIndex2, blockIndex3};
await _elasticsearchRepository.AddOrUpdateManyAsync(bulkList);

var queryable = await _elasticsearchRepository.GetQueryableAsync();
Expression<Func<BlockIndex, bool>> expression = p =>
p.ChainId == blockIndex1.ChainId && p.BlockHeight == blockIndex1.BlockHeight && p.Id == blockIndex1.Id;
var results = queryable.Where(expression).ToList();
results.ShouldNotBeNull();
results.First().BlockHash.ShouldBe(blockIndex1.BlockHash);
results.First().LogEventCount.ShouldBe(blockIndex1.LogEventCount);

queryable = await _elasticsearchRepository.GetQueryableAsync();
expression = p =>
p.ChainId == blockIndex2.ChainId && p.BlockHeight == blockIndex2.BlockHeight && p.Id == blockIndex2.Id;
results = queryable.Where(expression).ToList();
results.ShouldNotBeNull();
results.First().BlockHash.ShouldBe(blockIndex2.BlockHash);
results.First().LogEventCount.ShouldBe(blockIndex2.LogEventCount);

queryable = await _elasticsearchRepository.GetQueryableAsync();
expression = p =>
p.ChainId == blockIndex3.ChainId && p.BlockHeight == blockIndex3.BlockHeight && p.Id == blockIndex3.Id;
results = queryable.Where(expression).ToList();
results.ShouldNotBeNull();
results.First().BlockHash.ShouldBe(blockIndex3.BlockHash);
results.First().LogEventCount.ShouldBe(blockIndex3.LogEventCount);

blockIndex1.LogEventCount = 100;
blockIndex2.LogEventCount = 200;
blockIndex3.LogEventCount = 300;
var bulkUpdateList = new List<BlockIndex> {blockIndex1, blockIndex2, blockIndex3};
await _elasticsearchRepository.UpdateManyAsync(bulkUpdateList);

queryable = await _elasticsearchRepository.GetQueryableAsync();
expression = p =>
p.ChainId == blockIndex1.ChainId && p.BlockHeight == blockIndex1.BlockHeight && p.Id == blockIndex1.Id;
results = queryable.Where(expression).ToList();
results.ShouldNotBeNull();
results.First().LogEventCount.ShouldBe(100);

queryable = await _elasticsearchRepository.GetQueryableAsync();
expression = p =>
p.ChainId == blockIndex2.ChainId && p.BlockHeight == blockIndex2.BlockHeight && p.Id == blockIndex2.Id;
results = queryable.Where(expression).ToList();
results.ShouldNotBeNull();
results.First().LogEventCount.ShouldBe(200);

queryable = await _elasticsearchRepository.GetQueryableAsync();
expression = p =>
p.ChainId == blockIndex3.ChainId && p.BlockHeight == blockIndex3.BlockHeight && p.Id == blockIndex3.Id;
results = queryable.Where(expression).ToList();
results.ShouldNotBeNull();
results.First().LogEventCount.ShouldBe(300);
}

[Fact]
public async Task DeleteAsyncByEntityTest()
Expand Down

0 comments on commit 80f7347

Please sign in to comment.