Skip to content

Commit

Permalink
feat: dynamic object storage (#782)
Browse files Browse the repository at this point in the history
# Motivation

- Allow users to implement their own object storage plugins without
modifying this repository.
- Prepare the import existing object from the object storage feature by
dissociating Result metadata identifier from the key used in the object
storage to reference the actual data.

# Description

- Move object storage interfaces from ArmoniK.Core.Common to
ArmoniK.Core.Base package so that they can be implemented outside of
ArmoniK.Core.
- Change the Result data schema in database to add an OpaqueId which
represent the identifier used by the ObjectStorage to refer to the
Result data.
- Dynamically load object storages as done for the queue. It works on
the same principle : we use look for an implementation of
`IDependencyInjectionBuildable` and instanciate it. Then, this interface
is responsible for adding implementation of `IObjectStorage` to its
given `ServicesCollection`.
- The AddQueue function has been generalised to support ObjectStorages
and QueueStorages. It should also be able to support database and
partition plugins loading when it will be available.
- Update ArmoniK.Core infrastructure to properly use the dynamically
loaded object storages.
- Implement Embed object storage that stores the data into the key,
therefore inserting them completely in the database.
- Adds documentation to explain how to migrate an existing ArmoniK
database to the new definition of Result.

# Testing

- Unit tests and integration tests were executed locally and in the
pipeline.
- Database migration was tested with the following procedure:

First checkout and deploy with the previous release from ArmoniK.Core:

```bash
git checkout 0.28.0
just object=redis worker=htcmock tag=0.28.0 deploy
```

Then execute a job with a few tasks so that there is data to migrate in
the database. Do not forget to keep data in the Object Storage after the
execution (disable Purge from the session) to fully test that the
association will be done properly by the adapter.

```bash
docker run --net armonik_network --rm \
    -e HtcMock__NTasks=10 \
    -e HtcMock__TotalCalculationTime=00:00:00.100 \
    -e HtcMock__DataSize=1 \
    -e HtcMock__MemorySize=1 \
    -e HtcMock__PurgeData=false \
    -e HtcMock__Partition=TestPartition0 \
    -e GrpcClient__Endpoint=http://armonik.control.submitter:1080 \
    dockerhubaneo/armonik_core_htcmock_test_client:0.28.0
```

Once the job is finished, go to the version of ArmoniK.Core on this
branch and deploy it.

```bash
git checkout jg/dynamicobject
just object=redis worker=htcmock build-deploy
```

Connect to the database and execute the migration query found in the
documentation added by this PR.

```bash
docker exec -it database mongosh database
```

Download results metadata and try to access its data from the object
storage. The two requests should work without issue. Keep in mind that
you should replace the result_id and the session_id.

```bash
docker run --net armonik_network fullstorydev/grpcurl -plaintext -d '{"result_id" : "80e8c67d-3093-48f4-a635-c1cc10299bce" }'  armonik.control.submitter:1080 armonik.api.grpc.v1.results.Results/GetResult

docker run --net armonik_network fullstorydev/grpcurl -plaintext -d '{"result_id" : "80e8c67d-3093-48f4-a635-c1cc10299bce", "session_id" : "3df359ad-e69e-426a-970f-11e798e9b1ae" }'  armonik.control.submitter:1080 armonik.api.grpc.v1.results.Results/DownloadResultData
```

# Impact

- Performances with the existing object storage plugins should not be
impacted.
- Embed object storage should be more performant since it will store
data in the database but will increase the memory usage of the database.
Be careful for large data as MongoDB cannot store documents larger than
16Mb.
- Existing databases should be migrated according to the suggested
procedure.
- Object Storage configuration will have to be changed according to the
object storage used (see the outputs of our modules).


# Additional Information

- Using a string for OpaqueId instead of binary data during the
migration does not work properly. String cannot be automatically
deserialized into a byte array.

Some links I studied to complete this PR.

-
https://stackoverflow.com/questions/61794277/method-does-not-have-an-implementation-when-loading-assemblies-into-a-new-assemb
- https://www.mongodb.com/docs/current/reference/method/BinData/
-
https://www.mongodb.com/docs/manual/reference/operator/aggregation/convert/
- https://www.mongodb.com/docs/manual/reference/method/HexData/
-
https://www.mongodb.com/docs/manual/reference/operator/aggregation/function/

# Checklist

- [x] My code adheres to the coding and style guidelines of the project.
- [x] I have performed a self-review of my code.
- [x] I have commented my code, particularly in hard-to-understand
areas.
- [x] I have made corresponding changes to the documentation.
- [x] I have thoroughly tested my modifications and added tests when
necessary.
- [x] Tests pass locally and in the CI.
- [x] I have assessed the performance impact of my modifications.
  • Loading branch information
lemaitre-aneo authored Dec 19, 2024
2 parents 368637b + 989dea4 commit 220164c
Show file tree
Hide file tree
Showing 104 changed files with 1,596 additions and 1,187 deletions.
51 changes: 51 additions & 0 deletions .docs/content/0.installation/5.version-migration.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# How to migrate ArmoniK.Core dependencies during upgrade ?

## 0.28.x -> 0.29.x

### Database

This version changes the structure of a Result in the database. It introduces a new field called `OpaqueId` which holds the identifier of its associated value in the Object Storage. Previously, the ResultId was used. The following MongoDB request converts the ResultId into the OpaqueId to support the new implementation.

```js
db.Result.updateMany({},
[{
$addFields: {
OpaqueId: {
$function: {
body: function(data) {
const base64Chars = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/';
const bytes = [];
for (let i = 0; i < data.length; i++) {
bytes.push(data.charCodeAt(i));
}

let base64 = '';
let i = 0;
while (i < bytes.length) {
let byte1 = bytes[i++] || 0;
let byte2 = bytes[i++] || 0;
let byte3 = bytes[i++] || 0;

let enc1 = byte1 >> 2;
let enc2 = ((byte1 & 3) << 4) | (byte2 >> 4);
let enc3 = ((byte2 & 15) << 2) | (byte3 >> 6);
let enc4 = byte3 & 63;

if (isNaN(byte2)) {
enc3 = enc4 = 64;
} else if (isNaN(byte3)) {
enc4 = 64;
}

base64 += base64Chars[enc1] + base64Chars[enc2] + base64Chars[enc3] + base64Chars[enc4];
}

return BinData(0, base64);
},
args: ["$_id"],
lang: "js"
}
}
}
}])
```
9 changes: 5 additions & 4 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ jobs:
- Adaptors/MongoDB/tests
- Adaptors/Memory/tests
- Adaptors/S3/tests
- Adaptors/Embed/tests
os:
- ubuntu-latest
fail-fast: false
Expand Down Expand Up @@ -179,6 +180,7 @@ jobs:
- Common/tests
- Adaptors/MongoDB/tests
- Adaptors/Memory/tests
- Adaptors/Embed/tests
fail-fast: false
runs-on: windows-latest
steps:
Expand All @@ -189,12 +191,10 @@ jobs:
submodules: true

- name: Dotnet Restore
run: |
dotnet restore
run: dotnet restore

- name: Dotnet Build
run: |
dotnet build
run: dotnet build

- name: Run tests
run: |
Expand Down Expand Up @@ -440,6 +440,7 @@ jobs:
object:
- redis
- minio
- embed
log-level:
- Information
- Verbose
Expand Down
74 changes: 74 additions & 0 deletions Adaptors/Embed/src/ArmoniK.Core.Adapters.Embed.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>net8.0</TargetFramework>
<RuntimeIdentifiers>win-x64;linux-x64;linux-arm64</RuntimeIdentifiers>
<GeneratePackageOnBuild>True</GeneratePackageOnBuild>
<Company>ANEO</Company>
<Copyright>Copyright (C) ANEO, 2021-2022</Copyright>
<PackageLicenseExpression>AGPL-3.0-or-later</PackageLicenseExpression>
<PackageRequireLicenseAcceptance>True</PackageRequireLicenseAcceptance>
<IsPackable>true</IsPackable>
<Nullable>enable</Nullable>
<EnableDynamicLoading>true</EnableDynamicLoading>
</PropertyGroup>

<PropertyGroup Condition=" '$(Configuration)' == 'Debug' ">
<DebugType>Embedded</DebugType>
<IncludeSymbols>true</IncludeSymbols>
<DefineConstants>DEBUG;TRACE</DefineConstants>
</PropertyGroup>

<PropertyGroup Condition="'$(Configuration)'=='Release'">
<Optimize>true</Optimize>
<IncludeSymbols>true</IncludeSymbols>
<SymbolPackageFormat>snupkg</SymbolPackageFormat>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="ArmoniK.Utils.Diagnostics" Version="0.5.1">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
</ItemGroup>

<ItemGroup>
<PackageReference Include="JetBrains.Annotations" Version="2024.2.0">
<ExcludeAssets>runtime</ExcludeAssets>
</PackageReference>
<PackageReference Include="Microsoft.Bcl.AsyncInterfaces" Version="8.0.0">
<ExcludeAssets>runtime</ExcludeAssets>
</PackageReference>
<PackageReference Include="Microsoft.Extensions.Configuration.Binder" Version="8.0.2">
<ExcludeAssets>runtime</ExcludeAssets>
</PackageReference>
<PackageReference Include="Microsoft.Extensions.Configuration.Json" Version="8.0.1">
<Private>false</Private>
<ExcludeAssets>runtime</ExcludeAssets>
</PackageReference>
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="8.0.2">
<ExcludeAssets>runtime</ExcludeAssets>
</PackageReference>
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="8.0.2">
<ExcludeAssets>runtime</ExcludeAssets>
</PackageReference>
<PackageReference Include="ArmoniK.Utils.Diagnostics" Version="0.5.1">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
<PackageReference Include="ArmoniK.Utils" Version="0.5.1">
<ExcludeAssets>runtime</ExcludeAssets>
</PackageReference>
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\..\..\Utils\src\ArmoniK.Core.Utils.csproj">
<Private>false</Private>
<ExcludeAssets>runtime</ExcludeAssets>
</ProjectReference>
<ProjectReference Include="..\..\..\Base\src\ArmoniK.Core.Base.csproj">
<Private>false</Private>
<ExcludeAssets>runtime</ExcludeAssets>
</ProjectReference>
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
<wpf:ResourceDictionary xml:space="preserve" xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml" xmlns:s="clr-namespace:System;assembly=mscorlib" xmlns:ss="urn:shemas-jetbrains-com:settings-storage-xaml" xmlns:wpf="http://schemas.microsoft.com/winfx/2006/xaml/presentation">
<s:String x:Key="/Default/CodeInspection/Daemon/ConfigureAwaitAnalysisMode/@EntryValue">Library</s:String></wpf:ResourceDictionary>
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// This file is part of the ArmoniK project
// This file is part of the ArmoniK project
//
// Copyright (C) ANEO, 2021-2024. All rights reserved.
//
Expand All @@ -15,20 +15,27 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

using System;
using ArmoniK.Core.Base;
using ArmoniK.Core.Utils;

using JetBrains.Annotations;

namespace ArmoniK.Core.Adapters.MongoDB.Options;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;

namespace ArmoniK.Core.Adapters.Embed;

/// <summary>
/// Class for building Embed instance and Object interfaces through Dependency Injection
/// </summary>
[PublicAPI]
public class QueueStorage
public class ObjectBuilder : IDependencyInjectionBuildable
{
public const string SettingSection = nameof(MongoDB) + ":" + nameof(QueueStorage);

public TimeSpan LockRefreshPeriodicity { get; set; } = TimeSpan.FromMinutes(2);

public TimeSpan PollPeriodicity { get; set; } = TimeSpan.FromSeconds(5);

public TimeSpan LockRefreshExtension { get; set; } = TimeSpan.FromMinutes(5);
/// <inheritdoc />
[PublicAPI]
public void Build(IServiceCollection serviceCollection,
ConfigurationManager configuration,
ILogger logger)
=> serviceCollection.AddSingletonWithHealthCheck<IObjectStorage, ObjectStorage>(nameof(IObjectStorage));
}
86 changes: 86 additions & 0 deletions Adaptors/Embed/src/ObjectStorage.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// This file is part of the ArmoniK project
//
// Copyright (C) ANEO, 2021-2024. All rights reserved.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published
// by the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY, without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

using ArmoniK.Core.Base;
using ArmoniK.Core.Base.DataStructures;

using JetBrains.Annotations;

using Microsoft.Extensions.Diagnostics.HealthChecks;
using Microsoft.Extensions.Logging;

namespace ArmoniK.Core.Adapters.Embed;

[UsedImplicitly]
public class ObjectStorage : IObjectStorage
{
private readonly ILogger<ObjectStorage> logger_;
private bool isInitialized_;

/// <summary>
/// <see cref="IObjectStorage" /> implementation for Redis
/// </summary>
/// <param name="logger">Logger used to print logs</param>
public ObjectStorage(ILogger<ObjectStorage> logger)
=> logger_ = logger;

/// <inheritdoc />
public Task Init(CancellationToken cancellationToken)
{
isInitialized_ = true;
return Task.CompletedTask;
}

/// <inheritdoc />
public Task<HealthCheckResult> Check(HealthCheckTag tag)
=> Task.FromResult(isInitialized_
? HealthCheckResult.Healthy()
: HealthCheckResult.Unhealthy("Object storage not initialized yet."));

/// <inheritdoc />
public async Task<(byte[] id, long size)> AddOrUpdateAsync(ObjectData metaData,
IAsyncEnumerable<ReadOnlyMemory<byte>> valueChunks,
CancellationToken cancellationToken = default)
{
var array = new List<byte>();

await foreach (var val in valueChunks.WithCancellation(cancellationToken)
.ConfigureAwait(false))
{
array.AddRange(val.Span);
}

return (array.ToArray(), array.Count);
}

/// <inheritdoc />
public IAsyncEnumerable<byte[]> GetValuesAsync(byte[] id,
CancellationToken cancellationToken = default)
=> AsyncEnumerable.Repeat(id,
1);

/// <inheritdoc />
public Task TryDeleteAsync(IEnumerable<byte[]> ids,
CancellationToken cancellationToken = default)
=> Task.CompletedTask;
}
28 changes: 28 additions & 0 deletions Adaptors/Embed/tests/ArmoniK.Core.Adapters.Embed.Tests.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>net8.0</TargetFramework>
<RuntimeIdentifiers>win-x64;linux-x64;linux-arm64</RuntimeIdentifiers>
<IsPackable>false</IsPackable>
<Nullable>enable</Nullable>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="ArmoniK.Utils.Diagnostics" Version="0.5.1">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
<PackageReference Include="NUnit3TestAdapter" Version="4.5.0" />
<PackageReference Include="NUnit" Version="3.14.0" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.11.1" />
<PackageReference Include="redis-inside" Version="3.3.0" />
</ItemGroup>

<ItemGroup>
<AssemblyAttribute Include="System.Diagnostics.CodeAnalysis.ExcludeFromCodeCoverageAttribute" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\..\..\Common\tests\ArmoniK.Core.Common.Tests.csproj" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
<wpf:ResourceDictionary xml:space="preserve" xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml" xmlns:s="clr-namespace:System;assembly=mscorlib" xmlns:ss="urn:shemas-jetbrains-com:settings-storage-xaml" xmlns:wpf="http://schemas.microsoft.com/winfx/2006/xaml/presentation">
<s:String x:Key="/Default/CodeInspection/Daemon/ConfigureAwaitAnalysisMode/@EntryValue">Library</s:String></wpf:ResourceDictionary>
Loading

0 comments on commit 220164c

Please sign in to comment.