Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revamp TaskResult of UnifiedAPI #93

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
297 changes: 119 additions & 178 deletions UnifiedApi/Client/Services/Submitter/Service.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@
using ArmoniK.DevelopmentKit.Common;
using ArmoniK.DevelopmentKit.Common.Exceptions;

using Google.Protobuf.WellKnownTypes;

using JetBrains.Annotations;

using Microsoft.Extensions.Logging;
Expand Down Expand Up @@ -267,13 +265,9 @@ public string SubmitTask(ArmonikPayload payload,
handler)
.Single();

private void ProxyTryGetResults(IEnumerable<string> taskIds,
Action<string, byte[]> responseHandler,
Action<string, TaskStatus, string> errorHandler)
private void ResultTask()
{
var missing = taskIds.ToHashSet();
var holdPrev = missing.Count;
var waitInSeconds = new List<int>
var waitInSeconds = new[]
{
10,
1000,
Expand All @@ -282,194 +276,141 @@ private void ProxyTryGetResults(IEnumerable<string> taskIds,
20000,
30000,
};
var idx = 0;
var emptyFetch = 0;

while (missing.Count != 0)
while (!(CancellationResultTaskSource.Token.IsCancellationRequested && ResultHandlerDictionary.IsEmpty))
{
foreach (var bucket in missing.ToList()
.Batch(500))
try
{
var resultStatusCollection = SessionService.GetResultStatus(bucket);
if (ResultHandlerDictionary.IsEmpty)
{
Thread.Sleep(100);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this necessary ?

continue;
}

var fetched = 0;

foreach (var resultStatusData in resultStatusCollection.IdsReady)
foreach (var chunk in ResultHandlerDictionary.Keys.Batch(500))
{
try
{
Logger?.LogTrace("Response handler for {taskId}",
resultStatusData.TaskId);
responseHandler(resultStatusData.TaskId,
SessionService.TryGetResultAsync(new ResultRequest
{
ResultId = resultStatusData.ResultId,
Session = SessionId,
},
CancellationToken.None)
.Result);
}
catch (Exception e)
var resultStatusCollection = SessionService.GetResultStatus(chunk);

foreach (var resultStatusData in resultStatusCollection.IdsReady)
{
Logger?.LogWarning(e,
"Response handler for {taskId} threw an error",
resultStatusData.TaskId);
var taskId = resultStatusData.TaskId;
fetched += 1;

try
{
errorHandler(resultStatusData.TaskId,
TaskStatus.Error,
e.Message + e.StackTrace);
var byteResult = SessionService.TryGetResultAsync(new ResultRequest
{
ResultId = resultStatusData.ResultId,
Session = SessionId,
},
CancellationToken.None)
.Result;
Logger?.LogTrace("Response handler for {taskId}",
resultStatusData.TaskId);

var result = ProtoSerializer.DeSerializeMessageObjectArray(byteResult);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe distinguish between error from deserializing and handler execution ?

ResultHandlerDictionary[taskId]
.HandleResponse(result?[0],
taskId);
}
catch (Exception e2)
catch (Exception e)
{
Logger?.LogError(e2,
"An error occured while handling another error: {details}",
e);
Logger?.LogDebug(e,
"Response handler for {taskId} threw an exception, calling error handler",
resultStatusData.TaskId);

var ex = e;

if (e is AggregateException ae)
{
ex = ae.InnerExceptions.Count > 1
? ae
: ae.InnerException;
}

try
{
ResultHandlerDictionary[taskId]
.HandleError(new ServiceInvocationException(ex,
ArmonikStatusCode.Unknown),
taskId);
}
catch (Exception e2)
{
Logger?.LogError(e2,
"An exception was thrown while handling a previous exception in the result handler of {taskId}",
taskId);
}
}
finally
{
ResultHandlerDictionary.TryRemove(taskId,
out _);
}
}
}

missing.ExceptWith(resultStatusCollection.IdsReady.Select(x => x.TaskId));
var x = Duration.FromTimeSpan(TimeSpan.FromMinutes(5));

foreach (var resultStatusData in resultStatusCollection.IdsResultError)
{
var details = "";

var taskStatus = SessionService.GetTaskStatus(resultStatusData.TaskId);

switch (taskStatus)
foreach (var resultStatusData in resultStatusCollection.IdsResultError)
{
case TaskStatus.Canceling:
case TaskStatus.Canceled:
details = $"Task {resultStatusData.TaskId} was canceled";
break;
default:
var outputInfo = SessionService.GetTaskOutputInfo(resultStatusData.TaskId);
details = outputInfo.TypeCase == Output.TypeOneofCase.Error
? outputInfo.Error.Details
: "Result is in status : " + resultStatusData.Status + ", look for task in error in logs.";
break;
}
var taskId = resultStatusData.TaskId;
var details = "";
var taskStatus = TaskStatus.Unspecified;
fetched += 1;

Logger?.LogDebug("Error handler for {taskId}, {taskStatus}: {details}",
resultStatusData.TaskId,
taskStatus,
details);
try
{
errorHandler(resultStatusData.TaskId,
taskStatus,
details);
}
catch (Exception e)
{
Logger?.LogError(e,
"An error occured while handling a Task error {status}: {details}",
taskStatus,
details);
try
{
taskStatus = SessionService.GetTaskStatus(resultStatusData.TaskId);
if (!StatusCodesLookUp.TryGetValue(taskStatus,
out var statusCode))
{
statusCode = ArmonikStatusCode.Unknown;
}

switch (taskStatus)
{
case TaskStatus.Canceling:
case TaskStatus.Canceled:
details = $"Task {resultStatusData.TaskId} was canceled";
break;
default:
var outputInfo = SessionService.GetTaskOutputInfo(resultStatusData.TaskId);
details = outputInfo.TypeCase == Output.TypeOneofCase.Error
? outputInfo.Error.Details
: "Result is in status : " + resultStatusData.Status + ", look for task in error in logs.";
break;
}

Logger?.LogDebug("Error handler for {taskId}, {taskStatus}: {details}",
resultStatusData.TaskId,
taskStatus,
details);
ResultHandlerDictionary[resultStatusData.TaskId]
.HandleError(new ServiceInvocationException(details,
statusCode),
resultStatusData.TaskId);
}
catch (Exception e)
{
Logger?.LogError(e,
"An error occured while handling a Task error {status}: {details}",
taskStatus,
details);
}
finally
{
ResultHandlerDictionary.TryRemove(taskId,
out _);
}
}
}

missing.ExceptWith(resultStatusCollection.IdsResultError.Select(x => x.TaskId));

if (holdPrev == missing.Count)
{
idx = idx >= waitInSeconds.Count - 1
? waitInSeconds.Count - 1
: idx + 1;

Logger?.LogDebug("No Results are ready. Wait for {timeWait} seconds before new retry",
waitInSeconds[idx] / 1000);
}
else
{
idx = 0;
}

holdPrev = missing.Count;

Thread.Sleep(waitInSeconds[idx]);
}
}
}

private void ResultTask()
{
while (!(CancellationResultTaskSource.Token.IsCancellationRequested && ResultHandlerDictionary.IsEmpty))
{
try
{
if (!ResultHandlerDictionary.IsEmpty)
{
ProxyTryGetResults(ResultHandlerDictionary.Keys.ToList(),
(taskId,
byteResult) =>
{
try
{
var result = ProtoSerializer.DeSerializeMessageObjectArray(byteResult);
ResultHandlerDictionary[taskId]
.HandleResponse(result?[0],
taskId);
}
catch (Exception e)
{
const ArmonikStatusCode statusCode = ArmonikStatusCode.Unknown;

ServiceInvocationException ex;

var ae = e as AggregateException;

if (ae is not null && ae.InnerExceptions.Count > 1)
{
ex = new ServiceInvocationException(ae,
statusCode);
}
else if (ae is not null)
{
ex = new ServiceInvocationException(ae.InnerException,
statusCode);
}
else
{
ex = new ServiceInvocationException(e,
statusCode);
}

ResultHandlerDictionary[taskId]
.HandleError(ex,
taskId);
}
finally
{
ResultHandlerDictionary.TryRemove(taskId,
out _);
}
},
(taskId,
taskStatus,
ex) =>
{
try
{
var statusCode = StatusCodesLookUp.Keys.Contains(taskStatus)
? StatusCodesLookUp[taskStatus]
: ArmonikStatusCode.Unknown;

ResultHandlerDictionary[taskId]
.HandleError(new ServiceInvocationException(ex,
statusCode),
taskId);
}
finally
{
ResultHandlerDictionary.TryRemove(taskId,
out _);
}
});
}
else
{
Thread.Sleep(100);
}
Thread.Sleep(waitInSeconds[emptyFetch]);
emptyFetch = fetched == 0
? Math.Min(emptyFetch + 1,
waitInSeconds.Length)
: 0;
}
catch (Exception e)
{
Expand Down