-
Notifications
You must be signed in to change notification settings - Fork 5k
[QUIC] Add QuicStream.WaitForWriteCompletionAsync #58236
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -69,6 +69,11 @@ private sealed class State | |
// Resettable completions to be used for multiple calls to send. | ||
public readonly ResettableCompletionSource<uint> SendResettableCompletionSource = new ResettableCompletionSource<uint>(); | ||
|
||
public ShutdownWriteState ShutdownWriteState; | ||
|
||
// Set once writes have been shutdown. | ||
public readonly TaskCompletionSource ShutdownWriteCompletionSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); | ||
|
||
public ShutdownState ShutdownState; | ||
// The value makes sure that we release the handles only once. | ||
public int ShutdownDone; | ||
|
@@ -577,12 +582,26 @@ internal override void AbortWrite(long errorCode) | |
return; | ||
} | ||
|
||
bool shouldComplete = false; | ||
|
||
lock (_state) | ||
{ | ||
if (_state.SendState < SendState.Aborted) | ||
{ | ||
_state.SendState = SendState.Aborted; | ||
} | ||
|
||
if (_state.ShutdownWriteState == ShutdownWriteState.None) | ||
{ | ||
_state.ShutdownWriteState = ShutdownWriteState.Canceled; | ||
shouldComplete = true; | ||
} | ||
} | ||
|
||
if (shouldComplete) | ||
{ | ||
_state.ShutdownWriteCompletionSource.SetException( | ||
ExceptionDispatchInfo.SetCurrentStackTrace(new QuicOperationAbortedException("Write was aborted."))); | ||
} | ||
|
||
StartShutdown(QUIC_STREAM_SHUTDOWN_FLAGS.ABORT_SEND, errorCode); | ||
|
@@ -629,6 +648,23 @@ internal override async ValueTask ShutdownCompleted(CancellationToken cancellati | |
await _state.ShutdownCompletionSource.Task.ConfigureAwait(false); | ||
} | ||
|
||
internal override ValueTask WaitForWriteCompletionAsync(CancellationToken cancellationToken = default) | ||
{ | ||
// TODO: What should happen if this is called for a unidirectional stream and there are no writes? | ||
|
||
ThrowIfDisposed(); | ||
|
||
lock (_state) | ||
{ | ||
if (_state.ShutdownWriteState == ShutdownWriteState.ConnectionClosed) | ||
{ | ||
throw GetConnectionAbortedException(_state); | ||
} | ||
} | ||
|
||
return new ValueTask(_state.ShutdownWriteCompletionSource.Task.WaitAsync(cancellationToken)); | ||
} | ||
|
||
internal override void Shutdown() | ||
{ | ||
ThrowIfDisposed(); | ||
|
@@ -861,6 +897,11 @@ private static uint HandleEvent(State state, ref StreamEvent evt) | |
// Peer has stopped receiving data, don't send anymore. | ||
case QUIC_STREAM_EVENT_TYPE.PEER_RECEIVE_ABORTED: | ||
return HandleEventPeerRecvAborted(state, ref evt); | ||
// Occurs when shutdown is completed for the send side. | ||
// This only happens for shutdown on sending, not receiving | ||
// Receive shutdown can only be abortive. | ||
case QUIC_STREAM_EVENT_TYPE.SEND_SHUTDOWN_COMPLETE: | ||
return HandleEventSendShutdownComplete(state, ref evt); | ||
// Shutdown for both sending and receiving is completed. | ||
case QUIC_STREAM_EVENT_TYPE.SHUTDOWN_COMPLETE: | ||
return HandleEventShutdownComplete(state, ref evt); | ||
|
@@ -993,23 +1034,37 @@ private static unsafe uint HandleEventRecv(State state, ref StreamEvent evt) | |
|
||
private static uint HandleEventPeerRecvAborted(State state, ref StreamEvent evt) | ||
{ | ||
bool shouldComplete = false; | ||
bool shouldSendComplete = false; | ||
bool shouldShutdownWriteComplete = false; | ||
lock (state) | ||
{ | ||
if (state.SendState == SendState.None || state.SendState == SendState.Pending) | ||
{ | ||
shouldComplete = true; | ||
shouldSendComplete = true; | ||
} | ||
|
||
if (state.ShutdownWriteState == ShutdownWriteState.None) | ||
{ | ||
state.ShutdownWriteState = ShutdownWriteState.Canceled; | ||
shouldShutdownWriteComplete = true; | ||
} | ||
|
||
state.SendState = SendState.Aborted; | ||
state.SendErrorCode = (long)evt.Data.PeerReceiveAborted.ErrorCode; | ||
} | ||
|
||
if (shouldComplete) | ||
if (shouldSendComplete) | ||
{ | ||
state.SendResettableCompletionSource.CompleteException( | ||
ExceptionDispatchInfo.SetCurrentStackTrace(new QuicStreamAbortedException(state.SendErrorCode))); | ||
} | ||
|
||
if (shouldShutdownWriteComplete) | ||
{ | ||
state.ShutdownWriteCompletionSource.SetException( | ||
ExceptionDispatchInfo.SetCurrentStackTrace(new QuicStreamAbortedException(state.SendErrorCode))); | ||
} | ||
|
||
return MsQuicStatusCodes.Success; | ||
} | ||
|
||
|
@@ -1021,6 +1076,38 @@ private static uint HandleEventStartComplete(State state, ref StreamEvent evt) | |
return MsQuicStatusCodes.Success; | ||
} | ||
|
||
private static uint HandleEventSendShutdownComplete(State state, ref StreamEvent evt) | ||
{ | ||
// Graceful will be false in three situations: | ||
JamesNK marked this conversation as resolved.
Show resolved
Hide resolved
|
||
// 1. The peer aborted reads and the PEER_RECEIVE_ABORTED event was raised. | ||
// ShutdownWriteCompletionSource is already complete with an error. | ||
// 2. We aborted writes. | ||
// ShutdownWriteCompletionSource is already complete with an error. | ||
// 3. The connection was closed. | ||
// SHUTDOWN_COMPLETE event will be raised immediately after this event. It will handle completing with an error. | ||
// | ||
// Only use this event with sends gracefully completed. | ||
if (evt.Data.SendShutdownComplete.Graceful != 0) | ||
{ | ||
bool shouldComplete = false; | ||
lock (state) | ||
{ | ||
if (state.ShutdownWriteState == ShutdownWriteState.None) | ||
{ | ||
state.ShutdownWriteState = ShutdownWriteState.Finished; | ||
shouldComplete = true; | ||
} | ||
} | ||
|
||
if (shouldComplete) | ||
{ | ||
state.ShutdownWriteCompletionSource.SetResult(); | ||
} | ||
} | ||
|
||
return MsQuicStatusCodes.Success; | ||
} | ||
|
||
private static uint HandleEventShutdownComplete(State state, ref StreamEvent evt) | ||
{ | ||
StreamEventDataShutdownComplete shutdownCompleteEvent = evt.Data.ShutdownComplete; | ||
|
@@ -1031,6 +1118,7 @@ private static uint HandleEventShutdownComplete(State state, ref StreamEvent evt | |
} | ||
|
||
bool shouldReadComplete = false; | ||
bool shouldShutdownWriteComplete = false; | ||
bool shouldShutdownComplete = false; | ||
|
||
lock (state) | ||
|
@@ -1040,6 +1128,15 @@ private static uint HandleEventShutdownComplete(State state, ref StreamEvent evt | |
|
||
shouldReadComplete = CleanupReadStateAndCheckPending(state, ReadState.ReadsCompleted); | ||
|
||
if (state.ShutdownWriteState == ShutdownWriteState.None) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are you sure this state is expected here? As you've mentioned in comment for HandleEventSendShutdownComplete, by the time we receive SHUTDOWN_COMPLETE event, we should either completed ShutdownWriteState already, or it is a connection close, which is handled separately in HandleEventConnectionClose. I think we may leave the logic here, but guard it with Debug.Assert... what do you think? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Even as a fallback, I think it is especially strange to complete it successfully here... IMO successful completion should only happen in HandleEventSendShutdownComplete. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is one situation where you can get to this point without Perhaps |
||
{ | ||
// TODO: We can get to this point if the stream is unidirectional and there are no writes. | ||
// Consider what is the best behavior here with write shutdown and the read side of | ||
// unidirecitonal streams in the future. | ||
state.ShutdownWriteState = ShutdownWriteState.Finished; | ||
shouldShutdownWriteComplete = true; | ||
} | ||
|
||
if (state.ShutdownState == ShutdownState.None) | ||
{ | ||
state.ShutdownState = ShutdownState.Finished; | ||
|
@@ -1052,6 +1149,11 @@ private static uint HandleEventShutdownComplete(State state, ref StreamEvent evt | |
state.ReceiveResettableCompletionSource.Complete(0); | ||
} | ||
|
||
if (shouldShutdownWriteComplete) | ||
{ | ||
state.ShutdownWriteCompletionSource.SetResult(); | ||
} | ||
|
||
if (shouldShutdownComplete) | ||
{ | ||
state.ShutdownCompletionSource.SetResult(); | ||
|
@@ -1361,6 +1463,7 @@ private static uint HandleEventConnectionClose(State state) | |
|
||
bool shouldCompleteRead = false; | ||
bool shouldCompleteSend = false; | ||
bool shouldCompleteShutdownWrite = false; | ||
bool shouldCompleteShutdown = false; | ||
|
||
lock (state) | ||
|
@@ -1373,6 +1476,12 @@ private static uint HandleEventConnectionClose(State state) | |
} | ||
state.SendState = SendState.ConnectionClosed; | ||
|
||
if (state.ShutdownWriteState == ShutdownWriteState.None) | ||
{ | ||
shouldCompleteShutdownWrite = true; | ||
} | ||
state.ShutdownWriteState = ShutdownWriteState.ConnectionClosed; | ||
|
||
if (state.ShutdownState == ShutdownState.None) | ||
{ | ||
shouldCompleteShutdown = true; | ||
|
@@ -1392,6 +1501,12 @@ private static uint HandleEventConnectionClose(State state) | |
ExceptionDispatchInfo.SetCurrentStackTrace(GetConnectionAbortedException(state))); | ||
} | ||
|
||
if (shouldCompleteShutdownWrite) | ||
{ | ||
state.ShutdownWriteCompletionSource.SetException( | ||
ExceptionDispatchInfo.SetCurrentStackTrace(GetConnectionAbortedException(state))); | ||
} | ||
|
||
if (shouldCompleteShutdown) | ||
{ | ||
state.ShutdownCompletionSource.SetException( | ||
|
@@ -1493,6 +1608,14 @@ private enum ReadState | |
Closed | ||
} | ||
|
||
private enum ShutdownWriteState | ||
{ | ||
None = 0, | ||
Canceled, | ||
Finished, | ||
ConnectionClosed | ||
} | ||
|
||
private enum ShutdownState | ||
{ | ||
None = 0, | ||
|
Uh oh!
There was an error while loading. Please reload this page.