Skip to content

Commit

Permalink
Close inactive connections and requests
Browse files Browse the repository at this point in the history
This new middleware introduces a timeout of closing inactive
connections between requests after a configured amount of seconds.

This builds on top of reactphp#405 and partially on reactphp#422
  • Loading branch information
WyriHaximus committed Apr 6, 2024
1 parent cb72360 commit 6b78aa5
Show file tree
Hide file tree
Showing 9 changed files with 511 additions and 175 deletions.
1 change: 1 addition & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ jobs:
name: PHPUnit (PHP ${{ matrix.php }})
runs-on: ubuntu-22.04
strategy:
fail-fast: false
matrix:
php:
- 8.3
Expand Down
20 changes: 20 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ multiple concurrent HTTP requests without blocking.
* [Uri](#uri)
* [ResponseException](#responseexception)
* [React\Http\Middleware](#reacthttpmiddleware)
* [InactiveConnectionTimeoutMiddleware](#inactiveconnectiontimeoutmiddleware)
* [StreamingRequestMiddleware](#streamingrequestmiddleware)
* [LimitConcurrentRequestsMiddleware](#limitconcurrentrequestsmiddleware)
* [RequestBodyBufferMiddleware](#requestbodybuffermiddleware)
Expand Down Expand Up @@ -2692,6 +2693,25 @@ access its underlying response object.

### React\Http\Middleware

#### InactiveConnectionTimeoutMiddleware

The `React\Http\Middleware\InactiveConnectionTimeoutMiddleware` is purely a configuration middleware to configure the
`HttpServer` to close any inactive connections between requests to close the connection and not leave them needlessly
open. The default is `60` seconds of inactivity and should only be changed if you know what you are doing.

The following example configures the `HttpServer` to close any inactive connections after one and a half second:

```php
$http = new React\Http\HttpServer(
new React\Http\Middleware\InactiveConnectionTimeoutMiddleware(1.5),
$handler
);
```
> Internally, this class is used as a "value object" to override the default timeout of one minute.
As such it doesn't have any behavior internally, that is all in the internal "StreamingServer".
This timeout is only in effect if we expect data from the client, not when we are writing data to
the client.

#### StreamingRequestMiddleware

The `React\Http\Middleware\StreamingRequestMiddleware` can be used to
Expand Down
10 changes: 7 additions & 3 deletions src/HttpServer.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
use React\Http\Io\IniUtil;
use React\Http\Io\MiddlewareRunner;
use React\Http\Io\StreamingServer;
use React\Http\Middleware\InactiveConnectionTimeoutMiddleware;
use React\Http\Middleware\LimitConcurrentRequestsMiddleware;
use React\Http\Middleware\StreamingRequestMiddleware;
use React\Http\Middleware\RequestBodyBufferMiddleware;
Expand Down Expand Up @@ -219,10 +220,13 @@ public function __construct($requestHandlerOrLoop)
}

$streaming = false;
$idleConnectionTimeout = InactiveConnectionTimeoutMiddleware::DEFAULT_TIMEOUT;
foreach ((array) $requestHandlers as $handler) {
if ($handler instanceof StreamingRequestMiddleware) {
$streaming = true;
break;
}
if ($handler instanceof InactiveConnectionTimeoutMiddleware) {
$idleConnectionTimeout = $handler->getTimeout();
}
}

Expand Down Expand Up @@ -252,10 +256,10 @@ public function __construct($requestHandlerOrLoop)
* doing anything with the request.
*/
$middleware = \array_filter($middleware, function ($handler) {
return !($handler instanceof StreamingRequestMiddleware);
return !($handler instanceof StreamingRequestMiddleware) && !($handler instanceof InactiveConnectionTimeoutMiddleware);
});

$this->streamingServer = new StreamingServer($loop, new MiddlewareRunner($middleware));
$this->streamingServer = new StreamingServer($loop, new MiddlewareRunner($middleware), $idleConnectionTimeout);

$that = $this;
$this->streamingServer->on('error', function ($error) use ($that) {
Expand Down
104 changes: 97 additions & 7 deletions src/Io/StreamingServer.php
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,12 @@ final class StreamingServer extends EventEmitter
/** @var Clock */
private $clock;

/** @var LoopInterface */
private $loop;

/** @var int */
private $idleConnectionTimeout;

/**
* Creates an HTTP server that invokes the given callback for each incoming HTTP request
*
Expand All @@ -95,19 +101,21 @@ final class StreamingServer extends EventEmitter
* connections in order to then parse incoming data as HTTP.
* See also [listen()](#listen) for more details.
*
* @param LoopInterface $loop
* @param callable $requestHandler
* @param int $idleConnectionTimeout
* @see self::listen()
*/
public function __construct(LoopInterface $loop, $requestHandler)
public function __construct(LoopInterface $loop, $requestHandler, $idleConnectionTimeout)
{
if (!\is_callable($requestHandler)) {
throw new \InvalidArgumentException('Invalid request handler given');
}

$this->loop = $loop;
$this->callback = $requestHandler;
$this->clock = new Clock($loop);
$this->parser = new RequestHeaderParser($this->clock);
$this->idleConnectionTimeout = $idleConnectionTimeout;

$that = $this;
$this->parser->on('headers', function (ServerRequestInterface $request, ConnectionInterface $conn) use ($that) {
Expand All @@ -134,7 +142,35 @@ public function __construct(LoopInterface $loop, $requestHandler)
*/
public function listen(ServerInterface $socket)
{
$socket->on('connection', array($this->parser, 'handle'));
$socket->on('connection', array($this, 'handleConnection'));
}

/** @internal */
public function handleConnection(ConnectionInterface $connection)
{
$idleConnectionTimeout = $this->idleConnectionTimeout;
$loop = $this->loop;
$idleConnectionTimeoutHandler = function () use ($connection, &$closeEventHandler, &$dataEventHandler) {
$connection->removeListener('close', $closeEventHandler);
$connection->removeListener('data', $dataEventHandler);

$connection->close();
};
$timer = $this->loop->addTimer($idleConnectionTimeout, $idleConnectionTimeoutHandler);
$closeEventHandler = function () use ($connection, &$closeEventHandler, &$dataEventHandler, $loop, &$timer) {
$connection->removeListener('close', $closeEventHandler);
$connection->removeListener('data', $dataEventHandler);

$loop->cancelTimer($timer);
};
$dataEventHandler = function () use ($loop, $idleConnectionTimeout, $idleConnectionTimeoutHandler, &$timer) {
$loop->cancelTimer($timer);
$timer = $loop->addTimer($idleConnectionTimeout, $idleConnectionTimeoutHandler);
};
$connection->on('close', $closeEventHandler);
$connection->on('data', $dataEventHandler);

$this->parseRequest($connection);
}

/** @internal */
Expand Down Expand Up @@ -372,7 +408,7 @@ public function handleResponse(ConnectionInterface $connection, ServerRequestInt

// either wait for next request over persistent connection or end connection
if ($persist) {
$this->parser->handle($connection);
$this->parseRequest($connection);
} else {
$connection->end();
}
Expand All @@ -393,13 +429,67 @@ public function handleResponse(ConnectionInterface $connection, ServerRequestInt
// write streaming body and then wait for next request over persistent connection
if ($persist) {
$body->pipe($connection, array('end' => false));
$parser = $this->parser;
$body->on('end', function () use ($connection, $parser, $body) {
$that = $this;
$body->on('end', function () use ($connection, $body, &$that) {
$connection->removeListener('close', array($body, 'close'));
$parser->handle($connection);
$that->parseRequest($connection);
});
} else {
$body->pipe($connection);
}
}

/**
* @internal
*/
public function parseRequest(ConnectionInterface $connection)
{
$idleConnectionTimeout = $this->idleConnectionTimeout;
$loop = $this->loop;
$parser = $this->parser;
$idleConnectionTimeoutHandler = function () use ($connection, $parser, &$removeTimerHandler) {
$parser->removeListener('headers', $removeTimerHandler);
$parser->removeListener('error', $removeTimerHandler);

$parser->emit('error', array(
new \RuntimeException('Request timed out', Response::STATUS_REQUEST_TIMEOUT),
$connection
));
};
$timer = $this->loop->addTimer($idleConnectionTimeout, $idleConnectionTimeoutHandler);
$removeTimerHandler = function ($requestOrError, $conn) use ($loop, &$timer, $parser, $connection, &$removeTimerHandler, $idleConnectionTimeout, $idleConnectionTimeoutHandler) {
if ($conn !== $connection) {
return;
}

$loop->cancelTimer($timer);
$parser->removeListener('headers', $removeTimerHandler);
$parser->removeListener('error', $removeTimerHandler);

if (!($requestOrError instanceof ServerRequestInterface)) {
return;
}

$requestBody = $requestOrError->getBody();
if (!($requestBody instanceof HttpBodyStream)) {
return;
}

$timer = $loop->addTimer($idleConnectionTimeout, $idleConnectionTimeoutHandler);
$requestBody->on('data', function () use (&$timer, $loop, $idleConnectionTimeout, $idleConnectionTimeoutHandler) {
$loop->cancelTimer($timer);
$timer = $loop->addTimer($idleConnectionTimeout, $idleConnectionTimeoutHandler);
});
$requestBody->on('end', function () use (&$timer, $loop) {
$loop->cancelTimer($timer);
});
$requestBody->on('close', function () use (&$timer, $loop) {
$loop->cancelTimer($timer);
});
};
$this->parser->on('headers', $removeTimerHandler);
$this->parser->on('error', $removeTimerHandler);

$this->parser->handle($connection);
}
}
62 changes: 62 additions & 0 deletions src/Middleware/InactiveConnectionTimeoutMiddleware.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
<?php

namespace React\Http\Middleware;

use Psr\Http\Message\ResponseInterface;
use Psr\Http\Message\ServerRequestInterface;
use React\Http\Io\HttpBodyStream;
use React\Http\Io\PauseBufferStream;
use React\Promise;
use React\Promise\PromiseInterface;
use React\Promise\Deferred;
use React\Stream\ReadableStreamInterface;

/**
* Closes any inactive connection after the specified amount of seconds since last activity.
*
* This allows you to set an alternative timeout to the default one minute (60 seconds). For example
* thirteen and a half seconds:
*
* ```php
* $http = new React\Http\HttpServer(
* new React\Http\Middleware\InactiveConnectionTimeoutMiddleware(13.5),
* $handler
* );
*
* > Internally, this class is used as a "value object" to override the default timeout of one minute.
* As such it doesn't have any behavior internally, that is all in the internal "StreamingServer".
*/
final class InactiveConnectionTimeoutMiddleware
{
/**
* @internal
*/
const DEFAULT_TIMEOUT = 60;

/**
* @var float
*/
private $timeout;

/**
* @param float $timeout
*/
public function __construct($timeout = self::DEFAULT_TIMEOUT)
{
$this->timeout = $timeout;
}

public function __invoke(ServerRequestInterface $request, $next)
{
return $next($request);
}

/**
* @return float
* @internal
*/
public function getTimeout()
{
return $this->timeout;
}
}
9 changes: 5 additions & 4 deletions tests/FunctionalBrowserTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
use React\Http\HttpServer;
use React\Http\Message\Response;
use React\Http\Message\ResponseException;
use React\Http\Middleware\InactiveConnectionTimeoutMiddleware;
use React\Http\Middleware\StreamingRequestMiddleware;
use React\Promise\Promise;
use React\Promise\Stream;
Expand All @@ -32,7 +33,7 @@ public function setUpBrowserAndServer()
{
$this->browser = new Browser();

$http = new HttpServer(new StreamingRequestMiddleware(), function (ServerRequestInterface $request) {
$http = new HttpServer(new InactiveConnectionTimeoutMiddleware(0.1), new StreamingRequestMiddleware(), function (ServerRequestInterface $request) {
$path = $request->getUri()->getPath();

$headers = array();
Expand Down Expand Up @@ -687,7 +688,7 @@ public function testPostStreamKnownLength()
*/
public function testPostStreamWillStartSendingRequestEvenWhenBodyDoesNotEmitData()
{
$http = new HttpServer(new StreamingRequestMiddleware(), function (ServerRequestInterface $request) {
$http = new HttpServer(new InactiveConnectionTimeoutMiddleware(0.1), new StreamingRequestMiddleware(), function (ServerRequestInterface $request) {
return new Response(200);
});
$socket = new SocketServer('127.0.0.1:0');
Expand All @@ -714,7 +715,7 @@ public function testPostStreamClosed()

public function testSendsHttp11ByDefault()
{
$http = new HttpServer(function (ServerRequestInterface $request) {
$http = new HttpServer(new InactiveConnectionTimeoutMiddleware(0.1), function (ServerRequestInterface $request) {
return new Response(
200,
array(),
Expand All @@ -734,7 +735,7 @@ public function testSendsHttp11ByDefault()

public function testSendsExplicitHttp10Request()
{
$http = new HttpServer(function (ServerRequestInterface $request) {
$http = new HttpServer(new InactiveConnectionTimeoutMiddleware(0.1), function (ServerRequestInterface $request) {
return new Response(
200,
array(),
Expand Down
Loading

0 comments on commit 6b78aa5

Please sign in to comment.