diff --git a/Adaptors/MongoDB/src/Common/IMongoDataModelMapping.cs b/Adaptors/MongoDB/src/Common/IMongoDataModelMapping.cs index adbe8207b..7a3399a48 100644 --- a/Adaptors/MongoDB/src/Common/IMongoDataModelMapping.cs +++ b/Adaptors/MongoDB/src/Common/IMongoDataModelMapping.cs @@ -28,4 +28,7 @@ public interface IMongoDataModelMapping Task InitializeIndexesAsync(IClientSessionHandle sessionHandle, IMongoCollection collection, Options.MongoDB options); + + Task ShardCollectionAsync(IClientSessionHandle sessionHandle, + Options.MongoDB options); } diff --git a/Adaptors/MongoDB/src/Common/MongoCollectionProvider.cs b/Adaptors/MongoDB/src/Common/MongoCollectionProvider.cs index 83d86eddf..578bcdffa 100644 --- a/Adaptors/MongoDB/src/Common/MongoCollectionProvider.cs +++ b/Adaptors/MongoDB/src/Common/MongoCollectionProvider.cs @@ -167,9 +167,34 @@ await Task.Delay(1000 * indexRetry, } } + if (options.Sharding) + { + for (var indexRetry = 1; indexRetry < options.MaxRetries; indexRetry++) + { + lastException = null; + try + { + await model.ShardCollectionAsync(session, + options) + .ConfigureAwait(false); + break; + } + catch (Exception ex) + { + lastException = ex; + logger.LogDebug(ex, + "Retrying to shard {CollectionName} collection", + model.CollectionName); + await Task.Delay(1000 * indexRetry, + cancellationToken) + .ConfigureAwait(false); + } + } + } + if (lastException is not null) { - throw new TimeoutException($"Init Indexes for {model.CollectionName}: Max retries reached", + throw new TimeoutException($"Init Index or shard for {model.CollectionName}: Max retries reached", lastException); } diff --git a/Adaptors/MongoDB/src/Object/ObjectDataModelMapping.cs b/Adaptors/MongoDB/src/Object/ObjectDataModelMapping.cs index 5b0950cea..fa9560aeb 100644 --- a/Adaptors/MongoDB/src/Object/ObjectDataModelMapping.cs +++ b/Adaptors/MongoDB/src/Object/ObjectDataModelMapping.cs @@ -78,4 +78,8 @@ await collection.Indexes.CreateManyAsync(sessionHandle, indexModels) .ConfigureAwait(false); } + + public Task ShardCollectionAsync(IClientSessionHandle sessionHandle, + Options.MongoDB options) + => Task.CompletedTask; } diff --git a/Adaptors/MongoDB/src/Options/MongoDB.cs b/Adaptors/MongoDB/src/Options/MongoDB.cs index 67bc737d8..dcbf8dfc7 100644 --- a/Adaptors/MongoDB/src/Options/MongoDB.cs +++ b/Adaptors/MongoDB/src/Options/MongoDB.cs @@ -60,6 +60,11 @@ public class MongoDB public QueueStorage QueueStorage { get; set; } = new(); - public int MaxConnectionPoolSize { get; set; } = 500; + public int MaxConnectionPoolSize { get; set; } = 500; + public TimeSpan ServerSelectionTimeout { get; set; } = TimeSpan.FromMinutes(2); + + public bool Sharding { get; set; } + + public string AuthSource { get; set; } = ""; } diff --git a/Adaptors/MongoDB/src/ServiceCollectionExt.cs b/Adaptors/MongoDB/src/ServiceCollectionExt.cs index c555bcfdc..9fa9e90a7 100644 --- a/Adaptors/MongoDB/src/ServiceCollectionExt.cs +++ b/Adaptors/MongoDB/src/ServiceCollectionExt.cs @@ -184,6 +184,11 @@ public static IServiceCollection AddMongoClient(this IServiceCollection services mongoOptions.DatabaseName); } + if (!string.IsNullOrEmpty(mongoOptions.AuthSource)) + { + connectionString = $"{connectionString}?authSource={mongoOptions.AuthSource}"; + } + var settings = MongoClientSettings.FromUrl(new MongoUrl(connectionString)); settings.AllowInsecureTls = mongoOptions.AllowInsecureTls; settings.UseTls = mongoOptions.Tls; diff --git a/Adaptors/MongoDB/src/Table/DataModel/Auth/AuthDataModelMapping.cs b/Adaptors/MongoDB/src/Table/DataModel/Auth/AuthDataModelMapping.cs index 0c7211992..bae6c44a5 100644 --- a/Adaptors/MongoDB/src/Table/DataModel/Auth/AuthDataModelMapping.cs +++ b/Adaptors/MongoDB/src/Table/DataModel/Auth/AuthDataModelMapping.cs @@ -77,4 +77,8 @@ await collection.Indexes.CreateManyAsync(sessionHandle, indexModels) .ConfigureAwait(false); } + + public Task ShardCollectionAsync(IClientSessionHandle sessionHandle, + Options.MongoDB options) + => Task.CompletedTask; } diff --git a/Adaptors/MongoDB/src/Table/DataModel/Auth/RoleDataModelMapping.cs b/Adaptors/MongoDB/src/Table/DataModel/Auth/RoleDataModelMapping.cs index b194cca3a..5da24a1cc 100644 --- a/Adaptors/MongoDB/src/Table/DataModel/Auth/RoleDataModelMapping.cs +++ b/Adaptors/MongoDB/src/Table/DataModel/Auth/RoleDataModelMapping.cs @@ -73,4 +73,8 @@ await collection.Indexes.CreateManyAsync(sessionHandle, indexModels) .ConfigureAwait(false); } + + public Task ShardCollectionAsync(IClientSessionHandle sessionHandle, + Options.MongoDB options) + => Task.CompletedTask; } diff --git a/Adaptors/MongoDB/src/Table/DataModel/Auth/UserDataModelMapping.cs b/Adaptors/MongoDB/src/Table/DataModel/Auth/UserDataModelMapping.cs index 2836d843a..56d22fcbd 100644 --- a/Adaptors/MongoDB/src/Table/DataModel/Auth/UserDataModelMapping.cs +++ b/Adaptors/MongoDB/src/Table/DataModel/Auth/UserDataModelMapping.cs @@ -73,4 +73,8 @@ await collection.Indexes.CreateManyAsync(sessionHandle, indexModels) .ConfigureAwait(false); } + + public Task ShardCollectionAsync(IClientSessionHandle sessionHandle, + Options.MongoDB options) + => Task.CompletedTask; } diff --git a/Adaptors/MongoDB/src/Table/DataModel/PartitionDataModelMapping.cs b/Adaptors/MongoDB/src/Table/DataModel/PartitionDataModelMapping.cs index e5250fcda..2c22bc412 100644 --- a/Adaptors/MongoDB/src/Table/DataModel/PartitionDataModelMapping.cs +++ b/Adaptors/MongoDB/src/Table/DataModel/PartitionDataModelMapping.cs @@ -90,4 +90,8 @@ await collection.Indexes.CreateManyAsync(sessionHandle, indexModels) .ConfigureAwait(false); } + + public Task ShardCollectionAsync(IClientSessionHandle sessionHandle, + Options.MongoDB options) + => Task.CompletedTask; } diff --git a/Adaptors/MongoDB/src/Table/DataModel/ResultDataModelMapping.cs b/Adaptors/MongoDB/src/Table/DataModel/ResultDataModelMapping.cs index a25c7d95e..1621b252c 100644 --- a/Adaptors/MongoDB/src/Table/DataModel/ResultDataModelMapping.cs +++ b/Adaptors/MongoDB/src/Table/DataModel/ResultDataModelMapping.cs @@ -92,4 +92,10 @@ await collection.Indexes.CreateManyAsync(sessionHandle, indexModels) .ConfigureAwait(false); } + + public async Task ShardCollectionAsync(IClientSessionHandle sessionHandle, + Options.MongoDB options) + => await sessionHandle.shardCollection(options, + CollectionName) + .ConfigureAwait(false); } diff --git a/Adaptors/MongoDB/src/Table/DataModel/SessionDataModelMapping.cs b/Adaptors/MongoDB/src/Table/DataModel/SessionDataModelMapping.cs index 01237d6b5..c95eb72ab 100644 --- a/Adaptors/MongoDB/src/Table/DataModel/SessionDataModelMapping.cs +++ b/Adaptors/MongoDB/src/Table/DataModel/SessionDataModelMapping.cs @@ -137,4 +137,10 @@ await collection.Indexes.CreateManyAsync(sessionHandle, indexModels) .ConfigureAwait(false); } + + public async Task ShardCollectionAsync(IClientSessionHandle sessionHandle, + Options.MongoDB options) + => await sessionHandle.shardCollection(options, + CollectionName) + .ConfigureAwait(false); } diff --git a/Adaptors/MongoDB/src/Table/DataModel/ShardingExt.cs b/Adaptors/MongoDB/src/Table/DataModel/ShardingExt.cs new file mode 100644 index 000000000..cf92b094a --- /dev/null +++ b/Adaptors/MongoDB/src/Table/DataModel/ShardingExt.cs @@ -0,0 +1,52 @@ +// This file is part of the ArmoniK project +// +// Copyright (C) ANEO, 2021-2024. All rights reserved. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published +// by the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY, without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +using System.Collections.Generic; +using System.Threading.Tasks; + +using MongoDB.Bson; +using MongoDB.Driver; + +namespace ArmoniK.Core.Adapters.MongoDB.Table.DataModel; + +public static class ShardingExt +{ + public static async Task shardCollection(this IClientSessionHandle sessionHandle, + Options.MongoDB options, + string collectionName) + { + var adminDB = sessionHandle.Client.GetDatabase("admin"); + var shardingCommandDict = new Dictionary + { + { + "shardCollection", $"{options.DatabaseName}.{collectionName}" + }, + { + "key", new Dictionary + { + { + "_id", "hashed" + }, + } + }, + }; + + var shardingCommand = new BsonDocumentCommand(new BsonDocument(shardingCommandDict)); + await adminDB.RunCommandAsync(shardingCommand) + .ConfigureAwait(false); + } +} diff --git a/Adaptors/MongoDB/src/Table/DataModel/TaskDataModelMapping.cs b/Adaptors/MongoDB/src/Table/DataModel/TaskDataModelMapping.cs index 9ac72d1b3..7419e8572 100644 --- a/Adaptors/MongoDB/src/Table/DataModel/TaskDataModelMapping.cs +++ b/Adaptors/MongoDB/src/Table/DataModel/TaskDataModelMapping.cs @@ -210,4 +210,10 @@ await collection.Indexes.CreateManyAsync(sessionHandle, indexModels) .ConfigureAwait(false); } + + public async Task ShardCollectionAsync(IClientSessionHandle sessionHandle, + Options.MongoDB options) + => await sessionHandle.shardCollection(options, + CollectionName) + .ConfigureAwait(false); } diff --git a/ProfilingTools/src/OpenTelemetryExporter/OpenTelemetryDataModelMapping.cs b/ProfilingTools/src/OpenTelemetryExporter/OpenTelemetryDataModelMapping.cs index 01bcb9f25..54eb67355 100644 --- a/ProfilingTools/src/OpenTelemetryExporter/OpenTelemetryDataModelMapping.cs +++ b/ProfilingTools/src/OpenTelemetryExporter/OpenTelemetryDataModelMapping.cs @@ -110,4 +110,8 @@ await collection.Indexes.CreateManyAsync(sessionHandle, indexModels) .ConfigureAwait(false); } + + public Task ShardCollectionAsync(IClientSessionHandle sessionHandle, + Adapters.MongoDB.Options.MongoDB options) + => Task.CompletedTask; }