Skip to content

Commit

Permalink
Merge pull request #95 from Cysharp/feature/FixDeadlockSendBufferIsFull
Browse files Browse the repository at this point in the history
Fix deadlock in the completion process when the send buffer is full.
  • Loading branch information
mayuki authored Sep 18, 2024
2 parents ceb2f24 + c7deb79 commit 7af3114
Show file tree
Hide file tree
Showing 10 changed files with 126 additions and 43 deletions.
2 changes: 1 addition & 1 deletion src/YetAnotherHttpHandler/NativeRuntime.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ internal class NativeRuntime
public static NativeRuntime Instance { get; } = new NativeRuntime();

private readonly object _lock = new object();
private int _refCount;
internal /* for unit testing */ int _refCount;
private YahaRuntimeSafeHandle? _handle;

private NativeRuntime()
Expand Down
33 changes: 25 additions & 8 deletions src/YetAnotherHttpHandler/RequestContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,30 @@ private async Task RunReadRequestLoopAsync(CancellationToken cancellationToken)
}
}

private unsafe void WriteBody(Span<byte> data)
private void WriteBody(Span<byte> data)
{
var retryInterval = 16; //ms
var retryAfter = 0;

while (!_cancellationTokenSource.IsCancellationRequested && !TryWriteBody(data))
{
// TODO:
retryAfter += retryInterval;
Thread.Sleep(Math.Min(1000, retryAfter));
}
}

private unsafe bool TryWriteBody(Span<byte> data)
{
lock (_handleLock)
{
if (!_handle.IsAllocated)
{
// If the handle has been already released, the request is fully completed.
ThrowHelper.ThrowOperationCanceledException();
return true; // the method never return from here.
}

Debug.Assert(_handle.IsAllocated);

if (YahaEventSource.Log.IsEnabled()) YahaEventSource.Log.Trace($"[ReqSeq:{_requestSequence}:State:0x{Handle:X}] Sending the request body: Length={data.Length}");
Expand All @@ -146,8 +166,6 @@ private unsafe void WriteBody(Span<byte> data)
var ctx = _ctxHandle.DangerousGet();
var requestContext = _requestContextHandle.DangerousGet();

var retryInterval = 16; //ms
var retryAfter = 0;
while (!_requestBodyCompleted)
{
// If the internal buffer is full, yaha_request_write_body returns false. We need to wait until ready to send bytes again.
Expand All @@ -161,14 +179,11 @@ private unsafe void WriteBody(Span<byte> data)
{
if (YahaEventSource.Log.IsEnabled()) YahaEventSource.Log.Trace($"[ReqSeq:{_requestSequence}:State:0x{Handle:X}] Failed to write the request body. Because the request has been completed.");
ThrowHelper.ThrowOperationCanceledException();
return;
return true; // the method never return from here.
}

if (YahaEventSource.Log.IsEnabled()) YahaEventSource.Log.Trace($"[ReqSeq:{_requestSequence}:State:0x{Handle:X}] Send buffer is full.");

// TODO:
retryAfter += retryInterval;
Thread.Sleep(Math.Min(1000, retryAfter));
return false;
}
#if DEBUG
// Fill memory so that data corruption can be detected on debug build.
Expand All @@ -188,6 +203,8 @@ private unsafe void WriteBody(Span<byte> data)
}
}
}

return true;
}

private unsafe void TryCompleteBody(Exception? exception = default)
Expand Down
2 changes: 1 addition & 1 deletion src/YetAnotherHttpHandler/ResponseContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ internal ResponseContext(HttpRequestMessage requestMessage, RequestContext reque
_requestContext = requestContext;
_responseTask = new TaskCompletionSource<HttpResponseMessage>(TaskCreationOptions.RunContinuationsAsynchronously);
_cancellationToken = cancellationToken;
_tokenRegistration = cancellationToken.Register((state) =>
_tokenRegistration = cancellationToken.Register(static (state) =>
{
((ResponseContext)state!).Cancel();
}, this);
Expand Down
12 changes: 6 additions & 6 deletions test/YetAnotherHttpHandler.Test/ClientCertificateTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public async Task NotSet()
{
// Arrange
await using var server = await LaunchServerAsync<TestServerForHttp1AndHttp2>();
var httpHandler = new YetAnotherHttpHandler()
using var httpHandler = new YetAnotherHttpHandler()
{
//ClientAuthCertificates = File.ReadAllText("./Certificates/client.crt"),
//ClientAuthKey = File.ReadAllText("./Certificates/client.key"),
Expand All @@ -63,7 +63,7 @@ public async Task UseClientCertificate()
{
// Arrange
await using var server = await LaunchServerAsync<TestServerForHttp1AndHttp2>();
var httpHandler = new YetAnotherHttpHandler()
using var httpHandler = new YetAnotherHttpHandler()
{
ClientAuthCertificates = File.ReadAllText("./Certificates/client.crt"),
ClientAuthKey = File.ReadAllText("./Certificates/client.key"),
Expand All @@ -85,7 +85,7 @@ public async Task Invalid()
{
// Arrange
await using var server = await LaunchServerAsync<TestServerForHttp1AndHttp2>();
var httpHandler = new YetAnotherHttpHandler()
using var httpHandler = new YetAnotherHttpHandler()
{
ClientAuthCertificates = File.ReadAllText("./Certificates/client_unknown.crt"), // CN=unknown.example.com
ClientAuthKey = File.ReadAllText("./Certificates/client_unknown.key"),
Expand All @@ -106,7 +106,7 @@ public async Task Reference_SocketHttpHandler_NotSet()
{
// Arrange
await using var server = await LaunchServerAsync<TestServerForHttp1AndHttp2>();
var httpHandler = new SocketsHttpHandler();
using var httpHandler = new SocketsHttpHandler();
httpHandler.SslOptions.RemoteCertificateValidationCallback = (sender, certificate, chain, errors) => true;
var httpClient = new HttpClient(httpHandler);

Expand All @@ -123,7 +123,7 @@ public async Task Reference_SocketHttpHandler_UseClientCertificate()
{
// Arrange
await using var server = await LaunchServerAsync<TestServerForHttp1AndHttp2>();
var httpHandler = new SocketsHttpHandler();
using var httpHandler = new SocketsHttpHandler();
httpHandler.SslOptions.RemoteCertificateValidationCallback = (sender, certificate, chain, errors) => true;
httpHandler.SslOptions.ClientCertificates = new X509CertificateCollection()
{
Expand All @@ -145,7 +145,7 @@ public async Task Reference_SocketHttpHandler_Invalid()
{
// Arrange
await using var server = await LaunchServerAsync<TestServerForHttp1AndHttp2>();
var httpHandler = new SocketsHttpHandler();
using var httpHandler = new SocketsHttpHandler();
httpHandler.SslOptions.RemoteCertificateValidationCallback = (sender, certificate, chain, errors) => true;
httpHandler.SslOptions.ClientCertificates = new X509CertificateCollection()
{
Expand Down
9 changes: 1 addition & 8 deletions test/YetAnotherHttpHandler.Test/Http1Test.cs
Original file line number Diff line number Diff line change
@@ -1,18 +1,11 @@
using System.IO.Pipelines;
using System.Net;
using System.Runtime.ExceptionServices;
using _YetAnotherHttpHandler.Test.Helpers;
using Xunit.Abstractions;
using Xunit.Sdk;

namespace _YetAnotherHttpHandler.Test;

public class Http1Test : UseTestServerTestBase
public class Http1Test(ITestOutputHelper testOutputHelper) : UseTestServerTestBase(testOutputHelper)
{
public Http1Test(ITestOutputHelper testOutputHelper) : base(testOutputHelper)
{
}

[Fact]
public async Task FailedToConnect()
{
Expand Down
6 changes: 1 addition & 5 deletions test/YetAnotherHttpHandler.Test/Http2ClearTextTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,8 @@

namespace _YetAnotherHttpHandler.Test;

public class Http2ClearTextTest : Http2TestBase
public class Http2ClearTextTest(ITestOutputHelper testOutputHelper) : Http2TestBase(testOutputHelper)
{
public Http2ClearTextTest(ITestOutputHelper testOutputHelper) : base(testOutputHelper)
{
}

protected override YetAnotherHttpHandler CreateHandler()
{
return new YetAnotherHttpHandler() { Http2Only = true };
Expand Down
12 changes: 4 additions & 8 deletions test/YetAnotherHttpHandler.Test/Http2Test.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,8 @@
namespace _YetAnotherHttpHandler.Test;

[OSSkipCondition(OperatingSystems.MacOSX)] // .NET 7 or earlier does not support ALPN on macOS.
public class Http2Test : Http2TestBase
public class Http2Test(ITestOutputHelper testOutputHelper) : Http2TestBase(testOutputHelper)
{
public Http2Test(ITestOutputHelper testOutputHelper) : base(testOutputHelper)
{
}

protected override YetAnotherHttpHandler CreateHandler()
{
// Use self-signed certificate for testing purpose.
Expand Down Expand Up @@ -47,7 +43,7 @@ public async Task SelfSignedCertificate_NotTrusted()
{
// Arrange
await using var server = await LaunchServerAsync<TestServerForHttp1AndHttp2>();
var httpHandler = new YetAnotherHttpHandler() { SkipCertificateVerification = false }; // We need to verify server certificate.
using var httpHandler = new YetAnotherHttpHandler() { SkipCertificateVerification = false }; // We need to verify server certificate.
var httpClient = new HttpClient(httpHandler);

// Act
Expand All @@ -63,7 +59,7 @@ public async Task SelfSignedCertificate_NotTrusted_SkipValidation()
{
// Arrange
await using var server = await LaunchServerAsync<TestServerForHttp1AndHttp2>();
var httpHandler = new YetAnotherHttpHandler() { SkipCertificateVerification = true };
using var httpHandler = new YetAnotherHttpHandler() { SkipCertificateVerification = true };
var httpClient = new HttpClient(httpHandler);

// Act
Expand All @@ -80,7 +76,7 @@ public async Task SelfSignedCertificate_Trusted_CustomRootCA()
{
// Arrange
await using var server = await LaunchServerAsync<TestServerForHttp1AndHttp2>();
var httpHandler = new YetAnotherHttpHandler()
using var httpHandler = new YetAnotherHttpHandler()
{
// We need to verify server certificate.
SkipCertificateVerification = false,
Expand Down
6 changes: 1 addition & 5 deletions test/YetAnotherHttpHandler.Test/Http2TestBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,8 @@

namespace _YetAnotherHttpHandler.Test;

public abstract class Http2TestBase : UseTestServerTestBase
public abstract class Http2TestBase(ITestOutputHelper testOutputHelper) : UseTestServerTestBase(testOutputHelper)
{
protected Http2TestBase(ITestOutputHelper testOutputHelper) : base(testOutputHelper)
{
}

protected abstract YetAnotherHttpHandler CreateHandler();
protected abstract Task<TestWebAppServer> LaunchServerAsyncCore<T>(Action<WebApplicationBuilder>? configure = null) where T : ITestServerBuilder;

Expand Down
13 changes: 13 additions & 0 deletions test/YetAnotherHttpHandler.Test/TestServerForHttp1AndHttp2.cs
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,19 @@ public static WebApplication BuildApplication(WebApplicationBuilder builder)
}
return Results.Empty;
});
app.MapPost("/post-never-read", async (HttpContext httpContext) =>
{
// Send status code and response headers.
httpContext.Response.Headers["x-header-1"] = "foo";
httpContext.Response.StatusCode = (int)HttpStatusCode.OK;
await httpContext.Response.BodyWriter.FlushAsync();

while (!httpContext.RequestAborted.IsCancellationRequested)
{
await Task.Delay(100);
}
return Results.Empty;
});

// HTTP/2
app.MapGet("/error-reset", (HttpContext httpContext) =>
Expand Down
74 changes: 73 additions & 1 deletion test/YetAnotherHttpHandler.Test/YetAnotherHttpHandlerTest.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,16 @@
using Cysharp.Net.Http;
using System.IO.Pipelines;
using System.Net;
using System.Net.Http.Headers;
using Xunit.Abstractions;

namespace _YetAnotherHttpHandler.Test;

public class YetAnotherHttpHandlerTest
[CollectionDefinition(nameof(YetAnotherHttpHandlerTest), DisableParallelization = true)]
public class YetAnotherHttpHandlerTestCollection;

[Collection(nameof(YetAnotherHttpHandlerTest))]
public class YetAnotherHttpHandlerTest(ITestOutputHelper testOutputHelper) : UseTestServerTestBase(testOutputHelper)
{
[Fact]
public async Task Disposed()
Expand All @@ -18,4 +26,68 @@ public async Task Disposed()
// Assert
Assert.IsType<ObjectDisposedException>(ex);
}

[Fact]
public async Task DisposeHandler_During_SendBuffer_Is_Full()
{
GC.Collect();
GC.Collect();
GC.Collect();
GC.Collect();

var runtimeHandle = NativeRuntime.Instance.Acquire();

// To prevent references remaining from local variables, make it a local function.
async Task RunAsync()
{
// Arrange
var httpHandler = new YetAnotherHttpHandler() { Http2Only = true };
var httpClient = new HttpClient(httpHandler);
await using var server = await LaunchServerAsync<TestServerForHttp1AndHttp2>(TestWebAppServerListenMode.InsecureHttp2Only);

// Act
var pipe = new Pipe();
var writeTask = Task.Run(async () =>
{
var buffer = new byte[1024 * 1024];
while (true)
{
await pipe.Writer.WriteAsync(buffer);
}
});
var content = new StreamContent(pipe.Reader.AsStream());
content.Headers.ContentType = MediaTypeHeaderValue.Parse("application/octet-stream");
var request = new HttpRequestMessage(HttpMethod.Post, $"{server.BaseUri}/post-never-read")
{
Version = HttpVersion.Version20,
Content = content,
};
var response = await httpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead).WaitAsync(TimeoutToken);
await Task.Delay(TimeSpan.FromSeconds(5)); // Wait for the send buffer to overflow.

// Decrease the reference count of manually held internal handles for direct observation.
NativeRuntime.Instance.Release();

// Dispose all related resources.
request.Dispose();
response.Dispose();
httpHandler.Dispose();
httpClient.Dispose();
}

await RunAsync();

// Run Finalizer.
await Task.Delay(100);
GC.Collect();
GC.Collect();
GC.Collect();
GC.Collect();
await Task.Delay(100);

// Assert
Assert.Equal(0, NativeRuntime.Instance._refCount);
Assert.True(runtimeHandle.IsClosed);
}

}

0 comments on commit 7af3114

Please sign in to comment.