Skip to content

Commit

Permalink
Try get only first few results
Browse files Browse the repository at this point in the history
  • Loading branch information
lemaitre-aneo committed Oct 19, 2022
1 parent 3e2a53a commit 629a508
Showing 1 changed file with 92 additions and 96 deletions.
188 changes: 92 additions & 96 deletions UnifiedApi/Client/Services/Submitter/Service.cs
Original file line number Diff line number Diff line change
Expand Up @@ -290,120 +290,116 @@ private void ResultTask()

var fetched = 0;

foreach (var chunk in ResultHandlerDictionary.Keys.ToList()
.ToChunk(500))
var resultStatusCollection = SessionService.GetResultStatus(ResultHandlerDictionary.Keys.Take(500));

foreach (var resultStatusData in resultStatusCollection.IdsReady)
{
var resultStatusCollection = SessionService.GetResultStatus(chunk);
var taskId = resultStatusData.TaskId;
fetched += 1;

foreach (var resultStatusData in resultStatusCollection.IdsReady)
try
{
var taskId = resultStatusData.TaskId;
fetched += 1;
var byteResult = SessionService.TryGetResultAsync(new ResultRequest
{
ResultId = resultStatusData.ResultId,
Session = SessionId,
},
CancellationToken.None)
.Result;
Logger?.LogTrace("Response handler for {taskId}",
resultStatusData.TaskId);

var result = ProtoSerializer.DeSerializeMessageObjectArray(byteResult);
ResultHandlerDictionary[taskId]
.HandleResponse(result?[0],
taskId);
}
catch (Exception e)
{
Logger?.LogDebug(e,
"Response handler for {taskId} threw an exception, calling error handler",
resultStatusData.TaskId);

var ex = e;

if (e is AggregateException ae)
{
ex = ae.InnerExceptions.Count > 1
? ae
: ae.InnerException;
}

try
{
var byteResult = SessionService.TryGetResultAsync(new ResultRequest
{
ResultId = resultStatusData.ResultId,
Session = SessionId,
},
CancellationToken.None)
.Result;
Logger?.LogTrace("Response handler for {taskId}",
resultStatusData.TaskId);

var result = ProtoSerializer.DeSerializeMessageObjectArray(byteResult);
ResultHandlerDictionary[taskId]
.HandleResponse(result?[0],
taskId);
.HandleError(new ServiceInvocationException(ex,
ArmonikStatusCode.Unknown),
taskId);
}
catch (Exception e)
catch (Exception e2)
{
Logger?.LogDebug(e,
"Response handler for {taskId} threw an exception, calling error handler",
resultStatusData.TaskId);

var ex = e;

if (e is AggregateException ae)
{
ex = ae.InnerExceptions.Count > 1
? ae
: ae.InnerException;
}

try
{
ResultHandlerDictionary[taskId]
.HandleError(new ServiceInvocationException(ex,
ArmonikStatusCode.Unknown),
Logger?.LogError(e2,
"An exception was thrown while handling a previous exception in the result handler of {taskId}",
taskId);
}
catch (Exception e2)
{
Logger?.LogError(e2,
"An exception was thrown while handling a previous exception in the result handler of {taskId}",
taskId);
}
}
finally
{
ResultHandlerDictionary.TryRemove(taskId,
out _);
}
}

foreach (var resultStatusData in resultStatusCollection.IdsResultError)
finally
{
var taskId = resultStatusData.TaskId;
var details = "";
var taskStatus = TaskStatus.Unspecified;
fetched += 1;
ResultHandlerDictionary.TryRemove(taskId,
out _);
}
}

try
{
taskStatus = SessionService.GetTaskStatus(resultStatusData.TaskId);
if (!StatusCodesLookUp.TryGetValue(taskStatus,
out var statusCode))
{
statusCode = ArmonikStatusCode.Unknown;
}

switch (taskStatus)
{
case TaskStatus.Canceling:
case TaskStatus.Canceled:
details = $"Task {resultStatusData.TaskId} was canceled";
break;
default:
var outputInfo = SessionService.GetTaskOutputInfo(resultStatusData.TaskId);
details = outputInfo.TypeCase == Output.TypeOneofCase.Error
? outputInfo.Error.Details
: "Result is in status : " + resultStatusData.Status + ", look for task in error in logs.";
break;
}

Logger?.LogDebug("Error handler for {taskId}, {taskStatus}: {details}",
resultStatusData.TaskId,
taskStatus,
details);
ResultHandlerDictionary[resultStatusData.TaskId]
.HandleError(new ServiceInvocationException(details,
statusCode),
resultStatusData.TaskId);
}
catch (Exception e)
foreach (var resultStatusData in resultStatusCollection.IdsResultError)
{
var taskId = resultStatusData.TaskId;
var details = "";
var taskStatus = TaskStatus.Unspecified;
fetched += 1;

try
{
taskStatus = SessionService.GetTaskStatus(resultStatusData.TaskId);
if (!StatusCodesLookUp.TryGetValue(taskStatus,
out var statusCode))
{
Logger?.LogError(e,
"An error occured while handling a Task error {status}: {details}",
taskStatus,
details);
statusCode = ArmonikStatusCode.Unknown;
}
finally

switch (taskStatus)
{
ResultHandlerDictionary.TryRemove(taskId,
out _);
case TaskStatus.Canceling:
case TaskStatus.Canceled:
details = $"Task {resultStatusData.TaskId} was canceled";
break;
default:
var outputInfo = SessionService.GetTaskOutputInfo(resultStatusData.TaskId);
details = outputInfo.TypeCase == Output.TypeOneofCase.Error
? outputInfo.Error.Details
: "Result is in status : " + resultStatusData.Status + ", look for task in error in logs.";
break;
}

Logger?.LogDebug("Error handler for {taskId}, {taskStatus}: {details}",
resultStatusData.TaskId,
taskStatus,
details);
ResultHandlerDictionary[resultStatusData.TaskId]
.HandleError(new ServiceInvocationException(details,
statusCode),
resultStatusData.TaskId);
}
catch (Exception e)
{
Logger?.LogError(e,
"An error occured while handling a Task error {status}: {details}",
taskStatus,
details);
}
finally
{
ResultHandlerDictionary.TryRemove(taskId,
out _);
}
}

Expand Down

0 comments on commit 629a508

Please sign in to comment.