From 568fed48453da12cfb6fe7c090614ee988952d9b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A9r=C3=B4me=20Gurhem?= Date: Mon, 25 Sep 2023 08:54:25 +0200 Subject: [PATCH 1/3] feat: Events API client extension method to wait for results completion --- .../ArmoniK.Api.Client/EventsClientExt.cs | 149 ++++++++++++++++++ 1 file changed, 149 insertions(+) create mode 100644 packages/csharp/ArmoniK.Api.Client/EventsClientExt.cs diff --git a/packages/csharp/ArmoniK.Api.Client/EventsClientExt.cs b/packages/csharp/ArmoniK.Api.Client/EventsClientExt.cs new file mode 100644 index 000000000..2ce9ec003 --- /dev/null +++ b/packages/csharp/ArmoniK.Api.Client/EventsClientExt.cs @@ -0,0 +1,149 @@ +// This file is part of the ArmoniK project +// +// Copyright (C) ANEO, 2021-2023. All rights reserved. +// W. Kirschenmann +// J. Gurhem +// D. Dubuc +// L. Ziane Khodja +// F. Lemaitre +// S. Djebbar +// J. Fonseca +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published +// by the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY, without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; + +using ArmoniK.Api.gRPC.V1; +using ArmoniK.Api.gRPC.V1.Events; +using ArmoniK.Api.gRPC.V1.Results; + +using Grpc.Core.Utils; + +using JetBrains.Annotations; + +namespace ArmoniK.Api.Client +{ + /// + /// extensions methods + /// + [PublicAPI] + public static class EventsClientExt + { + private static FiltersAnd ResultsFilter(string resultId) + => new() + { + And = + { + new FilterField + { + Field = new ResultField + { + ResultRawField = new ResultRawField + { + Field = ResultRawEnumField.ResultId, + }, + }, + FilterString = new FilterString + { + Operator = FilterStringOperator.Equal, + Value = resultId, + }, + }, + }, + }; + + /// + /// Wait until the given results are completed + /// + /// gRPC result client + /// The session ID in which the results are located + /// A collection of results to wait for + /// Token used to cancel the execution of the method + /// if a result is aborted + [PublicAPI] + public static async Task WaitForResultsAsync(this Events.EventsClient client, + string sessionId, + ICollection resultIds, + CancellationToken cancellationToken) + { + var resultsNotFound = resultIds.ToDictionary(id => id, + _ => true); + + using var streamingCall = client.GetEvents(new EventSubscriptionRequest + { + SessionId = sessionId, + ReturnedEvents = + { + EventsEnum.ResultStatusUpdate, + EventsEnum.NewResult, + }, + ResultsFilters = new Filters + { + Or = + { + resultIds.Select(ResultsFilter), + }, + }, + }); + + + await streamingCall.ResponseStream.ForEachAsync(resp => + { + cancellationToken.ThrowIfCancellationRequested(); + if (resp.UpdateCase == EventSubscriptionResponse.UpdateOneofCase.ResultStatusUpdate && + resultsNotFound.ContainsKey(resp.ResultStatusUpdate.ResultId)) + { + if (resp.ResultStatusUpdate.Status == ResultStatus.Completed) + { + resultsNotFound.Remove(resp.ResultStatusUpdate.ResultId); + if (!resultsNotFound.Any()) + { + return Task.CompletedTask; + } + } + + if (resp.ResultStatusUpdate.Status == ResultStatus.Aborted) + { + throw new Exception($"Result {resp.ResultStatusUpdate.ResultId} has been aborted"); + } + } + + if (resp.UpdateCase == EventSubscriptionResponse.UpdateOneofCase.NewResult && + resultsNotFound.ContainsKey(resp.NewResult.ResultId)) + { + if (resp.NewResult.Status == ResultStatus.Completed) + { + resultsNotFound.Remove(resp.NewResult.ResultId); + if (!resultsNotFound.Any()) + { + return Task.CompletedTask; + } + } + + if (resp.NewResult.Status == ResultStatus.Aborted) + { + throw new Exception($"Result {resp.NewResult.ResultId} has been aborted"); + } + } + + return Task.CompletedTask; + }) + .ConfigureAwait(false); + } + } +} From 5abcce11a173f3ee816b203f8ac64ecd28f02f57 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A9r=C3=B4me=20Gurhem?= Date: Mon, 25 Sep 2023 11:28:54 +0200 Subject: [PATCH 2/3] fix: use while instead of gRPC foreach --- .../ArmoniK.Api.Client/EventsClientExt.cs | 76 +++++++++---------- 1 file changed, 35 insertions(+), 41 deletions(-) diff --git a/packages/csharp/ArmoniK.Api.Client/EventsClientExt.cs b/packages/csharp/ArmoniK.Api.Client/EventsClientExt.cs index 2ce9ec003..021346442 100644 --- a/packages/csharp/ArmoniK.Api.Client/EventsClientExt.cs +++ b/packages/csharp/ArmoniK.Api.Client/EventsClientExt.cs @@ -32,8 +32,6 @@ using ArmoniK.Api.gRPC.V1.Events; using ArmoniK.Api.gRPC.V1.Results; -using Grpc.Core.Utils; - using JetBrains.Annotations; namespace ArmoniK.Api.Client @@ -102,48 +100,44 @@ public static async Task WaitForResultsAsync(this Events.EventsClient client, }); - await streamingCall.ResponseStream.ForEachAsync(resp => - { - cancellationToken.ThrowIfCancellationRequested(); - if (resp.UpdateCase == EventSubscriptionResponse.UpdateOneofCase.ResultStatusUpdate && - resultsNotFound.ContainsKey(resp.ResultStatusUpdate.ResultId)) - { - if (resp.ResultStatusUpdate.Status == ResultStatus.Completed) - { - resultsNotFound.Remove(resp.ResultStatusUpdate.ResultId); - if (!resultsNotFound.Any()) - { - return Task.CompletedTask; - } - } - - if (resp.ResultStatusUpdate.Status == ResultStatus.Aborted) - { - throw new Exception($"Result {resp.ResultStatusUpdate.ResultId} has been aborted"); - } - } + while (await streamingCall.ResponseStream.MoveNext(cancellationToken)) + { + cancellationToken.ThrowIfCancellationRequested(); + var resp = streamingCall.ResponseStream.Current; + if (resp.UpdateCase == EventSubscriptionResponse.UpdateOneofCase.ResultStatusUpdate && resultsNotFound.ContainsKey(resp.ResultStatusUpdate.ResultId)) + { + if (resp.ResultStatusUpdate.Status == ResultStatus.Completed) + { + resultsNotFound.Remove(resp.ResultStatusUpdate.ResultId); + if (!resultsNotFound.Any()) + { + break; + } + } - if (resp.UpdateCase == EventSubscriptionResponse.UpdateOneofCase.NewResult && - resultsNotFound.ContainsKey(resp.NewResult.ResultId)) - { - if (resp.NewResult.Status == ResultStatus.Completed) - { - resultsNotFound.Remove(resp.NewResult.ResultId); - if (!resultsNotFound.Any()) - { - return Task.CompletedTask; - } - } + if (resp.ResultStatusUpdate.Status == ResultStatus.Aborted) + { + throw new Exception($"Result {resp.ResultStatusUpdate.ResultId} has been aborted"); + } + } - if (resp.NewResult.Status == ResultStatus.Aborted) - { - throw new Exception($"Result {resp.NewResult.ResultId} has been aborted"); - } - } + if (resp.UpdateCase == EventSubscriptionResponse.UpdateOneofCase.NewResult && resultsNotFound.ContainsKey(resp.NewResult.ResultId)) + { + if (resp.NewResult.Status == ResultStatus.Completed) + { + resultsNotFound.Remove(resp.NewResult.ResultId); + if (!resultsNotFound.Any()) + { + break; + } + } - return Task.CompletedTask; - }) - .ConfigureAwait(false); + if (resp.NewResult.Status == ResultStatus.Aborted) + { + throw new Exception($"Result {resp.NewResult.ResultId} has been aborted"); + } + } + } } } } From 0b9e044a9f360dde37de0cec76165b53caf466d7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A9r=C3=B4me=20Gurhem?= Date: Mon, 25 Sep 2023 12:01:32 +0200 Subject: [PATCH 3/3] refactor: use HashSet instead of Dictionary --- packages/csharp/ArmoniK.Api.Client/EventsClientExt.cs | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/packages/csharp/ArmoniK.Api.Client/EventsClientExt.cs b/packages/csharp/ArmoniK.Api.Client/EventsClientExt.cs index 021346442..e296a92da 100644 --- a/packages/csharp/ArmoniK.Api.Client/EventsClientExt.cs +++ b/packages/csharp/ArmoniK.Api.Client/EventsClientExt.cs @@ -79,8 +79,7 @@ public static async Task WaitForResultsAsync(this Events.EventsClient client, ICollection resultIds, CancellationToken cancellationToken) { - var resultsNotFound = resultIds.ToDictionary(id => id, - _ => true); + var resultsNotFound = new HashSet(resultIds); using var streamingCall = client.GetEvents(new EventSubscriptionRequest { @@ -99,12 +98,11 @@ public static async Task WaitForResultsAsync(this Events.EventsClient client, }, }); - while (await streamingCall.ResponseStream.MoveNext(cancellationToken)) { cancellationToken.ThrowIfCancellationRequested(); var resp = streamingCall.ResponseStream.Current; - if (resp.UpdateCase == EventSubscriptionResponse.UpdateOneofCase.ResultStatusUpdate && resultsNotFound.ContainsKey(resp.ResultStatusUpdate.ResultId)) + if (resp.UpdateCase == EventSubscriptionResponse.UpdateOneofCase.ResultStatusUpdate && resultsNotFound.Contains(resp.ResultStatusUpdate.ResultId)) { if (resp.ResultStatusUpdate.Status == ResultStatus.Completed) { @@ -121,7 +119,7 @@ public static async Task WaitForResultsAsync(this Events.EventsClient client, } } - if (resp.UpdateCase == EventSubscriptionResponse.UpdateOneofCase.NewResult && resultsNotFound.ContainsKey(resp.NewResult.ResultId)) + if (resp.UpdateCase == EventSubscriptionResponse.UpdateOneofCase.NewResult && resultsNotFound.Contains(resp.NewResult.ResultId)) { if (resp.NewResult.Status == ResultStatus.Completed) {