Skip to content

Commit

Permalink
feat: add static init for instances of Partition and Authentication
Browse files Browse the repository at this point in the history
  • Loading branch information
aneojgurhem committed Oct 8, 2024
1 parent 2cb6681 commit 247eac9
Show file tree
Hide file tree
Showing 49 changed files with 1,287 additions and 79 deletions.
49 changes: 29 additions & 20 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -421,19 +421,28 @@ jobs:
strategy:
fail-fast: false
matrix:
queue:
- activemq
- rabbitmq
- rabbitmq091
- pubsub
- sqs
object:
- redis
- minio
log-level:
- Information
- Verbose
name: HtcMock ${{ matrix.queue }} ${{ matrix.object }} ${{ matrix.log-level }}
target:
- { queue: activemq, object: redis, log-level: Information, cinit: true }
- { queue: rabbitmq, object: redis, log-level: Information, cinit: true }
- { queue: rabbitmq091, object: redis, log-level: Information, cinit: true }
- { queue: pubsub, object: redis, log-level: Information, cinit: true }
- { queue: sqs, object: redis, log-level: Information, cinit: true }

- { queue: activemq, object: redis, log-level: Verbose, cinit: true }
- { queue: rabbitmq, object: redis, log-level: Verbose, cinit: true }
- { queue: rabbitmq091, object: redis, log-level: Verbose, cinit: true }
- { queue: pubsub, object: redis, log-level: Verbose, cinit: true }
- { queue: sqs, object: redis, log-level: Verbose, cinit: true }

- { queue: activemq, object: minio, log-level: Information, cinit: true }
- { queue: rabbitmq, object: minio, log-level: Information, cinit: true }
- { queue: rabbitmq091, object: minio, log-level: Information, cinit: true }
- { queue: pubsub, object: minio, log-level: Information, cinit: true }
- { queue: sqs, object: minio, log-level: Information, cinit: true }

- { queue: activemq, object: redis, log-level: Information, cinit: false }

name: HtcMock ${{ matrix.target.queue }} ${{ matrix.target.object }} ${{ matrix.target.log-level }} ${{ matrix.target.cinit }}
steps:
- name: Checkout
uses: actions/checkout@692973e3d937129bcbf40652eb9f2f61becf3332 # v4
Expand All @@ -455,7 +464,7 @@ jobs:
- name: Deploy Core
run: |
MONITOR_PREFIX="monitor/deploy/" tools/retry.sh -w 30 -- tools/monitor.sh \
just log_level=${{ matrix.log-level }} tag=${VERSION} queue=${{ matrix.queue }} object=${{ matrix.object }} worker=htcmock deploy
just log_level=${{ matrix.target.log-level }} tag=${VERSION} queue=${{ matrix.target.queue }} object=${{ matrix.target.object }} cinit=${{ matrix.target.cinit }} worker=htcmock deploy
sleep 10
- name: Print And Time Metrics
Expand Down Expand Up @@ -524,7 +533,7 @@ jobs:
- name: Run HtcMock test 1000 tasks 1 level
timeout-minutes: 3
if: ${{ matrix.log-level != 'Verbose' }}
if: ${{ matrix.target.log-level != 'Verbose' }}
run: |
MONITOR_PREFIX="monitor/htcmock-1000-1/" tools/monitor.sh \
docker run --net armonik_network --rm \
Expand All @@ -539,7 +548,7 @@ jobs:
- name: Run HtcMock test 1000 tasks 4 levels
timeout-minutes: 3
if: ${{ matrix.log-level != 'Verbose' }}
if: ${{ matrix.target.log-level != 'Verbose' }}
run: |
MONITOR_PREFIX="monitor/htcmock-1000-4/" tools/monitor.sh \
docker run --net armonik_network --rm \
Expand All @@ -563,8 +572,8 @@ jobs:
run: |
export AWS_ACCESS_KEY_ID=${{ secrets.AWS_ACCESS_KEY_ID }}
export AWS_SECRET_ACCESS_KEY=${{ secrets.AWS_SECRET_ACCESS_KEY }}
docker cp fluentd:/armonik-logs.json - | gzip -c | aws s3 cp - s3://${{ secrets.AWS_LOG_BUCKET_NAME }}/core-pipeline/${{ github.run_number }}/${{ github.run_attempt }}/htcmock-${{ matrix.queue }}-${{ matrix.object }}-${{ matrix.log-level }}.json.gz
tar -czf - monitor/ | aws s3 cp - s3://${{ secrets.AWS_LOG_BUCKET_NAME }}/core-pipeline/${{ github.run_number }}/${{ github.run_attempt }}/htcmock-${{ matrix.queue }}-${{ matrix.object }}-${{ matrix.log-level }}-monitor.tar.gz
docker cp fluentd:/armonik-logs.json - | gzip -c | aws s3 cp - s3://${{ secrets.AWS_LOG_BUCKET_NAME }}/core-pipeline/${{ github.run_number }}/${{ github.run_attempt }}/htcmock-${{ matrix.target.queue }}-${{ matrix.target.object }}-${{ matrix.target.log-level }}-${{ matrix.target.cinit }}.json.gz
tar -czf - monitor/ | aws s3 cp - s3://${{ secrets.AWS_LOG_BUCKET_NAME }}/core-pipeline/${{ github.run_number }}/${{ github.run_attempt }}/htcmock-${{ matrix.target.queue }}-${{ matrix.target.object }}-${{ matrix.target.log-level }}-${{ matrix.target.cinit }}-monitor.tar.gz
- name: Collect docker container logs
uses: jwalton/gh-docker-logs@2741064ab9d7af54b0b1ffb6076cf64c16f0220e # v2
Expand All @@ -576,14 +585,14 @@ jobs:
run: |
export AWS_ACCESS_KEY_ID=${{ secrets.AWS_ACCESS_KEY_ID }}
export AWS_SECRET_ACCESS_KEY=${{ secrets.AWS_SECRET_ACCESS_KEY }}
tar -cvf - ./container-logs | aws s3 cp - s3://${{ secrets.AWS_LOG_BUCKET_NAME }}/core-pipeline/${{ github.run_number }}/${{ github.run_attempt }}/htcmock-${{ matrix.queue }}-${{ matrix.object }}-${{ matrix.log-level }}-container-logs.tar.gz
tar -cvf - ./container-logs | aws s3 cp - s3://${{ secrets.AWS_LOG_BUCKET_NAME }}/core-pipeline/${{ github.run_number }}/${{ github.run_attempt }}/htcmock-${{ matrix.target.queue }}-${{ matrix.target.object }}-${{ matrix.target.log-level }}-${{ matrix.target.cinit }}-container-logs.tar.gz
- name: Export and upload database
if: always()
run: |
export AWS_ACCESS_KEY_ID=${{ secrets.AWS_ACCESS_KEY_ID }}
export AWS_SECRET_ACCESS_KEY=${{ secrets.AWS_SECRET_ACCESS_KEY }}
bash tools/export_mongodb.sh
tar -cvf - *.json | aws s3 cp - s3://${{ secrets.AWS_LOG_BUCKET_NAME }}/core-pipeline/${{ github.run_number }}/${{ github.run_attempt }}/htcmock-${{ matrix.queue }}-${{ matrix.object }}-${{ matrix.log-level }}-database.tar.gz
tar -cvf - *.json | aws s3 cp - s3://${{ secrets.AWS_LOG_BUCKET_NAME }}/core-pipeline/${{ github.run_number }}/${{ github.run_attempt }}/htcmock-${{ matrix.target.queue }}-${{ matrix.target.object }}-${{ matrix.target.log-level }}-${{ matrix.target.cinit }}-database.tar.gz
testWindowsDocker:
needs:
Expand Down
36 changes: 36 additions & 0 deletions Adaptors/MongoDB/src/Common/IMongoDataModelMapping.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

using System.Threading.Tasks;

using ArmoniK.Core.Common.Injection.Options.Database;

using MongoDB.Driver;

namespace ArmoniK.Core.Adapters.MongoDB.Common;
Expand All @@ -25,10 +27,44 @@ public interface IMongoDataModelMapping<T>
{
string CollectionName { get; }

/// <summary>
/// Setup indexes for the collection
/// Can be called multiple times
/// </summary>
/// <param name="sessionHandle">MongoDB Client session</param>
/// <param name="collection">MongoDDB Collection in which to insert data</param>
/// <param name="options">Options for MongoDB</param>
/// <returns>
/// Task representing the asynchronous execution of the method
/// </returns>
Task InitializeIndexesAsync(IClientSessionHandle sessionHandle,
IMongoCollection<T> collection,
Options.MongoDB options);

/// <summary>
/// Setup sharding for the collection
/// Can be called multiple times
/// </summary>
/// <param name="sessionHandle">MongoDB Client session</param>
/// <param name="options">Options for MongoDB</param>
/// <returns>
/// Task representing the asynchronous execution of the method
/// </returns>
Task ShardCollectionAsync(IClientSessionHandle sessionHandle,
Options.MongoDB options);

/// <summary>
/// Insert data into the collection after its creation.
/// Can be called multiple times
/// </summary>
/// <param name="sessionHandle">MongoDB Client session</param>
/// <param name="collection">MongoDDB Collection in which to insert data</param>
/// <param name="initDatabase">Data to insert</param>
/// <returns>
/// Task representing the asynchronous execution of the method
/// </returns>
Task InitializeCollectionAsync(IClientSessionHandle sessionHandle,
IMongoCollection<T> collection,
InitDatabase initDatabase)
=> Task.CompletedTask;
}
103 changes: 73 additions & 30 deletions Adaptors/MongoDB/src/Common/MongoCollectionProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@
// along with this program. If not, see <http://www.gnu.org/licenses/>.

using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

using ArmoniK.Core.Base;
using ArmoniK.Core.Base.DataStructures;
using ArmoniK.Core.Common;
using ArmoniK.Core.Common.Injection.Options.Database;

using JetBrains.Annotations;

Expand All @@ -40,6 +42,7 @@ public class MongoCollectionProvider<TData, TModelMapping> : IInitializable, IAs
private IMongoCollection<TData>? mongoCollection_;

public MongoCollectionProvider(Options.MongoDB options,
InitDatabase initDatabase,
SessionProvider sessionProvider,
IMongoDatabase mongoDatabase,
ILogger<IMongoCollection<TData>> logger,
Expand All @@ -52,6 +55,7 @@ public MongoCollectionProvider(Options.MongoDB options,
}

Initialization = InitializeAsync(options,
initDatabase,
sessionProvider,
mongoDatabase,
logger,
Expand Down Expand Up @@ -88,6 +92,7 @@ public async Task Init(CancellationToken cancellationToken)
}

private static async Task<IMongoCollection<TData>> InitializeAsync(Options.MongoDB options,
InitDatabase initDatabase,
SessionProvider sessionProvider,
IMongoDatabase mongoDatabase,
ILogger<IMongoCollection<TData>> logger,
Expand All @@ -96,47 +101,55 @@ private static async Task<IMongoCollection<TData>> InitializeAsync(Options.Mongo
var model = new TModelMapping();
Exception? lastException = null;

for (var collectionRetry = 1; collectionRetry < options.MaxRetries; collectionRetry++)
if (initDatabase.Init)
{
lastException = null;
try
{
await mongoDatabase.CreateCollectionAsync(model.CollectionName,
null,
cancellationToken)
.ConfigureAwait(false);
break;
}
catch (MongoCommandException ex) when (ex.CodeName == "NamespaceExists")
for (var collectionRetry = 1; collectionRetry < options.MaxRetries; collectionRetry++)
{
logger.LogDebug(ex,
"Use already existing instance of Collection {CollectionName}",
model.CollectionName);
break;
lastException = null;
try
{
await mongoDatabase.CreateCollectionAsync(model.CollectionName,
null,
cancellationToken)
.ConfigureAwait(false);
break;
}
catch (MongoCommandException ex) when (ex.CodeName == "NamespaceExists")
{
logger.LogDebug(ex,
"Use already existing instance of Collection {CollectionName}",
model.CollectionName);
break;
}
catch (Exception ex)
{
lastException = ex;
logger.LogDebug(ex,
"Retrying to create Collection {CollectionName}",
model.CollectionName);
await Task.Delay(1000 * collectionRetry,
cancellationToken)
.ConfigureAwait(false);
}
}
catch (Exception ex)

if (lastException is not null)
{
lastException = ex;
logger.LogDebug(ex,
"Retrying to create Collection {CollectionName}",
model.CollectionName);
await Task.Delay(1000 * collectionRetry,
cancellationToken)
.ConfigureAwait(false);
throw new TimeoutException($"Create {model.CollectionName}: Max retries reached",
lastException);
}
}

if (lastException is not null)
{
throw new TimeoutException($"Create {model.CollectionName}: Max retries reached",
lastException);
}

var output = mongoDatabase.GetCollection<TData>(model.CollectionName);
await sessionProvider.Init(cancellationToken)
.ConfigureAwait(false);
var session = sessionProvider.Get();

if (!initDatabase.Init)
{
return output;
}

for (var indexRetry = 1; indexRetry < options.MaxRetries; indexRetry++)
{
lastException = null;
Expand Down Expand Up @@ -192,9 +205,39 @@ await Task.Delay(1000 * indexRetry,
}
}

for (var indexRetry = 1; indexRetry < options.MaxRetries; indexRetry++)
{
lastException = null;
try
{
await model.InitializeCollectionAsync(session,
output,
initDatabase)
.ConfigureAwait(false);
break;
}
catch (MongoBulkWriteException<TData> e) when (e.WriteErrors.All(error => error.Category == ServerErrorCategory.DuplicateKey))
{
logger.LogDebug(e,
"Values were already present within the collection {CollectionName}",
model.CollectionName);
break;
}
catch (Exception ex)
{
lastException = ex;
logger.LogDebug(ex,
"Retrying to initialize {CollectionName} collection",
model.CollectionName);
await Task.Delay(1000 * indexRetry,
cancellationToken)
.ConfigureAwait(false);
}
}

if (lastException is not null)
{
throw new TimeoutException($"Init Index or shard for {model.CollectionName}: Max retries reached",
throw new TimeoutException($"Init for {model.CollectionName}: Max retries reached",
lastException);
}

Expand Down
16 changes: 16 additions & 0 deletions Adaptors/MongoDB/src/Table/DataModel/Auth/AuthDataModelMapping.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

using System.Linq;
using System.Threading.Tasks;

using ArmoniK.Core.Adapters.MongoDB.Common;
using ArmoniK.Core.Common.Auth.Authentication;
using ArmoniK.Core.Common.Injection.Options.Database;

using MongoDB.Bson;
using MongoDB.Bson.Serialization;
Expand Down Expand Up @@ -78,7 +80,21 @@ await collection.Indexes.CreateManyAsync(sessionHandle,
.ConfigureAwait(false);
}

/// <inheritdoc />
public Task ShardCollectionAsync(IClientSessionHandle sessionHandle,
Options.MongoDB options)
=> Task.CompletedTask;

/// <inheritdoc />
public async Task InitializeCollectionAsync(IClientSessionHandle sessionHandle,
IMongoCollection<AuthData> collection,
InitDatabase initDatabase)
{
if (initDatabase.Auths.Any())
{
await collection.InsertManyAsync(sessionHandle,
initDatabase.Auths)
.ConfigureAwait(false);
}
}
}
16 changes: 16 additions & 0 deletions Adaptors/MongoDB/src/Table/DataModel/Auth/RoleDataModelMapping.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@
// along with this program. If not, see <http://www.gnu.org/licenses/>.

using System;
using System.Linq;
using System.Threading.Tasks;

using ArmoniK.Core.Adapters.MongoDB.Common;
using ArmoniK.Core.Common.Auth.Authentication;
using ArmoniK.Core.Common.Injection.Options.Database;

using MongoDB.Bson.Serialization;
using MongoDB.Driver;
Expand Down Expand Up @@ -74,7 +76,21 @@ await collection.Indexes.CreateManyAsync(sessionHandle,
.ConfigureAwait(false);
}

/// <inheritdoc />
public Task ShardCollectionAsync(IClientSessionHandle sessionHandle,
Options.MongoDB options)
=> Task.CompletedTask;

/// <inheritdoc />
public async Task InitializeCollectionAsync(IClientSessionHandle sessionHandle,
IMongoCollection<RoleData> collection,
InitDatabase initDatabase)
{
if (initDatabase.Roles.Any())
{
await collection.InsertManyAsync(sessionHandle,
initDatabase.Roles)
.ConfigureAwait(false);
}
}
}
Loading

0 comments on commit 247eac9

Please sign in to comment.