-
Notifications
You must be signed in to change notification settings - Fork 8
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Revamp TaskResult of UnifiedAPI #93
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -36,8 +36,6 @@ | |
using ArmoniK.DevelopmentKit.Common.Exceptions; | ||
using ArmoniK.DevelopmentKit.Common.Extensions; | ||
|
||
using Google.Protobuf.WellKnownTypes; | ||
|
||
using JetBrains.Annotations; | ||
|
||
using Microsoft.Extensions.Logging; | ||
|
@@ -268,14 +266,9 @@ public string SubmitTask(ArmonikPayload payload, | |
handler) | ||
.Single(); | ||
|
||
private void ProxyTryGetResults(IEnumerable<string> taskIds, | ||
Action<string, byte[]> responseHandler, | ||
Action<string, TaskStatus, string> errorHandler, | ||
int chunkResultSize = 500) | ||
private void ResultTask() | ||
{ | ||
var missing = taskIds.ToHashSet(); | ||
var holdPrev = missing.Count; | ||
var waitInSeconds = new List<int> | ||
var waitInSeconds = new[] | ||
{ | ||
10, | ||
1000, | ||
|
@@ -284,194 +277,141 @@ private void ProxyTryGetResults(IEnumerable<string> taskIds, | |
20000, | ||
30000, | ||
}; | ||
var idx = 0; | ||
var emptyFetch = 0; | ||
|
||
while (missing.Count != 0) | ||
while (!(CancellationResultTaskSource.Token.IsCancellationRequested && ResultHandlerDictionary.IsEmpty)) | ||
{ | ||
foreach (var bucket in missing.ToList() | ||
.ToChunk(chunkResultSize)) | ||
try | ||
{ | ||
var resultStatusCollection = SessionService.GetResultStatus(bucket); | ||
if (ResultHandlerDictionary.IsEmpty) | ||
{ | ||
Thread.Sleep(100); | ||
continue; | ||
} | ||
|
||
var fetched = 0; | ||
|
||
foreach (var resultStatusData in resultStatusCollection.IdsReady) | ||
foreach (var chunk in ResultHandlerDictionary.Keys.ToChunk(500)) | ||
{ | ||
try | ||
{ | ||
Logger?.LogTrace("Response handler for {taskId}", | ||
resultStatusData.TaskId); | ||
responseHandler(resultStatusData.TaskId, | ||
SessionService.TryGetResultAsync(new ResultRequest | ||
{ | ||
ResultId = resultStatusData.ResultId, | ||
Session = SessionId, | ||
}, | ||
CancellationToken.None) | ||
.Result); | ||
} | ||
catch (Exception e) | ||
var resultStatusCollection = SessionService.GetResultStatus(chunk); | ||
|
||
foreach (var resultStatusData in resultStatusCollection.IdsReady) | ||
{ | ||
Logger?.LogWarning(e, | ||
"Response handler for {taskId} threw an error", | ||
resultStatusData.TaskId); | ||
var taskId = resultStatusData.TaskId; | ||
fetched += 1; | ||
|
||
try | ||
{ | ||
errorHandler(resultStatusData.TaskId, | ||
TaskStatus.Error, | ||
e.Message + e.StackTrace); | ||
var byteResult = SessionService.TryGetResultAsync(new ResultRequest | ||
{ | ||
ResultId = resultStatusData.ResultId, | ||
Session = SessionId, | ||
}, | ||
CancellationToken.None) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No usage of CancellationResultTaskSource.Token instead of CancellationToker.None ? |
||
.Result; | ||
Logger?.LogTrace("Response handler for {taskId}", | ||
resultStatusData.TaskId); | ||
|
||
var result = ProtoSerializer.DeSerializeMessageObjectArray(byteResult); | ||
ResultHandlerDictionary[taskId] | ||
.HandleResponse(result?[0], | ||
taskId); | ||
} | ||
catch (Exception e2) | ||
catch (Exception e) | ||
{ | ||
Logger?.LogError(e2, | ||
"An error occured while handling another error: {details}", | ||
e); | ||
Logger?.LogDebug(e, | ||
"Response handler for {taskId} threw an exception, calling error handler", | ||
resultStatusData.TaskId); | ||
|
||
var ex = e; | ||
|
||
if (e is AggregateException ae) | ||
{ | ||
ex = ae.InnerExceptions.Count > 1 | ||
? ae | ||
: ae.InnerException; | ||
} | ||
|
||
try | ||
{ | ||
ResultHandlerDictionary[taskId] | ||
.HandleError(new ServiceInvocationException(ex, | ||
ArmonikStatusCode.Unknown), | ||
taskId); | ||
} | ||
catch (Exception e2) | ||
{ | ||
Logger?.LogError(e2, | ||
"An exception was thrown while handling a previous exception in the result handler of {taskId}", | ||
taskId); | ||
Comment on lines
+344
to
+346
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Re-throw exception here. User process should stop here since they don't manage properly the exception |
||
} | ||
} | ||
finally | ||
{ | ||
ResultHandlerDictionary.TryRemove(taskId, | ||
out _); | ||
} | ||
} | ||
} | ||
|
||
missing.ExceptWith(resultStatusCollection.IdsReady.Select(x => x.TaskId)); | ||
var x = Duration.FromTimeSpan(TimeSpan.FromMinutes(5)); | ||
|
||
foreach (var resultStatusData in resultStatusCollection.IdsResultError) | ||
{ | ||
var details = ""; | ||
|
||
var taskStatus = SessionService.GetTaskStatus(resultStatusData.TaskId); | ||
|
||
switch (taskStatus) | ||
foreach (var resultStatusData in resultStatusCollection.IdsResultError) | ||
{ | ||
case TaskStatus.Canceling: | ||
case TaskStatus.Canceled: | ||
details = $"Task {resultStatusData.TaskId} was canceled"; | ||
break; | ||
default: | ||
var outputInfo = SessionService.GetTaskOutputInfo(resultStatusData.TaskId); | ||
details = outputInfo.TypeCase == Output.TypeOneofCase.Error | ||
? outputInfo.Error.Details | ||
: "Result is in status : " + resultStatusData.Status + ", look for task in error in logs."; | ||
break; | ||
} | ||
var taskId = resultStatusData.TaskId; | ||
var details = ""; | ||
var taskStatus = TaskStatus.Unspecified; | ||
fetched += 1; | ||
|
||
Logger?.LogDebug("Error handler for {taskId}, {taskStatus}: {details}", | ||
resultStatusData.TaskId, | ||
taskStatus, | ||
details); | ||
try | ||
{ | ||
errorHandler(resultStatusData.TaskId, | ||
taskStatus, | ||
details); | ||
} | ||
catch (Exception e) | ||
{ | ||
Logger?.LogError(e, | ||
"An error occured while handling a Task error {status}: {details}", | ||
taskStatus, | ||
details); | ||
try | ||
{ | ||
taskStatus = SessionService.GetTaskStatus(resultStatusData.TaskId); | ||
if (!StatusCodesLookUp.TryGetValue(taskStatus, | ||
out var statusCode)) | ||
{ | ||
statusCode = ArmonikStatusCode.Unknown; | ||
} | ||
Comment on lines
+365
to
+370
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add specific exception for GetTaskStatus Grpc Request and call HandleError with Connection error exception if an exception occurs |
||
|
||
switch (taskStatus) | ||
{ | ||
case TaskStatus.Canceling: | ||
case TaskStatus.Canceled: | ||
details = $"Task {resultStatusData.TaskId} was canceled"; | ||
break; | ||
default: | ||
var outputInfo = SessionService.GetTaskOutputInfo(resultStatusData.TaskId); | ||
details = outputInfo.TypeCase == Output.TypeOneofCase.Error | ||
? outputInfo.Error.Details | ||
: "Result is in status : " + resultStatusData.Status + ", look for task in error in logs."; | ||
break; | ||
} | ||
|
||
Logger?.LogDebug("Error handler for {taskId}, {taskStatus}: {details}", | ||
resultStatusData.TaskId, | ||
taskStatus, | ||
details); | ||
ResultHandlerDictionary[resultStatusData.TaskId] | ||
.HandleError(new ServiceInvocationException(details, | ||
statusCode), | ||
resultStatusData.TaskId); | ||
Comment on lines
+390
to
+393
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add specific Try catch to manage Handle error ? |
||
} | ||
catch (Exception e) | ||
{ | ||
Logger?.LogError(e, | ||
"An error occured while handling a Task error {status}: {details}", | ||
taskStatus, | ||
details); | ||
} | ||
finally | ||
{ | ||
ResultHandlerDictionary.TryRemove(taskId, | ||
out _); | ||
} | ||
} | ||
} | ||
|
||
missing.ExceptWith(resultStatusCollection.IdsResultError.Select(x => x.TaskId)); | ||
|
||
if (holdPrev == missing.Count) | ||
{ | ||
idx = idx >= waitInSeconds.Count - 1 | ||
? waitInSeconds.Count - 1 | ||
: idx + 1; | ||
|
||
Logger?.LogDebug("No Results are ready. Wait for {timeWait} seconds before new retry", | ||
waitInSeconds[idx] / 1000); | ||
} | ||
else | ||
{ | ||
idx = 0; | ||
} | ||
|
||
holdPrev = missing.Count; | ||
|
||
Thread.Sleep(waitInSeconds[idx]); | ||
} | ||
} | ||
} | ||
|
||
private void ResultTask() | ||
{ | ||
while (!(CancellationResultTaskSource.Token.IsCancellationRequested && ResultHandlerDictionary.IsEmpty)) | ||
{ | ||
try | ||
{ | ||
if (!ResultHandlerDictionary.IsEmpty) | ||
{ | ||
ProxyTryGetResults(ResultHandlerDictionary.Keys.ToList(), | ||
(taskId, | ||
byteResult) => | ||
{ | ||
try | ||
{ | ||
var result = ProtoSerializer.DeSerializeMessageObjectArray(byteResult); | ||
ResultHandlerDictionary[taskId] | ||
.HandleResponse(result?[0], | ||
taskId); | ||
} | ||
catch (Exception e) | ||
{ | ||
const ArmonikStatusCode statusCode = ArmonikStatusCode.Unknown; | ||
|
||
ServiceInvocationException ex; | ||
|
||
var ae = e as AggregateException; | ||
|
||
if (ae is not null && ae.InnerExceptions.Count > 1) | ||
{ | ||
ex = new ServiceInvocationException(ae, | ||
statusCode); | ||
} | ||
else if (ae is not null) | ||
{ | ||
ex = new ServiceInvocationException(ae.InnerException, | ||
statusCode); | ||
} | ||
else | ||
{ | ||
ex = new ServiceInvocationException(e, | ||
statusCode); | ||
} | ||
|
||
ResultHandlerDictionary[taskId] | ||
.HandleError(ex, | ||
taskId); | ||
} | ||
finally | ||
{ | ||
ResultHandlerDictionary.TryRemove(taskId, | ||
out _); | ||
} | ||
}, | ||
(taskId, | ||
taskStatus, | ||
ex) => | ||
{ | ||
try | ||
{ | ||
var statusCode = StatusCodesLookUp.Keys.Contains(taskStatus) | ||
? StatusCodesLookUp[taskStatus] | ||
: ArmonikStatusCode.Unknown; | ||
|
||
ResultHandlerDictionary[taskId] | ||
.HandleError(new ServiceInvocationException(ex, | ||
statusCode), | ||
taskId); | ||
} | ||
finally | ||
{ | ||
ResultHandlerDictionary.TryRemove(taskId, | ||
out _); | ||
} | ||
}); | ||
} | ||
else | ||
{ | ||
Thread.Sleep(100); | ||
} | ||
Thread.Sleep(waitInSeconds[emptyFetch]); | ||
emptyFetch = fetched == 0 | ||
? Math.Min(emptyFetch + 1, | ||
waitInSeconds.Length) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is it waitInSeconds.Length - 1 instead ? |
||
: 0; | ||
} | ||
catch (Exception e) | ||
{ | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use WaitInSeconds Array line 271 rather than a hard coded value. 10 ms is slightly enough