From 7547955a7a17a779cf53b5e56126a3ba1294658e Mon Sep 17 00:00:00 2001 From: Florian LEMAITRE Date: Fri, 30 Sep 2022 09:22:59 +0200 Subject: [PATCH] Revamp TaskResult of UnifiedAPI --- .../Client/Services/Submitter/Service.cs | 297 +++++++----------- 1 file changed, 119 insertions(+), 178 deletions(-) diff --git a/UnifiedApi/Client/Services/Submitter/Service.cs b/UnifiedApi/Client/Services/Submitter/Service.cs index 609adf45..66c33c37 100644 --- a/UnifiedApi/Client/Services/Submitter/Service.cs +++ b/UnifiedApi/Client/Services/Submitter/Service.cs @@ -35,8 +35,6 @@ using ArmoniK.DevelopmentKit.Common; using ArmoniK.DevelopmentKit.Common.Exceptions; -using Google.Protobuf.WellKnownTypes; - using JetBrains.Annotations; using Microsoft.Extensions.Logging; @@ -267,13 +265,9 @@ public string SubmitTask(ArmonikPayload payload, handler) .Single(); - private void ProxyTryGetResults(IEnumerable taskIds, - Action responseHandler, - Action errorHandler) + private void ResultTask() { - var missing = taskIds.ToHashSet(); - var holdPrev = missing.Count; - var waitInSeconds = new List + var waitInSeconds = new[] { 10, 1000, @@ -282,194 +276,141 @@ private void ProxyTryGetResults(IEnumerable taskIds, 20000, 30000, }; - var idx = 0; + var emptyFetch = 0; - while (missing.Count != 0) + while (!(CancellationResultTaskSource.Token.IsCancellationRequested && ResultHandlerDictionary.IsEmpty)) { - foreach (var bucket in missing.ToList() - .Batch(500)) + try { - var resultStatusCollection = SessionService.GetResultStatus(bucket); + if (ResultHandlerDictionary.IsEmpty) + { + Thread.Sleep(100); + continue; + } + + var fetched = 0; - foreach (var resultStatusData in resultStatusCollection.IdsReady) + foreach (var chunk in ResultHandlerDictionary.Keys.Batch(500)) { - try - { - Logger?.LogTrace("Response handler for {taskId}", - resultStatusData.TaskId); - responseHandler(resultStatusData.TaskId, - SessionService.TryGetResultAsync(new ResultRequest - { - ResultId = resultStatusData.ResultId, - Session = SessionId, - }, - CancellationToken.None) - .Result); - } - catch (Exception e) + var resultStatusCollection = SessionService.GetResultStatus(chunk); + + foreach (var resultStatusData in resultStatusCollection.IdsReady) { - Logger?.LogWarning(e, - "Response handler for {taskId} threw an error", - resultStatusData.TaskId); + var taskId = resultStatusData.TaskId; + fetched += 1; + try { - errorHandler(resultStatusData.TaskId, - TaskStatus.Error, - e.Message + e.StackTrace); + var byteResult = SessionService.TryGetResultAsync(new ResultRequest + { + ResultId = resultStatusData.ResultId, + Session = SessionId, + }, + CancellationToken.None) + .Result; + Logger?.LogTrace("Response handler for {taskId}", + resultStatusData.TaskId); + + var result = ProtoSerializer.DeSerializeMessageObjectArray(byteResult); + ResultHandlerDictionary[taskId] + .HandleResponse(result?[0], + taskId); } - catch (Exception e2) + catch (Exception e) { - Logger?.LogError(e2, - "An error occured while handling another error: {details}", - e); + Logger?.LogDebug(e, + "Response handler for {taskId} threw an exception, calling error handler", + resultStatusData.TaskId); + + var ex = e; + + if (e is AggregateException ae) + { + ex = ae.InnerExceptions.Count > 1 + ? ae + : ae.InnerException; + } + + try + { + ResultHandlerDictionary[taskId] + .HandleError(new ServiceInvocationException(ex, + ArmonikStatusCode.Unknown), + taskId); + } + catch (Exception e2) + { + Logger?.LogError(e2, + "An exception was thrown while handling a previous exception in the result handler of {taskId}", + taskId); + } + } + finally + { + ResultHandlerDictionary.TryRemove(taskId, + out _); } } - } - - missing.ExceptWith(resultStatusCollection.IdsReady.Select(x => x.TaskId)); - var x = Duration.FromTimeSpan(TimeSpan.FromMinutes(5)); - - foreach (var resultStatusData in resultStatusCollection.IdsResultError) - { - var details = ""; - var taskStatus = SessionService.GetTaskStatus(resultStatusData.TaskId); - - switch (taskStatus) + foreach (var resultStatusData in resultStatusCollection.IdsResultError) { - case TaskStatus.Canceling: - case TaskStatus.Canceled: - details = $"Task {resultStatusData.TaskId} was canceled"; - break; - default: - var outputInfo = SessionService.GetTaskOutputInfo(resultStatusData.TaskId); - details = outputInfo.TypeCase == Output.TypeOneofCase.Error - ? outputInfo.Error.Details - : "Result is in status : " + resultStatusData.Status + ", look for task in error in logs."; - break; - } + var taskId = resultStatusData.TaskId; + var details = ""; + var taskStatus = TaskStatus.Unspecified; + fetched += 1; - Logger?.LogDebug("Error handler for {taskId}, {taskStatus}: {details}", - resultStatusData.TaskId, - taskStatus, - details); - try - { - errorHandler(resultStatusData.TaskId, - taskStatus, - details); - } - catch (Exception e) - { - Logger?.LogError(e, - "An error occured while handling a Task error {status}: {details}", - taskStatus, - details); + try + { + taskStatus = SessionService.GetTaskStatus(resultStatusData.TaskId); + if (!StatusCodesLookUp.TryGetValue(taskStatus, + out var statusCode)) + { + statusCode = ArmonikStatusCode.Unknown; + } + + switch (taskStatus) + { + case TaskStatus.Canceling: + case TaskStatus.Canceled: + details = $"Task {resultStatusData.TaskId} was canceled"; + break; + default: + var outputInfo = SessionService.GetTaskOutputInfo(resultStatusData.TaskId); + details = outputInfo.TypeCase == Output.TypeOneofCase.Error + ? outputInfo.Error.Details + : "Result is in status : " + resultStatusData.Status + ", look for task in error in logs."; + break; + } + + Logger?.LogDebug("Error handler for {taskId}, {taskStatus}: {details}", + resultStatusData.TaskId, + taskStatus, + details); + ResultHandlerDictionary[resultStatusData.TaskId] + .HandleError(new ServiceInvocationException(details, + statusCode), + resultStatusData.TaskId); + } + catch (Exception e) + { + Logger?.LogError(e, + "An error occured while handling a Task error {status}: {details}", + taskStatus, + details); + } + finally + { + ResultHandlerDictionary.TryRemove(taskId, + out _); + } } } - missing.ExceptWith(resultStatusCollection.IdsResultError.Select(x => x.TaskId)); - - if (holdPrev == missing.Count) - { - idx = idx >= waitInSeconds.Count - 1 - ? waitInSeconds.Count - 1 - : idx + 1; - - Logger?.LogDebug("No Results are ready. Wait for {timeWait} seconds before new retry", - waitInSeconds[idx] / 1000); - } - else - { - idx = 0; - } - - holdPrev = missing.Count; - - Thread.Sleep(waitInSeconds[idx]); - } - } - } - - private void ResultTask() - { - while (!(CancellationResultTaskSource.Token.IsCancellationRequested && ResultHandlerDictionary.IsEmpty)) - { - try - { - if (!ResultHandlerDictionary.IsEmpty) - { - ProxyTryGetResults(ResultHandlerDictionary.Keys.ToList(), - (taskId, - byteResult) => - { - try - { - var result = ProtoSerializer.DeSerializeMessageObjectArray(byteResult); - ResultHandlerDictionary[taskId] - .HandleResponse(result?[0], - taskId); - } - catch (Exception e) - { - const ArmonikStatusCode statusCode = ArmonikStatusCode.Unknown; - - ServiceInvocationException ex; - - var ae = e as AggregateException; - - if (ae is not null && ae.InnerExceptions.Count > 1) - { - ex = new ServiceInvocationException(ae, - statusCode); - } - else if (ae is not null) - { - ex = new ServiceInvocationException(ae.InnerException, - statusCode); - } - else - { - ex = new ServiceInvocationException(e, - statusCode); - } - - ResultHandlerDictionary[taskId] - .HandleError(ex, - taskId); - } - finally - { - ResultHandlerDictionary.TryRemove(taskId, - out _); - } - }, - (taskId, - taskStatus, - ex) => - { - try - { - var statusCode = StatusCodesLookUp.Keys.Contains(taskStatus) - ? StatusCodesLookUp[taskStatus] - : ArmonikStatusCode.Unknown; - - ResultHandlerDictionary[taskId] - .HandleError(new ServiceInvocationException(ex, - statusCode), - taskId); - } - finally - { - ResultHandlerDictionary.TryRemove(taskId, - out _); - } - }); - } - else - { - Thread.Sleep(100); - } + Thread.Sleep(waitInSeconds[emptyFetch]); + emptyFetch = fetched == 0 + ? Math.Min(emptyFetch + 1, + waitInSeconds.Length) + : 0; } catch (Exception e) {