From 4f47478c6fb293949ef3ea1659f6952d262f0fd2 Mon Sep 17 00:00:00 2001 From: Cees-Jan Kiewiet Date: Wed, 1 Sep 2021 19:34:56 +0200 Subject: [PATCH] Close inactive requests This builds on top of #405 and further builds out #423 by also close connections with inactive requests. --- src/HttpServer.php | 5 +- src/Io/RequestHeaderParser.php | 35 +- src/Io/StreamingServer.php | 46 +- .../InactiveConnectionTimeoutMiddleware.php | 3 + tests/HttpServerTest.php | 31 +- tests/Io/RequestHeaderParserTest.php | 99 ++-- tests/Io/StreamingServerTest.php | 451 +++++++++--------- 7 files changed, 363 insertions(+), 307 deletions(-) diff --git a/src/HttpServer.php b/src/HttpServer.php index 004a1e06..22945309 100644 --- a/src/HttpServer.php +++ b/src/HttpServer.php @@ -5,8 +5,10 @@ use Evenement\EventEmitter; use React\EventLoop\Loop; use React\EventLoop\LoopInterface; +use React\Http\Io\Clock; use React\Http\Io\IniUtil; use React\Http\Io\MiddlewareRunner; +use React\Http\Io\RequestHeaderParser; use React\Http\Io\StreamingServer; use React\Http\Middleware\InactiveConnectionTimeoutMiddleware; use React\Http\Middleware\LimitConcurrentRequestsMiddleware; @@ -259,7 +261,8 @@ public function __construct($requestHandlerOrLoop) return !($handler instanceof StreamingRequestMiddleware) && !($handler instanceof InactiveConnectionTimeoutMiddleware); }); - $this->streamingServer = new StreamingServer($loop, new MiddlewareRunner($middleware), $idleConnectTimeout); + $clock = new Clock($loop); + $this->streamingServer = new StreamingServer(new MiddlewareRunner($middleware), new RequestHeaderParser($loop, $clock, $idleConnectTimeout), $clock); $that = $this; $this->streamingServer->on('error', function ($error) use ($that) { diff --git a/src/Io/RequestHeaderParser.php b/src/Io/RequestHeaderParser.php index 6930afaf..d7108694 100644 --- a/src/Io/RequestHeaderParser.php +++ b/src/Io/RequestHeaderParser.php @@ -4,6 +4,7 @@ use Evenement\EventEmitter; use Psr\Http\Message\ServerRequestInterface; +use React\EventLoop\LoopInterface; use React\Http\Message\Response; use React\Http\Message\ServerRequest; use React\Socket\ConnectionInterface; @@ -24,20 +25,48 @@ class RequestHeaderParser extends EventEmitter { private $maxSize = 8192; + /** + * @var LoopInterface + */ + private $loop; + /** @var Clock */ private $clock; - public function __construct(Clock $clock) + /** + * @var float + */ + private $idleConnectionTimeout; + + /** + * @param LoopInterface $loop + * @param float $idleConnectionTimeout + */ + public function __construct(LoopInterface $loop, Clock $clock, $idleConnectionTimeout) { + $this->loop = $loop; $this->clock = $clock; + $this->idleConnectionTimeout = $idleConnectionTimeout; } public function handle(ConnectionInterface $conn) { + $loop = $this->loop; + $idleConnectionTimeout = $this->idleConnectionTimeout; + $timer = $loop->addTimer($idleConnectionTimeout, function () use ($conn) { + $conn->close(); + }); + $conn->on('close', function () use ($loop, &$timer) { + $loop->cancelTimer($timer); + }); $buffer = ''; $maxSize = $this->maxSize; $that = $this; - $conn->on('data', $fn = function ($data) use (&$buffer, &$fn, $conn, $maxSize, $that) { + $conn->on('data', $fn = function ($data) use (&$buffer, &$fn, $conn, $maxSize, $that, $loop, &$timer, $idleConnectionTimeout) { + $loop->cancelTimer($timer); + $timer = $loop->addTimer($idleConnectionTimeout, function () use ($conn) { + $conn->close(); + }); // append chunk of data to buffer and look for end of request headers $buffer .= $data; $endOfHeader = \strpos($buffer, "\r\n\r\n"); @@ -51,6 +80,7 @@ public function handle(ConnectionInterface $conn) new \OverflowException("Maximum header size of {$maxSize} exceeded.", Response::STATUS_REQUEST_HEADER_FIELDS_TOO_LARGE), $conn )); + $loop->cancelTimer($timer); return; } @@ -60,6 +90,7 @@ public function handle(ConnectionInterface $conn) } // request headers received => try to parse request + $loop->cancelTimer($timer); $conn->removeListener('data', $fn); $fn = null; diff --git a/src/Io/StreamingServer.php b/src/Io/StreamingServer.php index 63df7405..78296d54 100644 --- a/src/Io/StreamingServer.php +++ b/src/Io/StreamingServer.php @@ -5,10 +5,8 @@ use Evenement\EventEmitter; use Psr\Http\Message\ResponseInterface; use Psr\Http\Message\ServerRequestInterface; -use React\EventLoop\LoopInterface; use React\Http\Message\Response; use React\Http\Message\ServerRequest; -use React\Http\Middleware\InactiveConnectionTimeoutMiddleware; use React\Promise; use React\Promise\CancellablePromiseInterface; use React\Promise\PromiseInterface; @@ -30,7 +28,7 @@ * object in return: * * ```php - * $server = new StreamingServer($loop, function (ServerRequestInterface $request) { + * $server = new StreamingServer(function (ServerRequestInterface $request) { * return new Response( * Response::STATUS_OK, * array( @@ -55,7 +53,7 @@ * in order to start a plaintext HTTP server like this: * * ```php - * $server = new StreamingServer($loop, $handler); + * $server = new StreamingServer($handler); * * $socket = new React\Socket\SocketServer('0.0.0.0:8080', array(), $loop); * $server->listen($socket); @@ -99,23 +97,19 @@ final class StreamingServer extends EventEmitter * connections in order to then parse incoming data as HTTP. * See also [listen()](#listen) for more details. * - * @param LoopInterface $loop * @param callable $requestHandler * @param float $idleConnectTimeout * @see self::listen() */ - public function __construct(LoopInterface $loop, $requestHandler, $idleConnectTimeout = InactiveConnectionTimeoutMiddleware::DEFAULT_TIMEOUT) + public function __construct($requestHandler, RequestHeaderParser $parser, Clock $clock) { if (!\is_callable($requestHandler)) { throw new \InvalidArgumentException('Invalid request handler given'); } - $this->loop = $loop; - $this->idleConnectionTimeout = $idleConnectTimeout; - $this->callback = $requestHandler; - $this->clock = new Clock($loop); - $this->parser = new RequestHeaderParser($this->clock); + $this->clock = $clock; + $this->parser = $parser; $that = $this; $this->parser->on('headers', function (ServerRequestInterface $request, ConnectionInterface $conn) use ($that) { @@ -142,27 +136,7 @@ public function __construct(LoopInterface $loop, $requestHandler, $idleConnectTi */ public function listen(ServerInterface $socket) { - $socket->on('connection', array($this, 'handle')); - } - - /** @internal */ - public function handle(ConnectionInterface $conn) - { - $timer = $this->loop->addTimer($this->idleConnectionTimeout, function () use ($conn) { - $conn->close(); - }); - $loop = $this->loop; - $conn->once('data', function () use ($loop, $timer) { - $loop->cancelTimer($timer); - }); - $conn->on('end', function () use ($loop, $timer) { - $loop->cancelTimer($timer); - }); - $conn->on('close', function () use ($loop, $timer) { - $loop->cancelTimer($timer); - }); - - $this->parser->handle($conn); + $socket->on('connection', array($this->parser, 'handle')); } /** @internal */ @@ -380,7 +354,7 @@ public function handleResponse(ConnectionInterface $connection, ServerRequestInt // either wait for next request over persistent connection or end connection if ($persist) { - $this->handle($connection); + $this->parser->handle($connection); } else { $connection->end(); } @@ -401,10 +375,10 @@ public function handleResponse(ConnectionInterface $connection, ServerRequestInt // write streaming body and then wait for next request over persistent connection if ($persist) { $body->pipe($connection, array('end' => false)); - $that = $this; - $body->on('end', function () use ($connection, $that, $body) { + $parser = $this->parser; + $body->on('end', function () use ($connection, $parser, $body) { $connection->removeListener('close', array($body, 'close')); - $that->handle($connection); + $parser->handle($connection); }); } else { $body->pipe($connection); diff --git a/src/Middleware/InactiveConnectionTimeoutMiddleware.php b/src/Middleware/InactiveConnectionTimeoutMiddleware.php index 7ba0200a..5ebec7c2 100644 --- a/src/Middleware/InactiveConnectionTimeoutMiddleware.php +++ b/src/Middleware/InactiveConnectionTimeoutMiddleware.php @@ -28,6 +28,9 @@ */ final class InactiveConnectionTimeoutMiddleware { + /** + * @internal + */ const DEFAULT_TIMEOUT = 60; /** diff --git a/tests/HttpServerTest.php b/tests/HttpServerTest.php index 226073ff..e989f0d4 100644 --- a/tests/HttpServerTest.php +++ b/tests/HttpServerTest.php @@ -62,10 +62,18 @@ public function testConstructWithoutLoopAssignsLoopAutomatically() $ref->setAccessible(true); $clock = $ref->getValue($streamingServer); + $ref = new \ReflectionProperty($streamingServer, 'parser'); + $ref->setAccessible(true); + $parser = $ref->getValue($streamingServer); + $ref = new \ReflectionProperty($clock, 'loop'); $ref->setAccessible(true); $loop = $ref->getValue($clock); + $ref = new \ReflectionProperty($parser, 'loop'); + $ref->setAccessible(true); + $loop = $ref->getValue($parser); + $this->assertInstanceOf('React\EventLoop\LoopInterface', $loop); } @@ -259,18 +267,17 @@ function (ServerRequestInterface $request) use (&$streaming) { $this->assertEquals(true, $streaming); } - public function testIdleConnectionWillBeClosedAfterConfiguredTimeout() - { - $this->connection->expects($this->once())->method('close'); - - $loop = Factory::create(); - $http = new HttpServer($loop, new InactiveConnectionTimeoutMiddleware(0.1), $this->expectCallableNever()); - - $http->listen($this->socket); - $this->socket->emit('connection', array($this->connection)); - - $loop->run(); - } +// public function testIdleConnectionWillBeClosedAfterConfiguredTimeout() +// { +// $this->connection->expects($this->once())->method('close'); +// +// $http = new HttpServer(Loop::get(), new InactiveConnectionTimeoutMiddleware(0.1), $this->expectCallableNever()); +// +// $http->listen($this->socket); +// $this->socket->emit('connection', array($this->connection)); +// +// Loop::run(); +// } public function testForwardErrors() { diff --git a/tests/Io/RequestHeaderParserTest.php b/tests/Io/RequestHeaderParserTest.php index 7ba7fe01..5ec845c6 100644 --- a/tests/Io/RequestHeaderParserTest.php +++ b/tests/Io/RequestHeaderParserTest.php @@ -2,6 +2,8 @@ namespace React\Tests\Http\Io; +use React\EventLoop\Loop; +use React\Http\Io\Clock; use React\Http\Io\RequestHeaderParser; use React\Tests\Http\TestCase; use Psr\Http\Message\ServerRequestInterface; @@ -12,7 +14,7 @@ public function testSplitShouldHappenOnDoubleCrlf() { $clock = $this->getMockBuilder('React\Http\Io\Clock')->disableOriginalConstructor()->getMock(); - $parser = new RequestHeaderParser($clock); + $parser = new RequestHeaderParser(Loop::get(), $clock, 60); $parser->on('headers', $this->expectCallableNever()); $connection = $this->getMockBuilder('React\Socket\Connection')->disableOriginalConstructor()->setMethods(null)->getMock(); @@ -33,7 +35,7 @@ public function testFeedInOneGo() { $clock = $this->getMockBuilder('React\Http\Io\Clock')->disableOriginalConstructor()->getMock(); - $parser = new RequestHeaderParser($clock); + $parser = new RequestHeaderParser(Loop::get(), $clock, 60); $parser->on('headers', $this->expectCallableOnce()); $connection = $this->getMockBuilder('React\Socket\Connection')->disableOriginalConstructor()->setMethods(null)->getMock(); @@ -47,7 +49,7 @@ public function testFeedTwoRequestsOnSeparateConnections() { $clock = $this->getMockBuilder('React\Http\Io\Clock')->disableOriginalConstructor()->getMock(); - $parser = new RequestHeaderParser($clock); + $parser = new RequestHeaderParser(Loop::get(), $clock, 60); $called = 0; $parser->on('headers', function () use (&$called) { @@ -73,7 +75,7 @@ public function testHeadersEventShouldEmitRequestAndConnection() $clock = $this->getMockBuilder('React\Http\Io\Clock')->disableOriginalConstructor()->getMock(); - $parser = new RequestHeaderParser($clock); + $parser = new RequestHeaderParser(Loop::get(), $clock, 60); $parser->on('headers', function ($parsedRequest, $connection) use (&$request, &$conn) { $request = $parsedRequest; $conn = $connection; @@ -98,7 +100,7 @@ public function testHeadersEventShouldEmitRequestWhichShouldEmitEndForStreamingB { $clock = $this->getMockBuilder('React\Http\Io\Clock')->disableOriginalConstructor()->getMock(); - $parser = new RequestHeaderParser($clock); + $parser = new RequestHeaderParser(Loop::get(), $clock, 60); $ended = false; $that = $this; @@ -124,7 +126,7 @@ public function testHeadersEventShouldEmitRequestWhichShouldEmitStreamingBodyDat { $clock = $this->getMockBuilder('React\Http\Io\Clock')->disableOriginalConstructor()->getMock(); - $parser = new RequestHeaderParser($clock); + $parser = new RequestHeaderParser(Loop::get(), $clock, 60); $buffer = ''; $that = $this; @@ -154,7 +156,7 @@ public function testHeadersEventShouldEmitRequestWhichShouldEmitStreamingBodyWit { $clock = $this->getMockBuilder('React\Http\Io\Clock')->disableOriginalConstructor()->getMock(); - $parser = new RequestHeaderParser($clock); + $parser = new RequestHeaderParser(Loop::get(), $clock, 60); $buffer = ''; $that = $this; @@ -182,7 +184,7 @@ public function testHeadersEventShouldEmitRequestWhichShouldNotEmitStreamingBody { $clock = $this->getMockBuilder('React\Http\Io\Clock')->disableOriginalConstructor()->getMock(); - $parser = new RequestHeaderParser($clock); + $parser = new RequestHeaderParser(Loop::get(), $clock, 60); $buffer = ''; $that = $this; @@ -209,7 +211,7 @@ public function testHeadersEventShouldEmitRequestWhichShouldEmitStreamingBodyDat { $clock = $this->getMockBuilder('React\Http\Io\Clock')->disableOriginalConstructor()->getMock(); - $parser = new RequestHeaderParser($clock); + $parser = new RequestHeaderParser(Loop::get(), $clock, 60); $buffer = ''; $that = $this; @@ -238,7 +240,7 @@ public function testHeadersEventShouldParsePathAndQueryString() $clock = $this->getMockBuilder('React\Http\Io\Clock')->disableOriginalConstructor()->getMock(); - $parser = new RequestHeaderParser($clock); + $parser = new RequestHeaderParser(Loop::get(), $clock, 60); $parser->on('headers', function ($parsedRequest) use (&$request) { $request = $parsedRequest; }); @@ -267,7 +269,7 @@ public function testHeaderEventWithShouldApplyDefaultAddressFromLocalConnectionA $clock = $this->getMockBuilder('React\Http\Io\Clock')->disableOriginalConstructor()->getMock(); - $parser = new RequestHeaderParser($clock); + $parser = new RequestHeaderParser(Loop::get(), $clock, 60); $parser->on('headers', function ($parsedRequest) use (&$request) { $request = $parsedRequest; }); @@ -288,7 +290,7 @@ public function testHeaderEventViaHttpsShouldApplyHttpsSchemeFromLocalTlsConnect $clock = $this->getMockBuilder('React\Http\Io\Clock')->disableOriginalConstructor()->getMock(); - $parser = new RequestHeaderParser($clock); + $parser = new RequestHeaderParser(Loop::get(), $clock, 60); $parser->on('headers', function ($parsedRequest) use (&$request) { $request = $parsedRequest; }); @@ -310,7 +312,7 @@ public function testHeaderOverflowShouldEmitError() $clock = $this->getMockBuilder('React\Http\Io\Clock')->disableOriginalConstructor()->getMock(); - $parser = new RequestHeaderParser($clock); + $parser = new RequestHeaderParser(Loop::get(), $clock, 60); $parser->on('headers', $this->expectCallableNever()); $parser->on('error', function ($message, $connection) use (&$error, &$passedConnection) { $error = $message; @@ -334,7 +336,7 @@ public function testInvalidEmptyRequestHeadersParseException() $clock = $this->getMockBuilder('React\Http\Io\Clock')->disableOriginalConstructor()->getMock(); - $parser = new RequestHeaderParser($clock); + $parser = new RequestHeaderParser(Loop::get(), $clock, 60); $parser->on('headers', $this->expectCallableNever()); $parser->on('error', function ($message) use (&$error) { $error = $message; @@ -355,7 +357,7 @@ public function testInvalidMalformedRequestLineParseException() $clock = $this->getMockBuilder('React\Http\Io\Clock')->disableOriginalConstructor()->getMock(); - $parser = new RequestHeaderParser($clock); + $parser = new RequestHeaderParser(Loop::get(), $clock, 60); $parser->on('headers', $this->expectCallableNever()); $parser->on('error', function ($message) use (&$error) { $error = $message; @@ -376,7 +378,7 @@ public function testInvalidMalformedRequestHeadersThrowsParseException() $clock = $this->getMockBuilder('React\Http\Io\Clock')->disableOriginalConstructor()->getMock(); - $parser = new RequestHeaderParser($clock); + $parser = new RequestHeaderParser(Loop::get(), $clock, 60); $parser->on('headers', $this->expectCallableNever()); $parser->on('error', function ($message) use (&$error) { $error = $message; @@ -397,7 +399,7 @@ public function testInvalidMalformedRequestHeadersWhitespaceThrowsParseException $clock = $this->getMockBuilder('React\Http\Io\Clock')->disableOriginalConstructor()->getMock(); - $parser = new RequestHeaderParser($clock); + $parser = new RequestHeaderParser(Loop::get(), $clock, 60); $parser->on('headers', $this->expectCallableNever()); $parser->on('error', function ($message) use (&$error) { $error = $message; @@ -418,7 +420,7 @@ public function testInvalidAbsoluteFormSchemeEmitsError() $clock = $this->getMockBuilder('React\Http\Io\Clock')->disableOriginalConstructor()->getMock(); - $parser = new RequestHeaderParser($clock); + $parser = new RequestHeaderParser(Loop::get(), $clock, 60); $parser->on('headers', $this->expectCallableNever()); $parser->on('error', function ($message) use (&$error) { $error = $message; @@ -439,7 +441,7 @@ public function testOriginFormWithSchemeSeparatorInParam() $clock = $this->getMockBuilder('React\Http\Io\Clock')->disableOriginalConstructor()->getMock(); - $parser = new RequestHeaderParser($clock); + $parser = new RequestHeaderParser(Loop::get(), $clock, 60); $parser->on('error', $this->expectCallableNever()); $parser->on('headers', function ($parsedRequest, $parsedBodyBuffer) use (&$request) { $request = $parsedRequest; @@ -466,7 +468,7 @@ public function testUriStartingWithColonSlashSlashFails() $clock = $this->getMockBuilder('React\Http\Io\Clock')->disableOriginalConstructor()->getMock(); - $parser = new RequestHeaderParser($clock); + $parser = new RequestHeaderParser(Loop::get(), $clock, 60); $parser->on('headers', $this->expectCallableNever()); $parser->on('error', function ($message) use (&$error) { $error = $message; @@ -487,7 +489,7 @@ public function testInvalidAbsoluteFormWithFragmentEmitsError() $clock = $this->getMockBuilder('React\Http\Io\Clock')->disableOriginalConstructor()->getMock(); - $parser = new RequestHeaderParser($clock); + $parser = new RequestHeaderParser(Loop::get(), $clock, 60); $parser->on('headers', $this->expectCallableNever()); $parser->on('error', function ($message) use (&$error) { $error = $message; @@ -508,7 +510,7 @@ public function testInvalidHeaderContainsFullUri() $clock = $this->getMockBuilder('React\Http\Io\Clock')->disableOriginalConstructor()->getMock(); - $parser = new RequestHeaderParser($clock); + $parser = new RequestHeaderParser(Loop::get(), $clock, 60); $parser->on('headers', $this->expectCallableNever()); $parser->on('error', function ($message) use (&$error) { $error = $message; @@ -529,7 +531,7 @@ public function testInvalidAbsoluteFormWithHostHeaderEmpty() $clock = $this->getMockBuilder('React\Http\Io\Clock')->disableOriginalConstructor()->getMock(); - $parser = new RequestHeaderParser($clock); + $parser = new RequestHeaderParser(Loop::get(), $clock, 60); $parser->on('headers', $this->expectCallableNever()); $parser->on('error', function ($message) use (&$error) { $error = $message; @@ -550,7 +552,7 @@ public function testInvalidConnectRequestWithNonAuthorityForm() $clock = $this->getMockBuilder('React\Http\Io\Clock')->disableOriginalConstructor()->getMock(); - $parser = new RequestHeaderParser($clock); + $parser = new RequestHeaderParser(Loop::get(), $clock, 60); $parser->on('headers', $this->expectCallableNever()); $parser->on('error', function ($message) use (&$error) { $error = $message; @@ -571,7 +573,7 @@ public function testInvalidHttpVersion() $clock = $this->getMockBuilder('React\Http\Io\Clock')->disableOriginalConstructor()->getMock(); - $parser = new RequestHeaderParser($clock); + $parser = new RequestHeaderParser(Loop::get(), $clock, 60); $parser->on('headers', $this->expectCallableNever()); $parser->on('error', function ($message) use (&$error) { $error = $message; @@ -593,7 +595,7 @@ public function testInvalidContentLengthRequestHeaderWillEmitError() $clock = $this->getMockBuilder('React\Http\Io\Clock')->disableOriginalConstructor()->getMock(); - $parser = new RequestHeaderParser($clock); + $parser = new RequestHeaderParser(Loop::get(), $clock, 60); $parser->on('headers', $this->expectCallableNever()); $parser->on('error', function ($message) use (&$error) { $error = $message; @@ -615,7 +617,7 @@ public function testInvalidRequestWithMultipleContentLengthRequestHeadersWillEmi $clock = $this->getMockBuilder('React\Http\Io\Clock')->disableOriginalConstructor()->getMock(); - $parser = new RequestHeaderParser($clock); + $parser = new RequestHeaderParser(Loop::get(), $clock, 60); $parser->on('headers', $this->expectCallableNever()); $parser->on('error', function ($message) use (&$error) { $error = $message; @@ -637,7 +639,7 @@ public function testInvalidTransferEncodingRequestHeaderWillEmitError() $clock = $this->getMockBuilder('React\Http\Io\Clock')->disableOriginalConstructor()->getMock(); - $parser = new RequestHeaderParser($clock); + $parser = new RequestHeaderParser(Loop::get(), $clock, 60); $parser->on('headers', $this->expectCallableNever()); $parser->on('error', function ($message) use (&$error) { $error = $message; @@ -659,7 +661,7 @@ public function testInvalidRequestWithBothTransferEncodingAndContentLengthWillEm $clock = $this->getMockBuilder('React\Http\Io\Clock')->disableOriginalConstructor()->getMock(); - $parser = new RequestHeaderParser($clock); + $parser = new RequestHeaderParser(Loop::get(), $clock, 60); $parser->on('headers', $this->expectCallableNever()); $parser->on('error', function ($message) use (&$error) { $error = $message; @@ -682,7 +684,7 @@ public function testServerParamsWillBeSetOnHttpsRequest() $clock = $this->getMockBuilder('React\Http\Io\Clock')->disableOriginalConstructor()->getMock(); $clock->expects($this->once())->method('now')->willReturn(1652972091.3958); - $parser = new RequestHeaderParser($clock); + $parser = new RequestHeaderParser(Loop::get(), $clock, 60); $parser->on('headers', function ($parsedRequest) use (&$request) { $request = $parsedRequest; @@ -715,7 +717,7 @@ public function testServerParamsWillBeSetOnHttpRequest() $clock = $this->getMockBuilder('React\Http\Io\Clock')->disableOriginalConstructor()->getMock(); $clock->expects($this->once())->method('now')->willReturn(1652972091.3958); - $parser = new RequestHeaderParser($clock); + $parser = new RequestHeaderParser(Loop::get(), $clock, 60); $parser->on('headers', function ($parsedRequest) use (&$request) { $request = $parsedRequest; @@ -748,7 +750,7 @@ public function testServerParamsWillNotSetRemoteAddressForUnixDomainSockets() $clock = $this->getMockBuilder('React\Http\Io\Clock')->disableOriginalConstructor()->getMock(); $clock->expects($this->once())->method('now')->willReturn(1652972091.3958); - $parser = new RequestHeaderParser($clock); + $parser = new RequestHeaderParser(Loop::get(), $clock, 60); $parser->on('headers', function ($parsedRequest) use (&$request) { $request = $parsedRequest; @@ -785,7 +787,7 @@ public function testServerParamsWontBeSetOnMissingUrls() $clock = $this->getMockBuilder('React\Http\Io\Clock')->disableOriginalConstructor()->getMock(); $clock->expects($this->once())->method('now')->willReturn(1652972091.3958); - $parser = new RequestHeaderParser($clock); + $parser = new RequestHeaderParser(Loop::get(), $clock, 60); $parser->on('headers', function ($parsedRequest) use (&$request) { $request = $parsedRequest; @@ -814,7 +816,7 @@ public function testQueryParmetersWillBeSet() $clock = $this->getMockBuilder('React\Http\Io\Clock')->disableOriginalConstructor()->getMock(); - $parser = new RequestHeaderParser($clock); + $parser = new RequestHeaderParser(Loop::get(), $clock, 60); $parser->on('headers', function ($parsedRequest) use (&$request) { $request = $parsedRequest; @@ -831,6 +833,35 @@ public function testQueryParmetersWillBeSet() $this->assertEquals('this', $queryParams['test']); } + public function testIdleConnectionWillBeClosedAfterConfiguredTimeout() + { + $connection = $this->getMockBuilder('React\Socket\Connection')->disableOriginalConstructor()->setMethods(array('close'))->getMock(); + $connection->expects($this->once())->method('close'); + + $parser = new RequestHeaderParser(Loop::get(), new Clock(Loop::get()), 0.1); + + $parser->handle($connection); + + $connection->emit('data', array("GET /foo.php?hello=world&test=this HTTP/")); + + Loop::addTimer(0.2, function () { + Loop::stop(); + }); + Loop::run(); + } + + public function testIdleConnectionWillNotBeClosedAfterConfiguredTimeoutOnFullRequest() + { + $connection = $this->getMockBuilder('React\Socket\Connection')->disableOriginalConstructor()->setMethods(array('close'))->getMock(); + $connection->expects($this->never())->method('close'); + + $parser = new RequestHeaderParser(Loop::get(), new Clock(Loop::get()), 0.1); + + $parser->handle($connection); + + $connection->emit('data', array($this->createGetRequest())); + } + private function createGetRequest() { $data = "GET / HTTP/1.1\r\n"; diff --git a/tests/Io/StreamingServerTest.php b/tests/Io/StreamingServerTest.php index 0b4d9c1d..0c9c4904 100644 --- a/tests/Io/StreamingServerTest.php +++ b/tests/Io/StreamingServerTest.php @@ -4,9 +4,12 @@ use Psr\Http\Message\ServerRequestInterface; use React\EventLoop\Loop; +use React\Http\Io\Clock; +use React\Http\Io\RequestHeaderParser; use React\Http\Io\StreamingServer; use React\Http\Message\Response; use React\Http\Message\ServerRequest; +use React\Http\Middleware\InactiveConnectionTimeoutMiddleware; use React\Promise\Promise; use React\Stream\ThroughStream; use React\Tests\Http\SocketServerStub; @@ -14,6 +17,8 @@ class StreamingServerTest extends TestCase { + const DEFAULT_TEST_TIMEOUT = 0.1; + private $connection; private $socket; @@ -51,7 +56,7 @@ public function setUpConnectionMockAndSocket() public function testRequestEventWillNotBeEmittedForIncompleteHeaders() { - $server = new StreamingServer(Loop::get(), $this->expectCallableNever()); + $server = new StreamingServer($this->expectCallableNever(), new RequestHeaderParser(Loop::get(), new Clock(Loop::get()), self::DEFAULT_TEST_TIMEOUT), new Clock(Loop::get())); $server->listen($this->socket); $this->socket->emit('connection', array($this->connection)); @@ -65,18 +70,20 @@ public function testIdleConnectionWillBeClosedAfterConfiguredTimeout() { $this->connection->expects($this->once())->method('close'); - $loop = Factory::create(); - $server = new StreamingServer($loop, $this->expectCallableNever(), 0.1); + $server = new StreamingServer($this->expectCallableNever(), new RequestHeaderParser(Loop::get(), new Clock(Loop::get()), 0.1), new Clock(Loop::get())); $server->listen($this->socket); $this->socket->emit('connection', array($this->connection)); - $loop->run(); + Loop::addTimer(0.2, function () { + Loop::stop(); + }); + Loop::run(); } public function testRequestEventIsEmitted() { - $server = new StreamingServer(Loop::get(), $this->expectCallableOnce()); + $server = new StreamingServer($this->expectCallableOnce(), new RequestHeaderParser(Loop::get(), new Clock(Loop::get()), self::DEFAULT_TEST_TIMEOUT), new Clock(Loop::get())); $server->listen($this->socket); $this->socket->emit('connection', array($this->connection)); @@ -91,7 +98,7 @@ public function testRequestEventIsEmitted() public function testRequestEventIsEmittedForArrayCallable() { $this->called = null; - $server = new StreamingServer(Loop::get(), array($this, 'helperCallableOnce')); + $server = new StreamingServer(array($this, 'helperCallableOnce'), new RequestHeaderParser(Loop::get(), new Clock(Loop::get()), self::DEFAULT_TEST_TIMEOUT), new Clock(Loop::get())); $server->listen($this->socket); $this->socket->emit('connection', array($this->connection)); @@ -111,10 +118,10 @@ public function testRequestEvent() { $i = 0; $requestAssertion = null; - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use (&$i, &$requestAssertion) { + $server = new StreamingServer(function (ServerRequestInterface $request) use (&$i, &$requestAssertion) { $i++; $requestAssertion = $request; - }); + }, new RequestHeaderParser(Loop::get(), new Clock(Loop::get()), self::DEFAULT_TEST_TIMEOUT), new Clock(Loop::get())); $this->connection ->expects($this->any()) @@ -144,10 +151,10 @@ public function testRequestEventWithSingleRequestHandlerArray() { $i = 0; $requestAssertion = null; - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use (&$i, &$requestAssertion) { + $server = new StreamingServer(function (ServerRequestInterface $request) use (&$i, &$requestAssertion) { $i++; $requestAssertion = $request; - }); + }, new RequestHeaderParser(Loop::get(), new Clock(Loop::get()), self::DEFAULT_TEST_TIMEOUT), new Clock(Loop::get())); $this->connection ->expects($this->any()) @@ -176,9 +183,9 @@ public function testRequestEventWithSingleRequestHandlerArray() public function testRequestGetWithHostAndCustomPort() { $requestAssertion = null; - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use (&$requestAssertion) { + $server = new StreamingServer(function (ServerRequestInterface $request) use (&$requestAssertion) { $requestAssertion = $request; - }); + }, new RequestHeaderParser(Loop::get(), new Clock(Loop::get()), self::DEFAULT_TEST_TIMEOUT), new Clock(Loop::get())); $server->listen($this->socket); $this->socket->emit('connection', array($this->connection)); @@ -198,9 +205,9 @@ public function testRequestGetWithHostAndCustomPort() public function testRequestGetWithHostAndHttpsPort() { $requestAssertion = null; - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use (&$requestAssertion) { + $server = new StreamingServer(function (ServerRequestInterface $request) use (&$requestAssertion) { $requestAssertion = $request; - }); + }, new RequestHeaderParser(Loop::get(), new Clock(Loop::get()), self::DEFAULT_TEST_TIMEOUT), new Clock(Loop::get())); $server->listen($this->socket); $this->socket->emit('connection', array($this->connection)); @@ -220,9 +227,9 @@ public function testRequestGetWithHostAndHttpsPort() public function testRequestGetWithHostAndDefaultPortWillBeIgnored() { $requestAssertion = null; - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use (&$requestAssertion) { + $server = new StreamingServer(function (ServerRequestInterface $request) use (&$requestAssertion) { $requestAssertion = $request; - }); + }, new RequestHeaderParser(Loop::get(), new Clock(Loop::get()), self::DEFAULT_TEST_TIMEOUT), new Clock(Loop::get())); $server->listen($this->socket); $this->socket->emit('connection', array($this->connection)); @@ -242,9 +249,9 @@ public function testRequestGetWithHostAndDefaultPortWillBeIgnored() public function testRequestGetHttp10WithoutHostWillBeIgnored() { $requestAssertion = null; - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use (&$requestAssertion) { + $server = new StreamingServer(function (ServerRequestInterface $request) use (&$requestAssertion) { $requestAssertion = $request; - }); + }, new RequestHeaderParser(Loop::get(), new Clock(Loop::get()), self::DEFAULT_TEST_TIMEOUT), new Clock(Loop::get())); $server->listen($this->socket); $this->socket->emit('connection', array($this->connection)); @@ -264,7 +271,7 @@ public function testRequestGetHttp10WithoutHostWillBeIgnored() public function testRequestGetHttp11WithoutHostWillReject() { - $server = new StreamingServer(Loop::get(), 'var_dump'); + $server = new StreamingServer('var_dump', new RequestHeaderParser(Loop::get(), new Clock(Loop::get()), self::DEFAULT_TEST_TIMEOUT), new Clock(Loop::get())); $server->on('error', $this->expectCallableOnce()); $server->listen($this->socket); @@ -277,9 +284,9 @@ public function testRequestGetHttp11WithoutHostWillReject() public function testRequestOptionsAsterisk() { $requestAssertion = null; - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use (&$requestAssertion) { + $server = new StreamingServer(function (ServerRequestInterface $request) use (&$requestAssertion) { $requestAssertion = $request; - }); + }, new RequestHeaderParser(Loop::get(), new Clock(Loop::get()), self::DEFAULT_TEST_TIMEOUT), new Clock(Loop::get())); $server->listen($this->socket); $this->socket->emit('connection', array($this->connection)); @@ -297,7 +304,7 @@ public function testRequestOptionsAsterisk() public function testRequestNonOptionsWithAsteriskRequestTargetWillReject() { - $server = new StreamingServer(Loop::get(), $this->expectCallableNever()); + $server = new StreamingServer($this->expectCallableNever(), new RequestHeaderParser(Loop::get(), new Clock(Loop::get()), self::DEFAULT_TEST_TIMEOUT), new Clock(Loop::get())); $server->on('error', $this->expectCallableOnce()); $server->listen($this->socket); @@ -310,9 +317,9 @@ public function testRequestNonOptionsWithAsteriskRequestTargetWillReject() public function testRequestConnectAuthorityForm() { $requestAssertion = null; - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use (&$requestAssertion) { + $server = new StreamingServer(function (ServerRequestInterface $request) use (&$requestAssertion) { $requestAssertion = $request; - }); + }, new RequestHeaderParser(Loop::get(), new Clock(Loop::get()), self::DEFAULT_TEST_TIMEOUT), new Clock(Loop::get())); $server->listen($this->socket); $this->socket->emit('connection', array($this->connection)); @@ -332,9 +339,9 @@ public function testRequestConnectAuthorityForm() public function testRequestConnectWithoutHostWillBePassesAsIs() { $requestAssertion = null; - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use (&$requestAssertion) { + $server = new StreamingServer(function (ServerRequestInterface $request) use (&$requestAssertion) { $requestAssertion = $request; - }); + }, new RequestHeaderParser(Loop::get(), new Clock(Loop::get()), self::DEFAULT_TEST_TIMEOUT), new Clock(Loop::get())); $server->listen($this->socket); $this->socket->emit('connection', array($this->connection)); @@ -354,9 +361,9 @@ public function testRequestConnectWithoutHostWillBePassesAsIs() public function testRequestConnectAuthorityFormWithDefaultPortWillBePassedAsIs() { $requestAssertion = null; - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use (&$requestAssertion) { + $server = new StreamingServer(function (ServerRequestInterface $request) use (&$requestAssertion) { $requestAssertion = $request; - }); + }, new RequestHeaderParser(Loop::get(), new Clock(Loop::get()), self::DEFAULT_TEST_TIMEOUT), new Clock(Loop::get())); $server->listen($this->socket); $this->socket->emit('connection', array($this->connection)); @@ -376,9 +383,9 @@ public function testRequestConnectAuthorityFormWithDefaultPortWillBePassedAsIs() public function testRequestConnectAuthorityFormNonMatchingHostWillBePassedAsIs() { $requestAssertion = null; - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use (&$requestAssertion) { + $server = new StreamingServer(function (ServerRequestInterface $request) use (&$requestAssertion) { $requestAssertion = $request; - }); + }, new RequestHeaderParser(Loop::get(), new Clock(Loop::get()), self::DEFAULT_TEST_TIMEOUT), new Clock(Loop::get())); $server->listen($this->socket); $this->socket->emit('connection', array($this->connection)); @@ -397,7 +404,7 @@ public function testRequestConnectAuthorityFormNonMatchingHostWillBePassedAsIs() public function testRequestConnectOriginFormRequestTargetWillReject() { - $server = new StreamingServer(Loop::get(), $this->expectCallableNever()); + $server = new StreamingServer($this->expectCallableNever(), new RequestHeaderParser(Loop::get(), new Clock(Loop::get()), self::DEFAULT_TEST_TIMEOUT), new Clock(Loop::get())); $server->on('error', $this->expectCallableOnce()); $server->listen($this->socket); @@ -409,7 +416,7 @@ public function testRequestConnectOriginFormRequestTargetWillReject() public function testRequestNonConnectWithAuthorityRequestTargetWillReject() { - $server = new StreamingServer(Loop::get(), $this->expectCallableNever()); + $server = new StreamingServer($this->expectCallableNever(), new RequestHeaderParser(Loop::get(), new Clock(Loop::get()), self::DEFAULT_TEST_TIMEOUT), new Clock(Loop::get())); $server->on('error', $this->expectCallableOnce()); $server->listen($this->socket); @@ -423,9 +430,9 @@ public function testRequestWithoutHostEventUsesSocketAddress() { $requestAssertion = null; - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use (&$requestAssertion) { + $server = new StreamingServer(function (ServerRequestInterface $request) use (&$requestAssertion) { $requestAssertion = $request; - }); + }, new RequestHeaderParser(Loop::get(), new Clock(Loop::get()), self::DEFAULT_TEST_TIMEOUT), new Clock(Loop::get())); $this->connection ->expects($this->any()) @@ -449,9 +456,9 @@ public function testRequestAbsoluteEvent() { $requestAssertion = null; - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use (&$requestAssertion) { + $server = new StreamingServer(function (ServerRequestInterface $request) use (&$requestAssertion) { $requestAssertion = $request; - }); + }, new RequestHeaderParser(Loop::get(), new Clock(Loop::get()), self::DEFAULT_TEST_TIMEOUT), new Clock(Loop::get())); $server->listen($this->socket); $this->socket->emit('connection', array($this->connection)); @@ -471,9 +478,9 @@ public function testRequestAbsoluteNonMatchingHostWillBePassedAsIs() { $requestAssertion = null; - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use (&$requestAssertion) { + $server = new StreamingServer(function (ServerRequestInterface $request) use (&$requestAssertion) { $requestAssertion = $request; - }); + }, new RequestHeaderParser(Loop::get(), new Clock(Loop::get()), self::DEFAULT_TEST_TIMEOUT), new Clock(Loop::get())); $server->listen($this->socket); $this->socket->emit('connection', array($this->connection)); @@ -491,7 +498,7 @@ public function testRequestAbsoluteNonMatchingHostWillBePassedAsIs() public function testRequestAbsoluteWithoutHostWillReject() { - $server = new StreamingServer(Loop::get(), $this->expectCallableNever()); + $server = new StreamingServer($this->expectCallableNever(), new RequestHeaderParser(Loop::get(), new Clock(Loop::get()), self::DEFAULT_TEST_TIMEOUT), new Clock(Loop::get())); $server->on('error', $this->expectCallableOnce()); $server->listen($this->socket); @@ -505,9 +512,9 @@ public function testRequestOptionsAsteriskEvent() { $requestAssertion = null; - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use (&$requestAssertion) { + $server = new StreamingServer(function (ServerRequestInterface $request) use (&$requestAssertion) { $requestAssertion = $request; - }); + }, new RequestHeaderParser(Loop::get(), new Clock(Loop::get()), self::DEFAULT_TEST_TIMEOUT), new Clock(Loop::get())); $server->listen($this->socket); $this->socket->emit('connection', array($this->connection)); @@ -527,9 +534,9 @@ public function testRequestOptionsAbsoluteEvent() { $requestAssertion = null; - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use (&$requestAssertion) { + $server = new StreamingServer(function (ServerRequestInterface $request) use (&$requestAssertion) { $requestAssertion = $request; - }); + }, new RequestHeaderParser(Loop::get(), new Clock(Loop::get()), self::DEFAULT_TEST_TIMEOUT), new Clock(Loop::get())); $server->listen($this->socket); $this->socket->emit('connection', array($this->connection)); @@ -547,9 +554,9 @@ public function testRequestOptionsAbsoluteEvent() public function testRequestPauseWillBeForwardedToConnection() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = new StreamingServer(function (ServerRequestInterface $request) { $request->getBody()->pause(); - }); + }, new RequestHeaderParser(Loop::get(), new Clock(Loop::get()), self::DEFAULT_TEST_TIMEOUT), new Clock(Loop::get())); $this->connection->expects($this->once())->method('pause'); @@ -567,9 +574,9 @@ public function testRequestPauseWillBeForwardedToConnection() public function testRequestResumeWillBeForwardedToConnection() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = new StreamingServer(function (ServerRequestInterface $request) { $request->getBody()->resume(); - }); + }, new RequestHeaderParser(Loop::get(), new Clock(Loop::get()), self::DEFAULT_TEST_TIMEOUT), new Clock(Loop::get())); $this->connection->expects($this->once())->method('resume'); @@ -587,9 +594,9 @@ public function testRequestResumeWillBeForwardedToConnection() public function testRequestCloseWillNotCloseConnection() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = new StreamingServer(function (ServerRequestInterface $request) { $request->getBody()->close(); - }); + }, new RequestHeaderParser(Loop::get(), new Clock(Loop::get()), self::DEFAULT_TEST_TIMEOUT), new Clock(Loop::get())); $this->connection->expects($this->never())->method('close'); @@ -602,10 +609,10 @@ public function testRequestCloseWillNotCloseConnection() public function testRequestPauseAfterCloseWillNotBeForwarded() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = new StreamingServer(function (ServerRequestInterface $request) { $request->getBody()->close(); $request->getBody()->pause(); - }); + }, new RequestHeaderParser(Loop::get(), new Clock(Loop::get()), self::DEFAULT_TEST_TIMEOUT), new Clock(Loop::get())); $this->connection->expects($this->never())->method('close'); $this->connection->expects($this->never())->method('pause'); @@ -619,10 +626,10 @@ public function testRequestPauseAfterCloseWillNotBeForwarded() public function testRequestResumeAfterCloseWillNotBeForwarded() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = new StreamingServer(function (ServerRequestInterface $request) { $request->getBody()->close(); $request->getBody()->resume(); - }); + }, new RequestHeaderParser(Loop::get(), new Clock(Loop::get()), self::DEFAULT_TEST_TIMEOUT), new Clock(Loop::get())); $this->connection->expects($this->never())->method('close'); $this->connection->expects($this->never())->method('resume'); @@ -638,9 +645,9 @@ public function testRequestEventWithoutBodyWillNotEmitData() { $never = $this->expectCallableNever(); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($never) { + $server = new StreamingServer(function (ServerRequestInterface $request) use ($never) { $request->getBody()->on('data', $never); - }); + }, new RequestHeaderParser(Loop::get(), new Clock(Loop::get()), self::DEFAULT_TEST_TIMEOUT), new Clock(Loop::get())); $server->listen($this->socket); $this->socket->emit('connection', array($this->connection)); @@ -653,9 +660,9 @@ public function testRequestEventWithSecondDataEventWillEmitBodyData() { $once = $this->expectCallableOnceWith('incomplete'); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($once) { + $server = new StreamingServer(function (ServerRequestInterface $request) use ($once) { $request->getBody()->on('data', $once); - }); + }, new RequestHeaderParser(Loop::get(), new Clock(Loop::get()), self::DEFAULT_TEST_TIMEOUT), new Clock(Loop::get())); $server->listen($this->socket); $this->socket->emit('connection', array($this->connection)); @@ -673,9 +680,9 @@ public function testRequestEventWithPartialBodyWillEmitData() { $once = $this->expectCallableOnceWith('incomplete'); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($once) { + $server = new StreamingServer(function (ServerRequestInterface $request) use ($once) { $request->getBody()->on('data', $once); - }); + }, new RequestHeaderParser(Loop::get(), new Clock(Loop::get()), self::DEFAULT_TEST_TIMEOUT), new Clock(Loop::get())); $server->listen($this->socket); $this->socket->emit('connection', array($this->connection)); @@ -694,9 +701,9 @@ public function testRequestEventWithPartialBodyWillEmitData() public function testResponseContainsServerHeader() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = new StreamingServer(function (ServerRequestInterface $request) { return new Response(); - }); + }, new RequestHeaderParser(Loop::get(), new Clock(Loop::get()), self::DEFAULT_TEST_TIMEOUT), new Clock(Loop::get())); $buffer = ''; @@ -724,9 +731,9 @@ public function testResponsePendingPromiseWillNotSendAnything() { $never = $this->expectCallableNever(); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($never) { + $server = new StreamingServer(function (ServerRequestInterface $request) use ($never) { return new Promise(function () { }, $never); - }); + }, new RequestHeaderParser(Loop::get(), new Clock(Loop::get()), self::DEFAULT_TEST_TIMEOUT), new Clock(Loop::get())); $buffer = ''; @@ -754,9 +761,9 @@ public function testResponsePendingPromiseWillBeCancelledIfConnectionCloses() { $once = $this->expectCallableOnce(); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($once) { + $server = new StreamingServer(function (ServerRequestInterface $request) use ($once) { return new Promise(function () { }, $once); - }); + }, new RequestHeaderParser(Loop::get(), new Clock(Loop::get()), self::DEFAULT_TEST_TIMEOUT), new Clock(Loop::get())); $buffer = ''; @@ -786,13 +793,13 @@ public function testResponseBodyStreamAlreadyClosedWillSendEmptyBodyChunkedEncod $stream = new ThroughStream(); $stream->close(); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($stream) { + $server = new StreamingServer(function (ServerRequestInterface $request) use ($stream) { return new Response( 200, array(), $stream ); - }); + }, new RequestHeaderParser(Loop::get(), new Clock(Loop::get()), self::DEFAULT_TEST_TIMEOUT), new Clock(Loop::get())); $buffer = ''; @@ -821,13 +828,13 @@ public function testResponseBodyStreamEndingWillSendEmptyBodyChunkedEncoded() { $stream = new ThroughStream(); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($stream) { + $server = new StreamingServer(function (ServerRequestInterface $request) use ($stream) { return new Response( 200, array(), $stream ); - }); + }, new RequestHeaderParser(Loop::get(), new Clock(Loop::get()), self::DEFAULT_TEST_TIMEOUT), new Clock(Loop::get())); $buffer = ''; @@ -859,13 +866,13 @@ public function testResponseBodyStreamAlreadyClosedWillSendEmptyBodyPlainHttp10( $stream = new ThroughStream(); $stream->close(); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($stream) { + $server = new StreamingServer(function (ServerRequestInterface $request) use ($stream) { return new Response( 200, array(), $stream ); - }); + }, new RequestHeaderParser(Loop::get(), new Clock(Loop::get()), self::DEFAULT_TEST_TIMEOUT), new Clock(Loop::get())); $buffer = ''; @@ -895,13 +902,13 @@ public function testResponseStreamWillBeClosedIfConnectionIsAlreadyClosed() $stream = new ThroughStream(); $stream->on('close', $this->expectCallableOnce()); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($stream) { + $server = new StreamingServer(function (ServerRequestInterface $request) use ($stream) { return new Response( 200, array(), $stream ); - }); + }, new RequestHeaderParser(Loop::get(), new Clock(Loop::get()), self::DEFAULT_TEST_TIMEOUT), new Clock(Loop::get())); $buffer = ''; @@ -950,13 +957,13 @@ public function testResponseBodyStreamWillBeClosedIfConnectionEmitsCloseEvent() $stream = new ThroughStream(); $stream->on('close', $this->expectCallableOnce()); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($stream) { + $server = new StreamingServer(function (ServerRequestInterface $request) use ($stream) { return new Response( 200, array(), $stream ); - }); + }, new RequestHeaderParser(Loop::get(), new Clock(Loop::get()), self::DEFAULT_TEST_TIMEOUT), new Clock(Loop::get())); $server->listen($this->socket); $this->socket->emit('connection', array($this->connection)); @@ -968,7 +975,7 @@ public function testResponseBodyStreamWillBeClosedIfConnectionEmitsCloseEvent() public function testResponseUpgradeInResponseCanBeUsedToAdvertisePossibleUpgrade() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = new StreamingServer(function (ServerRequestInterface $request) { return new Response( 200, array( @@ -978,7 +985,7 @@ public function testResponseUpgradeInResponseCanBeUsedToAdvertisePossibleUpgrade ), 'foo' ); - }); + }, new RequestHeaderParser(Loop::get(), new Clock(Loop::get()), self::DEFAULT_TEST_TIMEOUT), new Clock(Loop::get())); $buffer = ''; @@ -1004,7 +1011,7 @@ function ($data) use (&$buffer) { public function testResponseUpgradeWishInRequestCanBeIgnoredByReturningNormalResponse() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = new StreamingServer(function (ServerRequestInterface $request) { return new Response( 200, array( @@ -1013,7 +1020,7 @@ public function testResponseUpgradeWishInRequestCanBeIgnoredByReturningNormalRes ), 'foo' ); - }); + }, new RequestHeaderParser(Loop::get(), new Clock(Loop::get()), self::DEFAULT_TEST_TIMEOUT), new Clock(Loop::get())); $buffer = ''; @@ -1039,7 +1046,7 @@ function ($data) use (&$buffer) { public function testResponseUpgradeSwitchingProtocolIncludesConnectionUpgradeHeaderWithoutContentLength() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = new StreamingServer(function (ServerRequestInterface $request) { return new Response( 101, array( @@ -1049,7 +1056,7 @@ public function testResponseUpgradeSwitchingProtocolIncludesConnectionUpgradeHea ), 'foo' ); - }); + }, new RequestHeaderParser(Loop::get(), new Clock(Loop::get()), self::DEFAULT_TEST_TIMEOUT), new Clock(Loop::get())); $server->on('error', 'printf'); @@ -1079,7 +1086,7 @@ public function testResponseUpgradeSwitchingProtocolWithStreamWillPipeDataToConn { $stream = new ThroughStream(); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($stream) { + $server = new StreamingServer(function (ServerRequestInterface $request) use ($stream) { return new Response( 101, array( @@ -1089,7 +1096,7 @@ public function testResponseUpgradeSwitchingProtocolWithStreamWillPipeDataToConn ), $stream ); - }); + }, new RequestHeaderParser(Loop::get(), new Clock(Loop::get()), self::DEFAULT_TEST_TIMEOUT), new Clock(Loop::get())); $buffer = ''; @@ -1120,13 +1127,13 @@ public function testResponseConnectMethodStreamWillPipeDataToConnection() { $stream = new ThroughStream(); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($stream) { + $server = new StreamingServer(function (ServerRequestInterface $request) use ($stream) { return new Response( 200, array(), $stream ); - }); + }, new RequestHeaderParser(Loop::get(), new Clock(Loop::get()), self::DEFAULT_TEST_TIMEOUT), new Clock(Loop::get())); $buffer = ''; @@ -1158,13 +1165,13 @@ public function testResponseConnectMethodStreamWillPipeDataFromConnection() { $stream = new ThroughStream(); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($stream) { + $server = new StreamingServer(function (ServerRequestInterface $request) use ($stream) { return new Response( 200, array(), $stream ); - }); + }, new RequestHeaderParser(Loop::get(), new Clock(Loop::get()), self::DEFAULT_TEST_TIMEOUT), new Clock(Loop::get())); $server->listen($this->socket); $this->socket->emit('connection', array($this->connection)); @@ -1177,13 +1184,13 @@ public function testResponseConnectMethodStreamWillPipeDataFromConnection() public function testResponseContainsSameRequestProtocolVersionAndChunkedBodyForHttp11() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = new StreamingServer(function (ServerRequestInterface $request) { return new Response( 200, array(), 'bye' ); - }); + }, new RequestHeaderParser(Loop::get(), new Clock(Loop::get()), self::DEFAULT_TEST_TIMEOUT), new Clock(Loop::get())); $buffer = ''; @@ -1210,13 +1217,13 @@ function ($data) use (&$buffer) { public function testResponseContainsSameRequestProtocolVersionAndRawBodyForHttp10() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = new StreamingServer(function (ServerRequestInterface $request) { return new Response( 200, array(), 'bye' ); - }); + }, new RequestHeaderParser(Loop::get(), new Clock(Loop::get()), self::DEFAULT_TEST_TIMEOUT), new Clock(Loop::get())); $buffer = ''; @@ -1244,13 +1251,13 @@ function ($data) use (&$buffer) { public function testResponseContainsNoResponseBodyForHeadRequest() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = new StreamingServer(function (ServerRequestInterface $request) { return new Response( 200, array(), 'bye' ); - }); + }, new RequestHeaderParser(Loop::get(), new Clock(Loop::get()), self::DEFAULT_TEST_TIMEOUT), new Clock(Loop::get())); $buffer = ''; $this->connection @@ -1280,13 +1287,13 @@ public function testResponseContainsNoResponseBodyForHeadRequestWithStreamingRes $stream = new ThroughStream(); $stream->on('close', $this->expectCallableOnce()); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($stream) { + $server = new StreamingServer(function (ServerRequestInterface $request) use ($stream) { return new Response( 200, array('Content-Length' => '3'), $stream ); - }); + }, new RequestHeaderParser(Loop::get(), new Clock(Loop::get()), self::DEFAULT_TEST_TIMEOUT), new Clock(Loop::get())); $buffer = ''; $this->connection @@ -1312,13 +1319,13 @@ function ($data) use (&$buffer) { public function testResponseContainsNoResponseBodyAndNoContentLengthForNoContentStatus() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = new StreamingServer(function (ServerRequestInterface $request) { return new Response( 204, array(), 'bye' ); - }); + }, new RequestHeaderParser(Loop::get(), new Clock(Loop::get()), self::DEFAULT_TEST_TIMEOUT), new Clock(Loop::get())); $buffer = ''; $this->connection @@ -1348,13 +1355,13 @@ public function testResponseContainsNoResponseBodyAndNoContentLengthForNoContent $stream = new ThroughStream(); $stream->on('close', $this->expectCallableOnce()); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($stream) { + $server = new StreamingServer(function (ServerRequestInterface $request) use ($stream) { return new Response( 204, array('Content-Length' => '3'), $stream ); - }); + }, new RequestHeaderParser(Loop::get(), new Clock(Loop::get()), self::DEFAULT_TEST_TIMEOUT), new Clock(Loop::get())); $buffer = ''; $this->connection @@ -1380,13 +1387,13 @@ function ($data) use (&$buffer) { public function testResponseContainsNoContentLengthHeaderForNotModifiedStatus() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = new StreamingServer(function (ServerRequestInterface $request) { return new Response( 304, array(), '' ); - }); + }, new RequestHeaderParser(Loop::get(), new Clock(Loop::get()), self::DEFAULT_TEST_TIMEOUT), new Clock(Loop::get())); $buffer = ''; $this->connection @@ -1412,13 +1419,13 @@ function ($data) use (&$buffer) { public function testResponseContainsExplicitContentLengthHeaderForNotModifiedStatus() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = new StreamingServer(function (ServerRequestInterface $request) { return new Response( 304, array('Content-Length' => 3), '' ); - }); + }, new RequestHeaderParser(Loop::get(), new Clock(Loop::get()), self::DEFAULT_TEST_TIMEOUT), new Clock(Loop::get())); $buffer = ''; $this->connection @@ -1444,13 +1451,13 @@ function ($data) use (&$buffer) { public function testResponseContainsExplicitContentLengthHeaderForHeadRequests() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = new StreamingServer(function (ServerRequestInterface $request) { return new Response( 200, array('Content-Length' => 3), '' ); - }); + }, new RequestHeaderParser(Loop::get(), new Clock(Loop::get()), self::DEFAULT_TEST_TIMEOUT), new Clock(Loop::get())); $buffer = ''; $this->connection @@ -1476,13 +1483,13 @@ function ($data) use (&$buffer) { public function testResponseContainsNoResponseBodyForNotModifiedStatus() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = new StreamingServer(function (ServerRequestInterface $request) { return new Response( 304, array(), 'bye' ); - }); + }, new RequestHeaderParser(Loop::get(), new Clock(Loop::get()), self::DEFAULT_TEST_TIMEOUT), new Clock(Loop::get())); $buffer = ''; $this->connection @@ -1512,13 +1519,13 @@ public function testResponseContainsNoResponseBodyForNotModifiedStatusWithStream $stream = new ThroughStream(); $stream->on('close', $this->expectCallableOnce()); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($stream) { + $server = new StreamingServer(function (ServerRequestInterface $request) use ($stream) { return new Response( 304, array('Content-Length' => '3'), $stream ); - }); + }, new RequestHeaderParser(Loop::get(), new Clock(Loop::get()), self::DEFAULT_TEST_TIMEOUT), new Clock(Loop::get())); $buffer = ''; $this->connection @@ -1545,7 +1552,7 @@ function ($data) use (&$buffer) { public function testRequestInvalidHttpProtocolVersionWillEmitErrorAndSendErrorResponse() { $error = null; - $server = new StreamingServer(Loop::get(), $this->expectCallableNever()); + $server = new StreamingServer($this->expectCallableNever(), new RequestHeaderParser(Loop::get(), new Clock(Loop::get()), self::DEFAULT_TEST_TIMEOUT), new Clock(Loop::get())); $server->on('error', function ($message) use (&$error) { $error = $message; }); @@ -1579,7 +1586,7 @@ function ($data) use (&$buffer) { public function testRequestOverflowWillEmitErrorAndSendErrorResponse() { $error = null; - $server = new StreamingServer(Loop::get(), $this->expectCallableNever()); + $server = new StreamingServer($this->expectCallableNever(), new RequestHeaderParser(Loop::get(), new Clock(Loop::get()), self::DEFAULT_TEST_TIMEOUT), new Clock(Loop::get())); $server->on('error', function ($message) use (&$error) { $error = $message; }); @@ -1613,7 +1620,7 @@ function ($data) use (&$buffer) { public function testRequestInvalidWillEmitErrorAndSendErrorResponse() { $error = null; - $server = new StreamingServer(Loop::get(), $this->expectCallableNever()); + $server = new StreamingServer($this->expectCallableNever(), new RequestHeaderParser(Loop::get(), new Clock(Loop::get()), self::DEFAULT_TEST_TIMEOUT), new Clock(Loop::get())); $server->on('error', function ($message) use (&$error) { $error = $message; }); @@ -1650,12 +1657,12 @@ public function testRequestContentLengthBodyDataWillEmitDataEventOnRequestStream $closeEvent = $this->expectCallableOnce(); $errorEvent = $this->expectCallableNever(); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($dataEvent, $endEvent, $closeEvent, $errorEvent) { + $server = new StreamingServer(function (ServerRequestInterface $request) use ($dataEvent, $endEvent, $closeEvent, $errorEvent) { $request->getBody()->on('data', $dataEvent); $request->getBody()->on('end', $endEvent); $request->getBody()->on('close', $closeEvent); $request->getBody()->on('error', $errorEvent); - }); + }, new RequestHeaderParser(Loop::get(), new Clock(Loop::get()), self::DEFAULT_TEST_TIMEOUT), new Clock(Loop::get())); $server->listen($this->socket); $this->socket->emit('connection', array($this->connection)); @@ -1678,13 +1685,13 @@ public function testRequestChunkedTransferEncodingRequestWillEmitDecodedDataEven $errorEvent = $this->expectCallableNever(); $requestValidation = null; - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($dataEvent, $endEvent, $closeEvent, $errorEvent, &$requestValidation) { + $server = new StreamingServer(function (ServerRequestInterface $request) use ($dataEvent, $endEvent, $closeEvent, $errorEvent, &$requestValidation) { $request->getBody()->on('data', $dataEvent); $request->getBody()->on('end', $endEvent); $request->getBody()->on('close', $closeEvent); $request->getBody()->on('error', $errorEvent); $requestValidation = $request; - }); + }, new RequestHeaderParser(Loop::get(), new Clock(Loop::get()), self::DEFAULT_TEST_TIMEOUT), new Clock(Loop::get())); $server->listen($this->socket); $this->socket->emit('connection', array($this->connection)); @@ -1709,12 +1716,12 @@ public function testRequestChunkedTransferEncodingWithAdditionalDataWontBeEmitte $closeEvent = $this->expectCallableOnce(); $errorEvent = $this->expectCallableNever(); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($dataEvent, $endEvent, $closeEvent, $errorEvent) { + $server = new StreamingServer(function (ServerRequestInterface $request) use ($dataEvent, $endEvent, $closeEvent, $errorEvent) { $request->getBody()->on('data', $dataEvent); $request->getBody()->on('end', $endEvent); $request->getBody()->on('close', $closeEvent); $request->getBody()->on('error', $errorEvent); - }); + }, new RequestHeaderParser(Loop::get(), new Clock(Loop::get()), self::DEFAULT_TEST_TIMEOUT), new Clock(Loop::get())); $server->listen($this->socket); $this->socket->emit('connection', array($this->connection)); @@ -1738,12 +1745,12 @@ public function testRequestChunkedTransferEncodingEmpty() $closeEvent = $this->expectCallableOnce(); $errorEvent = $this->expectCallableNever(); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($dataEvent, $endEvent, $closeEvent, $errorEvent) { + $server = new StreamingServer(function (ServerRequestInterface $request) use ($dataEvent, $endEvent, $closeEvent, $errorEvent) { $request->getBody()->on('data', $dataEvent); $request->getBody()->on('end', $endEvent); $request->getBody()->on('close', $closeEvent); $request->getBody()->on('error', $errorEvent); - }); + }, new RequestHeaderParser(Loop::get(), new Clock(Loop::get()), self::DEFAULT_TEST_TIMEOUT), new Clock(Loop::get())); $server->listen($this->socket); $this->socket->emit('connection', array($this->connection)); @@ -1766,13 +1773,13 @@ public function testRequestChunkedTransferEncodingHeaderCanBeUpperCase() $errorEvent = $this->expectCallableNever(); $requestValidation = null; - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($dataEvent, $endEvent, $closeEvent, $errorEvent, &$requestValidation) { + $server = new StreamingServer(function (ServerRequestInterface $request) use ($dataEvent, $endEvent, $closeEvent, $errorEvent, &$requestValidation) { $request->getBody()->on('data', $dataEvent); $request->getBody()->on('end', $endEvent); $request->getBody()->on('close', $closeEvent); $request->getBody()->on('error', $errorEvent); $requestValidation = $request; - }); + }, new RequestHeaderParser(Loop::get(), new Clock(Loop::get()), self::DEFAULT_TEST_TIMEOUT), new Clock(Loop::get())); $server->listen($this->socket); $this->socket->emit('connection', array($this->connection)); @@ -1796,12 +1803,12 @@ public function testRequestChunkedTransferEncodingCanBeMixedUpperAndLowerCase() $closeEvent = $this->expectCallableOnce(); $errorEvent = $this->expectCallableNever(); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($dataEvent, $endEvent, $closeEvent, $errorEvent) { + $server = new StreamingServer(function (ServerRequestInterface $request) use ($dataEvent, $endEvent, $closeEvent, $errorEvent) { $request->getBody()->on('data', $dataEvent); $request->getBody()->on('end', $endEvent); $request->getBody()->on('close', $closeEvent); $request->getBody()->on('error', $errorEvent); - }); + }, new RequestHeaderParser(Loop::get(), new Clock(Loop::get()), self::DEFAULT_TEST_TIMEOUT), new Clock(Loop::get())); $server->listen($this->socket); $this->socket->emit('connection', array($this->connection)); @@ -1823,14 +1830,14 @@ public function testRequestContentLengthWillEmitDataEventAndEndEventAndAdditiona $closeEvent = $this->expectCallableOnce(); $errorEvent = $this->expectCallableNever(); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($dataEvent, $endEvent, $closeEvent, $errorEvent) { + $server = new StreamingServer(function (ServerRequestInterface $request) use ($dataEvent, $endEvent, $closeEvent, $errorEvent) { $request->getBody()->on('data', $dataEvent); $request->getBody()->on('end', $endEvent); $request->getBody()->on('close', $closeEvent); $request->getBody()->on('error', $errorEvent); return \React\Promise\resolve(new Response()); - }); + }, new RequestHeaderParser(Loop::get(), new Clock(Loop::get()), self::DEFAULT_TEST_TIMEOUT), new Clock(Loop::get())); $server->listen($this->socket); $this->socket->emit('connection', array($this->connection)); @@ -1854,12 +1861,12 @@ public function testRequestContentLengthWillEmitDataEventAndEndEventAndAdditiona $errorEvent = $this->expectCallableNever(); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($dataEvent, $endEvent, $closeEvent, $errorEvent) { + $server = new StreamingServer(function (ServerRequestInterface $request) use ($dataEvent, $endEvent, $closeEvent, $errorEvent) { $request->getBody()->on('data', $dataEvent); $request->getBody()->on('end', $endEvent); $request->getBody()->on('close', $closeEvent); $request->getBody()->on('error', $errorEvent); - }); + }, new RequestHeaderParser(Loop::get(), new Clock(Loop::get()), self::DEFAULT_TEST_TIMEOUT), new Clock(Loop::get())); $server->listen($this->socket); $this->socket->emit('connection', array($this->connection)); @@ -1886,12 +1893,12 @@ public function testRequestZeroContentLengthWillEmitEndEvent() $closeEvent = $this->expectCallableOnce(); $errorEvent = $this->expectCallableNever(); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($dataEvent, $endEvent, $closeEvent, $errorEvent) { + $server = new StreamingServer(function (ServerRequestInterface $request) use ($dataEvent, $endEvent, $closeEvent, $errorEvent) { $request->getBody()->on('data', $dataEvent); $request->getBody()->on('end', $endEvent); $request->getBody()->on('close', $closeEvent); $request->getBody()->on('error', $errorEvent); - }); + }, new RequestHeaderParser(Loop::get(), new Clock(Loop::get()), self::DEFAULT_TEST_TIMEOUT), new Clock(Loop::get())); $server->listen($this->socket); $this->socket->emit('connection', array($this->connection)); @@ -1912,12 +1919,12 @@ public function testRequestZeroContentLengthWillEmitEndAndAdditionalDataWillBeIg $closeEvent = $this->expectCallableOnce(); $errorEvent = $this->expectCallableNever(); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($dataEvent, $endEvent, $closeEvent, $errorEvent) { + $server = new StreamingServer(function (ServerRequestInterface $request) use ($dataEvent, $endEvent, $closeEvent, $errorEvent) { $request->getBody()->on('data', $dataEvent); $request->getBody()->on('end', $endEvent); $request->getBody()->on('close', $closeEvent); $request->getBody()->on('error', $errorEvent); - }); + }, new RequestHeaderParser(Loop::get(), new Clock(Loop::get()), self::DEFAULT_TEST_TIMEOUT), new Clock(Loop::get())); $server->listen($this->socket); $this->socket->emit('connection', array($this->connection)); @@ -1939,12 +1946,12 @@ public function testRequestZeroContentLengthWillEmitEndAndAdditionalDataWillBeIg $closeEvent = $this->expectCallableOnce(); $errorEvent = $this->expectCallableNever(); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($dataEvent, $endEvent, $closeEvent, $errorEvent) { + $server = new StreamingServer(function (ServerRequestInterface $request) use ($dataEvent, $endEvent, $closeEvent, $errorEvent) { $request->getBody()->on('data', $dataEvent); $request->getBody()->on('end', $endEvent); $request->getBody()->on('close', $closeEvent); $request->getBody()->on('error', $errorEvent); - }); + }, new RequestHeaderParser(Loop::get(), new Clock(Loop::get()), self::DEFAULT_TEST_TIMEOUT), new Clock(Loop::get())); $server->listen($this->socket); $this->socket->emit('connection', array($this->connection)); @@ -1965,10 +1972,10 @@ public function testRequestZeroContentLengthWillEmitEndAndAdditionalDataWillBeIg public function testRequestInvalidChunkHeaderTooLongWillEmitErrorOnRequestStream() { $errorEvent = $this->expectCallableOnceWith($this->isInstanceOf('Exception')); - $server = new StreamingServer(Loop::get(), function ($request) use ($errorEvent){ + $server = new StreamingServer(function ($request) use ($errorEvent){ $request->getBody()->on('error', $errorEvent); return \React\Promise\resolve(new Response()); - }); + }, new RequestHeaderParser(Loop::get(), new Clock(Loop::get()), self::DEFAULT_TEST_TIMEOUT), new Clock(Loop::get())); $this->connection->expects($this->never())->method('close'); @@ -1990,9 +1997,9 @@ public function testRequestInvalidChunkHeaderTooLongWillEmitErrorOnRequestStream public function testRequestInvalidChunkBodyTooLongWillEmitErrorOnRequestStream() { $errorEvent = $this->expectCallableOnceWith($this->isInstanceOf('Exception')); - $server = new StreamingServer(Loop::get(), function ($request) use ($errorEvent){ + $server = new StreamingServer(function ($request) use ($errorEvent){ $request->getBody()->on('error', $errorEvent); - }); + }, new RequestHeaderParser(Loop::get(), new Clock(Loop::get()), self::DEFAULT_TEST_TIMEOUT), new Clock(Loop::get())); $this->connection->expects($this->never())->method('close'); @@ -2012,9 +2019,9 @@ public function testRequestInvalidChunkBodyTooLongWillEmitErrorOnRequestStream() public function testRequestUnexpectedEndOfRequestWithChunkedTransferConnectionWillEmitErrorOnRequestStream() { $errorEvent = $this->expectCallableOnceWith($this->isInstanceOf('Exception')); - $server = new StreamingServer(Loop::get(), function ($request) use ($errorEvent){ + $server = new StreamingServer(function ($request) use ($errorEvent){ $request->getBody()->on('error', $errorEvent); - }); + }, new RequestHeaderParser(Loop::get(), new Clock(Loop::get()), self::DEFAULT_TEST_TIMEOUT), new Clock(Loop::get())); $this->connection->expects($this->never())->method('close'); @@ -2035,9 +2042,9 @@ public function testRequestUnexpectedEndOfRequestWithChunkedTransferConnectionWi public function testRequestInvalidChunkHeaderWillEmitErrorOnRequestStream() { $errorEvent = $this->expectCallableOnceWith($this->isInstanceOf('Exception')); - $server = new StreamingServer(Loop::get(), function ($request) use ($errorEvent){ + $server = new StreamingServer(function ($request) use ($errorEvent){ $request->getBody()->on('error', $errorEvent); - }); + }, new RequestHeaderParser(Loop::get(), new Clock(Loop::get()), self::DEFAULT_TEST_TIMEOUT), new Clock(Loop::get())); $this->connection->expects($this->never())->method('close'); @@ -2057,9 +2064,9 @@ public function testRequestInvalidChunkHeaderWillEmitErrorOnRequestStream() public function testRequestUnexpectedEndOfRequestWithContentLengthWillEmitErrorOnRequestStream() { $errorEvent = $this->expectCallableOnceWith($this->isInstanceOf('Exception')); - $server = new StreamingServer(Loop::get(), function ($request) use ($errorEvent){ + $server = new StreamingServer(function ($request) use ($errorEvent){ $request->getBody()->on('error', $errorEvent); - }); + }, new RequestHeaderParser(Loop::get(), new Clock(Loop::get()), self::DEFAULT_TEST_TIMEOUT), new Clock(Loop::get())); $this->connection->expects($this->never())->method('close'); @@ -2084,12 +2091,12 @@ public function testRequestWithoutBodyWillEmitEndOnRequestStream() $endEvent = $this->expectCallableOnce(); $errorEvent = $this->expectCallableNever(); - $server = new StreamingServer(Loop::get(), function ($request) use ($dataEvent, $closeEvent, $endEvent, $errorEvent){ + $server = new StreamingServer(function ($request) use ($dataEvent, $closeEvent, $endEvent, $errorEvent){ $request->getBody()->on('data', $dataEvent); $request->getBody()->on('close', $closeEvent); $request->getBody()->on('end', $endEvent); $request->getBody()->on('error', $errorEvent); - }); + }, new RequestHeaderParser(Loop::get(), new Clock(Loop::get()), self::DEFAULT_TEST_TIMEOUT), new Clock(Loop::get())); $this->connection->expects($this->never())->method('close'); @@ -2108,12 +2115,12 @@ public function testRequestWithoutDefinedLengthWillIgnoreDataEvent() $closeEvent = $this->expectCallableOnce(); $errorEvent = $this->expectCallableNever(); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($dataEvent, $endEvent, $closeEvent, $errorEvent) { + $server = new StreamingServer(function (ServerRequestInterface $request) use ($dataEvent, $endEvent, $closeEvent, $errorEvent) { $request->getBody()->on('data', $dataEvent); $request->getBody()->on('end', $endEvent); $request->getBody()->on('close', $closeEvent); $request->getBody()->on('error', $errorEvent); - }); + }, new RequestHeaderParser(Loop::get(), new Clock(Loop::get()), self::DEFAULT_TEST_TIMEOUT), new Clock(Loop::get())); $server->listen($this->socket); $this->socket->emit('connection', array($this->connection)); @@ -2127,13 +2134,13 @@ public function testRequestWithoutDefinedLengthWillIgnoreDataEvent() public function testResponseWithBodyStreamWillUseChunkedTransferEncodingByDefault() { $stream = new ThroughStream(); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($stream) { + $server = new StreamingServer(function (ServerRequestInterface $request) use ($stream) { return new Response( 200, array(), $stream ); - }); + }, new RequestHeaderParser(Loop::get(), new Clock(Loop::get()), self::DEFAULT_TEST_TIMEOUT), new Clock(Loop::get())); $buffer = ''; $this->connection @@ -2161,7 +2168,7 @@ function ($data) use (&$buffer) { public function testResponseWithBodyStringWillOverwriteExplicitContentLengthAndTransferEncoding() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = new StreamingServer(function (ServerRequestInterface $request) { return new Response( 200, array( @@ -2170,7 +2177,7 @@ public function testResponseWithBodyStringWillOverwriteExplicitContentLengthAndT ), 'hello' ); - }); + }, new RequestHeaderParser(Loop::get(), new Clock(Loop::get()), self::DEFAULT_TEST_TIMEOUT), new Clock(Loop::get())); $buffer = ''; $this->connection @@ -2202,13 +2209,13 @@ public function testResponseContainsResponseBodyWithTransferEncodingChunkedForBo $body->expects($this->once())->method('getSize')->willReturn(null); $body->expects($this->once())->method('__toString')->willReturn('body'); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($body) { + $server = new StreamingServer(function (ServerRequestInterface $request) use ($body) { return new Response( 200, array(), $body ); - }); + }, new RequestHeaderParser(Loop::get(), new Clock(Loop::get()), self::DEFAULT_TEST_TIMEOUT), new Clock(Loop::get())); $buffer = ''; $this->connection @@ -2239,13 +2246,13 @@ public function testResponseContainsResponseBodyWithPlainBodyWithUnknownSizeForL $body->expects($this->once())->method('getSize')->willReturn(null); $body->expects($this->once())->method('__toString')->willReturn('body'); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($body) { + $server = new StreamingServer(function (ServerRequestInterface $request) use ($body) { return new Response( 200, array(), $body ); - }); + }, new RequestHeaderParser(Loop::get(), new Clock(Loop::get()), self::DEFAULT_TEST_TIMEOUT), new Clock(Loop::get())); $buffer = ''; $this->connection @@ -2273,7 +2280,7 @@ function ($data) use (&$buffer) { public function testResponseWithCustomTransferEncodingWillBeIgnoredAndUseChunkedTransferEncodingInstead() { $stream = new ThroughStream(); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($stream) { + $server = new StreamingServer(function (ServerRequestInterface $request) use ($stream) { return new Response( 200, array( @@ -2281,7 +2288,7 @@ public function testResponseWithCustomTransferEncodingWillBeIgnoredAndUseChunked ), $stream ); - }); + }, new RequestHeaderParser(Loop::get(), new Clock(Loop::get()), self::DEFAULT_TEST_TIMEOUT), new Clock(Loop::get())); $buffer = ''; $this->connection @@ -2310,9 +2317,9 @@ function ($data) use (&$buffer) { public function testResponseWithoutExplicitDateHeaderWillAddCurrentDateFromClock() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = new StreamingServer(function (ServerRequestInterface $request) { return new Response(); - }); + }, new RequestHeaderParser(Loop::get(), new Clock(Loop::get()), self::DEFAULT_TEST_TIMEOUT), new Clock(Loop::get())); $ref = new \ReflectionProperty($server, 'clock'); $ref->setAccessible(true); @@ -2348,12 +2355,12 @@ function ($data) use (&$buffer) { public function testResponseWithCustomDateHeaderOverwritesDefault() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = new StreamingServer(function (ServerRequestInterface $request) { return new Response( 200, array("Date" => "Tue, 15 Nov 1994 08:12:31 GMT") ); - }); + }, new RequestHeaderParser(Loop::get(), new Clock(Loop::get()), self::DEFAULT_TEST_TIMEOUT), new Clock(Loop::get())); $buffer = ''; $this->connection @@ -2381,12 +2388,12 @@ function ($data) use (&$buffer) { public function testResponseWithEmptyDateHeaderRemovesDateHeader() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = new StreamingServer(function (ServerRequestInterface $request) { return new Response( 200, array('Date' => '') ); - }); + }, new RequestHeaderParser(Loop::get(), new Clock(Loop::get()), self::DEFAULT_TEST_TIMEOUT), new Clock(Loop::get())); $buffer = ''; $this->connection @@ -2414,7 +2421,7 @@ function ($data) use (&$buffer) { public function testResponseCanContainMultipleCookieHeaders() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = new StreamingServer(function (ServerRequestInterface $request) { return new Response( 200, array( @@ -2426,7 +2433,7 @@ public function testResponseCanContainMultipleCookieHeaders() 'Server' => '' ) ); - }); + }, new RequestHeaderParser(Loop::get(), new Clock(Loop::get()), self::DEFAULT_TEST_TIMEOUT), new Clock(Loop::get())); $buffer = ''; $this->connection @@ -2452,9 +2459,9 @@ function ($data) use (&$buffer) { public function testReponseWithExpectContinueRequestContainsContinueWithLaterResponse() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = new StreamingServer(function (ServerRequestInterface $request) { return new Response(); - }); + }, new RequestHeaderParser(Loop::get(), new Clock(Loop::get()), self::DEFAULT_TEST_TIMEOUT), new Clock(Loop::get())); $buffer = ''; $this->connection @@ -2484,9 +2491,9 @@ function ($data) use (&$buffer) { public function testResponseWithExpectContinueRequestWontSendContinueForHttp10() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = new StreamingServer(function (ServerRequestInterface $request) { return new Response(); - }); + }, new RequestHeaderParser(Loop::get(), new Clock(Loop::get()), self::DEFAULT_TEST_TIMEOUT), new Clock(Loop::get())); $buffer = ''; $this->connection @@ -2515,20 +2522,20 @@ function ($data) use (&$buffer) { public function testInvalidCallbackFunctionLeadsToException() { $this->setExpectedException('InvalidArgumentException'); - $server = new StreamingServer(Loop::get(), 'invalid'); + $server = new StreamingServer('invalid', new RequestHeaderParser(Loop::get(), new Clock(Loop::get()), self::DEFAULT_TEST_TIMEOUT), new Clock(Loop::get())); } public function testResponseBodyStreamWillStreamDataWithChunkedTransferEncoding() { $input = new ThroughStream(); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($input) { + $server = new StreamingServer(function (ServerRequestInterface $request) use ($input) { return new Response( 200, array(), $input ); - }); + }, new RequestHeaderParser(Loop::get(), new Clock(Loop::get()), self::DEFAULT_TEST_TIMEOUT), new Clock(Loop::get())); $buffer = ''; $this->connection @@ -2561,13 +2568,13 @@ public function testResponseBodyStreamWithContentLengthWillStreamTillLengthWitho { $input = new ThroughStream(); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($input) { + $server = new StreamingServer(function (ServerRequestInterface $request) use ($input) { return new Response( 200, array('Content-Length' => 5), $input ); - }); + }, new RequestHeaderParser(Loop::get(), new Clock(Loop::get()), self::DEFAULT_TEST_TIMEOUT), new Clock(Loop::get())); $buffer = ''; $this->connection @@ -2599,9 +2606,9 @@ function ($data) use (&$buffer) { public function testResponseWithResponsePromise() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = new StreamingServer(function (ServerRequestInterface $request) { return \React\Promise\resolve(new Response()); - }); + }, new RequestHeaderParser(Loop::get(), new Clock(Loop::get()), self::DEFAULT_TEST_TIMEOUT), new Clock(Loop::get())); $buffer = ''; $this->connection @@ -2627,9 +2634,9 @@ function ($data) use (&$buffer) { public function testResponseReturnInvalidTypeWillResultInError() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = new StreamingServer(function (ServerRequestInterface $request) { return "invalid"; - }); + }, new RequestHeaderParser(Loop::get(), new Clock(Loop::get()), self::DEFAULT_TEST_TIMEOUT), new Clock(Loop::get())); $exception = null; $server->on('error', function (\Exception $ex) use (&$exception) { @@ -2661,9 +2668,9 @@ function ($data) use (&$buffer) { public function testResponseResolveWrongTypeInPromiseWillResultInError() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = new StreamingServer(function (ServerRequestInterface $request) { return \React\Promise\resolve("invalid"); - }); + }, new RequestHeaderParser(Loop::get(), new Clock(Loop::get()), self::DEFAULT_TEST_TIMEOUT), new Clock(Loop::get())); $buffer = ''; $this->connection @@ -2689,11 +2696,11 @@ function ($data) use (&$buffer) { public function testResponseRejectedPromiseWillResultInErrorMessage() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = new StreamingServer(function (ServerRequestInterface $request) { return new Promise(function ($resolve, $reject) { $reject(new \Exception()); }); - }); + }, new RequestHeaderParser(Loop::get(), new Clock(Loop::get()), self::DEFAULT_TEST_TIMEOUT), new Clock(Loop::get())); $server->on('error', $this->expectCallableOnce()); $buffer = ''; @@ -2720,11 +2727,11 @@ function ($data) use (&$buffer) { public function testResponseExceptionInCallbackWillResultInErrorMessage() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = new StreamingServer(function (ServerRequestInterface $request) { return new Promise(function ($resolve, $reject) { throw new \Exception('Bad call'); }); - }); + }, new RequestHeaderParser(Loop::get(), new Clock(Loop::get()), self::DEFAULT_TEST_TIMEOUT), new Clock(Loop::get())); $server->on('error', $this->expectCallableOnce()); $buffer = ''; @@ -2751,13 +2758,13 @@ function ($data) use (&$buffer) { public function testResponseWithContentLengthHeaderForStringBodyOverwritesTransferEncoding() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = new StreamingServer(function (ServerRequestInterface $request) { return new Response( 200, array('Transfer-Encoding' => 'chunked'), 'hello' ); - }); + }, new RequestHeaderParser(Loop::get(), new Clock(Loop::get()), self::DEFAULT_TEST_TIMEOUT), new Clock(Loop::get())); $buffer = ''; $this->connection @@ -2787,9 +2794,9 @@ function ($data) use (&$buffer) { public function testResponseWillBeHandled() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = new StreamingServer(function (ServerRequestInterface $request) { return new Response(); - }); + }, new RequestHeaderParser(Loop::get(), new Clock(Loop::get()), self::DEFAULT_TEST_TIMEOUT), new Clock(Loop::get())); $buffer = ''; $this->connection @@ -2815,9 +2822,9 @@ function ($data) use (&$buffer) { public function testResponseExceptionThrowInCallBackFunctionWillResultInErrorMessage() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = new StreamingServer(function (ServerRequestInterface $request) { throw new \Exception('hello'); - }); + }, new RequestHeaderParser(Loop::get(), new Clock(Loop::get()), self::DEFAULT_TEST_TIMEOUT), new Clock(Loop::get())); $exception = null; $server->on('error', function (\Exception $ex) use (&$exception) { @@ -2853,9 +2860,9 @@ function ($data) use (&$buffer) { */ public function testResponseThrowableThrowInCallBackFunctionWillResultInErrorMessage() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = new StreamingServer(function (ServerRequestInterface $request) { throw new \Error('hello'); - }); + }, new RequestHeaderParser(Loop::get(), new Clock(Loop::get()), self::DEFAULT_TEST_TIMEOUT), new Clock(Loop::get())); $exception = null; $server->on('error', function (\Exception $ex) use (&$exception) { @@ -2896,11 +2903,11 @@ function ($data) use (&$buffer) { public function testResponseRejectOfNonExceptionWillResultInErrorMessage() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = new StreamingServer(function (ServerRequestInterface $request) { return new Promise(function ($resolve, $reject) { $reject('Invalid type'); }); - }); + }, new RequestHeaderParser(Loop::get(), new Clock(Loop::get()), self::DEFAULT_TEST_TIMEOUT), new Clock(Loop::get())); $exception = null; $server->on('error', function (\Exception $ex) use (&$exception) { @@ -2933,9 +2940,9 @@ function ($data) use (&$buffer) { public function testRequestServerRequestParams() { $requestValidation = null; - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use (&$requestValidation) { + $server = new StreamingServer(function (ServerRequestInterface $request) use (&$requestValidation) { $requestValidation = $request; - }); + }, new RequestHeaderParser(Loop::get(), new Clock(Loop::get()), self::DEFAULT_TEST_TIMEOUT), new Clock(Loop::get())); $this->connection ->expects($this->any()) @@ -2967,9 +2974,9 @@ public function testRequestServerRequestParams() public function testRequestQueryParametersWillBeAddedToRequest() { $requestValidation = null; - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use (&$requestValidation) { + $server = new StreamingServer(function (ServerRequestInterface $request) use (&$requestValidation) { $requestValidation = $request; - }); + }, new RequestHeaderParser(Loop::get(), new Clock(Loop::get()), self::DEFAULT_TEST_TIMEOUT), new Clock(Loop::get())); $server->listen($this->socket); $this->socket->emit('connection', array($this->connection)); @@ -2987,9 +2994,9 @@ public function testRequestQueryParametersWillBeAddedToRequest() public function testRequestCookieWillBeAddedToServerRequest() { $requestValidation = null; - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use (&$requestValidation) { + $server = new StreamingServer(function (ServerRequestInterface $request) use (&$requestValidation) { $requestValidation = $request; - }); + }, new RequestHeaderParser(Loop::get(), new Clock(Loop::get()), self::DEFAULT_TEST_TIMEOUT), new Clock(Loop::get())); $server->listen($this->socket); $this->socket->emit('connection', array($this->connection)); @@ -3008,9 +3015,9 @@ public function testRequestCookieWillBeAddedToServerRequest() public function testRequestInvalidMultipleCookiesWontBeAddedToServerRequest() { $requestValidation = null; - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use (&$requestValidation) { + $server = new StreamingServer(function (ServerRequestInterface $request) use (&$requestValidation) { $requestValidation = $request; - }); + }, new RequestHeaderParser(Loop::get(), new Clock(Loop::get()), self::DEFAULT_TEST_TIMEOUT), new Clock(Loop::get())); $server->listen($this->socket); $this->socket->emit('connection', array($this->connection)); @@ -3029,9 +3036,9 @@ public function testRequestInvalidMultipleCookiesWontBeAddedToServerRequest() public function testRequestCookieWithSeparatorWillBeAddedToServerRequest() { $requestValidation = null; - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use (&$requestValidation) { + $server = new StreamingServer(function (ServerRequestInterface $request) use (&$requestValidation) { $requestValidation = $request; - }); + }, new RequestHeaderParser(Loop::get(), new Clock(Loop::get()), self::DEFAULT_TEST_TIMEOUT), new Clock(Loop::get())); $server->listen($this->socket); $this->socket->emit('connection', array($this->connection)); @@ -3049,9 +3056,9 @@ public function testRequestCookieWithSeparatorWillBeAddedToServerRequest() public function testRequestCookieWithCommaValueWillBeAddedToServerRequest() { $requestValidation = null; - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use (&$requestValidation) { + $server = new StreamingServer(function (ServerRequestInterface $request) use (&$requestValidation) { $requestValidation = $request; - }); + }, new RequestHeaderParser(Loop::get(), new Clock(Loop::get()), self::DEFAULT_TEST_TIMEOUT), new Clock(Loop::get())); $server->listen($this->socket); $this->socket->emit('connection', array($this->connection)); @@ -3068,7 +3075,7 @@ public function testRequestCookieWithCommaValueWillBeAddedToServerRequest() public function testNewConnectionWillInvokeParserOnce() { - $server = new StreamingServer(Loop::get(), $this->expectCallableNever()); + $server = new StreamingServer($this->expectCallableNever(), new RequestHeaderParser(Loop::get(), new Clock(Loop::get()), self::DEFAULT_TEST_TIMEOUT), new Clock(Loop::get())); $parser = $this->getMockBuilder('React\Http\Io\RequestHeaderParser')->disableOriginalConstructor()->getMock(); $parser->expects($this->once())->method('handle'); @@ -3085,7 +3092,7 @@ public function testNewConnectionWillInvokeParserOnceAndInvokeRequestHandlerWhen { $request = new ServerRequest('GET', 'http://localhost/', array(), '', '1.0'); - $server = new StreamingServer(Loop::get(), $this->expectCallableOnceWith($request)); + $server = new StreamingServer($this->expectCallableOnceWith($request), new RequestHeaderParser(Loop::get(), new Clock(Loop::get()), self::DEFAULT_TEST_TIMEOUT), new Clock(Loop::get())); $parser = $this->getMockBuilder('React\Http\Io\RequestHeaderParser')->disableOriginalConstructor()->getMock(); $parser->expects($this->once())->method('handle'); @@ -3108,7 +3115,7 @@ public function testNewConnectionWillInvokeParserOnceAndInvokeRequestHandlerWhen { $request = new ServerRequest('GET', 'http://localhost/', array('Connection' => 'close')); - $server = new StreamingServer(Loop::get(), $this->expectCallableOnceWith($request)); + $server = new StreamingServer($this->expectCallableOnceWith($request), new RequestHeaderParser(Loop::get(), new Clock(Loop::get()), self::DEFAULT_TEST_TIMEOUT), new Clock(Loop::get())); $parser = $this->getMockBuilder('React\Http\Io\RequestHeaderParser')->disableOriginalConstructor()->getMock(); $parser->expects($this->once())->method('handle'); @@ -3131,9 +3138,9 @@ public function testNewConnectionWillInvokeParserOnceAndInvokeRequestHandlerWhen { $request = new ServerRequest('GET', 'http://localhost/'); - $server = new StreamingServer(Loop::get(), function () { + $server = new StreamingServer(function () { return new Response(200, array('Connection' => 'close')); - }); + }, new RequestHeaderParser(Loop::get(), new Clock(Loop::get()), self::DEFAULT_TEST_TIMEOUT), new Clock(Loop::get())); $parser = $this->getMockBuilder('React\Http\Io\RequestHeaderParser')->disableOriginalConstructor()->getMock(); $parser->expects($this->once())->method('handle'); @@ -3156,9 +3163,9 @@ public function testNewConnectionWillInvokeParserTwiceAfterInvokingRequestHandle { $request = new ServerRequest('GET', 'http://localhost/'); - $server = new StreamingServer(Loop::get(), function () { + $server = new StreamingServer(function () { return new Response(); - }); + }, new RequestHeaderParser(Loop::get(), new Clock(Loop::get()), self::DEFAULT_TEST_TIMEOUT), new Clock(Loop::get())); $parser = $this->getMockBuilder('React\Http\Io\RequestHeaderParser')->disableOriginalConstructor()->getMock(); $parser->expects($this->exactly(2))->method('handle'); @@ -3181,9 +3188,9 @@ public function testNewConnectionWillInvokeParserTwiceAfterInvokingRequestHandle { $request = new ServerRequest('GET', 'http://localhost/', array('Connection' => 'keep-alive'), '', '1.0'); - $server = new StreamingServer(Loop::get(), function () { + $server = new StreamingServer(function () { return new Response(); - }); + }, new RequestHeaderParser(Loop::get(), new Clock(Loop::get()), self::DEFAULT_TEST_TIMEOUT), new Clock(Loop::get())); $parser = $this->getMockBuilder('React\Http\Io\RequestHeaderParser')->disableOriginalConstructor()->getMock(); $parser->expects($this->exactly(2))->method('handle'); @@ -3207,9 +3214,9 @@ public function testNewConnectionWillInvokeParserOnceAfterInvokingRequestHandler $request = new ServerRequest('GET', 'http://localhost/'); $body = new ThroughStream(); - $server = new StreamingServer(Loop::get(), function () use ($body) { + $server = new StreamingServer(function () use ($body) { return new Response(200, array(), $body); - }); + }, new RequestHeaderParser(Loop::get(), new Clock(Loop::get()), self::DEFAULT_TEST_TIMEOUT), new Clock(Loop::get())); $parser = $this->getMockBuilder('React\Http\Io\RequestHeaderParser')->disableOriginalConstructor()->getMock(); $parser->expects($this->once())->method('handle'); @@ -3233,9 +3240,9 @@ public function testNewConnectionWillInvokeParserTwiceAfterInvokingRequestHandle $request = new ServerRequest('GET', 'http://localhost/'); $body = new ThroughStream(); - $server = new StreamingServer(Loop::get(), function () use ($body) { + $server = new StreamingServer(function () use ($body) { return new Response(200, array(), $body); - }); + }, new RequestHeaderParser(Loop::get(), new Clock(Loop::get()), self::DEFAULT_TEST_TIMEOUT), new Clock(Loop::get())); $parser = $this->getMockBuilder('React\Http\Io\RequestHeaderParser')->disableOriginalConstructor()->getMock(); $parser->expects($this->exactly(2))->method('handle'); @@ -3253,9 +3260,9 @@ public function testNewConnectionWillInvokeParserTwiceAfterInvokingRequestHandle // pretend parser just finished parsing $server->handleRequest($this->connection, $request); - $this->assertCount(3, $this->connection->listeners('close')); + $this->assertCount(2, $this->connection->listeners('close')); $body->end(); - $this->assertCount(3, $this->connection->listeners('close')); + $this->assertCount(1, $this->connection->listeners('close')); } private function createGetRequest()