From 7b3f5b6308d9d1a0c448556d7574434be1d5daa5 Mon Sep 17 00:00:00 2001 From: Cees-Jan Kiewiet Date: Wed, 1 Sep 2021 19:34:56 +0200 Subject: [PATCH] Close inactive requests This builds on top of #405 and further builds out #423 by also close connections with inactive requests. --- src/Io/RequestHeaderParser.php | 39 +++++++++- src/Io/StreamingServer.php | 16 +---- tests/Io/RequestHeaderParserTest.php | 102 ++++++++++++++++++--------- tests/Io/StreamingServerTest.php | 20 +++--- 4 files changed, 117 insertions(+), 60 deletions(-) diff --git a/src/Io/RequestHeaderParser.php b/src/Io/RequestHeaderParser.php index 743c006c..ce56db5c 100644 --- a/src/Io/RequestHeaderParser.php +++ b/src/Io/RequestHeaderParser.php @@ -4,6 +4,7 @@ use Evenement\EventEmitter; use Psr\Http\Message\ServerRequestInterface; +use React\EventLoop\LoopInterface; use React\Http\Message\Response; use React\Http\Message\ServerRequest; use React\Socket\ConnectionInterface; @@ -24,12 +25,45 @@ class RequestHeaderParser extends EventEmitter { private $maxSize = 8192; + /** + * @var LoopInterface + */ + private $loop; + + /** + * @var float + */ + private $idleConnectionTimeout; + + /** + * @param LoopInterface $loop + * @param float $idleConnectionTimeout + */ + public function __construct(LoopInterface $loop, $idleConnectionTimeout) + { + $this->loop = $loop; + $this->idleConnectionTimeout = $idleConnectionTimeout; + } + public function handle(ConnectionInterface $conn) { + $loop = $this->loop; + $idleConnectionTimeout = $this->idleConnectionTimeout; + $createTimer = function () use ($conn, $loop, $idleConnectionTimeout) { + return $loop->addTimer($idleConnectionTimeout, function () use ($conn) { + $conn->close(); + }); + }; + $timer = $createTimer(); + $conn->on('close', function () use ($loop, $timer) { + $loop->cancelTimer($timer); + }); $buffer = ''; $maxSize = $this->maxSize; $that = $this; - $conn->on('data', $fn = function ($data) use (&$buffer, &$fn, $conn, $maxSize, $that) { + $conn->on('data', $fn = function ($data) use (&$buffer, &$fn, $conn, $maxSize, $that, $loop, &$timer, $createTimer) { + $loop->cancelTimer($timer); + $timer = $createTimer(); // append chunk of data to buffer and look for end of request headers $buffer .= $data; $endOfHeader = \strpos($buffer, "\r\n\r\n"); @@ -43,6 +77,7 @@ public function handle(ConnectionInterface $conn) new \OverflowException("Maximum header size of {$maxSize} exceeded.", Response::STATUS_REQUEST_HEADER_FIELDS_TOO_LARGE), $conn )); + $loop->cancelTimer($timer); return; } @@ -67,6 +102,7 @@ public function handle(ConnectionInterface $conn) $exception, $conn )); + $loop->cancelTimer($timer); return; } @@ -105,6 +141,7 @@ public function handle(ConnectionInterface $conn) if ($contentLength === 0) { $stream->emit('end'); $stream->close(); + $loop->cancelTimer($timer); } }); } diff --git a/src/Io/StreamingServer.php b/src/Io/StreamingServer.php index b36676b4..439f65bf 100644 --- a/src/Io/StreamingServer.php +++ b/src/Io/StreamingServer.php @@ -111,7 +111,7 @@ public function __construct(LoopInterface $loop, $requestHandler, $idleConnectTi $this->idleConnectionTimeout = $idleConnectTimeout; $this->callback = $requestHandler; - $this->parser = new RequestHeaderParser(); + $this->parser = new RequestHeaderParser($this->loop, $this->idleConnectionTimeout); $that = $this; $this->parser->on('headers', function (ServerRequestInterface $request, ConnectionInterface $conn) use ($that) { @@ -144,20 +144,6 @@ public function listen(ServerInterface $socket) /** @internal */ public function handle(ConnectionInterface $conn) { - $timer = $this->loop->addTimer($this->idleConnectionTimeout, function () use ($conn) { - $conn->close(); - }); - $loop = $this->loop; - $conn->once('data', function () use ($loop, $timer) { - $loop->cancelTimer($timer); - }); - $conn->on('end', function () use ($loop, $timer) { - $loop->cancelTimer($timer); - }); - $conn->on('close', function () use ($loop, $timer) { - $loop->cancelTimer($timer); - }); - $this->parser->handle($conn); } diff --git a/tests/Io/RequestHeaderParserTest.php b/tests/Io/RequestHeaderParserTest.php index 356443fb..64134338 100644 --- a/tests/Io/RequestHeaderParserTest.php +++ b/tests/Io/RequestHeaderParserTest.php @@ -2,7 +2,9 @@ namespace React\Tests\Http\Io; +use React\EventLoop\Factory; use React\Http\Io\RequestHeaderParser; +use React\Http\Io\StreamingServer; use React\Tests\Http\TestCase; use Psr\Http\Message\ServerRequestInterface; @@ -10,7 +12,7 @@ class RequestHeaderParserTest extends TestCase { public function testSplitShouldHappenOnDoubleCrlf() { - $parser = new RequestHeaderParser(); + $parser = new RequestHeaderParser(Factory::create(), 60); $parser->on('headers', $this->expectCallableNever()); $connection = $this->getMockBuilder('React\Socket\Connection')->disableOriginalConstructor()->setMethods(null)->getMock(); @@ -29,7 +31,7 @@ public function testSplitShouldHappenOnDoubleCrlf() public function testFeedInOneGo() { - $parser = new RequestHeaderParser(); + $parser = new RequestHeaderParser(Factory::create(), 60); $parser->on('headers', $this->expectCallableOnce()); $connection = $this->getMockBuilder('React\Socket\Connection')->disableOriginalConstructor()->setMethods(null)->getMock(); @@ -41,7 +43,7 @@ public function testFeedInOneGo() public function testFeedTwoRequestsOnSeparateConnections() { - $parser = new RequestHeaderParser(); + $parser = new RequestHeaderParser(Factory::create(), 60); $called = 0; $parser->on('headers', function () use (&$called) { @@ -65,7 +67,7 @@ public function testHeadersEventShouldEmitRequestAndConnection() $request = null; $conn = null; - $parser = new RequestHeaderParser(); + $parser = new RequestHeaderParser(Factory::create(), 60); $parser->on('headers', function ($parsedRequest, $connection) use (&$request, &$conn) { $request = $parsedRequest; $conn = $connection; @@ -88,7 +90,7 @@ public function testHeadersEventShouldEmitRequestAndConnection() public function testHeadersEventShouldEmitRequestWhichShouldEmitEndForStreamingBodyWithoutContentLengthFromInitialRequestBody() { - $parser = new RequestHeaderParser(); + $parser = new RequestHeaderParser(Factory::create(), 60); $ended = false; $that = $this; @@ -112,7 +114,7 @@ public function testHeadersEventShouldEmitRequestWhichShouldEmitEndForStreamingB public function testHeadersEventShouldEmitRequestWhichShouldEmitStreamingBodyDataFromInitialRequestBody() { - $parser = new RequestHeaderParser(); + $parser = new RequestHeaderParser(Factory::create(), 60); $buffer = ''; $that = $this; @@ -140,7 +142,7 @@ public function testHeadersEventShouldEmitRequestWhichShouldEmitStreamingBodyDat public function testHeadersEventShouldEmitRequestWhichShouldEmitStreamingBodyWithPlentyOfDataFromInitialRequestBody() { - $parser = new RequestHeaderParser(); + $parser = new RequestHeaderParser(Factory::create(), 60); $buffer = ''; $that = $this; @@ -166,7 +168,7 @@ public function testHeadersEventShouldEmitRequestWhichShouldEmitStreamingBodyWit public function testHeadersEventShouldEmitRequestWhichShouldNotEmitStreamingBodyDataWithoutContentLengthFromInitialRequestBody() { - $parser = new RequestHeaderParser(); + $parser = new RequestHeaderParser(Factory::create(), 60); $buffer = ''; $that = $this; @@ -191,7 +193,7 @@ public function testHeadersEventShouldEmitRequestWhichShouldNotEmitStreamingBody public function testHeadersEventShouldEmitRequestWhichShouldEmitStreamingBodyDataUntilContentLengthBoundaryFromInitialRequestBody() { - $parser = new RequestHeaderParser(); + $parser = new RequestHeaderParser(Factory::create(), 60); $buffer = ''; $that = $this; @@ -218,7 +220,7 @@ public function testHeadersEventShouldParsePathAndQueryString() { $request = null; - $parser = new RequestHeaderParser(); + $parser = new RequestHeaderParser(Factory::create(), 60); $parser->on('headers', function ($parsedRequest) use (&$request) { $request = $parsedRequest; }); @@ -245,7 +247,7 @@ public function testHeaderEventWithShouldApplyDefaultAddressFromLocalConnectionA { $request = null; - $parser = new RequestHeaderParser(); + $parser = new RequestHeaderParser(Factory::create(), 60); $parser->on('headers', function ($parsedRequest) use (&$request) { $request = $parsedRequest; }); @@ -264,7 +266,7 @@ public function testHeaderEventViaHttpsShouldApplyHttpsSchemeFromLocalTlsConnect { $request = null; - $parser = new RequestHeaderParser(); + $parser = new RequestHeaderParser(Factory::create(), 60); $parser->on('headers', function ($parsedRequest) use (&$request) { $request = $parsedRequest; }); @@ -284,7 +286,7 @@ public function testHeaderOverflowShouldEmitError() $error = null; $passedConnection = null; - $parser = new RequestHeaderParser(); + $parser = new RequestHeaderParser(Factory::create(), 60); $parser->on('headers', $this->expectCallableNever()); $parser->on('error', function ($message, $connection) use (&$error, &$passedConnection) { $error = $message; @@ -306,7 +308,7 @@ public function testInvalidEmptyRequestHeadersParseException() { $error = null; - $parser = new RequestHeaderParser(); + $parser = new RequestHeaderParser(Factory::create(), 60); $parser->on('headers', $this->expectCallableNever()); $parser->on('error', function ($message) use (&$error) { $error = $message; @@ -325,7 +327,7 @@ public function testInvalidMalformedRequestLineParseException() { $error = null; - $parser = new RequestHeaderParser(); + $parser = new RequestHeaderParser(Factory::create(), 60); $parser->on('headers', $this->expectCallableNever()); $parser->on('error', function ($message) use (&$error) { $error = $message; @@ -344,7 +346,7 @@ public function testInvalidMalformedRequestHeadersThrowsParseException() { $error = null; - $parser = new RequestHeaderParser(); + $parser = new RequestHeaderParser(Factory::create(), 60); $parser->on('headers', $this->expectCallableNever()); $parser->on('error', function ($message) use (&$error) { $error = $message; @@ -363,7 +365,7 @@ public function testInvalidMalformedRequestHeadersWhitespaceThrowsParseException { $error = null; - $parser = new RequestHeaderParser(); + $parser = new RequestHeaderParser(Factory::create(), 60); $parser->on('headers', $this->expectCallableNever()); $parser->on('error', function ($message) use (&$error) { $error = $message; @@ -382,7 +384,7 @@ public function testInvalidAbsoluteFormSchemeEmitsError() { $error = null; - $parser = new RequestHeaderParser(); + $parser = new RequestHeaderParser(Factory::create(), 60); $parser->on('headers', $this->expectCallableNever()); $parser->on('error', function ($message) use (&$error) { $error = $message; @@ -401,7 +403,7 @@ public function testOriginFormWithSchemeSeparatorInParam() { $request = null; - $parser = new RequestHeaderParser(); + $parser = new RequestHeaderParser(Factory::create(), 60); $parser->on('error', $this->expectCallableNever()); $parser->on('headers', function ($parsedRequest, $parsedBodyBuffer) use (&$request) { $request = $parsedRequest; @@ -426,7 +428,7 @@ public function testUriStartingWithColonSlashSlashFails() { $error = null; - $parser = new RequestHeaderParser(); + $parser = new RequestHeaderParser(Factory::create(), 60); $parser->on('headers', $this->expectCallableNever()); $parser->on('error', function ($message) use (&$error) { $error = $message; @@ -445,7 +447,7 @@ public function testInvalidAbsoluteFormWithFragmentEmitsError() { $error = null; - $parser = new RequestHeaderParser(); + $parser = new RequestHeaderParser(Factory::create(), 60); $parser->on('headers', $this->expectCallableNever()); $parser->on('error', function ($message) use (&$error) { $error = $message; @@ -464,7 +466,7 @@ public function testInvalidHeaderContainsFullUri() { $error = null; - $parser = new RequestHeaderParser(); + $parser = new RequestHeaderParser(Factory::create(), 60); $parser->on('headers', $this->expectCallableNever()); $parser->on('error', function ($message) use (&$error) { $error = $message; @@ -483,7 +485,7 @@ public function testInvalidAbsoluteFormWithHostHeaderEmpty() { $error = null; - $parser = new RequestHeaderParser(); + $parser = new RequestHeaderParser(Factory::create(), 60); $parser->on('headers', $this->expectCallableNever()); $parser->on('error', function ($message) use (&$error) { $error = $message; @@ -502,7 +504,7 @@ public function testInvalidConnectRequestWithNonAuthorityForm() { $error = null; - $parser = new RequestHeaderParser(); + $parser = new RequestHeaderParser(Factory::create(), 60); $parser->on('headers', $this->expectCallableNever()); $parser->on('error', function ($message) use (&$error) { $error = $message; @@ -521,7 +523,7 @@ public function testInvalidHttpVersion() { $error = null; - $parser = new RequestHeaderParser(); + $parser = new RequestHeaderParser(Factory::create(), 60); $parser->on('headers', $this->expectCallableNever()); $parser->on('error', function ($message) use (&$error) { $error = $message; @@ -541,7 +543,7 @@ public function testInvalidContentLengthRequestHeaderWillEmitError() { $error = null; - $parser = new RequestHeaderParser(); + $parser = new RequestHeaderParser(Factory::create(), 60); $parser->on('headers', $this->expectCallableNever()); $parser->on('error', function ($message) use (&$error) { $error = $message; @@ -561,7 +563,7 @@ public function testInvalidRequestWithMultipleContentLengthRequestHeadersWillEmi { $error = null; - $parser = new RequestHeaderParser(); + $parser = new RequestHeaderParser(Factory::create(), 60); $parser->on('headers', $this->expectCallableNever()); $parser->on('error', function ($message) use (&$error) { $error = $message; @@ -581,7 +583,7 @@ public function testInvalidTransferEncodingRequestHeaderWillEmitError() { $error = null; - $parser = new RequestHeaderParser(); + $parser = new RequestHeaderParser(Factory::create(), 60); $parser->on('headers', $this->expectCallableNever()); $parser->on('error', function ($message) use (&$error) { $error = $message; @@ -601,7 +603,7 @@ public function testInvalidRequestWithBothTransferEncodingAndContentLengthWillEm { $error = null; - $parser = new RequestHeaderParser(); + $parser = new RequestHeaderParser(Factory::create(), 60); $parser->on('headers', $this->expectCallableNever()); $parser->on('error', function ($message) use (&$error) { $error = $message; @@ -621,7 +623,7 @@ public function testServerParamsWillBeSetOnHttpsRequest() { $request = null; - $parser = new RequestHeaderParser(); + $parser = new RequestHeaderParser(Factory::create(), 60); $parser->on('headers', function ($parsedRequest) use (&$request) { $request = $parsedRequest; @@ -651,7 +653,7 @@ public function testServerParamsWillBeSetOnHttpRequest() { $request = null; - $parser = new RequestHeaderParser(); + $parser = new RequestHeaderParser(Factory::create(), 60); $parser->on('headers', function ($parsedRequest) use (&$request) { $request = $parsedRequest; @@ -681,7 +683,7 @@ public function testServerParamsWillNotSetRemoteAddressForUnixDomainSockets() { $request = null; - $parser = new RequestHeaderParser(); + $parser = new RequestHeaderParser(Factory::create(), 60); $parser->on('headers', function ($parsedRequest) use (&$request) { $request = $parsedRequest; @@ -715,7 +717,7 @@ public function testServerParamsWontBeSetOnMissingUrls() $request = null; - $parser = new RequestHeaderParser(); + $parser = new RequestHeaderParser(Factory::create(), 60); $parser->on('headers', function ($parsedRequest) use (&$request) { $request = $parsedRequest; @@ -742,7 +744,7 @@ public function testQueryParmetersWillBeSet() { $request = null; - $parser = new RequestHeaderParser(); + $parser = new RequestHeaderParser(Factory::create(), 60); $parser->on('headers', function ($parsedRequest) use (&$request) { $request = $parsedRequest; @@ -759,6 +761,38 @@ public function testQueryParmetersWillBeSet() $this->assertEquals('this', $queryParams['test']); } + public function testIdleConnectionWillBeClosedAfterConfiguredTimeout() + { + $connection = $this->getMockBuilder('React\Socket\Connection')->disableOriginalConstructor()->setMethods(array('close'))->getMock(); + $connection->expects($this->once())->method('close'); + + $loop = Factory::create(); + + $parser = new RequestHeaderParser($loop, 0.1); + + $parser->handle($connection); + + $connection->emit('data', array("GET /foo.php?hello=world&test=this HTTP/")); + + $loop->run(); + } + + public function testIdleConnectionWillNotBeClosedAfterConfiguredTimeoutOnFullRequest() + { + $connection = $this->getMockBuilder('React\Socket\Connection')->disableOriginalConstructor()->setMethods(array('close'))->getMock(); + $connection->expects($this->never())->method('close'); + + $loop = Factory::create(); + + $parser = new RequestHeaderParser($loop, 0.1); + + $parser->handle($connection); + + $connection->emit('data', array($this->createGetRequest())); + + $loop->run(); + } + private function createGetRequest() { $data = "GET / HTTP/1.1\r\n"; diff --git a/tests/Io/StreamingServerTest.php b/tests/Io/StreamingServerTest.php index 92ac8261..c44b196e 100644 --- a/tests/Io/StreamingServerTest.php +++ b/tests/Io/StreamingServerTest.php @@ -3026,7 +3026,7 @@ public function testNewConnectionWillInvokeParserOnce() { $server = new StreamingServer(Factory::create(), $this->expectCallableNever()); - $parser = $this->getMockBuilder('React\Http\Io\RequestHeaderParser')->getMock(); + $parser = $this->getMockBuilder('React\Http\Io\RequestHeaderParser')->disableOriginalConstructor()->getMock(); $parser->expects($this->once())->method('handle'); $ref = new \ReflectionProperty($server, 'parser'); @@ -3043,7 +3043,7 @@ public function testNewConnectionWillInvokeParserOnceAndInvokeRequestHandlerWhen $server = new StreamingServer(Factory::create(), $this->expectCallableOnceWith($request)); - $parser = $this->getMockBuilder('React\Http\Io\RequestHeaderParser')->getMock(); + $parser = $this->getMockBuilder('React\Http\Io\RequestHeaderParser')->disableOriginalConstructor()->getMock(); $parser->expects($this->once())->method('handle'); $ref = new \ReflectionProperty($server, 'parser'); @@ -3066,7 +3066,7 @@ public function testNewConnectionWillInvokeParserOnceAndInvokeRequestHandlerWhen $server = new StreamingServer(Factory::create(), $this->expectCallableOnceWith($request)); - $parser = $this->getMockBuilder('React\Http\Io\RequestHeaderParser')->getMock(); + $parser = $this->getMockBuilder('React\Http\Io\RequestHeaderParser')->disableOriginalConstructor()->getMock(); $parser->expects($this->once())->method('handle'); $ref = new \ReflectionProperty($server, 'parser'); @@ -3091,7 +3091,7 @@ public function testNewConnectionWillInvokeParserOnceAndInvokeRequestHandlerWhen return new Response(200, array('Connection' => 'close')); }); - $parser = $this->getMockBuilder('React\Http\Io\RequestHeaderParser')->getMock(); + $parser = $this->getMockBuilder('React\Http\Io\RequestHeaderParser')->disableOriginalConstructor()->getMock(); $parser->expects($this->once())->method('handle'); $ref = new \ReflectionProperty($server, 'parser'); @@ -3116,7 +3116,7 @@ public function testNewConnectionWillInvokeParserTwiceAfterInvokingRequestHandle return new Response(); }); - $parser = $this->getMockBuilder('React\Http\Io\RequestHeaderParser')->getMock(); + $parser = $this->getMockBuilder('React\Http\Io\RequestHeaderParser')->disableOriginalConstructor()->getMock(); $parser->expects($this->exactly(2))->method('handle'); $ref = new \ReflectionProperty($server, 'parser'); @@ -3141,7 +3141,7 @@ public function testNewConnectionWillInvokeParserTwiceAfterInvokingRequestHandle return new Response(); }); - $parser = $this->getMockBuilder('React\Http\Io\RequestHeaderParser')->getMock(); + $parser = $this->getMockBuilder('React\Http\Io\RequestHeaderParser')->disableOriginalConstructor()->getMock(); $parser->expects($this->exactly(2))->method('handle'); $ref = new \ReflectionProperty($server, 'parser'); @@ -3167,7 +3167,7 @@ public function testNewConnectionWillInvokeParserOnceAfterInvokingRequestHandler return new Response(200, array(), $body); }); - $parser = $this->getMockBuilder('React\Http\Io\RequestHeaderParser')->getMock(); + $parser = $this->getMockBuilder('React\Http\Io\RequestHeaderParser')->disableOriginalConstructor()->getMock(); $parser->expects($this->once())->method('handle'); $ref = new \ReflectionProperty($server, 'parser'); @@ -3193,7 +3193,7 @@ public function testNewConnectionWillInvokeParserTwiceAfterInvokingRequestHandle return new Response(200, array(), $body); }); - $parser = $this->getMockBuilder('React\Http\Io\RequestHeaderParser')->getMock(); + $parser = $this->getMockBuilder('React\Http\Io\RequestHeaderParser')->disableOriginalConstructor()->getMock(); $parser->expects($this->exactly(2))->method('handle'); $ref = new \ReflectionProperty($server, 'parser'); @@ -3209,9 +3209,9 @@ public function testNewConnectionWillInvokeParserTwiceAfterInvokingRequestHandle // pretend parser just finished parsing $server->handleRequest($this->connection, $request); - $this->assertCount(3, $this->connection->listeners('close')); + $this->assertCount(2, $this->connection->listeners('close')); $body->end(); - $this->assertCount(3, $this->connection->listeners('close')); + $this->assertCount(1, $this->connection->listeners('close')); } private function createGetRequest()