From fe73cb988c310ab8bb1c231c592920fc4cb0437a Mon Sep 17 00:00:00 2001 From: Florian Lemaitre Date: Fri, 23 Jun 2023 13:10:17 +0200 Subject: [PATCH 1/2] Acquire channel within retry loops --- .../Common/Submitter/BaseClientSubmitter.cs | 30 +++++++++---------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/Client/src/Common/Submitter/BaseClientSubmitter.cs b/Client/src/Common/Submitter/BaseClientSubmitter.cs index 2215f59e..4efa30f2 100644 --- a/Client/src/Common/Submitter/BaseClientSubmitter.cs +++ b/Client/src/Common/Submitter/BaseClientSubmitter.cs @@ -225,16 +225,16 @@ private IEnumerable ChunkSubmitTasksWithDependencies(IEnumerable taskIds) { using var _ = Logger?.LogFunction(); - using var channel = channelPool_.GetChannel(); - var submitterService = new Api.gRPC.V1.Submitter.Submitter.SubmitterClient(channel); - Retry.WhileException(5, 2000, retry => { + using var channel = channelPool_.GetChannel(); + var submitterService = new Api.gRPC.V1.Submitter.Submitter.SubmitterClient(channel); + if (retry > 1) { Logger?.LogWarning("Try {try} for {funcName}", @@ -457,13 +457,13 @@ public ResultStatusCollection GetResultStatus(IEnumerable taskIds, ResultStatus.Notfound)) : Array.Empty(); - using var channel = channelPool_.GetChannel(); - var submitterService = new Api.gRPC.V1.Submitter.Submitter.SubmitterClient(channel); - var idStatus = Retry.WhileException(5, 2000, retry => { + using var channel = channelPool_.GetChannel(); + var submitterService = new Api.gRPC.V1.Submitter.Submitter.SubmitterClient(channel); + Logger?.LogDebug("Try {try} for {funcName}", retry, nameof(submitterService.GetResultStatus)); @@ -584,13 +584,13 @@ public byte[] GetResult(string taskId, Session = SessionId.Id, }; - using var channel = channelPool_.GetChannel(); - var submitterService = new Api.gRPC.V1.Submitter.Submitter.SubmitterClient(channel); - Retry.WhileException(5, 2000, retry => { + using var channel = channelPool_.GetChannel(); + var submitterService = new Api.gRPC.V1.Submitter.Submitter.SubmitterClient(channel); + Logger?.LogDebug("Try {try} for {funcName}", retry, nameof(submitterService.WaitForAvailability)); From 6521f17c5e8079d362809c8554305ef0ce479359 Mon Sep 17 00:00:00 2001 From: lemaitre-aneo Date: Sun, 25 Jun 2023 20:10:02 +0200 Subject: [PATCH 2/2] Add a channel health check for the pool --- Client/src/Common/Submitter/ChannelPool.cs | 84 ++++++++++++++++++++-- 1 file changed, 78 insertions(+), 6 deletions(-) diff --git a/Client/src/Common/Submitter/ChannelPool.cs b/Client/src/Common/Submitter/ChannelPool.cs index 04055030..652ebf86 100644 --- a/Client/src/Common/Submitter/ChannelPool.cs +++ b/Client/src/Common/Submitter/ChannelPool.cs @@ -30,6 +30,9 @@ using JetBrains.Annotations; using Microsoft.Extensions.Logging; +#if NET5_0_OR_GREATER +using Grpc.Net.Client; +#endif namespace ArmoniK.DevelopmentKit.Client.Common.Submitter; @@ -66,9 +69,17 @@ private ChannelBase AcquireChannel() { if (pool_.TryTake(out var channel)) { - logger_?.LogDebug("Acquired already existing channel {channel} from pool", - channel); - return channel; + if (ShutdownOnFailure(channel)) + { + logger_?.LogDebug("Got an invalid channel {channel} from pool", + channel); + } + else + { + logger_?.LogDebug("Acquired already existing channel {channel} from pool", + channel); + return channel; + } } channel = channelFactory_(); @@ -83,9 +94,70 @@ private ChannelBase AcquireChannel() /// Channel to release private void ReleaseChannel(ChannelBase channel) { - logger_?.LogDebug("Released channel {channel} to pool", - channel); - pool_.Add(channel); + if (ShutdownOnFailure(channel)) + { + logger_?.LogDebug("Shutdown unhealthy channel {channel}", + channel); + } + else + { + logger_?.LogDebug("Released channel {channel} to pool", + channel); + pool_.Add(channel); + } + } + + /// + /// Check the state of a channel and shutdown it in case of failure + /// + /// Channel to check the state + /// True if the channel has been shut down + private static bool ShutdownOnFailure(ChannelBase channel) + { + try + { + switch (channel) + { + case Channel chan: + switch (chan.State) + { + case ChannelState.TransientFailure: + chan.ShutdownAsync() + .Wait(); + return true; + case ChannelState.Shutdown: + return true; + case ChannelState.Idle: + case ChannelState.Connecting: + case ChannelState.Ready: + default: + return false; + } +#if NET5_0_OR_GREATER + case GrpcChannel chan: + switch (chan.State) + { + case ConnectivityState.TransientFailure: + chan.ShutdownAsync() + .Wait(); + return true; + case ConnectivityState.Shutdown: + return true; + case ConnectivityState.Idle: + case ConnectivityState.Connecting: + case ConnectivityState.Ready: + default: + return false; + } +#endif + default: + return false; + } + } + catch (InvalidOperationException) + { + return false; + } } ///