Skip to content

Commit

Permalink
feat: Events API client extension method to wait for results completi…
Browse files Browse the repository at this point in the history
…on (#417)
  • Loading branch information
aneojgurhem authored Sep 25, 2023
2 parents c8c4c5c + 0b9e044 commit 0ae5684
Showing 1 changed file with 141 additions and 0 deletions.
141 changes: 141 additions & 0 deletions packages/csharp/ArmoniK.Api.Client/EventsClientExt.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
// This file is part of the ArmoniK project
//
// Copyright (C) ANEO, 2021-2023. All rights reserved.
// W. Kirschenmann <[email protected]>
// J. Gurhem <[email protected]>
// D. Dubuc <[email protected]>
// L. Ziane Khodja <[email protected]>
// F. Lemaitre <[email protected]>
// S. Djebbar <[email protected]>
// J. Fonseca <[email protected]>
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published
// by the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY, without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

using ArmoniK.Api.gRPC.V1;
using ArmoniK.Api.gRPC.V1.Events;
using ArmoniK.Api.gRPC.V1.Results;

using JetBrains.Annotations;

namespace ArmoniK.Api.Client
{
/// <summary>
/// <see cref="Events.EventsClient" /> extensions methods
/// </summary>
[PublicAPI]
public static class EventsClientExt
{
private static FiltersAnd ResultsFilter(string resultId)
=> new()
{
And =
{
new FilterField
{
Field = new ResultField
{
ResultRawField = new ResultRawField
{
Field = ResultRawEnumField.ResultId,
},
},
FilterString = new FilterString
{
Operator = FilterStringOperator.Equal,
Value = resultId,
},
},
},
};

/// <summary>
/// Wait until the given results are completed
/// </summary>
/// <param name="client">gRPC result client</param>
/// <param name="sessionId">The session ID in which the results are located</param>
/// <param name="resultIds">A collection of results to wait for</param>
/// <param name="cancellationToken">Token used to cancel the execution of the method</param>
/// <exception cref="Exception">if a result is aborted</exception>
[PublicAPI]
public static async Task WaitForResultsAsync(this Events.EventsClient client,
string sessionId,
ICollection<string> resultIds,
CancellationToken cancellationToken)
{
var resultsNotFound = new HashSet<string>(resultIds);

using var streamingCall = client.GetEvents(new EventSubscriptionRequest
{
SessionId = sessionId,
ReturnedEvents =
{
EventsEnum.ResultStatusUpdate,
EventsEnum.NewResult,
},
ResultsFilters = new Filters
{
Or =
{
resultIds.Select(ResultsFilter),
},
},
});

while (await streamingCall.ResponseStream.MoveNext(cancellationToken))
{
cancellationToken.ThrowIfCancellationRequested();
var resp = streamingCall.ResponseStream.Current;
if (resp.UpdateCase == EventSubscriptionResponse.UpdateOneofCase.ResultStatusUpdate && resultsNotFound.Contains(resp.ResultStatusUpdate.ResultId))
{
if (resp.ResultStatusUpdate.Status == ResultStatus.Completed)
{
resultsNotFound.Remove(resp.ResultStatusUpdate.ResultId);
if (!resultsNotFound.Any())
{
break;
}
}

if (resp.ResultStatusUpdate.Status == ResultStatus.Aborted)
{
throw new Exception($"Result {resp.ResultStatusUpdate.ResultId} has been aborted");
}
}

if (resp.UpdateCase == EventSubscriptionResponse.UpdateOneofCase.NewResult && resultsNotFound.Contains(resp.NewResult.ResultId))
{
if (resp.NewResult.Status == ResultStatus.Completed)
{
resultsNotFound.Remove(resp.NewResult.ResultId);
if (!resultsNotFound.Any())
{
break;
}
}

if (resp.NewResult.Status == ResultStatus.Aborted)
{
throw new Exception($"Result {resp.NewResult.ResultId} has been aborted");
}
}
}
}
}
}

0 comments on commit 0ae5684

Please sign in to comment.