Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: remove submitter from chunk submit with dependencies method #239

Merged
merged 12 commits into from
Dec 15, 2023
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="ArmoniK.Api.Client" Version="3.14.0" />
<PackageReference Include="ArmoniK.Api.Client" Version="3.15.0-edge.23.b178614" />
<PackageReference Include="Microsoft.Extensions.Configuration.Binder" Version="7.0.4" />
<PackageReference Include="Microsoft.Extensions.Configuration.EnvironmentVariables" Version="7.0.0" />
</ItemGroup>
Expand Down
225 changes: 160 additions & 65 deletions Client/src/Common/Submitter/BaseClientSubmitter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
using System.Threading.Tasks;

using ArmoniK.Api.Client;
using ArmoniK.Api.Client.Submitter;
using ArmoniK.Api.Common.Utils;
using ArmoniK.Api.gRPC.V1;
using ArmoniK.Api.gRPC.V1.Results;
Expand Down Expand Up @@ -63,6 +62,8 @@ public abstract class BaseClientSubmitter<T>
/// </summary>
private readonly int chunkSubmitSize_;

private readonly int configuration_;

private readonly Properties properties_;

/// <summary>
Expand Down Expand Up @@ -93,6 +94,9 @@ protected BaseClientSubmitter(Properties properties,
{
TaskOptions.PartitionId,
});

configuration_ = ChannelPool.WithChannel(channel => new Results.ResultsClient(channel).GetServiceConfiguration(new Empty())
.DataChunkMaxSize);
}

private ILoggerFactory LoggerFactory { get; }
Expand Down Expand Up @@ -285,83 +289,174 @@ private IEnumerable<string> ChunkSubmitTasksWithDependencies(IEnumerable<Tuple<s
{
using var _ = Logger.LogFunction();

var taskRequests = payloadsWithDependencies.Select(pwd =>
{
var taskRequest = new TaskRequest
{
Payload = UnsafeByteOperations.UnsafeWrap(pwd.Item2),
};
taskRequest.DataDependencies.AddRange(pwd.Item3);
taskRequest.ExpectedOutputKeys.Add(pwd.Item1);
return taskRequest;
});

for (var nbRetry = 0; nbRetry < maxRetries; nbRetry++)
var tasks = new List<SubmitTasksRequest.Types.TaskCreation>();
var tasksSubmitted = new List<string>();

foreach (var (resultId, payload, dependencies) in payloadsWithDependencies)
{
try
for (var nbRetry = 0; nbRetry < maxRetries; nbRetry++)
{
using var channel = ChannelPool.GetChannel();
var submitterService = new Api.gRPC.V1.Submitter.Submitter.SubmitterClient(channel);

var response = submitterService.CreateTasksAsync(SessionId.Id,
taskOptions ?? TaskOptions,
// multiple enumeration only occurs in case of failure
// ReSharper disable once PossibleMultipleEnumeration
taskRequests)
.ConfigureAwait(false)
.GetAwaiter()
.GetResult();
return response.ResponseCase switch
{
CreateTaskReply.ResponseOneofCase.CreationStatusList => response.CreationStatusList.CreationStatuses.Select(status => status.TaskInfo.TaskId),
CreateTaskReply.ResponseOneofCase.None => throw new Exception("Issue with Server !"),
CreateTaskReply.ResponseOneofCase.Error => throw new Exception("Error while creating tasks !"),
_ => throw new InvalidOperationException(),
};
}
catch (Exception e)
{
if (nbRetry >= maxRetries - 1)
using var channel = ChannelPool.GetChannel();
var resultsClient = new Results.ResultsClient(channel);

try
{
throw;
}
// todo: migrate to ArmoniK.Api
string payloadId;
if (payload.Length > configuration_)
{
payloadId = resultsClient.CreateResultsMetaData(new CreateResultsMetaDataRequest
{
SessionId = SessionId.Id,
Results =
{
new CreateResultsMetaDataRequest.Types.ResultCreate(),
},
})
.Results.Select(raw => raw.ResultId)
.Single();

resultsClient.UploadResultData(SessionId.Id,
payloadId,
payload);
}
else
{
payloadId = resultsClient.CreateResults(new CreateResultsRequest
{
SessionId = SessionId.Id,
Results =
{
new CreateResultsRequest.Types.ResultCreate
{
Data = UnsafeByteOperations.UnsafeWrap(payload),
},
},
})
.Results.Select(raw => raw.ResultId)
.Single();
}

switch (e)

tasks.Add(new SubmitTasksRequest.Types.TaskCreation
{
PayloadId = payloadId,
DataDependencies =
{
dependencies,
},
ExpectedOutputKeys =
{
resultId,
},
});
// break retry loop because submission is successful
break;
}
catch (Exception e)
{
case AggregateException
{
InnerException: RpcException,
} ex:
Logger.LogWarning(ex.InnerException,
"Failure to submit");
break;
case AggregateException
{
InnerException: IOException,
} ex:
Logger.LogWarning(ex.InnerException,
"IOException : Failure to submit, Retrying");
break;
case IOException ex:
Logger.LogWarning(ex,
"IOException Failure to submit");
break;
default:
Logger.LogError(e,
"Unknown failure :");
if (nbRetry >= maxRetries - 1)
{
throw;
}

var innerException = e is AggregateException
{
InnerExceptions.Count: 1,
} agg
? agg.InnerException
: e;

switch (innerException)
{
case RpcException:
case IOException:
Logger.LogWarning(innerException,
"Failure to submit : Retrying");
break;
default:
Logger.LogError(innerException,
"Unknown failure");
throw;
}

if (nbRetry > 0)
{
Logger.LogWarning("{retry}/{maxRetries} nbRetry to submit task associated to {resultId}",
nbRetry,
maxRetries,
resultId);
}
}
}
}

foreach (var taskChunk in tasks.ToChunks(100))
{
if (taskChunk.Length == 0)
{
continue;
}

if (nbRetry > 0)
for (var nbRetry = 0; nbRetry < maxRetries; nbRetry++)
{
Logger.LogWarning("{retry}/{maxRetries} nbRetry to submit batch of task",
nbRetry,
maxRetries);
using var channel = ChannelPool.GetChannel();
var tasksClient = new Tasks.TasksClient(channel);

try
{
var submitTasksResponse = tasksClient.SubmitTasks(new SubmitTasksRequest
{
TaskOptions = taskOptions,
SessionId = SessionId.Id,
TaskCreations =
{
taskChunk,
},
});

tasksSubmitted.AddRange(submitTasksResponse.TaskInfos.Select(info => info.TaskId));
// break retry loop because submission is successful
break;
}
catch (Exception e)
{
if (nbRetry >= maxRetries - 1)
{
throw;
}

var innerException = e is AggregateException
{
InnerExceptions.Count: 1,
} agg
? agg.InnerException
: e;

switch (innerException)
{
case RpcException:
case IOException:
Logger.LogWarning(innerException,
"Failure to submit : Retrying");
break;
default:
Logger.LogError(innerException,
"Unknown failure");
throw;
}

if (nbRetry > 0)
{
Logger.LogWarning("{retry}/{maxRetries} nbRetry to submit tasks",
nbRetry,
maxRetries);
}
}
}
}

throw new Exception("Max retry to send has been reached");
return tasksSubmitted;
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="ArmoniK.Api.Common" Version="3.14.0" />
<PackageReference Include="ArmoniK.Api.Common" Version="3.15.0-edge.23.b178614" />
<PackageReference Include="Microsoft.Extensions.Configuration.EnvironmentVariables" Version="7.0.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.FileExtensions" Version="7.0.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.Json" Version="7.0.0" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="ArmoniK.Api.Worker" Version="3.14.0" />
<PackageReference Include="ArmoniK.Api.Worker" Version="3.15.0-edge.23.b178614" />
<PackageReference Include="AWSSDK.S3" Version="3.7.106.1" />
<PackageReference Include="Microsoft.NETFramework.ReferenceAssemblies" Version="1.0.3" PrivateAssets="All" />
<PackageReference Include="Microsoft.VisualStudio.Azure.Containers.Tools.Targets" Version="1.18.1" />
Expand Down
Loading