Skip to content

Commit

Permalink
fix: backport channel fixes (#221)
Browse files Browse the repository at this point in the history
  • Loading branch information
ngruelaneo authored Oct 16, 2023
2 parents c460067 + 02d6232 commit 46cc815
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 21 deletions.
30 changes: 15 additions & 15 deletions Client/src/Common/Submitter/BaseClientSubmitter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -225,16 +225,16 @@ private IEnumerable<string> ChunkSubmitTasksWithDependencies(IEnumerable<Tuple<s
{
using var _ = Logger?.LogFunction();

using var channel = channelPool_.GetChannel();
var submitterService = new Api.gRPC.V1.Submitter.Submitter.SubmitterClient(channel);

var serviceConfiguration = submitterService.GetServiceConfigurationAsync(new Empty())
.ResponseAsync.Result;

for (var nbRetry = 0; nbRetry < maxRetries; nbRetry++)
{
try
{
using var channel = channelPool_.GetChannel();
var submitterService = new Api.gRPC.V1.Submitter.Submitter.SubmitterClient(channel);

var serviceConfiguration = submitterService.GetServiceConfigurationAsync(new Empty())
.ResponseAsync.Result;

using var asyncClientStreamingCall = submitterService.CreateLargeTasks();

asyncClientStreamingCall.RequestStream.WriteAsync(new CreateLargeTaskRequest
Expand Down Expand Up @@ -399,13 +399,13 @@ public void WaitForTasksCompletion(IEnumerable<string> taskIds)
{
using var _ = Logger?.LogFunction();

using var channel = channelPool_.GetChannel();
var submitterService = new Api.gRPC.V1.Submitter.Submitter.SubmitterClient(channel);

Retry.WhileException(5,
2000,
retry =>
{
using var channel = channelPool_.GetChannel();
var submitterService = new Api.gRPC.V1.Submitter.Submitter.SubmitterClient(channel);

if (retry > 1)
{
Logger?.LogWarning("Try {try} for {funcName}",
Expand Down Expand Up @@ -457,13 +457,13 @@ public ResultStatusCollection GetResultStatus(IEnumerable<string> taskIds,
ResultStatus.Notfound))
: Array.Empty<ResultStatusData>();

using var channel = channelPool_.GetChannel();
var submitterService = new Api.gRPC.V1.Submitter.Submitter.SubmitterClient(channel);

var idStatus = Retry.WhileException(5,
2000,
retry =>
{
using var channel = channelPool_.GetChannel();
var submitterService = new Api.gRPC.V1.Submitter.Submitter.SubmitterClient(channel);

Logger?.LogDebug("Try {try} for {funcName}",
retry,
nameof(submitterService.GetResultStatus));
Expand Down Expand Up @@ -584,13 +584,13 @@ public byte[] GetResult(string taskId,
Session = SessionId.Id,
};

using var channel = channelPool_.GetChannel();
var submitterService = new Api.gRPC.V1.Submitter.Submitter.SubmitterClient(channel);

Retry.WhileException(5,
2000,
retry =>
{
using var channel = channelPool_.GetChannel();
var submitterService = new Api.gRPC.V1.Submitter.Submitter.SubmitterClient(channel);

Logger?.LogDebug("Try {try} for {funcName}",
retry,
nameof(submitterService.WaitForAvailability));
Expand Down
84 changes: 78 additions & 6 deletions Client/src/Common/Submitter/ChannelPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@
using JetBrains.Annotations;

using Microsoft.Extensions.Logging;
#if NET5_0_OR_GREATER
using Grpc.Net.Client;
#endif

namespace ArmoniK.DevelopmentKit.Client.Common.Submitter;

Expand Down Expand Up @@ -66,9 +69,17 @@ private ChannelBase AcquireChannel()
{
if (pool_.TryTake(out var channel))
{
logger_?.LogDebug("Acquired already existing channel {channel} from pool",
channel);
return channel;
if (ShutdownOnFailure(channel))
{
logger_?.LogDebug("Got an invalid channel {channel} from pool",
channel);
}
else
{
logger_?.LogDebug("Acquired already existing channel {channel} from pool",
channel);
return channel;
}
}

channel = channelFactory_();
Expand All @@ -83,9 +94,70 @@ private ChannelBase AcquireChannel()
/// <param name="channel">Channel to release</param>
private void ReleaseChannel(ChannelBase channel)
{
logger_?.LogDebug("Released channel {channel} to pool",
channel);
pool_.Add(channel);
if (ShutdownOnFailure(channel))
{
logger_?.LogDebug("Shutdown unhealthy channel {channel}",
channel);
}
else
{
logger_?.LogDebug("Released channel {channel} to pool",
channel);
pool_.Add(channel);
}
}

/// <summary>
/// Check the state of a channel and shutdown it in case of failure
/// </summary>
/// <param name="channel">Channel to check the state</param>
/// <returns>True if the channel has been shut down</returns>
private static bool ShutdownOnFailure(ChannelBase channel)
{
try
{
switch (channel)
{
case Channel chan:
switch (chan.State)
{
case ChannelState.TransientFailure:
chan.ShutdownAsync()
.Wait();
return true;
case ChannelState.Shutdown:
return true;
case ChannelState.Idle:
case ChannelState.Connecting:
case ChannelState.Ready:
default:
return false;
}
#if NET5_0_OR_GREATER
case GrpcChannel chan:
switch (chan.State)
{
case ConnectivityState.TransientFailure:
chan.ShutdownAsync()
.Wait();
return true;
case ConnectivityState.Shutdown:
return true;
case ConnectivityState.Idle:
case ConnectivityState.Connecting:
case ConnectivityState.Ready:
default:
return false;
}
#endif
default:
return false;
}
}
catch (InvalidOperationException)
{
return false;
}
}

/// <summary>
Expand Down

0 comments on commit 46cc815

Please sign in to comment.