From b2d2cd92d922765cfd4b89a5f5b2587d51a4771a Mon Sep 17 00:00:00 2001 From: melflitty Date: Fri, 1 Dec 2023 18:00:25 +0100 Subject: [PATCH 01/12] refactor: remove submitter from chunk submit with dependencies method --- .../Common/Submitter/BaseClientSubmitter.cs | 149 ++++++++++-------- 1 file changed, 85 insertions(+), 64 deletions(-) diff --git a/Client/src/Common/Submitter/BaseClientSubmitter.cs b/Client/src/Common/Submitter/BaseClientSubmitter.cs index 3ba8c20d..741d5312 100644 --- a/Client/src/Common/Submitter/BaseClientSubmitter.cs +++ b/Client/src/Common/Submitter/BaseClientSubmitter.cs @@ -285,79 +285,100 @@ private IEnumerable ChunkSubmitTasksWithDependencies(IEnumerable - { - var taskRequest = new TaskRequest - { - Payload = UnsafeByteOperations.UnsafeWrap(pwd.Item2), - }; - taskRequest.DataDependencies.AddRange(pwd.Item3); - taskRequest.ExpectedOutputKeys.Add(pwd.Item1); - return taskRequest; - }); - for (var nbRetry = 0; nbRetry < maxRetries; nbRetry++) { - try + var channel = ChannelPool.GetChannel(); + var tasksClient = new Tasks.TasksClient(channel); + var resultsClient = new Results.ResultsClient(channel); + var resultsCreated = new List(); + foreach (var (resultId, payload, dependencies) in payloadsWithDependencies) { - using var channel = ChannelPool.GetChannel(); - var submitterService = new Api.gRPC.V1.Submitter.Submitter.SubmitterClient(channel); - - var response = submitterService.CreateTasksAsync(SessionId.Id, - taskOptions ?? TaskOptions, - // multiple enumeration only occurs in case of failure - // ReSharper disable once PossibleMultipleEnumeration - taskRequests) - .ConfigureAwait(false) - .GetAwaiter() - .GetResult(); - return response.ResponseCase switch - { - CreateTaskReply.ResponseOneofCase.CreationStatusList => response.CreationStatusList.CreationStatuses.Select(status => status.TaskInfo.TaskId), - CreateTaskReply.ResponseOneofCase.None => throw new Exception("Issue with Server !"), - CreateTaskReply.ResponseOneofCase.Error => throw new Exception("Error while creating tasks !"), - _ => throw new InvalidOperationException(), - }; - } - catch (Exception e) - { - if (nbRetry >= maxRetries - 1) + try { - throw; + var payloads = resultsClient.CreateResults(new CreateResultsRequest + { + SessionId = SessionId.Id, + Results = + { + new CreateResultsRequest.Types.ResultCreate + { + Data = UnsafeByteOperations.UnsafeWrap(payload), + }, + }, + }); + var result = resultsClient.CreateResultsMetaData(new CreateResultsMetaDataRequest + { + SessionId = SessionId.Id, + Results = + { + new CreateResultsMetaDataRequest.Types.ResultCreate(), + }, + }) + .Results.Select(raw => raw.ResultId) + .Single(); + tasksClient.SubmitTasksAsync(new SubmitTasksRequest + { + SessionId = SessionId.Id, + TaskCreations = + { + new SubmitTasksRequest.Types.TaskCreation + { + PayloadId = payloads.Results.Select(raw => raw.ResultId) + .Single(), + DataDependencies = + { + dependencies, + }, + ExpectedOutputKeys = + { + result, + }, + }, + }, + }); + resultsCreated.Add(result); + return resultsCreated; } - - switch (e) + catch (Exception e) { - case AggregateException - { - InnerException: RpcException, - } ex: - Logger.LogWarning(ex.InnerException, - "Failure to submit"); - break; - case AggregateException - { - InnerException: IOException, - } ex: - Logger.LogWarning(ex.InnerException, - "IOException : Failure to submit, Retrying"); - break; - case IOException ex: - Logger.LogWarning(ex, - "IOException Failure to submit"); - break; - default: - Logger.LogError(e, - "Unknown failure :"); + if (nbRetry >= maxRetries - 1) + { throw; + } + + switch (e) + { + case AggregateException + { + InnerException: RpcException, + } ex: + Logger.LogWarning(ex.InnerException, + "Failure to submit"); + break; + case AggregateException + { + InnerException: IOException, + } ex: + Logger.LogWarning(ex.InnerException, + "IOException : Failure to submit, Retrying"); + break; + case IOException ex: + Logger.LogWarning(ex, + "IOException Failure to submit"); + break; + default: + Logger.LogError(e, + "Unknown failure :"); + throw; + } } - } - if (nbRetry > 0) - { - Logger.LogWarning("{retry}/{maxRetries} nbRetry to submit batch of task", - nbRetry, - maxRetries); + if (nbRetry > 0) + { + Logger.LogWarning("{retry}/{maxRetries} nbRetry to submit batch of task", + nbRetry, + maxRetries); + } } } From 7fe8b4d999a734a09fc5be149d8446e39e6229ec Mon Sep 17 00:00:00 2001 From: melflitty Date: Fri, 1 Dec 2023 18:29:47 +0100 Subject: [PATCH 02/12] fix: submit function should return task ids not result ids --- Client/src/Common/Submitter/BaseClientSubmitter.cs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/Client/src/Common/Submitter/BaseClientSubmitter.cs b/Client/src/Common/Submitter/BaseClientSubmitter.cs index 741d5312..ba63ca5f 100644 --- a/Client/src/Common/Submitter/BaseClientSubmitter.cs +++ b/Client/src/Common/Submitter/BaseClientSubmitter.cs @@ -290,7 +290,7 @@ private IEnumerable ChunkSubmitTasksWithDependencies(IEnumerable(); + var tasksSubmitted = new List(); foreach (var (resultId, payload, dependencies) in payloadsWithDependencies) { try @@ -316,7 +316,7 @@ private IEnumerable ChunkSubmitTasksWithDependencies(IEnumerable raw.ResultId) .Single(); - tasksClient.SubmitTasksAsync(new SubmitTasksRequest + var submitResponse = tasksClient.SubmitTasks(new SubmitTasksRequest { SessionId = SessionId.Id, TaskCreations = @@ -336,8 +336,8 @@ private IEnumerable ChunkSubmitTasksWithDependencies(IEnumerable taskInfo.TaskId)); + return tasksSubmitted; } catch (Exception e) { From 92aa8213115334d2b604075130094132e463e923 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A9r=C3=B4me=20Gurhem?= Date: Thu, 7 Dec 2023 11:16:16 +0100 Subject: [PATCH 03/12] fix: result is created beforehand --- .../Common/Submitter/BaseClientSubmitter.cs | 47 ++++++++----------- 1 file changed, 19 insertions(+), 28 deletions(-) diff --git a/Client/src/Common/Submitter/BaseClientSubmitter.cs b/Client/src/Common/Submitter/BaseClientSubmitter.cs index ba63ca5f..2cc2bf66 100644 --- a/Client/src/Common/Submitter/BaseClientSubmitter.cs +++ b/Client/src/Common/Submitter/BaseClientSubmitter.cs @@ -306,37 +306,28 @@ private IEnumerable ChunkSubmitTasksWithDependencies(IEnumerable raw.ResultId) + .Single(), + DataDependencies = { - new CreateResultsMetaDataRequest.Types.ResultCreate(), + dependencies, }, - }) - .Results.Select(raw => raw.ResultId) - .Single(); - var submitResponse = tasksClient.SubmitTasks(new SubmitTasksRequest - { - SessionId = SessionId.Id, - TaskCreations = - { - new SubmitTasksRequest.Types.TaskCreation - { - PayloadId = payloads.Results.Select(raw => raw.ResultId) - .Single(), - DataDependencies = - { - dependencies, - }, - ExpectedOutputKeys = - { - result, - }, - }, - }, - }); - tasksSubmitted.AddRange(submitResponse.TaskInfos.Select(taskInfo=> taskInfo.TaskId)); + ExpectedOutputKeys = + { + resultId, + }, + }, + }, + }); + tasksSubmitted.AddRange(submitResponse.TaskInfos.Select(taskInfo => taskInfo.TaskId)); return tasksSubmitted; } catch (Exception e) From 9996947a93e4477b6826ff3db7eb1cfe402820d0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A9r=C3=B4me=20Gurhem?= Date: Thu, 7 Dec 2023 11:46:58 +0100 Subject: [PATCH 04/12] fix: return created tasks after creating ALL tasks --- .../Common/Submitter/BaseClientSubmitter.cs | 31 ++++++++++--------- 1 file changed, 17 insertions(+), 14 deletions(-) diff --git a/Client/src/Common/Submitter/BaseClientSubmitter.cs b/Client/src/Common/Submitter/BaseClientSubmitter.cs index 2cc2bf66..46401dbf 100644 --- a/Client/src/Common/Submitter/BaseClientSubmitter.cs +++ b/Client/src/Common/Submitter/BaseClientSubmitter.cs @@ -285,14 +285,16 @@ private IEnumerable ChunkSubmitTasksWithDependencies(IEnumerable(); + + foreach (var (resultId, payload, dependencies) in payloadsWithDependencies) { - var channel = ChannelPool.GetChannel(); - var tasksClient = new Tasks.TasksClient(channel); - var resultsClient = new Results.ResultsClient(channel); - var tasksSubmitted = new List(); - foreach (var (resultId, payload, dependencies) in payloadsWithDependencies) + for (var nbRetry = 0; nbRetry < maxRetries; nbRetry++) { + using var channel = ChannelPool.GetChannel(); + var tasksClient = new Tasks.TasksClient(channel); + var resultsClient = new Results.ResultsClient(channel); + try { var payloads = resultsClient.CreateResults(new CreateResultsRequest @@ -326,9 +328,9 @@ private IEnumerable ChunkSubmitTasksWithDependencies(IEnumerable taskInfo.TaskId)); - return tasksSubmitted; } catch (Exception e) { @@ -362,18 +364,19 @@ private IEnumerable ChunkSubmitTasksWithDependencies(IEnumerable 0) - { - Logger.LogWarning("{retry}/{maxRetries} nbRetry to submit batch of task", - nbRetry, - maxRetries); + if (nbRetry > 0) + { + Logger.LogWarning("{retry}/{maxRetries} nbRetry to submit task associated to {resultId}", + nbRetry, + maxRetries, + resultId); + } } } } - throw new Exception("Max retry to send has been reached"); + return tasksSubmitted; } /// From 57050e68acb6921717a1e0f488b261730e48417c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A9r=C3=B4me=20Gurhem?= Date: Thu, 7 Dec 2023 12:00:41 +0100 Subject: [PATCH 05/12] fix: add missing break --- Client/src/Common/Submitter/BaseClientSubmitter.cs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/Client/src/Common/Submitter/BaseClientSubmitter.cs b/Client/src/Common/Submitter/BaseClientSubmitter.cs index 46401dbf..a5178fba 100644 --- a/Client/src/Common/Submitter/BaseClientSubmitter.cs +++ b/Client/src/Common/Submitter/BaseClientSubmitter.cs @@ -331,6 +331,8 @@ private IEnumerable ChunkSubmitTasksWithDependencies(IEnumerable taskInfo.TaskId)); + // break retry loop because submission is successful + break; } catch (Exception e) { From 371757bb03f95848b7bf3195c2beafce47126dca Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A9r=C3=B4me=20Gurhem?= Date: Thu, 7 Dec 2023 13:38:28 +0100 Subject: [PATCH 06/12] refactor: batch task submission --- .../Common/Submitter/BaseClientSubmitter.cs | 103 ++++++++++++++---- 1 file changed, 80 insertions(+), 23 deletions(-) diff --git a/Client/src/Common/Submitter/BaseClientSubmitter.cs b/Client/src/Common/Submitter/BaseClientSubmitter.cs index a5178fba..402cedb3 100644 --- a/Client/src/Common/Submitter/BaseClientSubmitter.cs +++ b/Client/src/Common/Submitter/BaseClientSubmitter.cs @@ -285,6 +285,7 @@ private IEnumerable ChunkSubmitTasksWithDependencies(IEnumerable(); var tasksSubmitted = new List(); foreach (var (resultId, payload, dependencies) in payloadsWithDependencies) @@ -292,7 +293,6 @@ private IEnumerable ChunkSubmitTasksWithDependencies(IEnumerable ChunkSubmitTasksWithDependencies(IEnumerable raw.ResultId) - .Single(), - DataDependencies = - { - dependencies, - }, - ExpectedOutputKeys = - { - resultId, - }, - }, - }, - TaskOptions = taskOptions, - }); - tasksSubmitted.AddRange(submitResponse.TaskInfos.Select(taskInfo => taskInfo.TaskId)); + tasks.Add(new SubmitTasksRequest.Types.TaskCreation + { + PayloadId = payloads.Results.Select(raw => raw.ResultId) + .Single(), + DataDependencies = + { + dependencies, + }, + ExpectedOutputKeys = + { + resultId, + }, + }); // break retry loop because submission is successful break; } @@ -378,6 +369,72 @@ private IEnumerable ChunkSubmitTasksWithDependencies(IEnumerable info.TaskId)); + // break retry loop because submission is successful + break; + } + catch (Exception e) + { + if (nbRetry >= maxRetries - 1) + { + throw; + } + + switch (e) + { + case AggregateException + { + InnerException: RpcException, + } ex: + Logger.LogWarning(ex.InnerException, + "Failure to submit"); + break; + case AggregateException + { + InnerException: IOException, + } ex: + Logger.LogWarning(ex.InnerException, + "IOException : Failure to submit, Retrying"); + break; + case IOException ex: + Logger.LogWarning(ex, + "IOException Failure to submit"); + break; + default: + Logger.LogError(e, + "Unknown failure :"); + throw; + } + + if (nbRetry > 0) + { + Logger.LogWarning("{retry}/{maxRetries} nbRetry to submit tasks", + nbRetry, + maxRetries); + } + } + } + } + return tasksSubmitted; } From 09a2ab157a5105d890d2754836cb68e0adc5553d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A9r=C3=B4me=20Gurhem?= Date: Thu, 7 Dec 2023 13:57:38 +0100 Subject: [PATCH 07/12] refactor: use stream when payload is large --- .../Common/Submitter/BaseClientSubmitter.cs | 53 ++++++++++++++----- 1 file changed, 40 insertions(+), 13 deletions(-) diff --git a/Client/src/Common/Submitter/BaseClientSubmitter.cs b/Client/src/Common/Submitter/BaseClientSubmitter.cs index 402cedb3..e3b7521f 100644 --- a/Client/src/Common/Submitter/BaseClientSubmitter.cs +++ b/Client/src/Common/Submitter/BaseClientSubmitter.cs @@ -288,6 +288,9 @@ private IEnumerable ChunkSubmitTasksWithDependencies(IEnumerable(); var tasksSubmitted = new List(); + var configuration = ChannelPool.WithChannel(channel => new Results.ResultsClient(channel).GetServiceConfiguration(new Empty()) + .DataChunkMaxSize); + foreach (var (resultId, payload, dependencies) in payloadsWithDependencies) { for (var nbRetry = 0; nbRetry < maxRetries; nbRetry++) @@ -297,22 +300,46 @@ private IEnumerable ChunkSubmitTasksWithDependencies(IEnumerable configuration) + { + payloadId = resultsClient.CreateResultsMetaData(new CreateResultsMetaDataRequest + { + SessionId = SessionId.Id, + Results = + { + new CreateResultsMetaDataRequest.Types.ResultCreate(), + }, + }) + .Results.Select(raw => raw.ResultId) + .Single(); + + resultsClient.UploadResultData(SessionId.Id, + payloadId, + payload); + } + else + { + payloadId = resultsClient.CreateResults(new CreateResultsRequest + { + SessionId = SessionId.Id, + Results = + { + new CreateResultsRequest.Types.ResultCreate + { + Data = UnsafeByteOperations.UnsafeWrap(payload), + }, + }, + }) + .Results.Select(raw => raw.ResultId) + .Single(); + } + tasks.Add(new SubmitTasksRequest.Types.TaskCreation { - PayloadId = payloads.Results.Select(raw => raw.ResultId) - .Single(), + PayloadId = payloadId, DataDependencies = { dependencies, From 349626d6e0ec90d5729a992982c0f4b2584f0291 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A9r=C3=B4me=20Gurhem?= Date: Thu, 7 Dec 2023 19:55:08 +0100 Subject: [PATCH 08/12] fix: does not submit when input is empty --- Client/src/Common/Submitter/BaseClientSubmitter.cs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/Client/src/Common/Submitter/BaseClientSubmitter.cs b/Client/src/Common/Submitter/BaseClientSubmitter.cs index e3b7521f..f19d43f8 100644 --- a/Client/src/Common/Submitter/BaseClientSubmitter.cs +++ b/Client/src/Common/Submitter/BaseClientSubmitter.cs @@ -398,6 +398,11 @@ private IEnumerable ChunkSubmitTasksWithDependencies(IEnumerable Date: Thu, 14 Dec 2023 14:28:47 +0100 Subject: [PATCH 09/12] chore: Update Api to get new gprc version --- Client/src/Common/ArmoniK.DevelopmentKit.Client.Common.csproj | 2 +- .../ArmoniK.EndToEndTests.Common.csproj | 2 +- Worker/src/Common/ArmoniK.DevelopmentKit.Worker.Common.csproj | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/Client/src/Common/ArmoniK.DevelopmentKit.Client.Common.csproj b/Client/src/Common/ArmoniK.DevelopmentKit.Client.Common.csproj index a9ad52ec..8f45a01e 100644 --- a/Client/src/Common/ArmoniK.DevelopmentKit.Client.Common.csproj +++ b/Client/src/Common/ArmoniK.DevelopmentKit.Client.Common.csproj @@ -8,7 +8,7 @@ - + diff --git a/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Common/ArmoniK.EndToEndTests.Common.csproj b/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Common/ArmoniK.EndToEndTests.Common.csproj index 584e1c7f..155a355d 100644 --- a/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Common/ArmoniK.EndToEndTests.Common.csproj +++ b/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Common/ArmoniK.EndToEndTests.Common.csproj @@ -12,7 +12,7 @@ - + diff --git a/Worker/src/Common/ArmoniK.DevelopmentKit.Worker.Common.csproj b/Worker/src/Common/ArmoniK.DevelopmentKit.Worker.Common.csproj index f33cb8fe..c6ee70f9 100644 --- a/Worker/src/Common/ArmoniK.DevelopmentKit.Worker.Common.csproj +++ b/Worker/src/Common/ArmoniK.DevelopmentKit.Worker.Common.csproj @@ -8,7 +8,7 @@ - + From caed497cb8032303482138c09c7d7368d0e899ff Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A9r=C3=B4me=20Gurhem?= Date: Fri, 15 Dec 2023 10:59:22 +0100 Subject: [PATCH 10/12] perf: retrieve chunk size only once --- Client/src/Common/Submitter/BaseClientSubmitter.cs | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/Client/src/Common/Submitter/BaseClientSubmitter.cs b/Client/src/Common/Submitter/BaseClientSubmitter.cs index f19d43f8..b799d424 100644 --- a/Client/src/Common/Submitter/BaseClientSubmitter.cs +++ b/Client/src/Common/Submitter/BaseClientSubmitter.cs @@ -23,7 +23,6 @@ using System.Threading.Tasks; using ArmoniK.Api.Client; -using ArmoniK.Api.Client.Submitter; using ArmoniK.Api.Common.Utils; using ArmoniK.Api.gRPC.V1; using ArmoniK.Api.gRPC.V1.Results; @@ -63,6 +62,8 @@ public abstract class BaseClientSubmitter /// private readonly int chunkSubmitSize_; + private readonly int configuration_; + private readonly Properties properties_; /// @@ -93,6 +94,9 @@ protected BaseClientSubmitter(Properties properties, { TaskOptions.PartitionId, }); + + configuration_ = ChannelPool.WithChannel(channel => new Results.ResultsClient(channel).GetServiceConfiguration(new Empty()) + .DataChunkMaxSize); } private ILoggerFactory LoggerFactory { get; } @@ -288,9 +292,6 @@ private IEnumerable ChunkSubmitTasksWithDependencies(IEnumerable(); var tasksSubmitted = new List(); - var configuration = ChannelPool.WithChannel(channel => new Results.ResultsClient(channel).GetServiceConfiguration(new Empty()) - .DataChunkMaxSize); - foreach (var (resultId, payload, dependencies) in payloadsWithDependencies) { for (var nbRetry = 0; nbRetry < maxRetries; nbRetry++) @@ -302,7 +303,7 @@ private IEnumerable ChunkSubmitTasksWithDependencies(IEnumerable configuration) + if (payload.Length > configuration_) { payloadId = resultsClient.CreateResultsMetaData(new CreateResultsMetaDataRequest { From 6372579cf6f11457d6489fd2fe66313adf2b349c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A9r=C3=B4me=20Gurhem?= Date: Fri, 15 Dec 2023 12:00:17 +0100 Subject: [PATCH 11/12] fix: better retry management when submission fails --- .../Common/Submitter/BaseClientSubmitter.cs | 68 ++++++++----------- 1 file changed, 28 insertions(+), 40 deletions(-) diff --git a/Client/src/Common/Submitter/BaseClientSubmitter.cs b/Client/src/Common/Submitter/BaseClientSubmitter.cs index b799d424..8be061aa 100644 --- a/Client/src/Common/Submitter/BaseClientSubmitter.cs +++ b/Client/src/Common/Submitter/BaseClientSubmitter.cs @@ -360,29 +360,23 @@ private IEnumerable ChunkSubmitTasksWithDependencies(IEnumerable ChunkSubmitTasksWithDependencies(IEnumerable Date: Fri, 15 Dec 2023 19:59:54 +0100 Subject: [PATCH 12/12] style: apply formatting patch --- Client/src/Common/Submitter/BaseClientSubmitter.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Client/src/Common/Submitter/BaseClientSubmitter.cs b/Client/src/Common/Submitter/BaseClientSubmitter.cs index 8be061aa..56d10215 100644 --- a/Client/src/Common/Submitter/BaseClientSubmitter.cs +++ b/Client/src/Common/Submitter/BaseClientSubmitter.cs @@ -376,7 +376,7 @@ private IEnumerable ChunkSubmitTasksWithDependencies(IEnumerable