Skip to content

Commit

Permalink
Make sure corresponding Connection and Channel token properly float i…
Browse files Browse the repository at this point in the history
…nto handlers
  • Loading branch information
danielmarbach committed Dec 8, 2024
1 parent e4e05a0 commit 0413438
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 10 deletions.
6 changes: 3 additions & 3 deletions projects/RabbitMQ.Client/Impl/Channel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ protected void TakeOver(Channel other)
public Task CloseAsync(ushort replyCode, string replyText, bool abort,
CancellationToken cancellationToken)
{
var args = new ShutdownEventArgs(ShutdownInitiator.Application, replyCode, replyText);
var args = new ShutdownEventArgs(ShutdownInitiator.Application, replyCode, replyText, cancellationToken: cancellationToken);
return CloseAsync(args, abort, cancellationToken);
}

Expand Down Expand Up @@ -725,7 +725,7 @@ await Session.Connection.HandleConnectionBlockedAsync(reason, cancellationToken)
protected async Task<bool> HandleConnectionCloseAsync(IncomingCommand cmd, CancellationToken cancellationToken)
{
var method = new ConnectionClose(cmd.MethodSpan);
var reason = new ShutdownEventArgs(ShutdownInitiator.Peer, method._replyCode, method._replyText, method._classId, method._methodId);
var reason = new ShutdownEventArgs(ShutdownInitiator.Peer, method._replyCode, method._replyText, method._classId, method._methodId, cancellationToken: cancellationToken);
try
{
await Session.Connection.ClosedViaPeerAsync(reason, cancellationToken)
Expand Down Expand Up @@ -763,7 +763,7 @@ protected async Task<bool> HandleConnectionStartAsync(IncomingCommand cmd, Cance
{
if (m_connectionStartCell is null)
{
var reason = new ShutdownEventArgs(ShutdownInitiator.Library, Constants.CommandInvalid, "Unexpected Connection.Start");
var reason = new ShutdownEventArgs(ShutdownInitiator.Library, Constants.CommandInvalid, "Unexpected Connection.Start", cancellationToken: cancellationToken);
await Session.Connection.CloseAsync(reason, false,
InternalConstants.DefaultConnectionCloseTimeout,
cancellationToken)
Expand Down
15 changes: 10 additions & 5 deletions projects/RabbitMQ.Client/Impl/Connection.Receive.cs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ await ReceiveLoopAsync(mainLoopToken)
var ea = new ShutdownEventArgs(ShutdownInitiator.Library,
Constants.InternalError,
"Thread aborted (AppDomain unloaded?)",
exception: taex);
exception: taex,
cancellationToken: mainLoopToken);
await HandleMainLoopExceptionAsync(ea)
.ConfigureAwait(false);
}
Expand All @@ -73,7 +74,8 @@ await HandleMainLoopExceptionAsync(ea)
var ea = new ShutdownEventArgs(ShutdownInitiator.Library,
0,
"End of stream",
exception: eose);
exception: eose,
cancellationToken: mainLoopToken);
await HandleMainLoopExceptionAsync(ea)
.ConfigureAwait(false);
}
Expand All @@ -91,7 +93,8 @@ await HardProtocolExceptionHandlerAsync(hpe, mainLoopToken)
var ea = new ShutdownEventArgs(ShutdownInitiator.Library,
Constants.InternalError,
fileLoadException.Message,
exception: fileLoadException);
exception: fileLoadException,
cancellationToken: mainLoopToken);
await HandleMainLoopExceptionAsync(ea)
.ConfigureAwait(false);
}
Expand All @@ -106,7 +109,8 @@ await HandleMainLoopExceptionAsync(ea)
var ea = new ShutdownEventArgs(ShutdownInitiator.Library,
Constants.InternalError,
ocex.Message,
exception: ocex);
exception: ocex,
cancellationToken: mainLoopToken);
await HandleMainLoopExceptionAsync(ea)
.ConfigureAwait(false);
}
Expand All @@ -116,7 +120,8 @@ await HandleMainLoopExceptionAsync(ea)
var ea = new ShutdownEventArgs(ShutdownInitiator.Library,
Constants.InternalError,
ex.Message,
exception: ex);
exception: ex,
cancellationToken: mainLoopToken);
await HandleMainLoopExceptionAsync(ea)
.ConfigureAwait(false);
}
Expand Down
4 changes: 2 additions & 2 deletions projects/RabbitMQ.Client/Impl/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ await _channel0.ConnectionOpenAsync(_config.VirtualHost, cancellationToken)
{
try
{
var ea = new ShutdownEventArgs(ShutdownInitiator.Library, Constants.InternalError, "FailedOpen");
var ea = new ShutdownEventArgs(ShutdownInitiator.Library, Constants.InternalError, "FailedOpen", cancellationToken: cancellationToken);
await CloseAsync(ea, true,
InternalConstants.DefaultConnectionAbortTimeout,
cancellationToken).ConfigureAwait(false);
Expand Down Expand Up @@ -297,7 +297,7 @@ internal void EnsureIsOpen()
public Task CloseAsync(ushort reasonCode, string reasonText, TimeSpan timeout, bool abort,
CancellationToken cancellationToken = default)
{
var reason = new ShutdownEventArgs(ShutdownInitiator.Application, reasonCode, reasonText);
var reason = new ShutdownEventArgs(ShutdownInitiator.Application, reasonCode, reasonText, cancellationToken: cancellationToken);
return CloseAsync(reason, abort, timeout, cancellationToken);
}

Expand Down

0 comments on commit 0413438

Please sign in to comment.