From ad43cb8d6c4ea58244ead04af9778e4112128042 Mon Sep 17 00:00:00 2001 From: ddubuc Date: Tue, 23 May 2023 11:45:21 +0200 Subject: [PATCH 1/2] Add retry and increase time between 2 retries --- .../Common/Submitter/BaseClientSubmitter.cs | 64 +++++++--- .../src/Unified/Services/Submitter/Service.cs | 43 +++++-- Common/src/Common/RetryAction.cs | 115 +++++++++++++++++- .../LargeSubmitAsyncClient.cs | 22 +++- 4 files changed, 212 insertions(+), 32 deletions(-) diff --git a/Client/src/Common/Submitter/BaseClientSubmitter.cs b/Client/src/Common/Submitter/BaseClientSubmitter.cs index ccbe0b81..c4b8daab 100644 --- a/Client/src/Common/Submitter/BaseClientSubmitter.cs +++ b/Client/src/Common/Submitter/BaseClientSubmitter.cs @@ -389,12 +389,15 @@ public void WaitForTasksCompletion(IEnumerable taskIds) var submitterService = new Api.gRPC.V1.Submitter.Submitter.SubmitterClient(channel); Retry.WhileException(5, - 200, + 2000, retry => { - Logger?.LogDebug("Try {try} for {funcName}", - retry, - nameof(submitterService.WaitForCompletion)); + if (retry > 1) + { + Logger?.LogWarning("Try {try} for {funcName}", + retry, + nameof(submitterService.WaitForCompletion)); + } var __ = submitterService.WaitForCompletion(new WaitRequest { @@ -443,7 +446,7 @@ public ResultStatusCollection GetResultStatus(IEnumerable taskIds, var submitterService = new Api.gRPC.V1.Submitter.Submitter.SubmitterClient(channel); var idStatus = Retry.WhileException(5, - 200, + 2000, retry => { Logger?.LogDebug("Try {try} for {funcName}", @@ -508,15 +511,36 @@ public ResultStatusCollection GetResultStatus(IEnumerable taskIds, return resultStatusList; } + /// + /// Gets the result ids for a given list of task ids. + /// + /// The list of task ids. + /// A collection of map task results. public ICollection GetResultIds(IEnumerable taskIds) - => channelPool_.WithChannel(channel => new Tasks.TasksClient(channel).GetResultIds(new GetResultIdsRequest - { - TaskId = - { - taskIds, - }, - }) - .TaskResults); + => Retry.WhileException(5, + 2000, + retry => + { + if (retry > 1) + { + Logger?.LogWarning("Try {try} for {funcName}", + retry, + nameof(GetResultIds)); + } + + return channelPool_.WithChannel(channel => new Tasks.TasksClient(channel).GetResultIds(new GetResultIdsRequest + { + TaskId = + { + taskIds, + }, + }) + .TaskResults); + }, + true, + typeof(IOException), + typeof(RpcException)); + /// /// Try to find the result of One task. If there no result, the function return byte[0] @@ -546,7 +570,7 @@ public byte[] GetResult(string taskId, var submitterService = new Api.gRPC.V1.Submitter.Submitter.SubmitterClient(channel); Retry.WhileException(5, - 200, + 2000, retry => { Logger?.LogDebug("Try {try} for {funcName}", @@ -714,12 +738,16 @@ public byte[] TryGetResult(string taskId, }; var resultReply = Retry.WhileException(5, - 200, + 2000, retry => { - Logger?.LogDebug("Try {try} for {funcName}", - retry, - "SubmitterService.TryGetResultAsync"); + if (retry > 1) + { + Logger?.LogWarning("Try {try} for {funcName}", + retry, + "SubmitterService.TryGetResultAsync"); + } + try { var response = TryGetResultAsync(resultRequest, diff --git a/Client/src/Unified/Services/Submitter/Service.cs b/Client/src/Unified/Services/Submitter/Service.cs index fb766d71..1a646c7a 100644 --- a/Client/src/Unified/Services/Submitter/Service.cs +++ b/Client/src/Unified/Services/Submitter/Service.cs @@ -23,6 +23,7 @@ using System; using System.Collections.Generic; +using System.IO; using System.Linq; using System.Threading; using System.Threading.Tasks; @@ -56,7 +57,7 @@ namespace ArmoniK.DevelopmentKit.Client.Unified.Services.Submitter; [MarkDownDoc] public class Service : AbstractClientService, ISubmitterService { - private const int MaxRetries = 5; + private const int MaxRetries = 10; // *** you need some mechanism to map types to fields private static readonly IDictionary StatusCodesLookUp = new List> @@ -209,7 +210,7 @@ public Service(Properties properties, MaxRetries); //Delay before submission - Task.Delay(TimeSpan.FromMilliseconds(100)); + Task.Delay(TimeSpan.FromMilliseconds(1000)); } } @@ -558,10 +559,17 @@ public ServiceResult Execute(string methodName, }; } + /// + /// Retrieves the results for the given taskIds. + /// + /// The taskIds to retrieve results for. + /// The action to take when a response is received. + /// The action to take when an error occurs. + /// The size of the chunk to retrieve results in. private void ProxyTryGetResults(IEnumerable taskIds, Action responseHandler, Action errorHandler, - int chunkResultSize = 500) + int chunkResultSize = 200) { var missing = taskIds.ToHashSet(); var holdPrev = missing.Count; @@ -590,13 +598,28 @@ private void ProxyTryGetResults(IEnumerable taskIds, Logger?.LogTrace("Response handler for {taskId}", resultStatusData.TaskId); responseHandler(resultStatusData.TaskId, - SessionService.TryGetResultAsync(new ResultRequest - { - ResultId = resultStatusData.ResultId, - Session = SessionId, - }, - CancellationToken.None) - .Result); + Retry.WhileException(5, + 2000, + retry => + { + if (retry > 1) + { + Logger?.LogWarning("Try {try} for {funcName}", + retry, + nameof(SessionService.TryGetResultAsync)); + } + + return SessionService.TryGetResultAsync(new ResultRequest + { + ResultId = resultStatusData.ResultId, + Session = SessionId, + }, + CancellationToken.None) + .Result; + }, + true, + typeof(IOException), + typeof(RpcException))); } catch (Exception e) { diff --git a/Common/src/Common/RetryAction.cs b/Common/src/Common/RetryAction.cs index 36b236cd..6de248ae 100644 --- a/Common/src/Common/RetryAction.cs +++ b/Common/src/Common/RetryAction.cs @@ -1,6 +1,7 @@ -using System; +using System; using System.Linq; using System.Threading; +using System.Threading.Tasks; namespace ArmoniK.DevelopmentKit.Common; @@ -142,4 +143,116 @@ public static T WhileException(int retries, // we're out of retries. If it's unexpected, throwing is the right thing to do anyway return operation(retries); } + + /// + /// Retry async the specified operation the specified number of times, until there are no more retries or it succeeded + /// without an exception. + /// + /// The number of times to retry the operation + /// The number of milliseconds to sleep after a failed invocation of the operation + /// the operation to perform + /// if not null, ignore any exceptions of this type and subtypes + /// + /// If true, exceptions deriving from the specified exception type are ignored as + /// well. Defaults to False + /// + /// When one of the retries succeeds, return the value the operation returned. If not, an exception is thrown. + public static async Task WhileExceptionAsync(int retries, + int delayMs, + Action operation, + bool allowDerivedExceptions = false, + params Type[] exceptionType) + { + // Do all but one retries in the loop + for (var retry = 1; retry < retries; retry++) + { + try + { + // Try the operation. If it succeeds, return its result + operation(retry); + return; + } + catch (Exception ex) + { + // Oops - it did NOT succeed! + if (exceptionType != null && allowDerivedExceptions && ex is AggregateException && + exceptionType.Any(e => ex.InnerException != null && ex.InnerException.GetType() == e)) + { + Thread.Sleep(delayMs); + } + else if (exceptionType == null || exceptionType.Any(e => e == ex.GetType()) || (allowDerivedExceptions && exceptionType.Any(e => ex.GetType() + .IsSubclassOf(e)))) + { + // Ignore exceptions when exceptionType is not specified OR + // the exception thrown was of the specified exception type OR + // the exception thrown is derived from the specified exception type and we allow that + Thread.Sleep(delayMs); + } + else + { + // We have an unexpected exception! Re-throw it: + throw; + } + } + } + } + + /// + /// Retry async the specified operation the specified number of times, until there are no more retries or it succeeded + /// without an exception. + /// + /// The return type of the exception + /// The number of times to retry the operation + /// The number of milliseconds to sleep after a failed invocation of the operation + /// the operation to perform + /// if not null, ignore any exceptions of this type and subtypes + /// + /// If true, exceptions deriving from the specified exception type are ignored as + /// well. Defaults to False + /// + /// When one of the retries succeeds, return the value the operation returned. If not, an exception is thrown. + public static async Task WhileExceptionAsync(int retries, + int delayMs, + Func> operation, + bool allowDerivedExceptions = false, + params Type[] exceptionType) + { + // Do all but one retries in the loop + for (var retry = 1; retry < retries; retry++) + { + try + { + // Try the operation. If it succeeds, return its result + return await operation(retry) + .ConfigureAwait(false); + } + catch (Exception ex) + { + if (exceptionType != null && allowDerivedExceptions && ex is AggregateException && + exceptionType.Any(e => ex.InnerException != null && ex.InnerException.GetType() == e)) + { + Thread.Sleep(delayMs); + } + else if (exceptionType == null || exceptionType.Any(e => e == ex.GetType()) || (allowDerivedExceptions && exceptionType.Any(e => ex.GetType() + .IsSubclassOf(e)))) + { + // Ignore exceptions when exceptionType is not specified OR + // the exception thrown was of the specified exception type OR + // the exception thrown is derived from the specified exception type and we allow that + Thread.Sleep(delayMs); + } + else + { + // We have an unexpected exception! Re-throw it: + throw; + } + } + } + + // Try the operation one last time. This may or may not succeed. + // Exceptions pass unchanged. If this is an expected exception we need to know about it because + // we're out of retries. If it's unexpected, throwing is the right thing to do anyway + return await operation(retries) + .ConfigureAwait(false); + } } diff --git a/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Client/Tests/LargeSubmitAsync/LargeSubmitAsyncClient.cs b/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Client/Tests/LargeSubmitAsync/LargeSubmitAsyncClient.cs index e061a492..6232cc74 100644 --- a/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Client/Tests/LargeSubmitAsync/LargeSubmitAsyncClient.cs +++ b/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Client/Tests/LargeSubmitAsync/LargeSubmitAsyncClient.cs @@ -214,7 +214,8 @@ private IEnumerable ExecuteSubmitAsync(int nbTasks, .Batch(nbTasks / Props.MaxParallelChannels) .AsParallel(); - var resultTask = new ConcurrentBag(); + var resultTask = new ConcurrentBag>(); + var results = new ConcurrentBag(); resultQueries.ForAll(bucket => { @@ -231,11 +232,26 @@ private IEnumerable ExecuteSubmitAsync(int nbTasks, foreach (var task in tasksInBucket) { - resultTask.Add(task.Result); + resultTask.Add(task); } }); - return resultTask.ToList(); + //Need to fix aync issue for a performance submission and check all exception one by one + resultTask.AsParallel() + .ForAll(task => + { + if (task.IsFaulted) + { + if (task.Exception != null) + { + throw task.Exception; + } + } + + results.Add(task.Result); + }); + + return results.ToList(); } private static void OverrideTaskOptions(TaskOptions taskOptions) From a99328ed4a52c7156054496e122dbabe489ac18a Mon Sep 17 00:00:00 2001 From: ddubuc Date: Tue, 23 May 2023 12:35:35 +0200 Subject: [PATCH 2/2] Add general variable in properties for max number of retries and time between retries --- Client/src/Common/Properties.cs | 10 ++++++++++ .../Common/Submitter/BaseClientSubmitter.cs | 20 +++++++++---------- .../Services/Common/AbstractClientService.cs | 13 ++++++++++-- .../src/Unified/Services/Submitter/Service.cs | 6 +++--- 4 files changed, 34 insertions(+), 15 deletions(-) diff --git a/Client/src/Common/Properties.cs b/Client/src/Common/Properties.cs index eea831da..c3679c16 100644 --- a/Client/src/Common/Properties.cs +++ b/Client/src/Common/Properties.cs @@ -319,4 +319,14 @@ public string ConnectionString /// The TaskOptions to pass to the session or the submission session /// public TaskOptions TaskOptions { get; set; } + + /// + /// Gets or sets the maximum number of retries. Default 5 retries + /// + public static int MaxRetries { get; set; } = 5; + + /// + /// Gets or sets the time interval between retries. Default 2000 ms + /// + public static int TimeIntervalRetriesInMs { get; set; } = 2000; } diff --git a/Client/src/Common/Submitter/BaseClientSubmitter.cs b/Client/src/Common/Submitter/BaseClientSubmitter.cs index c4b8daab..fdf7d506 100644 --- a/Client/src/Common/Submitter/BaseClientSubmitter.cs +++ b/Client/src/Common/Submitter/BaseClientSubmitter.cs @@ -388,8 +388,8 @@ public void WaitForTasksCompletion(IEnumerable taskIds) using var channel = channelPool_.GetChannel(); var submitterService = new Api.gRPC.V1.Submitter.Submitter.SubmitterClient(channel); - Retry.WhileException(5, - 2000, + Retry.WhileException(Properties.MaxRetries, + Properties.TimeIntervalRetriesInMs, retry => { if (retry > 1) @@ -445,8 +445,8 @@ public ResultStatusCollection GetResultStatus(IEnumerable taskIds, using var channel = channelPool_.GetChannel(); var submitterService = new Api.gRPC.V1.Submitter.Submitter.SubmitterClient(channel); - var idStatus = Retry.WhileException(5, - 2000, + var idStatus = Retry.WhileException(Properties.MaxRetries, + Properties.TimeIntervalRetriesInMs, retry => { Logger?.LogDebug("Try {try} for {funcName}", @@ -517,8 +517,8 @@ public ResultStatusCollection GetResultStatus(IEnumerable taskIds, /// The list of task ids. /// A collection of map task results. public ICollection GetResultIds(IEnumerable taskIds) - => Retry.WhileException(5, - 2000, + => Retry.WhileException(Properties.MaxRetries, + Properties.TimeIntervalRetriesInMs, retry => { if (retry > 1) @@ -569,8 +569,8 @@ public byte[] GetResult(string taskId, using var channel = channelPool_.GetChannel(); var submitterService = new Api.gRPC.V1.Submitter.Submitter.SubmitterClient(channel); - Retry.WhileException(5, - 2000, + Retry.WhileException(Properties.MaxRetries, + Properties.TimeIntervalRetriesInMs, retry => { Logger?.LogDebug("Try {try} for {funcName}", @@ -737,8 +737,8 @@ public byte[] TryGetResult(string taskId, Session = SessionId.Id, }; - var resultReply = Retry.WhileException(5, - 2000, + var resultReply = Retry.WhileException(Properties.MaxRetries, + Properties.TimeIntervalRetriesInMs, retry => { if (retry > 1) diff --git a/Client/src/Unified/Services/Common/AbstractClientService.cs b/Client/src/Unified/Services/Common/AbstractClientService.cs index 54e25052..fff7f9ef 100644 --- a/Client/src/Unified/Services/Common/AbstractClientService.cs +++ b/Client/src/Unified/Services/Common/AbstractClientService.cs @@ -23,11 +23,12 @@ public abstract class AbstractClientService : IDisposable public AbstractClientService(Properties properties, [CanBeNull] ILoggerFactory loggerFactory = null) { - LoggerFactory = loggerFactory; - + LoggerFactory = loggerFactory; + Properties = properties; ResultHandlerDictionary = new ConcurrentDictionary(); } + /// /// Instant view of currently handled task ids. /// The list is only valid at the time of access. @@ -36,6 +37,14 @@ public AbstractClientService(Properties properties, public IReadOnlyCollection CurrentlyHandledTaskIds => (IReadOnlyCollection)ResultHandlerDictionary.Keys; + /// + /// Gets or sets the Properties object. + /// + /// + /// The Properties object. + /// + protected Properties Properties { get; set; } + /// /// The result dictionary to return result /// diff --git a/Client/src/Unified/Services/Submitter/Service.cs b/Client/src/Unified/Services/Submitter/Service.cs index 1a646c7a..1b61ccee 100644 --- a/Client/src/Unified/Services/Submitter/Service.cs +++ b/Client/src/Unified/Services/Submitter/Service.cs @@ -210,7 +210,7 @@ public Service(Properties properties, MaxRetries); //Delay before submission - Task.Delay(TimeSpan.FromMilliseconds(1000)); + Task.Delay(TimeSpan.FromMilliseconds(Properties.TimeIntervalRetriesInMs)); } } @@ -598,8 +598,8 @@ private void ProxyTryGetResults(IEnumerable taskIds, Logger?.LogTrace("Response handler for {taskId}", resultStatusData.TaskId); responseHandler(resultStatusData.TaskId, - Retry.WhileException(5, - 2000, + Retry.WhileException(Properties.MaxRetries, + Properties.TimeIntervalRetriesInMs, retry => { if (retry > 1)