Skip to content

Commit

Permalink
complete output pipewriter once communication on connection is cancelled
Browse files Browse the repository at this point in the history
  • Loading branch information
kerryjiang committed Jan 1, 2025
1 parent 1eff181 commit ac1a78c
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 15 deletions.
6 changes: 3 additions & 3 deletions src/SuperSocket.Connection/PipeConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ private static Pipe GetOutputPipe(ConnectionOptions connectionOptions)
return connectionOptions.Output ?? new Pipe();
}

protected override Task StartTask<TPackageInfo>(IObjectPipe<TPackageInfo> packagePipe)
protected override Task StartTask<TPackageInfo>(IObjectPipe<TPackageInfo> packagePipe, CancellationToken cancellationToken)
{
var pipeTask = base.StartTask<TPackageInfo>(packagePipe);
var pipeTask = base.StartTask<TPackageInfo>(packagePipe, cancellationToken);
return Task.WhenAll(pipeTask, ProcessSends());
}

Expand Down Expand Up @@ -168,7 +168,7 @@ protected async ValueTask<bool> ProcessOutputRead(PipeReader reader)
catch (Exception e)
{
// Cancel all the work in the connection if encounter an error during sending
Cancel();
await CancelAsync().ConfigureAwait(false);

if (!IsIgnorableException(e))
OnError("Exception happened in SendAsync", e);
Expand Down
27 changes: 18 additions & 9 deletions src/SuperSocket.Connection/PipeConnectionBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,9 @@ protected PipeConnectionBase(PipeReader inputReader, PipeWriter outputWriter, Co
ConnectionToken = _cts.Token;
}

protected virtual Task StartTask<TPackageInfo>(IObjectPipe<TPackageInfo> packagePipe)
protected virtual Task StartTask<TPackageInfo>(IObjectPipe<TPackageInfo> packagePipe, CancellationToken cancellationToken)
{
return StartInputPipeTask(packagePipe, _cts.Token);
return StartInputPipeTask(packagePipe, cancellationToken);
}

protected void UpdateLastActiveTime()
Expand All @@ -80,7 +80,7 @@ public async override IAsyncEnumerable<TPackageInfo> RunAsync<TPackageInfo>(IPip
_packagePipe = packagePipe;
_pipelineFilter = pipelineFilter;

_pipeTask = StartTask(packagePipe);
_pipeTask = StartTask(packagePipe, _cts.Token);

_ = HandleClosing();

Expand Down Expand Up @@ -136,13 +136,17 @@ private async ValueTask HandleClosing()
public override async ValueTask CloseAsync(CloseReason closeReason)
{
CloseReason = closeReason;
Cancel();
await CancelAsync().ConfigureAwait(false);
await HandleClosing().ConfigureAwait(false);
}

protected void Cancel()
protected async Task CancelAsync()
{
if (_cts.IsCancellationRequested)
return;

_cts.Cancel();
await CompleteWriterAsync(OutputWriter, _isDetaching).ConfigureAwait(false);
}

protected virtual bool IsIgnorableException(Exception e)
Expand Down Expand Up @@ -316,7 +320,7 @@ protected async Task ReadPipeAsync<TPackageInfo>(PipeReader reader, IObjectPipe<
}
}

CompleteReader(reader, _isDetaching);
await CompleteReaderAsync(reader, _isDetaching).ConfigureAwait(false);
WriteEOFPackage();
}

Expand Down Expand Up @@ -412,7 +416,7 @@ private bool ReaderBuffer<TPackageInfo>(ref ReadOnlySequence<byte> buffer, IPipe
public override async ValueTask DetachAsync()
{
_isDetaching = true;
Cancel();
await CancelAsync().ConfigureAwait(false);
await HandleClosing().ConfigureAwait(false);
_isDetaching = false;
}
Expand All @@ -425,9 +429,14 @@ protected void OnError(string message, Exception e = null)
Logger?.LogError(message);
}

protected virtual void CompleteReader(PipeReader reader, bool isDetaching)
protected virtual async ValueTask CompleteReaderAsync(PipeReader reader, bool isDetaching)
{
await reader.CompleteAsync().ConfigureAwait(false);
}

protected virtual async ValueTask CompleteWriterAsync(PipeWriter writer, bool isDetaching)
{
reader.Complete();
await writer.CompleteAsync().ConfigureAwait(false);
}
}
}
14 changes: 11 additions & 3 deletions src/SuperSocket.Kestrel/KestrelPipeConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,19 @@ public KestrelPipeConnection(ConnectionContext context, ConnectionOptions option
}
}

protected override void CompleteReader(PipeReader reader, bool isDetaching)
protected override async ValueTask CompleteReaderAsync(PipeReader reader, bool isDetaching)
{
if (!isDetaching)
{
reader.Complete();
await reader.CompleteAsync().ConfigureAwait(false);
}
}

protected override async ValueTask CompleteWriterAsync(PipeWriter writer, bool isDetaching)
{
if (!isDetaching)
{
await writer.CompleteAsync().ConfigureAwait(false);
}
}

Expand Down Expand Up @@ -101,6 +109,6 @@ protected override bool IsIgnorableException(Exception e)

private void OnConnectionClosed()
{
Cancel();
CancelAsync().Wait();
}
}

0 comments on commit ac1a78c

Please sign in to comment.