Skip to content

Commit

Permalink
fix: use while instead of gRPC foreach
Browse files Browse the repository at this point in the history
  • Loading branch information
aneojgurhem committed Sep 25, 2023
1 parent 568fed4 commit 5abcce1
Showing 1 changed file with 35 additions and 41 deletions.
76 changes: 35 additions & 41 deletions packages/csharp/ArmoniK.Api.Client/EventsClientExt.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@
using ArmoniK.Api.gRPC.V1.Events;
using ArmoniK.Api.gRPC.V1.Results;

using Grpc.Core.Utils;

using JetBrains.Annotations;

namespace ArmoniK.Api.Client
Expand Down Expand Up @@ -102,48 +100,44 @@ public static async Task WaitForResultsAsync(this Events.EventsClient client,
});


await streamingCall.ResponseStream.ForEachAsync(resp =>
{
cancellationToken.ThrowIfCancellationRequested();
if (resp.UpdateCase == EventSubscriptionResponse.UpdateOneofCase.ResultStatusUpdate &&
resultsNotFound.ContainsKey(resp.ResultStatusUpdate.ResultId))
{
if (resp.ResultStatusUpdate.Status == ResultStatus.Completed)
{
resultsNotFound.Remove(resp.ResultStatusUpdate.ResultId);
if (!resultsNotFound.Any())
{
return Task.CompletedTask;
}
}

if (resp.ResultStatusUpdate.Status == ResultStatus.Aborted)
{
throw new Exception($"Result {resp.ResultStatusUpdate.ResultId} has been aborted");
}
}
while (await streamingCall.ResponseStream.MoveNext(cancellationToken))
{
cancellationToken.ThrowIfCancellationRequested();
var resp = streamingCall.ResponseStream.Current;
if (resp.UpdateCase == EventSubscriptionResponse.UpdateOneofCase.ResultStatusUpdate && resultsNotFound.ContainsKey(resp.ResultStatusUpdate.ResultId))
{
if (resp.ResultStatusUpdate.Status == ResultStatus.Completed)
{
resultsNotFound.Remove(resp.ResultStatusUpdate.ResultId);
if (!resultsNotFound.Any())
{
break;
}
}

if (resp.UpdateCase == EventSubscriptionResponse.UpdateOneofCase.NewResult &&
resultsNotFound.ContainsKey(resp.NewResult.ResultId))
{
if (resp.NewResult.Status == ResultStatus.Completed)
{
resultsNotFound.Remove(resp.NewResult.ResultId);
if (!resultsNotFound.Any())
{
return Task.CompletedTask;
}
}
if (resp.ResultStatusUpdate.Status == ResultStatus.Aborted)
{
throw new Exception($"Result {resp.ResultStatusUpdate.ResultId} has been aborted");
}
}

if (resp.NewResult.Status == ResultStatus.Aborted)
{
throw new Exception($"Result {resp.NewResult.ResultId} has been aborted");
}
}
if (resp.UpdateCase == EventSubscriptionResponse.UpdateOneofCase.NewResult && resultsNotFound.ContainsKey(resp.NewResult.ResultId))
{
if (resp.NewResult.Status == ResultStatus.Completed)
{
resultsNotFound.Remove(resp.NewResult.ResultId);
if (!resultsNotFound.Any())
{
break;
}
}

return Task.CompletedTask;
})
.ConfigureAwait(false);
if (resp.NewResult.Status == ResultStatus.Aborted)
{
throw new Exception($"Result {resp.NewResult.ResultId} has been aborted");
}
}
}
}
}
}

0 comments on commit 5abcce1

Please sign in to comment.