diff --git a/packages/csharp/ArmoniK.Api.Client/EventsClientExt.cs b/packages/csharp/ArmoniK.Api.Client/EventsClientExt.cs index 2ce9ec003..021346442 100644 --- a/packages/csharp/ArmoniK.Api.Client/EventsClientExt.cs +++ b/packages/csharp/ArmoniK.Api.Client/EventsClientExt.cs @@ -32,8 +32,6 @@ using ArmoniK.Api.gRPC.V1.Events; using ArmoniK.Api.gRPC.V1.Results; -using Grpc.Core.Utils; - using JetBrains.Annotations; namespace ArmoniK.Api.Client @@ -102,48 +100,44 @@ public static async Task WaitForResultsAsync(this Events.EventsClient client, }); - await streamingCall.ResponseStream.ForEachAsync(resp => - { - cancellationToken.ThrowIfCancellationRequested(); - if (resp.UpdateCase == EventSubscriptionResponse.UpdateOneofCase.ResultStatusUpdate && - resultsNotFound.ContainsKey(resp.ResultStatusUpdate.ResultId)) - { - if (resp.ResultStatusUpdate.Status == ResultStatus.Completed) - { - resultsNotFound.Remove(resp.ResultStatusUpdate.ResultId); - if (!resultsNotFound.Any()) - { - return Task.CompletedTask; - } - } - - if (resp.ResultStatusUpdate.Status == ResultStatus.Aborted) - { - throw new Exception($"Result {resp.ResultStatusUpdate.ResultId} has been aborted"); - } - } + while (await streamingCall.ResponseStream.MoveNext(cancellationToken)) + { + cancellationToken.ThrowIfCancellationRequested(); + var resp = streamingCall.ResponseStream.Current; + if (resp.UpdateCase == EventSubscriptionResponse.UpdateOneofCase.ResultStatusUpdate && resultsNotFound.ContainsKey(resp.ResultStatusUpdate.ResultId)) + { + if (resp.ResultStatusUpdate.Status == ResultStatus.Completed) + { + resultsNotFound.Remove(resp.ResultStatusUpdate.ResultId); + if (!resultsNotFound.Any()) + { + break; + } + } - if (resp.UpdateCase == EventSubscriptionResponse.UpdateOneofCase.NewResult && - resultsNotFound.ContainsKey(resp.NewResult.ResultId)) - { - if (resp.NewResult.Status == ResultStatus.Completed) - { - resultsNotFound.Remove(resp.NewResult.ResultId); - if (!resultsNotFound.Any()) - { - return Task.CompletedTask; - } - } + if (resp.ResultStatusUpdate.Status == ResultStatus.Aborted) + { + throw new Exception($"Result {resp.ResultStatusUpdate.ResultId} has been aborted"); + } + } - if (resp.NewResult.Status == ResultStatus.Aborted) - { - throw new Exception($"Result {resp.NewResult.ResultId} has been aborted"); - } - } + if (resp.UpdateCase == EventSubscriptionResponse.UpdateOneofCase.NewResult && resultsNotFound.ContainsKey(resp.NewResult.ResultId)) + { + if (resp.NewResult.Status == ResultStatus.Completed) + { + resultsNotFound.Remove(resp.NewResult.ResultId); + if (!resultsNotFound.Any()) + { + break; + } + } - return Task.CompletedTask; - }) - .ConfigureAwait(false); + if (resp.NewResult.Status == ResultStatus.Aborted) + { + throw new Exception($"Result {resp.NewResult.ResultId} has been aborted"); + } + } + } } } }