Skip to content

Commit

Permalink
refactor: Use Api streamwrapper (#190)
Browse files Browse the repository at this point in the history
  • Loading branch information
dbrasseur-aneo authored Jun 27, 2023
2 parents 97e720b + 3c1a904 commit b558fb5
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 98 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="ArmoniK.Api.Client" Version="3.9.0-edge.115.5d51a95" />
<PackageReference Include="ArmoniK.Api.Client" Version="3.9.0-edge.128.8e8057a" />
<PackageReference Include="Grpc.Core" Version="2.46.6" />
<PackageReference Include="Microsoft.Bcl.AsyncInterfaces" Version="7.0.0" />
<PackageReference Include="System.Net.Http" Version="4.3.4" />
Expand Down
122 changes: 28 additions & 94 deletions Client/src/Common/Submitter/BaseClientSubmitter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
using System.Threading;
using System.Threading.Tasks;

using ArmoniK.Api.Client.Submitter;
using ArmoniK.Api.Common.Utils;
using ArmoniK.Api.gRPC.V1;
using ArmoniK.Api.gRPC.V1.Results;
Expand Down Expand Up @@ -230,98 +231,31 @@ private IEnumerable<string> ChunkSubmitTasksWithDependencies(IEnumerable<Tuple<s
using var channel = channelPool_.GetChannel();
var submitterService = new Api.gRPC.V1.Submitter.Submitter.SubmitterClient(channel);

var serviceConfiguration = submitterService.GetServiceConfigurationAsync(new Empty())
.ResponseAsync.Result;

using var asyncClientStreamingCall = submitterService.CreateLargeTasks();

asyncClientStreamingCall.RequestStream.WriteAsync(new CreateLargeTaskRequest
{
InitRequest = new CreateLargeTaskRequest.Types.InitRequest
{
SessionId = SessionId.Id,
TaskOptions = taskOptions ?? TaskOptions,
},
})
.Wait();

//
// Here the payloadsWithDependencies can have multiple enumeration
// It will happen only during a retry to submit tasks
// Loosing a little bit of perf in case of retry is not a big deal : in the usual case, it should not happen.
//
foreach (var (resultId, payload, dependencies) in payloadsWithDependencies)
{
asyncClientStreamingCall.RequestStream.WriteAsync(new CreateLargeTaskRequest
{
InitTask = new InitTaskRequest
{
Header = new TaskRequestHeader
{
ExpectedOutputKeys =
{
resultId,
},
DataDependencies =
{
dependencies ?? new List<string>(),
},
},
},
})
.Wait();

for (var j = 0; j < payload.Length; j += serviceConfiguration.DataChunkMaxSize)
{
var chunkSize = Math.Min(serviceConfiguration.DataChunkMaxSize,
payload.Length - j);

asyncClientStreamingCall.RequestStream.WriteAsync(new CreateLargeTaskRequest
{
TaskPayload = new DataChunk
{
Data = UnsafeByteOperations.UnsafeWrap(payload.AsMemory(j,
chunkSize)),
},
})
.Wait();
}

asyncClientStreamingCall.RequestStream.WriteAsync(new CreateLargeTaskRequest
{
TaskPayload = new DataChunk
{
DataComplete = true,
},
})
.Wait();
}

asyncClientStreamingCall.RequestStream.WriteAsync(new CreateLargeTaskRequest
{
InitTask = new InitTaskRequest
{
LastTask = true,
},
})
.Wait();

asyncClientStreamingCall.RequestStream.CompleteAsync()
.Wait();

var createTaskReply = asyncClientStreamingCall.ResponseAsync.Result;

switch (createTaskReply.ResponseCase)
{
case CreateTaskReply.ResponseOneofCase.None:
throw new Exception("Issue with Server !");
case CreateTaskReply.ResponseOneofCase.CreationStatusList:
return createTaskReply.CreationStatusList.CreationStatuses.Select(status => status.TaskInfo.TaskId);
case CreateTaskReply.ResponseOneofCase.Error:
throw new Exception("Error while creating tasks !");
default:
throw new ArgumentOutOfRangeException();
}
//Multiple enumeration occurs on a retry
var response = submitterService.CreateTasksAsync(SessionId.Id,
taskOptions ?? TaskOptions,
payloadsWithDependencies.Select(pwd =>
{
var taskRequest = new TaskRequest
{
Payload = UnsafeByteOperations.UnsafeWrap(pwd.Item2),
};
taskRequest.DataDependencies
.AddRange(pwd.Item3 ?? Enumerable.Empty<string>());
taskRequest.ExpectedOutputKeys.Add(pwd.Item1);
return taskRequest;
}))
.ConfigureAwait(false)
.GetAwaiter()
.GetResult();
return response.ResponseCase switch
{
CreateTaskReply.ResponseOneofCase.CreationStatusList => response.CreationStatusList.CreationStatuses.Select(status => status.TaskInfo.TaskId)
.ToList(),
CreateTaskReply.ResponseOneofCase.None => throw new Exception("Issue with Server !"),
CreateTaskReply.ResponseOneofCase.Error => throw new Exception("Error while creating tasks !"),
_ => throw new ArgumentOutOfRangeException(),
};
}
catch (Exception e)
{
Expand Down Expand Up @@ -351,8 +285,8 @@ private IEnumerable<string> ChunkSubmitTasksWithDependencies(IEnumerable<Tuple<s
"IOException Failure to submit");
break;
default:
Logger.LogError(e,
"Unknown failure :");
Logger?.LogError(e,
"Unknown failure :");
throw;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="ArmoniK.Api.Common" Version="3.9.0-edge.115.5d51a95" />
<PackageReference Include="ArmoniK.Api.Common" Version="3.9.0-edge.128.8e8057a" />
<PackageReference Include="Microsoft.Extensions.Configuration" Version="6.0.1" />
<PackageReference Include="Microsoft.Extensions.Configuration.EnvironmentVariables" Version="6.0.1" />
<PackageReference Include="Microsoft.Extensions.Configuration.FileExtensions" Version="6.0.0" />
Expand Down
4 changes: 2 additions & 2 deletions Worker/src/Common/ArmoniK.DevelopmentKit.Worker.Common.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="ArmoniK.Api.Worker" Version="3.9.0-edge.115.5d51a95" />
<PackageReference Include="ArmoniK.Api.Worker" Version="3.9.0-edge.128.8e8057a" />
<PackageReference Include="AWSSDK.S3" Version="3.7.9.65" />
<PackageReference Include="AWSSDK.SecurityToken" Version="3.7.101.36" />
<PackageReference Include="Microsoft.NETFramework.ReferenceAssemblies" Version="1.0.3" PrivateAssets="All" />
<PackageReference Include="Microsoft.VisualStudio.Azure.Containers.Tools.Targets" Version="1.17.0" />
<PackageReference Include="Grpc.Tools" Version="2.54.0">
<PackageReference Include="Grpc.Tools" Version="2.56.0">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
Expand Down

0 comments on commit b558fb5

Please sign in to comment.