Skip to content

Commit

Permalink
Fix deadlock in the completion process when the send buffer is full.
Browse files Browse the repository at this point in the history
  • Loading branch information
mayuki committed Sep 18, 2024
1 parent ceb2f24 commit 9d4165a
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 10 deletions.
2 changes: 1 addition & 1 deletion src/YetAnotherHttpHandler/NativeRuntime.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ internal class NativeRuntime
public static NativeRuntime Instance { get; } = new NativeRuntime();

private readonly object _lock = new object();
private int _refCount;
internal /* for unit testing */ int _refCount;
private YahaRuntimeSafeHandle? _handle;

private NativeRuntime()
Expand Down
33 changes: 25 additions & 8 deletions src/YetAnotherHttpHandler/RequestContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,30 @@ private async Task RunReadRequestLoopAsync(CancellationToken cancellationToken)
}
}

private unsafe void WriteBody(Span<byte> data)
private void WriteBody(Span<byte> data)
{
var retryInterval = 16; //ms
var retryAfter = 0;

while (!_cancellationTokenSource.IsCancellationRequested && !TryWriteBody(data))
{
// TODO:
retryAfter += retryInterval;
Thread.Sleep(Math.Min(1000, retryAfter));
}
}

private unsafe bool TryWriteBody(Span<byte> data)
{
lock (_handleLock)
{
if (!_handle.IsAllocated)
{
// If the handle has been already released, the request is fully completed.
ThrowHelper.ThrowOperationCanceledException();
return true; // the method never return from here.
}

Debug.Assert(_handle.IsAllocated);

if (YahaEventSource.Log.IsEnabled()) YahaEventSource.Log.Trace($"[ReqSeq:{_requestSequence}:State:0x{Handle:X}] Sending the request body: Length={data.Length}");
Expand All @@ -146,8 +166,6 @@ private unsafe void WriteBody(Span<byte> data)
var ctx = _ctxHandle.DangerousGet();
var requestContext = _requestContextHandle.DangerousGet();

var retryInterval = 16; //ms
var retryAfter = 0;
while (!_requestBodyCompleted)
{
// If the internal buffer is full, yaha_request_write_body returns false. We need to wait until ready to send bytes again.
Expand All @@ -161,14 +179,11 @@ private unsafe void WriteBody(Span<byte> data)
{
if (YahaEventSource.Log.IsEnabled()) YahaEventSource.Log.Trace($"[ReqSeq:{_requestSequence}:State:0x{Handle:X}] Failed to write the request body. Because the request has been completed.");
ThrowHelper.ThrowOperationCanceledException();
return;
return true; // the method never return from here.
}

if (YahaEventSource.Log.IsEnabled()) YahaEventSource.Log.Trace($"[ReqSeq:{_requestSequence}:State:0x{Handle:X}] Send buffer is full.");

// TODO:
retryAfter += retryInterval;
Thread.Sleep(Math.Min(1000, retryAfter));
return false;
}
#if DEBUG
// Fill memory so that data corruption can be detected on debug build.
Expand All @@ -188,6 +203,8 @@ private unsafe void WriteBody(Span<byte> data)
}
}
}

return true;
}

private unsafe void TryCompleteBody(Exception? exception = default)
Expand Down
2 changes: 1 addition & 1 deletion src/YetAnotherHttpHandler/ResponseContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ internal ResponseContext(HttpRequestMessage requestMessage, RequestContext reque
_requestContext = requestContext;
_responseTask = new TaskCompletionSource<HttpResponseMessage>(TaskCreationOptions.RunContinuationsAsynchronously);
_cancellationToken = cancellationToken;
_tokenRegistration = cancellationToken.Register((state) =>
_tokenRegistration = cancellationToken.Register(static (state) =>
{
((ResponseContext)state!).Cancel();
}, this);
Expand Down
59 changes: 59 additions & 0 deletions test/YetAnotherHttpHandler.Test/Http2TestBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,65 @@ public async Task Cancel_Post_BeforeRequest()
#endif
}


[ConditionalFact]
public async Task DisposeHandler_During_SendBuffer_Is_Full()
{
var runtimeHandle = NativeRuntime.Instance.Acquire();

// To prevent references remaining from local variables, make it a local function.
async Task RunAsync()
{
// Arrange
var httpHandler = CreateHandler();
var httpClient = new HttpClient(httpHandler);
await using var server = await LaunchServerAsync<TestServerForHttp1AndHttp2>();

// Act
var pipe = new Pipe();
var writeTask = Task.Run(async () =>
{
var buffer = new byte[1024 * 1024];
while (true)
{
await pipe.Writer.WriteAsync(buffer);
}
});
var content = new StreamContent(pipe.Reader.AsStream());
content.Headers.ContentType = MediaTypeHeaderValue.Parse("application/octet-stream");
var request = new HttpRequestMessage(HttpMethod.Post, $"{server.BaseUri}/post-never-read")
{
Version = HttpVersion.Version20,
Content = content,
};
var response = await httpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead).WaitAsync(TimeoutToken);
await Task.Delay(TimeSpan.FromSeconds(5)); // Wait for the send buffer to overflow.

// Decrease the reference count of manually held internal handles for direct observation.
NativeRuntime.Instance.Release();

// Dispose all related resources.
request.Dispose();
response.Dispose();
httpHandler.Dispose();
httpClient.Dispose();
}

await RunAsync();

// Run Finalizer.
await Task.Delay(100);
GC.Collect();
GC.Collect();
GC.Collect();
GC.Collect();
await Task.Delay(100);

// Assert
Assert.Equal(0, NativeRuntime.Instance._refCount);
Assert.True(runtimeHandle.IsClosed);
}

[ConditionalFact]
public async Task Grpc_Unary()
{
Expand Down
13 changes: 13 additions & 0 deletions test/YetAnotherHttpHandler.Test/TestServerForHttp1AndHttp2.cs
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,19 @@ public static WebApplication BuildApplication(WebApplicationBuilder builder)
}
return Results.Empty;
});
app.MapPost("/post-never-read", async (HttpContext httpContext) =>
{
// Send status code and response headers.
httpContext.Response.Headers["x-header-1"] = "foo";
httpContext.Response.StatusCode = (int)HttpStatusCode.OK;
await httpContext.Response.BodyWriter.FlushAsync();

while (!httpContext.RequestAborted.IsCancellationRequested)
{
await Task.Delay(100);
}
return Results.Empty;
});

// HTTP/2
app.MapGet("/error-reset", (HttpContext httpContext) =>
Expand Down

0 comments on commit 9d4165a

Please sign in to comment.