Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix deadlock in the completion process when the send buffer is full. #95

Merged
merged 2 commits into from
Sep 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/YetAnotherHttpHandler/NativeRuntime.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ internal class NativeRuntime
public static NativeRuntime Instance { get; } = new NativeRuntime();

private readonly object _lock = new object();
private int _refCount;
internal /* for unit testing */ int _refCount;
private YahaRuntimeSafeHandle? _handle;

private NativeRuntime()
Expand Down
33 changes: 25 additions & 8 deletions src/YetAnotherHttpHandler/RequestContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,30 @@ private async Task RunReadRequestLoopAsync(CancellationToken cancellationToken)
}
}

private unsafe void WriteBody(Span<byte> data)
private void WriteBody(Span<byte> data)
{
var retryInterval = 16; //ms
var retryAfter = 0;

while (!_cancellationTokenSource.IsCancellationRequested && !TryWriteBody(data))
{
// TODO:
retryAfter += retryInterval;
Thread.Sleep(Math.Min(1000, retryAfter));
}
}

private unsafe bool TryWriteBody(Span<byte> data)
{
lock (_handleLock)
{
if (!_handle.IsAllocated)
{
// If the handle has been already released, the request is fully completed.
ThrowHelper.ThrowOperationCanceledException();
return true; // the method never return from here.
}

Debug.Assert(_handle.IsAllocated);

if (YahaEventSource.Log.IsEnabled()) YahaEventSource.Log.Trace($"[ReqSeq:{_requestSequence}:State:0x{Handle:X}] Sending the request body: Length={data.Length}");
Expand All @@ -146,8 +166,6 @@ private unsafe void WriteBody(Span<byte> data)
var ctx = _ctxHandle.DangerousGet();
var requestContext = _requestContextHandle.DangerousGet();

var retryInterval = 16; //ms
var retryAfter = 0;
while (!_requestBodyCompleted)
{
// If the internal buffer is full, yaha_request_write_body returns false. We need to wait until ready to send bytes again.
Expand All @@ -161,14 +179,11 @@ private unsafe void WriteBody(Span<byte> data)
{
if (YahaEventSource.Log.IsEnabled()) YahaEventSource.Log.Trace($"[ReqSeq:{_requestSequence}:State:0x{Handle:X}] Failed to write the request body. Because the request has been completed.");
ThrowHelper.ThrowOperationCanceledException();
return;
return true; // the method never return from here.
}

if (YahaEventSource.Log.IsEnabled()) YahaEventSource.Log.Trace($"[ReqSeq:{_requestSequence}:State:0x{Handle:X}] Send buffer is full.");

// TODO:
retryAfter += retryInterval;
Thread.Sleep(Math.Min(1000, retryAfter));
return false;
}
#if DEBUG
// Fill memory so that data corruption can be detected on debug build.
Expand All @@ -188,6 +203,8 @@ private unsafe void WriteBody(Span<byte> data)
}
}
}

return true;
}

private unsafe void TryCompleteBody(Exception? exception = default)
Expand Down
2 changes: 1 addition & 1 deletion src/YetAnotherHttpHandler/ResponseContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ internal ResponseContext(HttpRequestMessage requestMessage, RequestContext reque
_requestContext = requestContext;
_responseTask = new TaskCompletionSource<HttpResponseMessage>(TaskCreationOptions.RunContinuationsAsynchronously);
_cancellationToken = cancellationToken;
_tokenRegistration = cancellationToken.Register((state) =>
_tokenRegistration = cancellationToken.Register(static (state) =>
{
((ResponseContext)state!).Cancel();
}, this);
Expand Down
12 changes: 6 additions & 6 deletions test/YetAnotherHttpHandler.Test/ClientCertificateTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public async Task NotSet()
{
// Arrange
await using var server = await LaunchServerAsync<TestServerForHttp1AndHttp2>();
var httpHandler = new YetAnotherHttpHandler()
using var httpHandler = new YetAnotherHttpHandler()
{
//ClientAuthCertificates = File.ReadAllText("./Certificates/client.crt"),
//ClientAuthKey = File.ReadAllText("./Certificates/client.key"),
Expand All @@ -63,7 +63,7 @@ public async Task UseClientCertificate()
{
// Arrange
await using var server = await LaunchServerAsync<TestServerForHttp1AndHttp2>();
var httpHandler = new YetAnotherHttpHandler()
using var httpHandler = new YetAnotherHttpHandler()
{
ClientAuthCertificates = File.ReadAllText("./Certificates/client.crt"),
ClientAuthKey = File.ReadAllText("./Certificates/client.key"),
Expand All @@ -85,7 +85,7 @@ public async Task Invalid()
{
// Arrange
await using var server = await LaunchServerAsync<TestServerForHttp1AndHttp2>();
var httpHandler = new YetAnotherHttpHandler()
using var httpHandler = new YetAnotherHttpHandler()
{
ClientAuthCertificates = File.ReadAllText("./Certificates/client_unknown.crt"), // CN=unknown.example.com
ClientAuthKey = File.ReadAllText("./Certificates/client_unknown.key"),
Expand All @@ -106,7 +106,7 @@ public async Task Reference_SocketHttpHandler_NotSet()
{
// Arrange
await using var server = await LaunchServerAsync<TestServerForHttp1AndHttp2>();
var httpHandler = new SocketsHttpHandler();
using var httpHandler = new SocketsHttpHandler();
httpHandler.SslOptions.RemoteCertificateValidationCallback = (sender, certificate, chain, errors) => true;
var httpClient = new HttpClient(httpHandler);

Expand All @@ -123,7 +123,7 @@ public async Task Reference_SocketHttpHandler_UseClientCertificate()
{
// Arrange
await using var server = await LaunchServerAsync<TestServerForHttp1AndHttp2>();
var httpHandler = new SocketsHttpHandler();
using var httpHandler = new SocketsHttpHandler();
httpHandler.SslOptions.RemoteCertificateValidationCallback = (sender, certificate, chain, errors) => true;
httpHandler.SslOptions.ClientCertificates = new X509CertificateCollection()
{
Expand All @@ -145,7 +145,7 @@ public async Task Reference_SocketHttpHandler_Invalid()
{
// Arrange
await using var server = await LaunchServerAsync<TestServerForHttp1AndHttp2>();
var httpHandler = new SocketsHttpHandler();
using var httpHandler = new SocketsHttpHandler();
httpHandler.SslOptions.RemoteCertificateValidationCallback = (sender, certificate, chain, errors) => true;
httpHandler.SslOptions.ClientCertificates = new X509CertificateCollection()
{
Expand Down
9 changes: 1 addition & 8 deletions test/YetAnotherHttpHandler.Test/Http1Test.cs
Original file line number Diff line number Diff line change
@@ -1,18 +1,11 @@
using System.IO.Pipelines;
using System.Net;
using System.Runtime.ExceptionServices;
using _YetAnotherHttpHandler.Test.Helpers;
using Xunit.Abstractions;
using Xunit.Sdk;

namespace _YetAnotherHttpHandler.Test;

public class Http1Test : UseTestServerTestBase
public class Http1Test(ITestOutputHelper testOutputHelper) : UseTestServerTestBase(testOutputHelper)
{
public Http1Test(ITestOutputHelper testOutputHelper) : base(testOutputHelper)
{
}

[Fact]
public async Task FailedToConnect()
{
Expand Down
6 changes: 1 addition & 5 deletions test/YetAnotherHttpHandler.Test/Http2ClearTextTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,8 @@

namespace _YetAnotherHttpHandler.Test;

public class Http2ClearTextTest : Http2TestBase
public class Http2ClearTextTest(ITestOutputHelper testOutputHelper) : Http2TestBase(testOutputHelper)
{
public Http2ClearTextTest(ITestOutputHelper testOutputHelper) : base(testOutputHelper)
{
}

protected override YetAnotherHttpHandler CreateHandler()
{
return new YetAnotherHttpHandler() { Http2Only = true };
Expand Down
12 changes: 4 additions & 8 deletions test/YetAnotherHttpHandler.Test/Http2Test.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,8 @@
namespace _YetAnotherHttpHandler.Test;

[OSSkipCondition(OperatingSystems.MacOSX)] // .NET 7 or earlier does not support ALPN on macOS.
public class Http2Test : Http2TestBase
public class Http2Test(ITestOutputHelper testOutputHelper) : Http2TestBase(testOutputHelper)
{
public Http2Test(ITestOutputHelper testOutputHelper) : base(testOutputHelper)
{
}

protected override YetAnotherHttpHandler CreateHandler()
{
// Use self-signed certificate for testing purpose.
Expand Down Expand Up @@ -47,7 +43,7 @@ public async Task SelfSignedCertificate_NotTrusted()
{
// Arrange
await using var server = await LaunchServerAsync<TestServerForHttp1AndHttp2>();
var httpHandler = new YetAnotherHttpHandler() { SkipCertificateVerification = false }; // We need to verify server certificate.
using var httpHandler = new YetAnotherHttpHandler() { SkipCertificateVerification = false }; // We need to verify server certificate.
var httpClient = new HttpClient(httpHandler);

// Act
Expand All @@ -63,7 +59,7 @@ public async Task SelfSignedCertificate_NotTrusted_SkipValidation()
{
// Arrange
await using var server = await LaunchServerAsync<TestServerForHttp1AndHttp2>();
var httpHandler = new YetAnotherHttpHandler() { SkipCertificateVerification = true };
using var httpHandler = new YetAnotherHttpHandler() { SkipCertificateVerification = true };
var httpClient = new HttpClient(httpHandler);

// Act
Expand All @@ -80,7 +76,7 @@ public async Task SelfSignedCertificate_Trusted_CustomRootCA()
{
// Arrange
await using var server = await LaunchServerAsync<TestServerForHttp1AndHttp2>();
var httpHandler = new YetAnotherHttpHandler()
using var httpHandler = new YetAnotherHttpHandler()
{
// We need to verify server certificate.
SkipCertificateVerification = false,
Expand Down
6 changes: 1 addition & 5 deletions test/YetAnotherHttpHandler.Test/Http2TestBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,8 @@

namespace _YetAnotherHttpHandler.Test;

public abstract class Http2TestBase : UseTestServerTestBase
public abstract class Http2TestBase(ITestOutputHelper testOutputHelper) : UseTestServerTestBase(testOutputHelper)
{
protected Http2TestBase(ITestOutputHelper testOutputHelper) : base(testOutputHelper)
{
}

protected abstract YetAnotherHttpHandler CreateHandler();
protected abstract Task<TestWebAppServer> LaunchServerAsyncCore<T>(Action<WebApplicationBuilder>? configure = null) where T : ITestServerBuilder;

Expand Down
13 changes: 13 additions & 0 deletions test/YetAnotherHttpHandler.Test/TestServerForHttp1AndHttp2.cs
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,19 @@ public static WebApplication BuildApplication(WebApplicationBuilder builder)
}
return Results.Empty;
});
app.MapPost("/post-never-read", async (HttpContext httpContext) =>
{
// Send status code and response headers.
httpContext.Response.Headers["x-header-1"] = "foo";
httpContext.Response.StatusCode = (int)HttpStatusCode.OK;
await httpContext.Response.BodyWriter.FlushAsync();

while (!httpContext.RequestAborted.IsCancellationRequested)
{
await Task.Delay(100);
}
return Results.Empty;
});

// HTTP/2
app.MapGet("/error-reset", (HttpContext httpContext) =>
Expand Down
74 changes: 73 additions & 1 deletion test/YetAnotherHttpHandler.Test/YetAnotherHttpHandlerTest.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,16 @@
using Cysharp.Net.Http;
using System.IO.Pipelines;
using System.Net;
using System.Net.Http.Headers;
using Xunit.Abstractions;

namespace _YetAnotherHttpHandler.Test;

public class YetAnotherHttpHandlerTest
[CollectionDefinition(nameof(YetAnotherHttpHandlerTest), DisableParallelization = true)]
public class YetAnotherHttpHandlerTestCollection;

[Collection(nameof(YetAnotherHttpHandlerTest))]
public class YetAnotherHttpHandlerTest(ITestOutputHelper testOutputHelper) : UseTestServerTestBase(testOutputHelper)
{
[Fact]
public async Task Disposed()
Expand All @@ -18,4 +26,68 @@ public async Task Disposed()
// Assert
Assert.IsType<ObjectDisposedException>(ex);
}

[Fact]
public async Task DisposeHandler_During_SendBuffer_Is_Full()
{
GC.Collect();
GC.Collect();
GC.Collect();
GC.Collect();

var runtimeHandle = NativeRuntime.Instance.Acquire();

// To prevent references remaining from local variables, make it a local function.
async Task RunAsync()
{
// Arrange
var httpHandler = new YetAnotherHttpHandler() { Http2Only = true };
var httpClient = new HttpClient(httpHandler);
await using var server = await LaunchServerAsync<TestServerForHttp1AndHttp2>(TestWebAppServerListenMode.InsecureHttp2Only);

// Act
var pipe = new Pipe();
var writeTask = Task.Run(async () =>
{
var buffer = new byte[1024 * 1024];
while (true)
{
await pipe.Writer.WriteAsync(buffer);
}
});
var content = new StreamContent(pipe.Reader.AsStream());
content.Headers.ContentType = MediaTypeHeaderValue.Parse("application/octet-stream");
var request = new HttpRequestMessage(HttpMethod.Post, $"{server.BaseUri}/post-never-read")
{
Version = HttpVersion.Version20,
Content = content,
};
var response = await httpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead).WaitAsync(TimeoutToken);
await Task.Delay(TimeSpan.FromSeconds(5)); // Wait for the send buffer to overflow.

// Decrease the reference count of manually held internal handles for direct observation.
NativeRuntime.Instance.Release();

// Dispose all related resources.
request.Dispose();
response.Dispose();
httpHandler.Dispose();
httpClient.Dispose();
}

await RunAsync();

// Run Finalizer.
await Task.Delay(100);
GC.Collect();
GC.Collect();
GC.Collect();
GC.Collect();
await Task.Delay(100);

// Assert
Assert.Equal(0, NativeRuntime.Instance._refCount);
Assert.True(runtimeHandle.IsClosed);
}

}
Loading