Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add backpressure control and allow asynchronous flushing #116

Open
wants to merge 4 commits into
base: main
Choose a base branch
from

Conversation

ruccho
Copy link
Contributor

@ruccho ruccho commented Jan 27, 2025

Problem

I've wanted to control the amount of backpressure when reading a response using a stream. Specifically, the value of pauseWriterThreshold in PipeOptions.Default of the Pipe that Cysharp.Net.Http.ResponseContext has is 64KiB. I would like to change this value.

https://github.com/dotnet/runtime/blob/b9691b06b96541e8e94cd79f17d6447b8d03ec20/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeOptions.cs#L47-L48

However, I have noticed that the progress of the other requests may be blocked when backpressure is occurring. Upon investigation, I found that this is caused by waiting for PipeWriter.FlushAsync() synchronously, which blocks FlushAsync() while backpressure is occurring, blocking the worker thread on the native side.

var t = _pipe.Writer.FlushAsync();
if (!t.IsCompleted)
{
t.AsTask().GetAwaiter().GetResult();
}

Here is the example.
If I call multiple requests concurrently with larger response body than pauseWriterThreshold (64KiB) and intentionally cause backpressure by not reading the stream, the subsequent request stops progressing and times out in the middle. This can be problematic when there are large numbers of concurrent requests.

NumRequests: 11
Request #0
Request #1
System.Threading.Tasks.TaskCanceledException: The operation was canceled.
using Cysharp.Net.Http;
using Xunit.Abstractions;

namespace _YetAnotherHttpHandler.Test;

public class BackPressureTest
{
    private readonly ITestOutputHelper _testOutputHelper;

    public BackPressureTest(ITestOutputHelper testOutputHelper)
    {
        _testOutputHelper = testOutputHelper;
    }

    [Theory]
    [InlineData(4186)] // pass
    [InlineData(65535)] // pass
    [InlineData(65536)] // fail
    [InlineData(131972)] // fail
    public async Task FlushTest(int size)
    {
        using var httpHandler = new YetAnotherHttpHandler();
        using var client = new HttpClient(httpHandler);

        var numRequests = Environment.ProcessorCount + 1; // more than the number of tokio worker threads

        _testOutputHelper.WriteLine($"NumRequests: {numRequests}");

        var pendingStreams = new List<Stream>();

        for (var i = 0; i < numRequests; i++)
        {
            _testOutputHelper.WriteLine($"Request #{i}");

            using var timeout = new CancellationTokenSource();
            timeout.CancelAfter(TimeSpan.FromSeconds(10));
            var stream = await client.GetStreamAsync($"https://httpbin.org/bytes/{size}", timeout.Token);

            pendingStreams.Add(stream);
        }

        foreach (var stream in pendingStreams) await stream.DisposeAsync();
    }
}

Solution

I have found that if I make a change that allows PipeWriter.FlushAsync() to be awaited asynchronously, it passes the same test.

This PR adds a ResponsePipeOptions property to YetAnotherHttpHandler and allows it to wait for PipeWriter.FlushAsync() asynchronously.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant