From 372377b7805216dddd0ae595b3c8a6501d7266b6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A9r=C3=B4me=20Gurhem?= <88328270+aneojgurhem@users.noreply.github.com> Date: Fri, 18 Aug 2023 12:18:19 +0200 Subject: [PATCH 01/11] fix: Updated to Api 3.11 (#204) --- ...rmoniK.DevelopmentKit.Client.Common.csproj | 2 +- .../AggregationPriorityTest.cs | 52 ++++++++++++++----- .../ArmoniK.EndToEndTests.Common.csproj | 2 +- ...rmoniK.DevelopmentKit.Worker.Common.csproj | 4 +- 4 files changed, 43 insertions(+), 17 deletions(-) diff --git a/Client/src/Common/ArmoniK.DevelopmentKit.Client.Common.csproj b/Client/src/Common/ArmoniK.DevelopmentKit.Client.Common.csproj index 90ad5d37..942955b9 100644 --- a/Client/src/Common/ArmoniK.DevelopmentKit.Client.Common.csproj +++ b/Client/src/Common/ArmoniK.DevelopmentKit.Client.Common.csproj @@ -8,7 +8,7 @@ - + diff --git a/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Client/Tests/AggregationPriority/AggregationPriorityTest.cs b/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Client/Tests/AggregationPriority/AggregationPriorityTest.cs index 2e8d8c52..40ec37c0 100644 --- a/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Client/Tests/AggregationPriority/AggregationPriorityTest.cs +++ b/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Client/Tests/AggregationPriority/AggregationPriorityTest.cs @@ -21,8 +21,8 @@ using System.Threading; using System.Threading.Tasks; -using Armonik.Api.Grpc.V1.SortDirection; - +using ArmoniK.Api.gRPC.V1; +using ArmoniK.Api.gRPC.V1.SortDirection; using ArmoniK.Api.gRPC.V1.Tasks; using ArmoniK.DevelopmentKit.Client.Common.Status; using ArmoniK.DevelopmentKit.Client.Unified.Services.Submitter; @@ -112,9 +112,9 @@ public void Check_That_serialazation_is_ok() /// The filter. /// The sort. /// An IAsyncEnumerable of TaskRaw. - private async IAsyncEnumerable RetrieveAllTasksStats(ChannelBase channel, - ListTasksRequest.Types.Filter filter, - ListTasksRequest.Types.Sort sort) + private async IAsyncEnumerable RetrieveAllTasksStats(ChannelBase channel, + Filters filter, + ListTasksRequest.Types.Sort sort) { var read = 0; var page = 0; @@ -123,7 +123,7 @@ private async IAsyncEnumerable RetrieveAllTasksStats(ChannelBase while ((res = await taskClient.ListTasksAsync(new ListTasksRequest { - Filter = filter, + Filters = filter, Sort = sort, PageSize = 50, Page = page, @@ -149,24 +149,50 @@ private async IAsyncEnumerable RetrieveAllTasksStats(ChannelBase /// Work in progress. GetDistribution is a method that gets the repartition between scalar and agg tasks. /// /// A Task of IEnumerable of TaskRaw. - private async Task> GetDistribution(int nRows) + private async Task> GetDistribution(int nRows) { var service = unifiedTestHelper_.Service as Service; service.GetChannel(); - var taskRawData = new List(); + var taskRawData = new List(); await foreach (var taskRaw in RetrieveAllTasksStats(service.GetChannel(), - new ListTasksRequest.Types.Filter + new Filters { - SessionId = service.SessionId, + Or = + { + new FiltersAnd + { + And = + { + new FilterField + { + Field = new TaskField + { + TaskSummaryField = new TaskSummaryField + { + Field = TaskSummaryEnumField.SessionId, + }, + }, + FilterString = new FilterString + { + Operator = FilterStringOperator.Equal, + Value = service.SessionId, + }, + }, + }, + }, + }, }, new ListTasksRequest.Types.Sort { Direction = SortDirection.Asc, Field = new TaskField { - TaskSummaryField = TaskSummaryField.TaskId, + TaskSummaryField = new TaskSummaryField + { + Field = TaskSummaryEnumField.TaskId, + }, }, }) .ConfigureAwait(false)) @@ -302,8 +328,8 @@ private IEnumerable> WaitForResults(string se /// The sessionId for which the intermediate result info is to be retrieved. /// The taskDataIds for which the intermediate result info is to be retrieved. /// An IEnumerable of tuples containing the sessionId, taskRaw, and taskResult. - private IEnumerable<(string, TaskRaw, TaskResult)> GetIntermediateResultInfo(string sessionId, - IEnumerable taskDataIds) + private IEnumerable<(string, TaskDetailed, TaskResult)> GetIntermediateResultInfo(string sessionId, + IEnumerable taskDataIds) { var result = WaitForResults(sessionId, taskDataIds.Select(t => t.Id)); diff --git a/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Common/ArmoniK.EndToEndTests.Common.csproj b/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Common/ArmoniK.EndToEndTests.Common.csproj index e74a92ca..523eac6a 100644 --- a/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Common/ArmoniK.EndToEndTests.Common.csproj +++ b/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Common/ArmoniK.EndToEndTests.Common.csproj @@ -13,7 +13,7 @@ - + diff --git a/Worker/src/Common/ArmoniK.DevelopmentKit.Worker.Common.csproj b/Worker/src/Common/ArmoniK.DevelopmentKit.Worker.Common.csproj index 7f0dba16..139e1e86 100644 --- a/Worker/src/Common/ArmoniK.DevelopmentKit.Worker.Common.csproj +++ b/Worker/src/Common/ArmoniK.DevelopmentKit.Worker.Common.csproj @@ -1,4 +1,4 @@ - + net6.0 @@ -8,7 +8,7 @@ - + From ad76effeef742e2677f37ad4823b34e9602d54b5 Mon Sep 17 00:00:00 2001 From: Dylan Brasseur Date: Mon, 16 Oct 2023 10:23:05 +0200 Subject: [PATCH 02/11] Fix retry delay --- Client/src/Common/Properties.cs | 70 ++++++++++++++++--- .../src/Unified/Services/Submitter/Service.cs | 8 ++- 2 files changed, 68 insertions(+), 10 deletions(-) diff --git a/Client/src/Common/Properties.cs b/Client/src/Common/Properties.cs index 1aec5ba0..132452f9 100644 --- a/Client/src/Common/Properties.cs +++ b/Client/src/Common/Properties.cs @@ -54,6 +54,10 @@ public class Properties private const string SectionClientCertP12 = "ClientP12"; private const string SectionTargetNameOverride = "EndpointNameOverride"; + private const string SectionRetryInitialBackoff = "RetryInitialBackoff"; + private const string SectionRetryBackoffMultiplier = "RetryBackoffMultiplier"; + private const string SectionRetryMaxBackoff = "RetryMaxBackoff"; + /// /// The default configuration to submit task in a Session /// @@ -116,17 +120,23 @@ public Properties(TaskOptions options, /// The client key file in a pem format /// The client certificate in a P12/Pkcs12/PFX format /// Disable the ssl strong validation of ssl certificate (default : enable => true) + /// Initial retry backoff delay + /// Retry backoff multiplier + /// Max retry backoff /// public Properties(IConfiguration configuration, TaskOptions options, - string connectionAddress = null, - int connectionPort = 0, - string protocol = null, - string clientCertFilePem = null, - string clientKeyFilePem = null, - string clientP12 = null, - string caCertPem = null, - bool? sslValidation = null) + string connectionAddress = null, + int connectionPort = 0, + string protocol = null, + string clientCertFilePem = null, + string clientKeyFilePem = null, + string clientP12 = null, + string caCertPem = null, + bool? sslValidation = null, + TimeSpan retryInitialBackoff = new(), + double retryBackoffMultiplier = 0, + TimeSpan retryMaxBackoff = new()) { TaskOptions = options; Configuration = configuration; @@ -160,6 +170,35 @@ public Properties(IConfiguration configuration, ClientKeyFilePem = clientKeyFilePem ?? sectionGrpc?[SectionClientKey]; ClientP12File = clientP12 ?? sectionGrpc?[SectionClientCertP12]; + if (retryInitialBackoff != TimeSpan.Zero) + { + RetryInitialBackoff = retryInitialBackoff; + } + else if (!string.IsNullOrWhiteSpace(sectionGrpc?[SectionRetryInitialBackoff])) + { + RetryInitialBackoff = TimeSpan.Parse(sectionGrpc[SectionRetryInitialBackoff]); + } + + if (retryBackoffMultiplier != 0) + { + RetryBackoffMultiplier = retryBackoffMultiplier; + } + else if (!string.IsNullOrWhiteSpace(sectionGrpc?[SectionRetryBackoffMultiplier])) + { + RetryBackoffMultiplier = double.Parse(sectionGrpc[SectionRetryBackoffMultiplier]); + } + + + if (retryMaxBackoff != TimeSpan.Zero) + { + RetryMaxBackoff = retryMaxBackoff; + } + else if (!string.IsNullOrWhiteSpace(sectionGrpc?[SectionRetryMaxBackoff])) + { + RetryMaxBackoff = TimeSpan.Parse(sectionGrpc[SectionRetryMaxBackoff]); + } + + if (connectionPort != 0) { ConnectionPort = connectionPort; @@ -285,4 +324,19 @@ public string ConnectionString /// The target name of the endpoint when ssl validation is disabled. Automatic if not set. /// public string TargetNameOverride { get; set; } = ""; + + /// + /// Initial backoff from retries + /// + public TimeSpan RetryInitialBackoff { get; set; } = TimeSpan.FromSeconds(1); + + /// + /// Backoff multiplier for retries + /// + public double RetryBackoffMultiplier { get; set; } = 2; + + /// + /// Max backoff for retries + /// + public TimeSpan RetryMaxBackoff { get; set; } = TimeSpan.FromSeconds(30); } diff --git a/Client/src/Unified/Services/Submitter/Service.cs b/Client/src/Unified/Services/Submitter/Service.cs index 35a76f57..c9668a3f 100644 --- a/Client/src/Unified/Services/Submitter/Service.cs +++ b/Client/src/Unified/Services/Submitter/Service.cs @@ -152,6 +152,7 @@ public Service(Properties properties, { var maxRetries = groupBlockRequest.First() .MaxRetries; + var currentBackoff = properties.RetryInitialBackoff; for (var retry = 0; retry < maxRetries; retry++) { //Generate resultId @@ -214,11 +215,14 @@ public Service(Properties properties, Logger?.LogWarning(e, "Fail to submit, {retry}/{maxRetries} retrying", - retry, + retry + 1, maxRetries); //Delay before submission - Task.Delay(TimeSpan.FromMilliseconds(1000)); + Task.Delay(currentBackoff) + .Wait(); + currentBackoff = TimeSpan.FromSeconds(Math.Min(currentBackoff.TotalSeconds * properties.RetryBackoffMultiplier, + properties.RetryMaxBackoff.TotalSeconds)); } } From 7f4d6c1c6257ac8d29fcd6babe821245bcc83529 Mon Sep 17 00:00:00 2001 From: Dylan Brasseur Date: Mon, 16 Oct 2023 11:52:03 +0200 Subject: [PATCH 03/11] Fixed build core version --- .github/workflows/build.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index be693781..61115bec 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -213,6 +213,7 @@ jobs: tls: ${{ matrix.tls }} mtls: ${{ matrix.mtls }} ext-csharp-version: ${{ needs.versionning.outputs.version }} + core-version: 0.13.2 - name: Setup hosts file run : echo -e "$(kubectl get svc ingress -n armonik -o jsonpath={.status.loadBalancer.ingress[0].ip})\tarmonik.local" | sudo tee -a /etc/hosts From 5697121e5330b1b66caddbdec6b6a57ab5aad456 Mon Sep 17 00:00:00 2001 From: Dylan Brasseur Date: Tue, 20 Jun 2023 11:00:43 +0200 Subject: [PATCH 04/11] Fixes and reduces nuint tests --- .../Tests/AggregationPriority/AggregationPriorityTest.cs | 1 + .../ArmoniK.EndToEndTests.Client/Tests/UnitTestHelperBase.cs | 5 ++--- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Client/Tests/AggregationPriority/AggregationPriorityTest.cs b/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Client/Tests/AggregationPriority/AggregationPriorityTest.cs index eedad9fc..7ccf3a6f 100644 --- a/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Client/Tests/AggregationPriority/AggregationPriorityTest.cs +++ b/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Client/Tests/AggregationPriority/AggregationPriorityTest.cs @@ -303,6 +303,7 @@ private IEnumerable> WaitForResults(string se /// /// The size of the square matrix. [TestCase(20)] + [Ignore("Too big")] public void Check_That_Result_has_expected_value(int squareMatrixSize) { unifiedTestHelper_.Log.LogInformation($"Compute square matrix with n = {squareMatrixSize}"); diff --git a/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Client/Tests/UnitTestHelperBase.cs b/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Client/Tests/UnitTestHelperBase.cs index 027a8b5d..ea19a6c9 100644 --- a/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Client/Tests/UnitTestHelperBase.cs +++ b/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Client/Tests/UnitTestHelperBase.cs @@ -85,9 +85,8 @@ public void InitProperties(EngineType engineType, applicationNamespace, applicationService); - Props = new Properties(TaskOptions, - Configuration.GetSection("Grpc")["EndPoint"], - 5001); + Props = new Properties(Configuration, + TaskOptions); } public static object[] ParamsHelper(params object[] elements) From e58e3c7942f6e84504e6192a419db09fdb5e7a06 Mon Sep 17 00:00:00 2001 From: Dylan Brasseur Date: Mon, 16 Oct 2023 14:17:34 +0200 Subject: [PATCH 05/11] Only get for properties --- Client/src/Common/Properties.cs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/Client/src/Common/Properties.cs b/Client/src/Common/Properties.cs index 132452f9..eaee8055 100644 --- a/Client/src/Common/Properties.cs +++ b/Client/src/Common/Properties.cs @@ -328,15 +328,15 @@ public string ConnectionString /// /// Initial backoff from retries /// - public TimeSpan RetryInitialBackoff { get; set; } = TimeSpan.FromSeconds(1); + public TimeSpan RetryInitialBackoff { get; } = TimeSpan.FromSeconds(1); /// /// Backoff multiplier for retries /// - public double RetryBackoffMultiplier { get; set; } = 2; + public double RetryBackoffMultiplier { get; } = 2; /// /// Max backoff for retries /// - public TimeSpan RetryMaxBackoff { get; set; } = TimeSpan.FromSeconds(30); + public TimeSpan RetryMaxBackoff { get; } = TimeSpan.FromSeconds(30); } From 60d1a29dfa75c6d07c048e03385e75b6441bf732 Mon Sep 17 00:00:00 2001 From: Dylan Brasseur Date: Mon, 16 Oct 2023 14:47:05 +0200 Subject: [PATCH 06/11] Removed ci destroy deployment --- .github/workflows/build.yml | 7 ------- 1 file changed, 7 deletions(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 61115bec..ad25bd12 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -261,13 +261,6 @@ jobs: name: "IntegrationTests tls:${{ matrix.tls }} mtls:${{ matrix.mtls }} val:${{ matrix.sslvalidation }} ca:${{ matrix.useca }}" path: ./Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Client/TestResults/test-results.trx reporter: dotnet-trx - - - name: Destroy deployment - if: success() || failure() - uses: aneoconsulting/ArmoniK.Action.Deploy/destroy@main - with: - working-directory: ${{ github.workspace }}/infra - type: localhost canMerge: needs: From 42677e590d9f764105786a6f95c6787bf18b1b24 Mon Sep 17 00:00:00 2001 From: Dylan Brasseur Date: Mon, 16 Oct 2023 14:13:22 +0200 Subject: [PATCH 07/11] Added log to retry while exception --- .../Common/Submitter/BaseClientSubmitter.cs | 6 ++ .../src/Unified/Services/Submitter/Service.cs | 1 + Common/src/Common/RetryAction.cs | 69 ++++++++++++++++++- 3 files changed, 75 insertions(+), 1 deletion(-) diff --git a/Client/src/Common/Submitter/BaseClientSubmitter.cs b/Client/src/Common/Submitter/BaseClientSubmitter.cs index 273e15b1..2215f59e 100644 --- a/Client/src/Common/Submitter/BaseClientSubmitter.cs +++ b/Client/src/Common/Submitter/BaseClientSubmitter.cs @@ -430,6 +430,7 @@ public void WaitForTasksCompletion(IEnumerable taskIds) }); }, true, + Logger, typeof(IOException), typeof(RpcException)); } @@ -477,6 +478,7 @@ public ResultStatusCollection GetResultStatus(IEnumerable taskIds, return resultStatusReply.IdStatuses; }, true, + Logger, typeof(IOException), typeof(RpcException)); @@ -552,6 +554,7 @@ public ResultStatusCollection GetResultStatus(IEnumerable taskIds, .TaskResults); }, true, + Logger, typeof(IOException), typeof(RpcException)); @@ -611,6 +614,7 @@ public byte[] GetResult(string taskId, } }, true, + Logger, typeof(IOException), typeof(RpcException)); @@ -620,6 +624,7 @@ public byte[] GetResult(string taskId, cancellationToken) .Result, true, + Logger, typeof(IOException), typeof(RpcException)); @@ -814,6 +819,7 @@ public byte[] TryGetResult(string taskId, } }, true, + Logger, typeof(IOException), typeof(RpcException)); diff --git a/Client/src/Unified/Services/Submitter/Service.cs b/Client/src/Unified/Services/Submitter/Service.cs index c9668a3f..aec1fae4 100644 --- a/Client/src/Unified/Services/Submitter/Service.cs +++ b/Client/src/Unified/Services/Submitter/Service.cs @@ -684,6 +684,7 @@ private void ProxyTryGetResults(IEnumerable taskIds, .Result; }, true, + Logger, typeof(IOException), typeof(RpcException))); } diff --git a/Common/src/Common/RetryAction.cs b/Common/src/Common/RetryAction.cs index 36b236cd..74bd7392 100644 --- a/Common/src/Common/RetryAction.cs +++ b/Common/src/Common/RetryAction.cs @@ -1,7 +1,11 @@ -using System; +using System; using System.Linq; using System.Threading; +using JetBrains.Annotations; + +using Microsoft.Extensions.Logging; + namespace ArmoniK.DevelopmentKit.Common; /// @@ -51,6 +55,33 @@ public static void WhileException(int retries, Action operation, bool allowDerivedExceptions = false, params Type[] exceptionType) + => WhileException(retries, + delayMs, + operation, + allowDerivedExceptions, + null, + exceptionType); + + /// + /// Retry the specified operation the specified number of times, until there are no more retries or it succeeded + /// without an exception. + /// + /// The number of times to retry the operation + /// The number of milliseconds to sleep after a failed invocation of the operation + /// the operation to perform + /// if not null, ignore any exceptions of this type and subtypes + /// + /// If true, exceptions deriving from the specified exception type are ignored as + /// well. Defaults to False + /// + /// Logger to log retried exception + /// When one of the retries succeeds, return the value the operation returned. If not, an exception is thrown. + public static void WhileException(int retries, + int delayMs, + Action operation, + bool allowDerivedExceptions = false, + [CanBeNull] ILogger logger = null, + params Type[] exceptionType) { // Do all but one retries in the loop for (var retry = 1; retry < retries; retry++) @@ -67,6 +98,8 @@ public static void WhileException(int retries, if (exceptionType != null && allowDerivedExceptions && ex is AggregateException && exceptionType.Any(e => ex.InnerException != null && ex.InnerException.GetType() == e)) { + logger?.LogDebug("Got exception while executing function to retry : {ex}", + ex); Thread.Sleep(delayMs); } else if (exceptionType == null || exceptionType.Any(e => e == ex.GetType()) || (allowDerivedExceptions && exceptionType.Any(e => ex.GetType() @@ -75,6 +108,8 @@ public static void WhileException(int retries, // Ignore exceptions when exceptionType is not specified OR // the exception thrown was of the specified exception type OR // the exception thrown is derived from the specified exception type and we allow that + logger?.LogDebug("Got exception while executing function to retry : {ex}", + ex); Thread.Sleep(delayMs); } else @@ -105,6 +140,34 @@ public static T WhileException(int retries, Func operation, bool allowDerivedExceptions = false, params Type[] exceptionType) + => WhileException(retries, + delayMs, + operation, + allowDerivedExceptions, + null, + exceptionType); + + /// + /// Retry the specified operation the specified number of times, until there are no more retries or it succeeded + /// without an exception. + /// + /// The return type of the exception + /// The number of times to retry the operation + /// The number of milliseconds to sleep after a failed invocation of the operation + /// the operation to perform + /// if not null, ignore any exceptions of this type and subtypes + /// + /// If true, exceptions deriving from the specified exception type are ignored as + /// well. Defaults to False + /// + /// Logger to log retried exception + /// When one of the retries succeeds, return the value the operation returned. If not, an exception is thrown. + public static T WhileException(int retries, + int delayMs, + Func operation, + bool allowDerivedExceptions = false, + [CanBeNull] ILogger logger = null, + params Type[] exceptionType) { // Do all but one retries in the loop for (var retry = 1; retry < retries; retry++) @@ -119,6 +182,8 @@ public static T WhileException(int retries, if (exceptionType != null && allowDerivedExceptions && ex is AggregateException && exceptionType.Any(e => ex.InnerException != null && ex.InnerException.GetType() == e)) { + logger?.LogDebug("Got exception while executing function to retry : {ex}", + ex); Thread.Sleep(delayMs); } else if (exceptionType == null || exceptionType.Any(e => e == ex.GetType()) || (allowDerivedExceptions && exceptionType.Any(e => ex.GetType() @@ -127,6 +192,8 @@ public static T WhileException(int retries, // Ignore exceptions when exceptionType is not specified OR // the exception thrown was of the specified exception type OR // the exception thrown is derived from the specified exception type and we allow that + logger?.LogDebug("Got exception while executing function to retry : {ex}", + ex); Thread.Sleep(delayMs); } else From fe73cb988c310ab8bb1c231c592920fc4cb0437a Mon Sep 17 00:00:00 2001 From: Florian Lemaitre Date: Fri, 23 Jun 2023 13:10:17 +0200 Subject: [PATCH 08/11] Acquire channel within retry loops --- .../Common/Submitter/BaseClientSubmitter.cs | 30 +++++++++---------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/Client/src/Common/Submitter/BaseClientSubmitter.cs b/Client/src/Common/Submitter/BaseClientSubmitter.cs index 2215f59e..4efa30f2 100644 --- a/Client/src/Common/Submitter/BaseClientSubmitter.cs +++ b/Client/src/Common/Submitter/BaseClientSubmitter.cs @@ -225,16 +225,16 @@ private IEnumerable ChunkSubmitTasksWithDependencies(IEnumerable taskIds) { using var _ = Logger?.LogFunction(); - using var channel = channelPool_.GetChannel(); - var submitterService = new Api.gRPC.V1.Submitter.Submitter.SubmitterClient(channel); - Retry.WhileException(5, 2000, retry => { + using var channel = channelPool_.GetChannel(); + var submitterService = new Api.gRPC.V1.Submitter.Submitter.SubmitterClient(channel); + if (retry > 1) { Logger?.LogWarning("Try {try} for {funcName}", @@ -457,13 +457,13 @@ public ResultStatusCollection GetResultStatus(IEnumerable taskIds, ResultStatus.Notfound)) : Array.Empty(); - using var channel = channelPool_.GetChannel(); - var submitterService = new Api.gRPC.V1.Submitter.Submitter.SubmitterClient(channel); - var idStatus = Retry.WhileException(5, 2000, retry => { + using var channel = channelPool_.GetChannel(); + var submitterService = new Api.gRPC.V1.Submitter.Submitter.SubmitterClient(channel); + Logger?.LogDebug("Try {try} for {funcName}", retry, nameof(submitterService.GetResultStatus)); @@ -584,13 +584,13 @@ public byte[] GetResult(string taskId, Session = SessionId.Id, }; - using var channel = channelPool_.GetChannel(); - var submitterService = new Api.gRPC.V1.Submitter.Submitter.SubmitterClient(channel); - Retry.WhileException(5, 2000, retry => { + using var channel = channelPool_.GetChannel(); + var submitterService = new Api.gRPC.V1.Submitter.Submitter.SubmitterClient(channel); + Logger?.LogDebug("Try {try} for {funcName}", retry, nameof(submitterService.WaitForAvailability)); From 6521f17c5e8079d362809c8554305ef0ce479359 Mon Sep 17 00:00:00 2001 From: lemaitre-aneo Date: Sun, 25 Jun 2023 20:10:02 +0200 Subject: [PATCH 09/11] Add a channel health check for the pool --- Client/src/Common/Submitter/ChannelPool.cs | 84 ++++++++++++++++++++-- 1 file changed, 78 insertions(+), 6 deletions(-) diff --git a/Client/src/Common/Submitter/ChannelPool.cs b/Client/src/Common/Submitter/ChannelPool.cs index 04055030..652ebf86 100644 --- a/Client/src/Common/Submitter/ChannelPool.cs +++ b/Client/src/Common/Submitter/ChannelPool.cs @@ -30,6 +30,9 @@ using JetBrains.Annotations; using Microsoft.Extensions.Logging; +#if NET5_0_OR_GREATER +using Grpc.Net.Client; +#endif namespace ArmoniK.DevelopmentKit.Client.Common.Submitter; @@ -66,9 +69,17 @@ private ChannelBase AcquireChannel() { if (pool_.TryTake(out var channel)) { - logger_?.LogDebug("Acquired already existing channel {channel} from pool", - channel); - return channel; + if (ShutdownOnFailure(channel)) + { + logger_?.LogDebug("Got an invalid channel {channel} from pool", + channel); + } + else + { + logger_?.LogDebug("Acquired already existing channel {channel} from pool", + channel); + return channel; + } } channel = channelFactory_(); @@ -83,9 +94,70 @@ private ChannelBase AcquireChannel() /// Channel to release private void ReleaseChannel(ChannelBase channel) { - logger_?.LogDebug("Released channel {channel} to pool", - channel); - pool_.Add(channel); + if (ShutdownOnFailure(channel)) + { + logger_?.LogDebug("Shutdown unhealthy channel {channel}", + channel); + } + else + { + logger_?.LogDebug("Released channel {channel} to pool", + channel); + pool_.Add(channel); + } + } + + /// + /// Check the state of a channel and shutdown it in case of failure + /// + /// Channel to check the state + /// True if the channel has been shut down + private static bool ShutdownOnFailure(ChannelBase channel) + { + try + { + switch (channel) + { + case Channel chan: + switch (chan.State) + { + case ChannelState.TransientFailure: + chan.ShutdownAsync() + .Wait(); + return true; + case ChannelState.Shutdown: + return true; + case ChannelState.Idle: + case ChannelState.Connecting: + case ChannelState.Ready: + default: + return false; + } +#if NET5_0_OR_GREATER + case GrpcChannel chan: + switch (chan.State) + { + case ConnectivityState.TransientFailure: + chan.ShutdownAsync() + .Wait(); + return true; + case ConnectivityState.Shutdown: + return true; + case ConnectivityState.Idle: + case ConnectivityState.Connecting: + case ConnectivityState.Ready: + default: + return false; + } +#endif + default: + return false; + } + } + catch (InvalidOperationException) + { + return false; + } } /// From 80b07db38f3c5778dc51821f6d4be08522d35a20 Mon Sep 17 00:00:00 2001 From: Dylan Brasseur Date: Mon, 16 Oct 2023 16:53:46 +0200 Subject: [PATCH 10/11] Added ci deploy test to .x branches --- .github/workflows/build.yml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index ec38d6a8..36a1fff9 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -4,9 +4,12 @@ on: push: branches: - main + - "[0-9]+.[0-9]+.x" pull_request: branches: - main + - "[0-9]+.[0-9]+.x" + jobs: versionning: From f8683b7606295ae42a40803fe9a7efe6871772a6 Mon Sep 17 00:00:00 2001 From: Dylan Brasseur Date: Tue, 17 Oct 2023 15:17:19 +0200 Subject: [PATCH 11/11] Fix post merge issues --- ...rmoniK.DevelopmentKit.Client.Common.csproj | 1 + Client/src/Common/Properties.cs | 16 ++++++------ .../ArmoniK.DevelopmentKit.Common.csproj | 1 + Common/src/Common/RetryAction.cs | 26 +++++++++---------- 4 files changed, 22 insertions(+), 22 deletions(-) diff --git a/Client/src/Common/ArmoniK.DevelopmentKit.Client.Common.csproj b/Client/src/Common/ArmoniK.DevelopmentKit.Client.Common.csproj index ca1477d2..a779a4bc 100644 --- a/Client/src/Common/ArmoniK.DevelopmentKit.Client.Common.csproj +++ b/Client/src/Common/ArmoniK.DevelopmentKit.Client.Common.csproj @@ -9,6 +9,7 @@ + diff --git a/Client/src/Common/Properties.cs b/Client/src/Common/Properties.cs index 5c8604e9..727eb2a5 100644 --- a/Client/src/Common/Properties.cs +++ b/Client/src/Common/Properties.cs @@ -127,14 +127,14 @@ public Properties(TaskOptions options, /// public Properties(IConfiguration configuration, TaskOptions options, - string? connectionAddress = null, - int connectionPort = 0, - string? protocol = null, - string? clientCertFilePem = null, - string? clientKeyFilePem = null, - string? clientP12 = null, - string? caCertPem = null, - bool? sslValidation = null, + string? connectionAddress = null, + int connectionPort = 0, + string? protocol = null, + string? clientCertFilePem = null, + string? clientKeyFilePem = null, + string? clientP12 = null, + string? caCertPem = null, + bool? sslValidation = null, TimeSpan retryInitialBackoff = new(), double retryBackoffMultiplier = 0, TimeSpan retryMaxBackoff = new()) diff --git a/Common/src/Common/ArmoniK.DevelopmentKit.Common.csproj b/Common/src/Common/ArmoniK.DevelopmentKit.Common.csproj index e731ce79..f344e851 100644 --- a/Common/src/Common/ArmoniK.DevelopmentKit.Common.csproj +++ b/Common/src/Common/ArmoniK.DevelopmentKit.Common.csproj @@ -9,6 +9,7 @@ + diff --git a/Common/src/Common/RetryAction.cs b/Common/src/Common/RetryAction.cs index a2479cdb..26cc0bb6 100644 --- a/Common/src/Common/RetryAction.cs +++ b/Common/src/Common/RetryAction.cs @@ -18,8 +18,6 @@ using System.Linq; using System.Threading; -using JetBrains.Annotations; - using Microsoft.Extensions.Logging; namespace ArmoniK.DevelopmentKit.Common; @@ -92,12 +90,12 @@ public static void WhileException(int retries, /// /// Logger to log retried exception /// When one of the retries succeeds, return the value the operation returned. If not, an exception is thrown. - public static void WhileException(int retries, - int delayMs, - Action operation, - bool allowDerivedExceptions = false, - [CanBeNull] ILogger logger = null, - params Type[] exceptionType) + public static void WhileException(int retries, + int delayMs, + Action operation, + bool allowDerivedExceptions = false, + ILogger? logger = null, + params Type[] exceptionType) { // Do all but one retries in the loop for (var retry = 1; retry < retries; retry++) @@ -178,12 +176,12 @@ public static T WhileException(int retries, /// /// Logger to log retried exception /// When one of the retries succeeds, return the value the operation returned. If not, an exception is thrown. - public static T WhileException(int retries, - int delayMs, - Func operation, - bool allowDerivedExceptions = false, - [CanBeNull] ILogger logger = null, - params Type[] exceptionType) + public static T WhileException(int retries, + int delayMs, + Func operation, + bool allowDerivedExceptions = false, + ILogger? logger = null, + params Type[] exceptionType) { // Do all but one retries in the loop for (var retry = 1; retry < retries; retry++)