diff --git a/Client/src/Common/ArmoniK.DevelopmentKit.Client.Common.csproj b/Client/src/Common/ArmoniK.DevelopmentKit.Client.Common.csproj index d226cd70..fc800f31 100644 --- a/Client/src/Common/ArmoniK.DevelopmentKit.Client.Common.csproj +++ b/Client/src/Common/ArmoniK.DevelopmentKit.Client.Common.csproj @@ -8,7 +8,7 @@ - + diff --git a/Client/src/Common/Submitter/BaseClientSubmitter.cs b/Client/src/Common/Submitter/BaseClientSubmitter.cs index 6fed42cf..6e12dd04 100644 --- a/Client/src/Common/Submitter/BaseClientSubmitter.cs +++ b/Client/src/Common/Submitter/BaseClientSubmitter.cs @@ -22,6 +22,7 @@ using System.Threading; using System.Threading.Tasks; +using ArmoniK.Api.Client.Submitter; using ArmoniK.Api.Common.Utils; using ArmoniK.Api.gRPC.V1; using ArmoniK.Api.gRPC.V1.Results; @@ -230,98 +231,31 @@ private IEnumerable ChunkSubmitTasksWithDependencies(IEnumerable(), - }, - }, - }, - }) - .Wait(); - - for (var j = 0; j < payload.Length; j += serviceConfiguration.DataChunkMaxSize) - { - var chunkSize = Math.Min(serviceConfiguration.DataChunkMaxSize, - payload.Length - j); - - asyncClientStreamingCall.RequestStream.WriteAsync(new CreateLargeTaskRequest - { - TaskPayload = new DataChunk - { - Data = UnsafeByteOperations.UnsafeWrap(payload.AsMemory(j, - chunkSize)), - }, - }) - .Wait(); - } - - asyncClientStreamingCall.RequestStream.WriteAsync(new CreateLargeTaskRequest - { - TaskPayload = new DataChunk - { - DataComplete = true, - }, - }) - .Wait(); - } - - asyncClientStreamingCall.RequestStream.WriteAsync(new CreateLargeTaskRequest - { - InitTask = new InitTaskRequest - { - LastTask = true, - }, - }) - .Wait(); - - asyncClientStreamingCall.RequestStream.CompleteAsync() - .Wait(); - - var createTaskReply = asyncClientStreamingCall.ResponseAsync.Result; - - switch (createTaskReply.ResponseCase) - { - case CreateTaskReply.ResponseOneofCase.None: - throw new Exception("Issue with Server !"); - case CreateTaskReply.ResponseOneofCase.CreationStatusList: - return createTaskReply.CreationStatusList.CreationStatuses.Select(status => status.TaskInfo.TaskId); - case CreateTaskReply.ResponseOneofCase.Error: - throw new Exception("Error while creating tasks !"); - default: - throw new ArgumentOutOfRangeException(); - } + //Multiple enumeration occurs on a retry + var response = submitterService.CreateTasksAsync(SessionId.Id, + taskOptions ?? TaskOptions, + payloadsWithDependencies.Select(pwd => + { + var taskRequest = new TaskRequest + { + Payload = UnsafeByteOperations.UnsafeWrap(pwd.Item2), + }; + taskRequest.DataDependencies + .AddRange(pwd.Item3 ?? Enumerable.Empty()); + taskRequest.ExpectedOutputKeys.Add(pwd.Item1); + return taskRequest; + })) + .ConfigureAwait(false) + .GetAwaiter() + .GetResult(); + return response.ResponseCase switch + { + CreateTaskReply.ResponseOneofCase.CreationStatusList => response.CreationStatusList.CreationStatuses.Select(status => status.TaskInfo.TaskId) + .ToList(), + CreateTaskReply.ResponseOneofCase.None => throw new Exception("Issue with Server !"), + CreateTaskReply.ResponseOneofCase.Error => throw new Exception("Error while creating tasks !"), + _ => throw new ArgumentOutOfRangeException(), + }; } catch (Exception e) { @@ -351,8 +285,8 @@ private IEnumerable ChunkSubmitTasksWithDependencies(IEnumerable - + diff --git a/Worker/src/Common/ArmoniK.DevelopmentKit.Worker.Common.csproj b/Worker/src/Common/ArmoniK.DevelopmentKit.Worker.Common.csproj index b7f86435..d1d15123 100644 --- a/Worker/src/Common/ArmoniK.DevelopmentKit.Worker.Common.csproj +++ b/Worker/src/Common/ArmoniK.DevelopmentKit.Worker.Common.csproj @@ -8,12 +8,12 @@ - + - + all runtime; build; native; contentfiles; analyzers; buildtransitive