Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: backport channel fixes #221

Merged
merged 3 commits into from
Oct 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 15 additions & 15 deletions Client/src/Common/Submitter/BaseClientSubmitter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -225,16 +225,16 @@ private IEnumerable<string> ChunkSubmitTasksWithDependencies(IEnumerable<Tuple<s
{
using var _ = Logger?.LogFunction();

using var channel = channelPool_.GetChannel();
var submitterService = new Api.gRPC.V1.Submitter.Submitter.SubmitterClient(channel);

var serviceConfiguration = submitterService.GetServiceConfigurationAsync(new Empty())
.ResponseAsync.Result;

for (var nbRetry = 0; nbRetry < maxRetries; nbRetry++)
{
try
{
using var channel = channelPool_.GetChannel();
var submitterService = new Api.gRPC.V1.Submitter.Submitter.SubmitterClient(channel);

var serviceConfiguration = submitterService.GetServiceConfigurationAsync(new Empty())
.ResponseAsync.Result;

using var asyncClientStreamingCall = submitterService.CreateLargeTasks();

asyncClientStreamingCall.RequestStream.WriteAsync(new CreateLargeTaskRequest
Expand Down Expand Up @@ -399,13 +399,13 @@ public void WaitForTasksCompletion(IEnumerable<string> taskIds)
{
using var _ = Logger?.LogFunction();

using var channel = channelPool_.GetChannel();
var submitterService = new Api.gRPC.V1.Submitter.Submitter.SubmitterClient(channel);

Retry.WhileException(5,
2000,
retry =>
{
using var channel = channelPool_.GetChannel();
var submitterService = new Api.gRPC.V1.Submitter.Submitter.SubmitterClient(channel);

if (retry > 1)
{
Logger?.LogWarning("Try {try} for {funcName}",
Expand Down Expand Up @@ -457,13 +457,13 @@ public ResultStatusCollection GetResultStatus(IEnumerable<string> taskIds,
ResultStatus.Notfound))
: Array.Empty<ResultStatusData>();

using var channel = channelPool_.GetChannel();
var submitterService = new Api.gRPC.V1.Submitter.Submitter.SubmitterClient(channel);

var idStatus = Retry.WhileException(5,
2000,
retry =>
{
using var channel = channelPool_.GetChannel();
var submitterService = new Api.gRPC.V1.Submitter.Submitter.SubmitterClient(channel);

Logger?.LogDebug("Try {try} for {funcName}",
retry,
nameof(submitterService.GetResultStatus));
Expand Down Expand Up @@ -584,13 +584,13 @@ public byte[] GetResult(string taskId,
Session = SessionId.Id,
};

using var channel = channelPool_.GetChannel();
var submitterService = new Api.gRPC.V1.Submitter.Submitter.SubmitterClient(channel);

Retry.WhileException(5,
2000,
retry =>
{
using var channel = channelPool_.GetChannel();
var submitterService = new Api.gRPC.V1.Submitter.Submitter.SubmitterClient(channel);

Logger?.LogDebug("Try {try} for {funcName}",
retry,
nameof(submitterService.WaitForAvailability));
Expand Down
84 changes: 78 additions & 6 deletions Client/src/Common/Submitter/ChannelPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@
using JetBrains.Annotations;

using Microsoft.Extensions.Logging;
#if NET5_0_OR_GREATER
using Grpc.Net.Client;
#endif

namespace ArmoniK.DevelopmentKit.Client.Common.Submitter;

Expand Down Expand Up @@ -66,9 +69,17 @@ private ChannelBase AcquireChannel()
{
if (pool_.TryTake(out var channel))
{
logger_?.LogDebug("Acquired already existing channel {channel} from pool",
channel);
return channel;
if (ShutdownOnFailure(channel))
{
logger_?.LogDebug("Got an invalid channel {channel} from pool",
channel);
}
else
{
logger_?.LogDebug("Acquired already existing channel {channel} from pool",
channel);
return channel;
}
}

channel = channelFactory_();
Expand All @@ -83,9 +94,70 @@ private ChannelBase AcquireChannel()
/// <param name="channel">Channel to release</param>
private void ReleaseChannel(ChannelBase channel)
{
logger_?.LogDebug("Released channel {channel} to pool",
channel);
pool_.Add(channel);
if (ShutdownOnFailure(channel))
{
logger_?.LogDebug("Shutdown unhealthy channel {channel}",
channel);
}
else
{
logger_?.LogDebug("Released channel {channel} to pool",
channel);
pool_.Add(channel);
}
}

/// <summary>
/// Check the state of a channel and shutdown it in case of failure
/// </summary>
/// <param name="channel">Channel to check the state</param>
/// <returns>True if the channel has been shut down</returns>
private static bool ShutdownOnFailure(ChannelBase channel)
{
try
{
switch (channel)
{
case Channel chan:
switch (chan.State)
{
case ChannelState.TransientFailure:
chan.ShutdownAsync()
.Wait();
return true;
case ChannelState.Shutdown:
return true;
case ChannelState.Idle:
case ChannelState.Connecting:
case ChannelState.Ready:
default:
return false;
}
#if NET5_0_OR_GREATER
case GrpcChannel chan:
switch (chan.State)
{
case ConnectivityState.TransientFailure:
chan.ShutdownAsync()
.Wait();
return true;
case ConnectivityState.Shutdown:
return true;
case ConnectivityState.Idle:
case ConnectivityState.Connecting:
case ConnectivityState.Ready:
default:
return false;
}
#endif
default:
return false;
}
}
catch (InvalidOperationException)
{
return false;
}
}

/// <summary>
Expand Down
Loading