Skip to content

Commit

Permalink
refactor: remove submitter from chunk submit with dependencies method (
Browse files Browse the repository at this point in the history
  • Loading branch information
aneojgurhem authored Dec 15, 2023
2 parents d4b77d9 + f7c7f8b commit 3271336
Show file tree
Hide file tree
Showing 4 changed files with 163 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="ArmoniK.Api.Client" Version="3.14.0" />
<PackageReference Include="ArmoniK.Api.Client" Version="3.15.0-edge.23.b178614" />
<PackageReference Include="Microsoft.Extensions.Configuration.Binder" Version="7.0.4" />
<PackageReference Include="Microsoft.Extensions.Configuration.EnvironmentVariables" Version="7.0.0" />
</ItemGroup>
Expand Down
225 changes: 160 additions & 65 deletions Client/src/Common/Submitter/BaseClientSubmitter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
using System.Threading.Tasks;

using ArmoniK.Api.Client;
using ArmoniK.Api.Client.Submitter;
using ArmoniK.Api.Common.Utils;
using ArmoniK.Api.gRPC.V1;
using ArmoniK.Api.gRPC.V1.Results;
Expand Down Expand Up @@ -63,6 +62,8 @@ public abstract class BaseClientSubmitter<T>
/// </summary>
private readonly int chunkSubmitSize_;

private readonly int configuration_;

private readonly Properties properties_;

/// <summary>
Expand Down Expand Up @@ -93,6 +94,9 @@ protected BaseClientSubmitter(Properties properties,
{
TaskOptions.PartitionId,
});

configuration_ = ChannelPool.WithChannel(channel => new Results.ResultsClient(channel).GetServiceConfiguration(new Empty())
.DataChunkMaxSize);
}

private ILoggerFactory LoggerFactory { get; }
Expand Down Expand Up @@ -285,83 +289,174 @@ private IEnumerable<string> ChunkSubmitTasksWithDependencies(IEnumerable<Tuple<s
{
using var _ = Logger.LogFunction();

var taskRequests = payloadsWithDependencies.Select(pwd =>
{
var taskRequest = new TaskRequest
{
Payload = UnsafeByteOperations.UnsafeWrap(pwd.Item2),
};
taskRequest.DataDependencies.AddRange(pwd.Item3);
taskRequest.ExpectedOutputKeys.Add(pwd.Item1);
return taskRequest;
});

for (var nbRetry = 0; nbRetry < maxRetries; nbRetry++)
var tasks = new List<SubmitTasksRequest.Types.TaskCreation>();
var tasksSubmitted = new List<string>();

foreach (var (resultId, payload, dependencies) in payloadsWithDependencies)
{
try
for (var nbRetry = 0; nbRetry < maxRetries; nbRetry++)
{
using var channel = ChannelPool.GetChannel();
var submitterService = new Api.gRPC.V1.Submitter.Submitter.SubmitterClient(channel);

var response = submitterService.CreateTasksAsync(SessionId.Id,
taskOptions ?? TaskOptions,
// multiple enumeration only occurs in case of failure
// ReSharper disable once PossibleMultipleEnumeration
taskRequests)
.ConfigureAwait(false)
.GetAwaiter()
.GetResult();
return response.ResponseCase switch
{
CreateTaskReply.ResponseOneofCase.CreationStatusList => response.CreationStatusList.CreationStatuses.Select(status => status.TaskInfo.TaskId),
CreateTaskReply.ResponseOneofCase.None => throw new Exception("Issue with Server !"),
CreateTaskReply.ResponseOneofCase.Error => throw new Exception("Error while creating tasks !"),
_ => throw new InvalidOperationException(),
};
}
catch (Exception e)
{
if (nbRetry >= maxRetries - 1)
using var channel = ChannelPool.GetChannel();
var resultsClient = new Results.ResultsClient(channel);

try
{
throw;
}
// todo: migrate to ArmoniK.Api
string payloadId;
if (payload.Length > configuration_)
{
payloadId = resultsClient.CreateResultsMetaData(new CreateResultsMetaDataRequest
{
SessionId = SessionId.Id,
Results =
{
new CreateResultsMetaDataRequest.Types.ResultCreate(),
},
})
.Results.Select(raw => raw.ResultId)
.Single();

resultsClient.UploadResultData(SessionId.Id,
payloadId,
payload);
}
else
{
payloadId = resultsClient.CreateResults(new CreateResultsRequest
{
SessionId = SessionId.Id,
Results =
{
new CreateResultsRequest.Types.ResultCreate
{
Data = UnsafeByteOperations.UnsafeWrap(payload),
},
},
})
.Results.Select(raw => raw.ResultId)
.Single();
}

switch (e)

tasks.Add(new SubmitTasksRequest.Types.TaskCreation
{
PayloadId = payloadId,
DataDependencies =
{
dependencies,
},
ExpectedOutputKeys =
{
resultId,
},
});
// break retry loop because submission is successful
break;
}
catch (Exception e)
{
case AggregateException
{
InnerException: RpcException,
} ex:
Logger.LogWarning(ex.InnerException,
"Failure to submit");
break;
case AggregateException
{
InnerException: IOException,
} ex:
Logger.LogWarning(ex.InnerException,
"IOException : Failure to submit, Retrying");
break;
case IOException ex:
Logger.LogWarning(ex,
"IOException Failure to submit");
break;
default:
Logger.LogError(e,
"Unknown failure :");
if (nbRetry >= maxRetries - 1)
{
throw;
}

var innerException = e is AggregateException
{
InnerExceptions.Count: 1,
} agg
? agg.InnerException
: e;

switch (innerException)
{
case RpcException:
case IOException:
Logger.LogWarning(innerException,
"Failure to submit : Retrying");
break;
default:
Logger.LogError(innerException,
"Unknown failure");
throw;
}

if (nbRetry > 0)
{
Logger.LogWarning("{retry}/{maxRetries} nbRetry to submit task associated to {resultId}",
nbRetry,
maxRetries,
resultId);
}
}
}
}

foreach (var taskChunk in tasks.ToChunks(100))
{
if (taskChunk.Length == 0)
{
continue;
}

if (nbRetry > 0)
for (var nbRetry = 0; nbRetry < maxRetries; nbRetry++)
{
Logger.LogWarning("{retry}/{maxRetries} nbRetry to submit batch of task",
nbRetry,
maxRetries);
using var channel = ChannelPool.GetChannel();
var tasksClient = new Tasks.TasksClient(channel);

try
{
var submitTasksResponse = tasksClient.SubmitTasks(new SubmitTasksRequest
{
TaskOptions = taskOptions,
SessionId = SessionId.Id,
TaskCreations =
{
taskChunk,
},
});

tasksSubmitted.AddRange(submitTasksResponse.TaskInfos.Select(info => info.TaskId));
// break retry loop because submission is successful
break;
}
catch (Exception e)
{
if (nbRetry >= maxRetries - 1)
{
throw;
}

var innerException = e is AggregateException
{
InnerExceptions.Count: 1,
} agg
? agg.InnerException
: e;

switch (innerException)
{
case RpcException:
case IOException:
Logger.LogWarning(innerException,
"Failure to submit : Retrying");
break;
default:
Logger.LogError(innerException,
"Unknown failure");
throw;
}

if (nbRetry > 0)
{
Logger.LogWarning("{retry}/{maxRetries} nbRetry to submit tasks",
nbRetry,
maxRetries);
}
}
}
}

throw new Exception("Max retry to send has been reached");
return tasksSubmitted;
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="ArmoniK.Api.Common" Version="3.14.0" />
<PackageReference Include="ArmoniK.Api.Common" Version="3.15.0-edge.23.b178614" />
<PackageReference Include="Microsoft.Extensions.Configuration.EnvironmentVariables" Version="7.0.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.FileExtensions" Version="7.0.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.Json" Version="7.0.0" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="ArmoniK.Api.Worker" Version="3.14.0" />
<PackageReference Include="ArmoniK.Api.Worker" Version="3.15.0-edge.23.b178614" />
<PackageReference Include="AWSSDK.S3" Version="3.7.106.1" />
<PackageReference Include="Microsoft.NETFramework.ReferenceAssemblies" Version="1.0.3" PrivateAssets="All" />
<PackageReference Include="Microsoft.VisualStudio.Azure.Containers.Tools.Targets" Version="1.18.1" />
Expand Down

0 comments on commit 3271336

Please sign in to comment.