diff --git a/Client/src/Common/ArmoniK.DevelopmentKit.Client.Common.csproj b/Client/src/Common/ArmoniK.DevelopmentKit.Client.Common.csproj index a9ad52ec..8f45a01e 100644 --- a/Client/src/Common/ArmoniK.DevelopmentKit.Client.Common.csproj +++ b/Client/src/Common/ArmoniK.DevelopmentKit.Client.Common.csproj @@ -8,7 +8,7 @@ - + diff --git a/Client/src/Common/Submitter/BaseClientSubmitter.cs b/Client/src/Common/Submitter/BaseClientSubmitter.cs index 3ba8c20d..56d10215 100644 --- a/Client/src/Common/Submitter/BaseClientSubmitter.cs +++ b/Client/src/Common/Submitter/BaseClientSubmitter.cs @@ -23,7 +23,6 @@ using System.Threading.Tasks; using ArmoniK.Api.Client; -using ArmoniK.Api.Client.Submitter; using ArmoniK.Api.Common.Utils; using ArmoniK.Api.gRPC.V1; using ArmoniK.Api.gRPC.V1.Results; @@ -63,6 +62,8 @@ public abstract class BaseClientSubmitter /// private readonly int chunkSubmitSize_; + private readonly int configuration_; + private readonly Properties properties_; /// @@ -93,6 +94,9 @@ protected BaseClientSubmitter(Properties properties, { TaskOptions.PartitionId, }); + + configuration_ = ChannelPool.WithChannel(channel => new Results.ResultsClient(channel).GetServiceConfiguration(new Empty()) + .DataChunkMaxSize); } private ILoggerFactory LoggerFactory { get; } @@ -285,83 +289,174 @@ private IEnumerable ChunkSubmitTasksWithDependencies(IEnumerable - { - var taskRequest = new TaskRequest - { - Payload = UnsafeByteOperations.UnsafeWrap(pwd.Item2), - }; - taskRequest.DataDependencies.AddRange(pwd.Item3); - taskRequest.ExpectedOutputKeys.Add(pwd.Item1); - return taskRequest; - }); - - for (var nbRetry = 0; nbRetry < maxRetries; nbRetry++) + var tasks = new List(); + var tasksSubmitted = new List(); + + foreach (var (resultId, payload, dependencies) in payloadsWithDependencies) { - try + for (var nbRetry = 0; nbRetry < maxRetries; nbRetry++) { - using var channel = ChannelPool.GetChannel(); - var submitterService = new Api.gRPC.V1.Submitter.Submitter.SubmitterClient(channel); - - var response = submitterService.CreateTasksAsync(SessionId.Id, - taskOptions ?? TaskOptions, - // multiple enumeration only occurs in case of failure - // ReSharper disable once PossibleMultipleEnumeration - taskRequests) - .ConfigureAwait(false) - .GetAwaiter() - .GetResult(); - return response.ResponseCase switch - { - CreateTaskReply.ResponseOneofCase.CreationStatusList => response.CreationStatusList.CreationStatuses.Select(status => status.TaskInfo.TaskId), - CreateTaskReply.ResponseOneofCase.None => throw new Exception("Issue with Server !"), - CreateTaskReply.ResponseOneofCase.Error => throw new Exception("Error while creating tasks !"), - _ => throw new InvalidOperationException(), - }; - } - catch (Exception e) - { - if (nbRetry >= maxRetries - 1) + using var channel = ChannelPool.GetChannel(); + var resultsClient = new Results.ResultsClient(channel); + + try { - throw; - } + // todo: migrate to ArmoniK.Api + string payloadId; + if (payload.Length > configuration_) + { + payloadId = resultsClient.CreateResultsMetaData(new CreateResultsMetaDataRequest + { + SessionId = SessionId.Id, + Results = + { + new CreateResultsMetaDataRequest.Types.ResultCreate(), + }, + }) + .Results.Select(raw => raw.ResultId) + .Single(); + + resultsClient.UploadResultData(SessionId.Id, + payloadId, + payload); + } + else + { + payloadId = resultsClient.CreateResults(new CreateResultsRequest + { + SessionId = SessionId.Id, + Results = + { + new CreateResultsRequest.Types.ResultCreate + { + Data = UnsafeByteOperations.UnsafeWrap(payload), + }, + }, + }) + .Results.Select(raw => raw.ResultId) + .Single(); + } - switch (e) + + tasks.Add(new SubmitTasksRequest.Types.TaskCreation + { + PayloadId = payloadId, + DataDependencies = + { + dependencies, + }, + ExpectedOutputKeys = + { + resultId, + }, + }); + // break retry loop because submission is successful + break; + } + catch (Exception e) { - case AggregateException - { - InnerException: RpcException, - } ex: - Logger.LogWarning(ex.InnerException, - "Failure to submit"); - break; - case AggregateException - { - InnerException: IOException, - } ex: - Logger.LogWarning(ex.InnerException, - "IOException : Failure to submit, Retrying"); - break; - case IOException ex: - Logger.LogWarning(ex, - "IOException Failure to submit"); - break; - default: - Logger.LogError(e, - "Unknown failure :"); + if (nbRetry >= maxRetries - 1) + { throw; + } + + var innerException = e is AggregateException + { + InnerExceptions.Count: 1, + } agg + ? agg.InnerException + : e; + + switch (innerException) + { + case RpcException: + case IOException: + Logger.LogWarning(innerException, + "Failure to submit : Retrying"); + break; + default: + Logger.LogError(innerException, + "Unknown failure"); + throw; + } + + if (nbRetry > 0) + { + Logger.LogWarning("{retry}/{maxRetries} nbRetry to submit task associated to {resultId}", + nbRetry, + maxRetries, + resultId); + } } } + } + + foreach (var taskChunk in tasks.ToChunks(100)) + { + if (taskChunk.Length == 0) + { + continue; + } - if (nbRetry > 0) + for (var nbRetry = 0; nbRetry < maxRetries; nbRetry++) { - Logger.LogWarning("{retry}/{maxRetries} nbRetry to submit batch of task", - nbRetry, - maxRetries); + using var channel = ChannelPool.GetChannel(); + var tasksClient = new Tasks.TasksClient(channel); + + try + { + var submitTasksResponse = tasksClient.SubmitTasks(new SubmitTasksRequest + { + TaskOptions = taskOptions, + SessionId = SessionId.Id, + TaskCreations = + { + taskChunk, + }, + }); + + tasksSubmitted.AddRange(submitTasksResponse.TaskInfos.Select(info => info.TaskId)); + // break retry loop because submission is successful + break; + } + catch (Exception e) + { + if (nbRetry >= maxRetries - 1) + { + throw; + } + + var innerException = e is AggregateException + { + InnerExceptions.Count: 1, + } agg + ? agg.InnerException + : e; + + switch (innerException) + { + case RpcException: + case IOException: + Logger.LogWarning(innerException, + "Failure to submit : Retrying"); + break; + default: + Logger.LogError(innerException, + "Unknown failure"); + throw; + } + + if (nbRetry > 0) + { + Logger.LogWarning("{retry}/{maxRetries} nbRetry to submit tasks", + nbRetry, + maxRetries); + } + } } } - throw new Exception("Max retry to send has been reached"); + return tasksSubmitted; } /// diff --git a/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Common/ArmoniK.EndToEndTests.Common.csproj b/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Common/ArmoniK.EndToEndTests.Common.csproj index 584e1c7f..155a355d 100644 --- a/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Common/ArmoniK.EndToEndTests.Common.csproj +++ b/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Common/ArmoniK.EndToEndTests.Common.csproj @@ -12,7 +12,7 @@ - + diff --git a/Worker/src/Common/ArmoniK.DevelopmentKit.Worker.Common.csproj b/Worker/src/Common/ArmoniK.DevelopmentKit.Worker.Common.csproj index f33cb8fe..c6ee70f9 100644 --- a/Worker/src/Common/ArmoniK.DevelopmentKit.Worker.Common.csproj +++ b/Worker/src/Common/ArmoniK.DevelopmentKit.Worker.Common.csproj @@ -8,7 +8,7 @@ - +