Skip to content

Commit

Permalink
Close inactive requests
Browse files Browse the repository at this point in the history
This builds on top of #405 and further builds out #423 by also
close connections with inactive requests.
  • Loading branch information
WyriHaximus committed Nov 10, 2021
1 parent 997d5b6 commit 0ddb503
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 65 deletions.
47 changes: 45 additions & 2 deletions src/Io/RequestHeaderParser.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

use Evenement\EventEmitter;
use Psr\Http\Message\ServerRequestInterface;
use React\EventLoop\LoopInterface;
use React\Http\Message\Response;
use React\Http\Message\ServerRequest;
use React\Socket\ConnectionInterface;
Expand All @@ -24,12 +25,51 @@ class RequestHeaderParser extends EventEmitter
{
private $maxSize = 8192;

public function handle(ConnectionInterface $conn)
/**
* @var LoopInterface
*/
private $loop;

/**
* @var float
*/
private $idleConnectionTimeout;

/**
* @param LoopInterface $loop
* @param float $idleConnectionTimeout
*/
public function __construct(LoopInterface $loop, $idleConnectionTimeout)
{
$this->loop = $loop;
$this->idleConnectionTimeout = $idleConnectionTimeout;
}

/**
* @param bool $firstRequest
*/
public function handle(ConnectionInterface $conn, $firstRequest = true)
{
$loop = $this->loop;
$idleConnectionTimeout = $this->idleConnectionTimeout;
$createTimer = function () use ($conn, $loop, $idleConnectionTimeout, $firstRequest) {
return $loop->addTimer($idleConnectionTimeout, function () use ($conn, $firstRequest) {
if ($firstRequest) {
$conn->write("HTTP/1.0 " . Response::STATUS_REQUEST_TIMEOUT . " Request Timed Out\r\n");
}
$conn->close();
});
};
$timer = $createTimer();
$conn->on('close', function () use ($loop, $timer) {
$loop->cancelTimer($timer);
});
$buffer = '';
$maxSize = $this->maxSize;
$that = $this;
$conn->on('data', $fn = function ($data) use (&$buffer, &$fn, $conn, $maxSize, $that) {
$conn->on('data', $fn = function ($data) use (&$buffer, &$fn, $conn, $maxSize, $that, $loop, &$timer, $createTimer) {
$loop->cancelTimer($timer);
$timer = $createTimer();
// append chunk of data to buffer and look for end of request headers
$buffer .= $data;
$endOfHeader = \strpos($buffer, "\r\n\r\n");
Expand All @@ -43,6 +83,7 @@ public function handle(ConnectionInterface $conn)
new \OverflowException("Maximum header size of {$maxSize} exceeded.", Response::STATUS_REQUEST_HEADER_FIELDS_TOO_LARGE),
$conn
));
$loop->cancelTimer($timer);
return;
}

Expand All @@ -67,6 +108,7 @@ public function handle(ConnectionInterface $conn)
$exception,
$conn
));
$loop->cancelTimer($timer);
return;
}

Expand Down Expand Up @@ -105,6 +147,7 @@ public function handle(ConnectionInterface $conn)
if ($contentLength === 0) {
$stream->emit('end');
$stream->close();
$loop->cancelTimer($timer);
}
});
}
Expand Down
24 changes: 5 additions & 19 deletions src/Io/StreamingServer.php
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public function __construct(LoopInterface $loop, $requestHandler, $idleConnectTi
$this->idleConnectionTimeout = $idleConnectTimeout;

$this->callback = $requestHandler;
$this->parser = new RequestHeaderParser();
$this->parser = new RequestHeaderParser($this->loop, $this->idleConnectionTimeout);

$that = $this;
$this->parser->on('headers', function (ServerRequestInterface $request, ConnectionInterface $conn) use ($that) {
Expand Down Expand Up @@ -142,23 +142,9 @@ public function listen(ServerInterface $socket)
}

/** @internal */
public function handle(ConnectionInterface $conn)
public function handle(ConnectionInterface $conn, $firstRequest = true)
{
$timer = $this->loop->addTimer($this->idleConnectionTimeout, function () use ($conn) {
$conn->close();
});
$loop = $this->loop;
$conn->once('data', function () use ($loop, $timer) {
$loop->cancelTimer($timer);
});
$conn->on('end', function () use ($loop, $timer) {
$loop->cancelTimer($timer);
});
$conn->on('close', function () use ($loop, $timer) {
$loop->cancelTimer($timer);
});

$this->parser->handle($conn);
$this->parser->handle($conn, $firstRequest);
}

/** @internal */
Expand Down Expand Up @@ -374,7 +360,7 @@ public function handleResponse(ConnectionInterface $connection, ServerRequestInt

// either wait for next request over persistent connection or end connection
if ($persist) {
$this->handle($connection);
$this->handle($connection, false);
} else {
$connection->end();
}
Expand All @@ -398,7 +384,7 @@ public function handleResponse(ConnectionInterface $connection, ServerRequestInt
$that = $this;
$body->on('end', function () use ($connection, $that, $body) {
$connection->removeListener('close', array($body, 'close'));
$that->handle($connection);
$that->handle($connection, false);
});
} else {
$body->pipe($connection);
Expand Down
Loading

0 comments on commit 0ddb503

Please sign in to comment.