From 3398ca878d7b28a63f1b3ada91a5f3032315e9a3 Mon Sep 17 00:00:00 2001 From: Cees-Jan Kiewiet Date: Wed, 1 Sep 2021 19:34:56 +0200 Subject: [PATCH] Close inactive requests This builds on top of #405 and further builds out #423 by also close connections with inactive requests. --- src/HttpServer.php | 5 +- src/Io/RequestHeaderParser.php | 38 ++- src/Io/StreamingServer.php | 46 +-- .../InactiveConnectionTimeoutMiddleware.php | 4 + tests/HttpServerTest.php | 13 +- tests/Io/RequestHeaderParserTest.php | 124 +++++--- tests/Io/StreamingServerTest.php | 282 ++++++++++-------- 7 files changed, 312 insertions(+), 200 deletions(-) diff --git a/src/HttpServer.php b/src/HttpServer.php index 004a1e06..22945309 100644 --- a/src/HttpServer.php +++ b/src/HttpServer.php @@ -5,8 +5,10 @@ use Evenement\EventEmitter; use React\EventLoop\Loop; use React\EventLoop\LoopInterface; +use React\Http\Io\Clock; use React\Http\Io\IniUtil; use React\Http\Io\MiddlewareRunner; +use React\Http\Io\RequestHeaderParser; use React\Http\Io\StreamingServer; use React\Http\Middleware\InactiveConnectionTimeoutMiddleware; use React\Http\Middleware\LimitConcurrentRequestsMiddleware; @@ -259,7 +261,8 @@ public function __construct($requestHandlerOrLoop) return !($handler instanceof StreamingRequestMiddleware) && !($handler instanceof InactiveConnectionTimeoutMiddleware); }); - $this->streamingServer = new StreamingServer($loop, new MiddlewareRunner($middleware), $idleConnectTimeout); + $clock = new Clock($loop); + $this->streamingServer = new StreamingServer(new MiddlewareRunner($middleware), new RequestHeaderParser($loop, $clock, $idleConnectTimeout), $clock); $that = $this; $this->streamingServer->on('error', function ($error) use ($that) { diff --git a/src/Io/RequestHeaderParser.php b/src/Io/RequestHeaderParser.php index b8336f5b..c73e7883 100644 --- a/src/Io/RequestHeaderParser.php +++ b/src/Io/RequestHeaderParser.php @@ -4,6 +4,7 @@ use Evenement\EventEmitter; use Psr\Http\Message\ServerRequestInterface; +use React\EventLoop\LoopInterface; use React\Http\Message\Response; use React\Http\Message\ServerRequest; use React\Socket\ConnectionInterface; @@ -24,23 +25,53 @@ class RequestHeaderParser extends EventEmitter { private $maxSize = 8192; + /** + * @var LoopInterface + */ + private $loop; + /** @var Clock */ private $clock; + /** + * @var float + */ + private $idleConnectionTimeout; + /** @var array> */ private $connectionParams = array(); - public function __construct(Clock $clock) + /** + * @param LoopInterface $loop + * @param float $idleConnectionTimeout + */ + public function __construct(LoopInterface $loop, Clock $clock, $idleConnectionTimeout) { + $this->loop = $loop; $this->clock = $clock; + $this->idleConnectionTimeout = $idleConnectionTimeout; } public function handle(ConnectionInterface $conn) { + $loop = $this->loop; + $idleConnectionTimeout = $this->idleConnectionTimeout; + $that = $this; + $idleConnectionTimeoutHandler = function () use ($that, $conn) { + $that->emit('error', array( + new \RuntimeException('Request timed out', Response::STATUS_REQUEST_TIMEOUT), + $conn + )); + $conn->close(); + }; + $timer = $loop->addTimer($idleConnectionTimeout, $idleConnectionTimeoutHandler); + $conn->on('close', function () use ($loop, &$timer) { + $loop->cancelTimer($timer); + }); $buffer = ''; $maxSize = $this->maxSize; - $that = $this; - $conn->on('data', $fn = function ($data) use (&$buffer, &$fn, $conn, $maxSize, $that) { + $conn->on('data', $fn = function ($data) use (&$buffer, &$fn, $conn, $maxSize, $that, $loop, &$timer, $idleConnectionTimeout, $idleConnectionTimeoutHandler) { + $loop->cancelTimer($timer); // append chunk of data to buffer and look for end of request headers $buffer .= $data; $endOfHeader = \strpos($buffer, "\r\n\r\n"); @@ -59,6 +90,7 @@ public function handle(ConnectionInterface $conn) // ignore incomplete requests if ($endOfHeader === false) { + $timer = $loop->addTimer($idleConnectionTimeout, $idleConnectionTimeoutHandler); return; } diff --git a/src/Io/StreamingServer.php b/src/Io/StreamingServer.php index 1a47718c..84e228e3 100644 --- a/src/Io/StreamingServer.php +++ b/src/Io/StreamingServer.php @@ -5,10 +5,8 @@ use Evenement\EventEmitter; use Psr\Http\Message\ResponseInterface; use Psr\Http\Message\ServerRequestInterface; -use React\EventLoop\LoopInterface; use React\Http\Message\Response; use React\Http\Message\ServerRequest; -use React\Http\Middleware\InactiveConnectionTimeoutMiddleware; use React\Promise; use React\Promise\PromiseInterface; use React\Socket\ConnectionInterface; @@ -29,7 +27,7 @@ * object in return: * * ```php - * $server = new StreamingServer($loop, function (ServerRequestInterface $request) { + * $server = new StreamingServer(function (ServerRequestInterface $request) { * return new Response( * Response::STATUS_OK, * array( @@ -54,7 +52,7 @@ * in order to start a plaintext HTTP server like this: * * ```php - * $server = new StreamingServer($loop, $handler); + * $server = new StreamingServer($handler); * * $socket = new React\Socket\SocketServer('0.0.0.0:8080', array(), $loop); * $server->listen($socket); @@ -98,23 +96,19 @@ final class StreamingServer extends EventEmitter * connections in order to then parse incoming data as HTTP. * See also [listen()](#listen) for more details. * - * @param LoopInterface $loop * @param callable $requestHandler * @param float $idleConnectTimeout * @see self::listen() */ - public function __construct(LoopInterface $loop, $requestHandler, $idleConnectTimeout = InactiveConnectionTimeoutMiddleware::DEFAULT_TIMEOUT) + public function __construct($requestHandler, RequestHeaderParser $parser, Clock $clock) { if (!\is_callable($requestHandler)) { throw new \InvalidArgumentException('Invalid request handler given'); } - $this->loop = $loop; - $this->idleConnectionTimeout = $idleConnectTimeout; - $this->callback = $requestHandler; - $this->clock = new Clock($loop); - $this->parser = new RequestHeaderParser($this->clock); + $this->clock = $clock; + $this->parser = $parser; $that = $this; $this->parser->on('headers', function (ServerRequestInterface $request, ConnectionInterface $conn) use ($that) { @@ -141,27 +135,7 @@ public function __construct(LoopInterface $loop, $requestHandler, $idleConnectTi */ public function listen(ServerInterface $socket) { - $socket->on('connection', array($this, 'handle')); - } - - /** @internal */ - public function handle(ConnectionInterface $conn) - { - $timer = $this->loop->addTimer($this->idleConnectionTimeout, function () use ($conn) { - $conn->close(); - }); - $loop = $this->loop; - $conn->once('data', function () use ($loop, $timer) { - $loop->cancelTimer($timer); - }); - $conn->on('end', function () use ($loop, $timer) { - $loop->cancelTimer($timer); - }); - $conn->on('close', function () use ($loop, $timer) { - $loop->cancelTimer($timer); - }); - - $this->parser->handle($conn); + $socket->on('connection', array($this->parser, 'handle')); } /** @internal */ @@ -379,7 +353,7 @@ public function handleResponse(ConnectionInterface $connection, ServerRequestInt // either wait for next request over persistent connection or end connection if ($persist) { - $this->handle($connection); + $this->parser->handle($connection); } else { $connection->end(); } @@ -400,10 +374,10 @@ public function handleResponse(ConnectionInterface $connection, ServerRequestInt // write streaming body and then wait for next request over persistent connection if ($persist) { $body->pipe($connection, array('end' => false)); - $that = $this; - $body->on('end', function () use ($connection, $that, $body) { + $parser = $this->parser; + $body->on('end', function () use ($connection, $parser, $body) { $connection->removeListener('close', array($body, 'close')); - $that->handle($connection); + $parser->handle($connection); }); } else { $body->pipe($connection); diff --git a/src/Middleware/InactiveConnectionTimeoutMiddleware.php b/src/Middleware/InactiveConnectionTimeoutMiddleware.php index 7ba0200a..0ca4c759 100644 --- a/src/Middleware/InactiveConnectionTimeoutMiddleware.php +++ b/src/Middleware/InactiveConnectionTimeoutMiddleware.php @@ -28,6 +28,9 @@ */ final class InactiveConnectionTimeoutMiddleware { + /** + * @internal + */ const DEFAULT_TIMEOUT = 60; /** @@ -50,6 +53,7 @@ public function __invoke(ServerRequestInterface $request, $next) /** * @return float + * @internal */ public function getTimeout() { diff --git a/tests/HttpServerTest.php b/tests/HttpServerTest.php index 226073ff..90b89dde 100644 --- a/tests/HttpServerTest.php +++ b/tests/HttpServerTest.php @@ -62,10 +62,18 @@ public function testConstructWithoutLoopAssignsLoopAutomatically() $ref->setAccessible(true); $clock = $ref->getValue($streamingServer); + $ref = new \ReflectionProperty($streamingServer, 'parser'); + $ref->setAccessible(true); + $parser = $ref->getValue($streamingServer); + $ref = new \ReflectionProperty($clock, 'loop'); $ref->setAccessible(true); $loop = $ref->getValue($clock); + $ref = new \ReflectionProperty($parser, 'loop'); + $ref->setAccessible(true); + $loop = $ref->getValue($parser); + $this->assertInstanceOf('React\EventLoop\LoopInterface', $loop); } @@ -263,13 +271,12 @@ public function testIdleConnectionWillBeClosedAfterConfiguredTimeout() { $this->connection->expects($this->once())->method('close'); - $loop = Factory::create(); - $http = new HttpServer($loop, new InactiveConnectionTimeoutMiddleware(0.1), $this->expectCallableNever()); + $http = new HttpServer(Loop::get(), new InactiveConnectionTimeoutMiddleware(0.1), $this->expectCallableNever()); $http->listen($this->socket); $this->socket->emit('connection', array($this->connection)); - $loop->run(); + Loop::run(); } public function testForwardErrors() diff --git a/tests/Io/RequestHeaderParserTest.php b/tests/Io/RequestHeaderParserTest.php index 87d6bf1b..cc3ab4ff 100644 --- a/tests/Io/RequestHeaderParserTest.php +++ b/tests/Io/RequestHeaderParserTest.php @@ -3,7 +3,11 @@ namespace React\Tests\Http\Io; use Psr\Http\Message\ServerRequestInterface; +use React\EventLoop\Loop; +use React\EventLoop\LoopInterface; +use React\Http\Io\Clock; use React\Http\Io\RequestHeaderParser; +use React\Http\Middleware\InactiveConnectionTimeoutMiddleware; use React\Tests\Http\TestCase; class RequestHeaderParserTest extends TestCase @@ -12,7 +16,7 @@ public function testSplitShouldHappenOnDoubleCrlf() { $clock = $this->getMockBuilder('React\Http\Io\Clock')->disableOriginalConstructor()->getMock(); - $parser = new RequestHeaderParser($clock); + $parser = $this->createRequestHeaderParser($clock); $parser->on('headers', $this->expectCallableNever()); $connection = $this->getMockBuilder('React\Socket\Connection')->disableOriginalConstructor()->setMethods(null)->getMock(); @@ -33,7 +37,7 @@ public function testFeedInOneGo() { $clock = $this->getMockBuilder('React\Http\Io\Clock')->disableOriginalConstructor()->getMock(); - $parser = new RequestHeaderParser($clock); + $parser = $this->createRequestHeaderParser($clock); $parser->on('headers', $this->expectCallableOnce()); $connection = $this->getMockBuilder('React\Socket\Connection')->disableOriginalConstructor()->setMethods(null)->getMock(); @@ -47,7 +51,7 @@ public function testFeedTwoRequestsOnSeparateConnections() { $clock = $this->getMockBuilder('React\Http\Io\Clock')->disableOriginalConstructor()->getMock(); - $parser = new RequestHeaderParser($clock); + $parser = $this->createRequestHeaderParser($clock); $called = 0; $parser->on('headers', function () use (&$called) { @@ -73,7 +77,7 @@ public function testHeadersEventShouldEmitRequestAndConnection() $clock = $this->getMockBuilder('React\Http\Io\Clock')->disableOriginalConstructor()->getMock(); - $parser = new RequestHeaderParser($clock); + $parser = $this->createRequestHeaderParser($clock); $parser->on('headers', function ($parsedRequest, $connection) use (&$request, &$conn) { $request = $parsedRequest; $conn = $connection; @@ -98,7 +102,7 @@ public function testHeadersEventShouldEmitRequestWhichShouldEmitEndForStreamingB { $clock = $this->getMockBuilder('React\Http\Io\Clock')->disableOriginalConstructor()->getMock(); - $parser = new RequestHeaderParser($clock); + $parser = $this->createRequestHeaderParser($clock); $ended = false; $that = $this; @@ -124,7 +128,7 @@ public function testHeadersEventShouldEmitRequestWhichShouldEmitStreamingBodyDat { $clock = $this->getMockBuilder('React\Http\Io\Clock')->disableOriginalConstructor()->getMock(); - $parser = new RequestHeaderParser($clock); + $parser = $this->createRequestHeaderParser($clock); $buffer = ''; $that = $this; @@ -154,7 +158,7 @@ public function testHeadersEventShouldEmitRequestWhichShouldEmitStreamingBodyWit { $clock = $this->getMockBuilder('React\Http\Io\Clock')->disableOriginalConstructor()->getMock(); - $parser = new RequestHeaderParser($clock); + $parser = $this->createRequestHeaderParser($clock); $buffer = ''; $that = $this; @@ -182,7 +186,7 @@ public function testHeadersEventShouldEmitRequestWhichShouldNotEmitStreamingBody { $clock = $this->getMockBuilder('React\Http\Io\Clock')->disableOriginalConstructor()->getMock(); - $parser = new RequestHeaderParser($clock); + $parser = $this->createRequestHeaderParser($clock); $buffer = ''; $that = $this; @@ -209,7 +213,7 @@ public function testHeadersEventShouldEmitRequestWhichShouldEmitStreamingBodyDat { $clock = $this->getMockBuilder('React\Http\Io\Clock')->disableOriginalConstructor()->getMock(); - $parser = new RequestHeaderParser($clock); + $parser = $this->createRequestHeaderParser($clock); $buffer = ''; $that = $this; @@ -238,7 +242,7 @@ public function testHeadersEventShouldParsePathAndQueryString() $clock = $this->getMockBuilder('React\Http\Io\Clock')->disableOriginalConstructor()->getMock(); - $parser = new RequestHeaderParser($clock); + $parser = $this->createRequestHeaderParser($clock); $parser->on('headers', function ($parsedRequest) use (&$request) { $request = $parsedRequest; }); @@ -267,7 +271,7 @@ public function testHeaderEventWithShouldApplyDefaultAddressFromLocalConnectionA $clock = $this->getMockBuilder('React\Http\Io\Clock')->disableOriginalConstructor()->getMock(); - $parser = new RequestHeaderParser($clock); + $parser = $this->createRequestHeaderParser($clock); $parser->on('headers', function ($parsedRequest) use (&$request) { $request = $parsedRequest; }); @@ -288,7 +292,7 @@ public function testHeaderEventViaHttpsShouldApplyHttpsSchemeFromLocalTlsConnect $clock = $this->getMockBuilder('React\Http\Io\Clock')->disableOriginalConstructor()->getMock(); - $parser = new RequestHeaderParser($clock); + $parser = $this->createRequestHeaderParser($clock); $parser->on('headers', function ($parsedRequest) use (&$request) { $request = $parsedRequest; }); @@ -310,7 +314,7 @@ public function testHeaderOverflowShouldEmitError() $clock = $this->getMockBuilder('React\Http\Io\Clock')->disableOriginalConstructor()->getMock(); - $parser = new RequestHeaderParser($clock); + $parser = $this->createRequestHeaderParser($clock); $parser->on('headers', $this->expectCallableNever()); $parser->on('error', function ($message, $connection) use (&$error, &$passedConnection) { $error = $message; @@ -334,7 +338,7 @@ public function testInvalidEmptyRequestHeadersParseException() $clock = $this->getMockBuilder('React\Http\Io\Clock')->disableOriginalConstructor()->getMock(); - $parser = new RequestHeaderParser($clock); + $parser = $this->createRequestHeaderParser($clock); $parser->on('headers', $this->expectCallableNever()); $parser->on('error', function ($message) use (&$error) { $error = $message; @@ -355,7 +359,7 @@ public function testInvalidMalformedRequestLineParseException() $clock = $this->getMockBuilder('React\Http\Io\Clock')->disableOriginalConstructor()->getMock(); - $parser = new RequestHeaderParser($clock); + $parser = $this->createRequestHeaderParser($clock); $parser->on('headers', $this->expectCallableNever()); $parser->on('error', function ($message) use (&$error) { $error = $message; @@ -376,7 +380,7 @@ public function testInvalidMalformedRequestHeadersThrowsParseException() $clock = $this->getMockBuilder('React\Http\Io\Clock')->disableOriginalConstructor()->getMock(); - $parser = new RequestHeaderParser($clock); + $parser = $this->createRequestHeaderParser($clock); $parser->on('headers', $this->expectCallableNever()); $parser->on('error', function ($message) use (&$error) { $error = $message; @@ -397,7 +401,7 @@ public function testInvalidMalformedRequestHeadersWhitespaceThrowsParseException $clock = $this->getMockBuilder('React\Http\Io\Clock')->disableOriginalConstructor()->getMock(); - $parser = new RequestHeaderParser($clock); + $parser = $this->createRequestHeaderParser($clock); $parser->on('headers', $this->expectCallableNever()); $parser->on('error', function ($message) use (&$error) { $error = $message; @@ -418,7 +422,7 @@ public function testInvalidAbsoluteFormSchemeEmitsError() $clock = $this->getMockBuilder('React\Http\Io\Clock')->disableOriginalConstructor()->getMock(); - $parser = new RequestHeaderParser($clock); + $parser = $this->createRequestHeaderParser($clock); $parser->on('headers', $this->expectCallableNever()); $parser->on('error', function ($message) use (&$error) { $error = $message; @@ -439,7 +443,7 @@ public function testOriginFormWithSchemeSeparatorInParam() $clock = $this->getMockBuilder('React\Http\Io\Clock')->disableOriginalConstructor()->getMock(); - $parser = new RequestHeaderParser($clock); + $parser = $this->createRequestHeaderParser($clock); $parser->on('error', $this->expectCallableNever()); $parser->on('headers', function ($parsedRequest, $parsedBodyBuffer) use (&$request) { $request = $parsedRequest; @@ -466,7 +470,7 @@ public function testUriStartingWithColonSlashSlashFails() $clock = $this->getMockBuilder('React\Http\Io\Clock')->disableOriginalConstructor()->getMock(); - $parser = new RequestHeaderParser($clock); + $parser = $this->createRequestHeaderParser($clock); $parser->on('headers', $this->expectCallableNever()); $parser->on('error', function ($message) use (&$error) { $error = $message; @@ -487,7 +491,7 @@ public function testInvalidAbsoluteFormWithFragmentEmitsError() $clock = $this->getMockBuilder('React\Http\Io\Clock')->disableOriginalConstructor()->getMock(); - $parser = new RequestHeaderParser($clock); + $parser = $this->createRequestHeaderParser($clock); $parser->on('headers', $this->expectCallableNever()); $parser->on('error', function ($message) use (&$error) { $error = $message; @@ -508,7 +512,7 @@ public function testInvalidHeaderContainsFullUri() $clock = $this->getMockBuilder('React\Http\Io\Clock')->disableOriginalConstructor()->getMock(); - $parser = new RequestHeaderParser($clock); + $parser = $this->createRequestHeaderParser($clock); $parser->on('headers', $this->expectCallableNever()); $parser->on('error', function ($message) use (&$error) { $error = $message; @@ -529,7 +533,7 @@ public function testInvalidAbsoluteFormWithHostHeaderEmpty() $clock = $this->getMockBuilder('React\Http\Io\Clock')->disableOriginalConstructor()->getMock(); - $parser = new RequestHeaderParser($clock); + $parser = $this->createRequestHeaderParser($clock); $parser->on('headers', $this->expectCallableNever()); $parser->on('error', function ($message) use (&$error) { $error = $message; @@ -550,7 +554,7 @@ public function testInvalidConnectRequestWithNonAuthorityForm() $clock = $this->getMockBuilder('React\Http\Io\Clock')->disableOriginalConstructor()->getMock(); - $parser = new RequestHeaderParser($clock); + $parser = $this->createRequestHeaderParser($clock); $parser->on('headers', $this->expectCallableNever()); $parser->on('error', function ($message) use (&$error) { $error = $message; @@ -571,7 +575,7 @@ public function testInvalidHttpVersion() $clock = $this->getMockBuilder('React\Http\Io\Clock')->disableOriginalConstructor()->getMock(); - $parser = new RequestHeaderParser($clock); + $parser = $this->createRequestHeaderParser($clock); $parser->on('headers', $this->expectCallableNever()); $parser->on('error', function ($message) use (&$error) { $error = $message; @@ -593,7 +597,7 @@ public function testInvalidContentLengthRequestHeaderWillEmitError() $clock = $this->getMockBuilder('React\Http\Io\Clock')->disableOriginalConstructor()->getMock(); - $parser = new RequestHeaderParser($clock); + $parser = $this->createRequestHeaderParser($clock); $parser->on('headers', $this->expectCallableNever()); $parser->on('error', function ($message) use (&$error) { $error = $message; @@ -615,7 +619,7 @@ public function testInvalidRequestWithMultipleContentLengthRequestHeadersWillEmi $clock = $this->getMockBuilder('React\Http\Io\Clock')->disableOriginalConstructor()->getMock(); - $parser = new RequestHeaderParser($clock); + $parser = $this->createRequestHeaderParser($clock); $parser->on('headers', $this->expectCallableNever()); $parser->on('error', function ($message) use (&$error) { $error = $message; @@ -637,7 +641,7 @@ public function testInvalidTransferEncodingRequestHeaderWillEmitError() $clock = $this->getMockBuilder('React\Http\Io\Clock')->disableOriginalConstructor()->getMock(); - $parser = new RequestHeaderParser($clock); + $parser = $this->createRequestHeaderParser($clock); $parser->on('headers', $this->expectCallableNever()); $parser->on('error', function ($message) use (&$error) { $error = $message; @@ -659,7 +663,7 @@ public function testInvalidRequestWithBothTransferEncodingAndContentLengthWillEm $clock = $this->getMockBuilder('React\Http\Io\Clock')->disableOriginalConstructor()->getMock(); - $parser = new RequestHeaderParser($clock); + $parser = $this->createRequestHeaderParser($clock); $parser->on('headers', $this->expectCallableNever()); $parser->on('error', function ($message) use (&$error) { $error = $message; @@ -682,7 +686,7 @@ public function testServerParamsWillBeSetOnHttpsRequest() $clock = $this->getMockBuilder('React\Http\Io\Clock')->disableOriginalConstructor()->getMock(); $clock->expects($this->once())->method('now')->willReturn(1652972091.3958); - $parser = new RequestHeaderParser($clock); + $parser = $this->createRequestHeaderParser($clock); $parser->on('headers', function ($parsedRequest) use (&$request) { $request = $parsedRequest; @@ -715,7 +719,7 @@ public function testServerParamsWillBeSetOnHttpRequest() $clock = $this->getMockBuilder('React\Http\Io\Clock')->disableOriginalConstructor()->getMock(); $clock->expects($this->once())->method('now')->willReturn(1652972091.3958); - $parser = new RequestHeaderParser($clock); + $parser = $this->createRequestHeaderParser($clock); $parser->on('headers', function ($parsedRequest) use (&$request) { $request = $parsedRequest; @@ -748,7 +752,7 @@ public function testServerParamsWillNotSetRemoteAddressForUnixDomainSockets() $clock = $this->getMockBuilder('React\Http\Io\Clock')->disableOriginalConstructor()->getMock(); $clock->expects($this->once())->method('now')->willReturn(1652972091.3958); - $parser = new RequestHeaderParser($clock); + $parser = $this->createRequestHeaderParser($clock); $parser->on('headers', function ($parsedRequest) use (&$request) { $request = $parsedRequest; @@ -785,7 +789,7 @@ public function testServerParamsWontBeSetOnMissingUrls() $clock = $this->getMockBuilder('React\Http\Io\Clock')->disableOriginalConstructor()->getMock(); $clock->expects($this->once())->method('now')->willReturn(1652972091.3958); - $parser = new RequestHeaderParser($clock); + $parser = $this->createRequestHeaderParser($clock); $parser->on('headers', function ($parsedRequest) use (&$request) { $request = $parsedRequest; @@ -813,7 +817,7 @@ public function testServerParamsWillBeReusedForMultipleRequestsFromSameConnectio $clock = $this->getMockBuilder('React\Http\Io\Clock')->disableOriginalConstructor()->getMock(); $clock->expects($this->exactly(2))->method('now')->willReturn(1652972091.3958); - $parser = new RequestHeaderParser($clock); + $parser = $this->createRequestHeaderParser($clock); $connection = $this->getMockBuilder('React\Socket\Connection')->disableOriginalConstructor()->setMethods(array('getLocalAddress', 'getRemoteAddress'))->getMock(); $connection->expects($this->once())->method('getLocalAddress')->willReturn('tcp://127.1.1.1:8000'); @@ -848,7 +852,7 @@ public function testServerParamsWillBeRememberedUntilConnectionIsClosed() { $clock = $this->getMockBuilder('React\Http\Io\Clock')->disableOriginalConstructor()->getMock(); - $parser = new RequestHeaderParser($clock); + $parser = $this->createRequestHeaderParser($clock); $connection = $this->getMockBuilder('React\Socket\Connection')->disableOriginalConstructor()->setMethods(array('getLocalAddress', 'getRemoteAddress'))->getMock(); @@ -870,7 +874,7 @@ public function testQueryParmetersWillBeSet() $clock = $this->getMockBuilder('React\Http\Io\Clock')->disableOriginalConstructor()->getMock(); - $parser = new RequestHeaderParser($clock); + $parser = $this->createRequestHeaderParser($clock); $parser->on('headers', function ($parsedRequest) use (&$request) { $request = $parsedRequest; @@ -887,6 +891,49 @@ public function testQueryParmetersWillBeSet() $this->assertEquals('this', $queryParams['test']); } + public function testIdleConnectionWillBeClosedAfterConfiguredTimeout() + { + $callback = null; + $caughtError = null; + $timer = $this->getMockBuilder('React\EventLoop\TimerInterface')->getMock(); + $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); + $loop->expects($this->exactly(2))->method('addTimer')->with(0.1, $this->callback(function ($cb) use (&$tick, &$callback) { + $callback = $cb; + return true; + }))->willReturn($timer); + $loop->expects($this->any())->method('cancelTimer')->with($timer); + + $connection = $this->getMockBuilder('React\Socket\Connection')->disableOriginalConstructor()->setMethods(array('close'))->getMock(); + $connection->expects($this->once())->method('close'); + + $parser = $this->createRequestHeaderParser(new Clock($loop), 0.1, $loop); + $parser->on('error', function ($error) use (&$caughtError) { + $caughtError = $error; + }); + + $parser->handle($connection); + + $connection->emit('data', array("GET /foo.php?hello=world&test=this HTTP/")); + + self::assertTrue(is_callable($callback)); + $callback(); + + self::assertInstanceOf('\RuntimeException', $caughtError); + self::assertSame('Request timed out', $caughtError->getMessage()); + } + + public function testIdleConnectionWillNotBeClosedAfterConfiguredTimeoutOnFullRequest() + { + $connection = $this->getMockBuilder('React\Socket\Connection')->disableOriginalConstructor()->setMethods(array('close'))->getMock(); + $connection->expects($this->never())->method('close'); + + $parser = $this->createRequestHeaderParser(new Clock($this->getMockBuilder('React\EventLoop\LoopInterface')->getMock()), 0.1); + + $parser->handle($connection); + + $connection->emit('data', array($this->createGetRequest())); + } + private function createGetRequest() { $data = "GET / HTTP/1.1\r\n"; @@ -907,4 +954,9 @@ private function createAdvancedPostRequest() return $data; } + + private function createRequestHeaderParser(Clock $clock, $timeout = 1, LoopInterface $loop = null) + { + return new RequestHeaderParser($loop instanceof LoopInterface ? $loop : Loop::get(), $clock, $timeout); + } } diff --git a/tests/Io/StreamingServerTest.php b/tests/Io/StreamingServerTest.php index 0b4d9c1d..62eb0a46 100644 --- a/tests/Io/StreamingServerTest.php +++ b/tests/Io/StreamingServerTest.php @@ -4,9 +4,12 @@ use Psr\Http\Message\ServerRequestInterface; use React\EventLoop\Loop; +use React\Http\Io\Clock; +use React\Http\Io\RequestHeaderParser; use React\Http\Io\StreamingServer; use React\Http\Message\Response; use React\Http\Message\ServerRequest; +use React\Http\Middleware\InactiveConnectionTimeoutMiddleware; use React\Promise\Promise; use React\Stream\ThroughStream; use React\Tests\Http\SocketServerStub; @@ -51,7 +54,7 @@ public function setUpConnectionMockAndSocket() public function testRequestEventWillNotBeEmittedForIncompleteHeaders() { - $server = new StreamingServer(Loop::get(), $this->expectCallableNever()); + $server = $this->createStreamingServer($this->expectCallableNever()); $server->listen($this->socket); $this->socket->emit('connection', array($this->connection)); @@ -65,18 +68,20 @@ public function testIdleConnectionWillBeClosedAfterConfiguredTimeout() { $this->connection->expects($this->once())->method('close'); - $loop = Factory::create(); - $server = new StreamingServer($loop, $this->expectCallableNever(), 0.1); + $server = $this->createStreamingServer($this->expectCallableNever(), 0.1); $server->listen($this->socket); $this->socket->emit('connection', array($this->connection)); - $loop->run(); + Loop::addTimer(0.2, function () { + Loop::stop(); + }); + Loop::run(); } public function testRequestEventIsEmitted() { - $server = new StreamingServer(Loop::get(), $this->expectCallableOnce()); + $server = $this->createStreamingServer($this->expectCallableOnce()); $server->listen($this->socket); $this->socket->emit('connection', array($this->connection)); @@ -91,7 +96,7 @@ public function testRequestEventIsEmitted() public function testRequestEventIsEmittedForArrayCallable() { $this->called = null; - $server = new StreamingServer(Loop::get(), array($this, 'helperCallableOnce')); + $server = $this->createStreamingServer(array($this, 'helperCallableOnce')); $server->listen($this->socket); $this->socket->emit('connection', array($this->connection)); @@ -111,7 +116,7 @@ public function testRequestEvent() { $i = 0; $requestAssertion = null; - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use (&$i, &$requestAssertion) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use (&$i, &$requestAssertion) { $i++; $requestAssertion = $request; }); @@ -144,7 +149,7 @@ public function testRequestEventWithSingleRequestHandlerArray() { $i = 0; $requestAssertion = null; - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use (&$i, &$requestAssertion) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use (&$i, &$requestAssertion) { $i++; $requestAssertion = $request; }); @@ -176,7 +181,7 @@ public function testRequestEventWithSingleRequestHandlerArray() public function testRequestGetWithHostAndCustomPort() { $requestAssertion = null; - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use (&$requestAssertion) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use (&$requestAssertion) { $requestAssertion = $request; }); @@ -198,7 +203,7 @@ public function testRequestGetWithHostAndCustomPort() public function testRequestGetWithHostAndHttpsPort() { $requestAssertion = null; - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use (&$requestAssertion) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use (&$requestAssertion) { $requestAssertion = $request; }); @@ -220,7 +225,7 @@ public function testRequestGetWithHostAndHttpsPort() public function testRequestGetWithHostAndDefaultPortWillBeIgnored() { $requestAssertion = null; - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use (&$requestAssertion) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use (&$requestAssertion) { $requestAssertion = $request; }); @@ -242,7 +247,7 @@ public function testRequestGetWithHostAndDefaultPortWillBeIgnored() public function testRequestGetHttp10WithoutHostWillBeIgnored() { $requestAssertion = null; - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use (&$requestAssertion) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use (&$requestAssertion) { $requestAssertion = $request; }); @@ -264,7 +269,7 @@ public function testRequestGetHttp10WithoutHostWillBeIgnored() public function testRequestGetHttp11WithoutHostWillReject() { - $server = new StreamingServer(Loop::get(), 'var_dump'); + $server = $this->createStreamingServer('var_dump'); $server->on('error', $this->expectCallableOnce()); $server->listen($this->socket); @@ -277,7 +282,7 @@ public function testRequestGetHttp11WithoutHostWillReject() public function testRequestOptionsAsterisk() { $requestAssertion = null; - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use (&$requestAssertion) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use (&$requestAssertion) { $requestAssertion = $request; }); @@ -297,7 +302,7 @@ public function testRequestOptionsAsterisk() public function testRequestNonOptionsWithAsteriskRequestTargetWillReject() { - $server = new StreamingServer(Loop::get(), $this->expectCallableNever()); + $server = $this->createStreamingServer($this->expectCallableNever()); $server->on('error', $this->expectCallableOnce()); $server->listen($this->socket); @@ -310,7 +315,7 @@ public function testRequestNonOptionsWithAsteriskRequestTargetWillReject() public function testRequestConnectAuthorityForm() { $requestAssertion = null; - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use (&$requestAssertion) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use (&$requestAssertion) { $requestAssertion = $request; }); @@ -332,7 +337,7 @@ public function testRequestConnectAuthorityForm() public function testRequestConnectWithoutHostWillBePassesAsIs() { $requestAssertion = null; - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use (&$requestAssertion) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use (&$requestAssertion) { $requestAssertion = $request; }); @@ -354,7 +359,7 @@ public function testRequestConnectWithoutHostWillBePassesAsIs() public function testRequestConnectAuthorityFormWithDefaultPortWillBePassedAsIs() { $requestAssertion = null; - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use (&$requestAssertion) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use (&$requestAssertion) { $requestAssertion = $request; }); @@ -376,7 +381,7 @@ public function testRequestConnectAuthorityFormWithDefaultPortWillBePassedAsIs() public function testRequestConnectAuthorityFormNonMatchingHostWillBePassedAsIs() { $requestAssertion = null; - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use (&$requestAssertion) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use (&$requestAssertion) { $requestAssertion = $request; }); @@ -397,7 +402,7 @@ public function testRequestConnectAuthorityFormNonMatchingHostWillBePassedAsIs() public function testRequestConnectOriginFormRequestTargetWillReject() { - $server = new StreamingServer(Loop::get(), $this->expectCallableNever()); + $server = $this->createStreamingServer($this->expectCallableNever()); $server->on('error', $this->expectCallableOnce()); $server->listen($this->socket); @@ -409,7 +414,7 @@ public function testRequestConnectOriginFormRequestTargetWillReject() public function testRequestNonConnectWithAuthorityRequestTargetWillReject() { - $server = new StreamingServer(Loop::get(), $this->expectCallableNever()); + $server = $this->createStreamingServer($this->expectCallableNever()); $server->on('error', $this->expectCallableOnce()); $server->listen($this->socket); @@ -423,7 +428,7 @@ public function testRequestWithoutHostEventUsesSocketAddress() { $requestAssertion = null; - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use (&$requestAssertion) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use (&$requestAssertion) { $requestAssertion = $request; }); @@ -449,7 +454,7 @@ public function testRequestAbsoluteEvent() { $requestAssertion = null; - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use (&$requestAssertion) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use (&$requestAssertion) { $requestAssertion = $request; }); @@ -471,7 +476,7 @@ public function testRequestAbsoluteNonMatchingHostWillBePassedAsIs() { $requestAssertion = null; - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use (&$requestAssertion) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use (&$requestAssertion) { $requestAssertion = $request; }); @@ -491,7 +496,7 @@ public function testRequestAbsoluteNonMatchingHostWillBePassedAsIs() public function testRequestAbsoluteWithoutHostWillReject() { - $server = new StreamingServer(Loop::get(), $this->expectCallableNever()); + $server = $this->createStreamingServer($this->expectCallableNever()); $server->on('error', $this->expectCallableOnce()); $server->listen($this->socket); @@ -505,7 +510,7 @@ public function testRequestOptionsAsteriskEvent() { $requestAssertion = null; - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use (&$requestAssertion) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use (&$requestAssertion) { $requestAssertion = $request; }); @@ -527,7 +532,7 @@ public function testRequestOptionsAbsoluteEvent() { $requestAssertion = null; - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use (&$requestAssertion) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use (&$requestAssertion) { $requestAssertion = $request; }); @@ -547,7 +552,7 @@ public function testRequestOptionsAbsoluteEvent() public function testRequestPauseWillBeForwardedToConnection() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) { $request->getBody()->pause(); }); @@ -567,7 +572,7 @@ public function testRequestPauseWillBeForwardedToConnection() public function testRequestResumeWillBeForwardedToConnection() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) { $request->getBody()->resume(); }); @@ -587,7 +592,7 @@ public function testRequestResumeWillBeForwardedToConnection() public function testRequestCloseWillNotCloseConnection() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) { $request->getBody()->close(); }); @@ -602,7 +607,7 @@ public function testRequestCloseWillNotCloseConnection() public function testRequestPauseAfterCloseWillNotBeForwarded() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) { $request->getBody()->close(); $request->getBody()->pause(); }); @@ -619,7 +624,7 @@ public function testRequestPauseAfterCloseWillNotBeForwarded() public function testRequestResumeAfterCloseWillNotBeForwarded() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) { $request->getBody()->close(); $request->getBody()->resume(); }); @@ -638,7 +643,7 @@ public function testRequestEventWithoutBodyWillNotEmitData() { $never = $this->expectCallableNever(); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($never) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use ($never) { $request->getBody()->on('data', $never); }); @@ -653,7 +658,7 @@ public function testRequestEventWithSecondDataEventWillEmitBodyData() { $once = $this->expectCallableOnceWith('incomplete'); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($once) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use ($once) { $request->getBody()->on('data', $once); }); @@ -673,7 +678,7 @@ public function testRequestEventWithPartialBodyWillEmitData() { $once = $this->expectCallableOnceWith('incomplete'); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($once) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use ($once) { $request->getBody()->on('data', $once); }); @@ -694,7 +699,7 @@ public function testRequestEventWithPartialBodyWillEmitData() public function testResponseContainsServerHeader() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) { return new Response(); }); @@ -724,7 +729,7 @@ public function testResponsePendingPromiseWillNotSendAnything() { $never = $this->expectCallableNever(); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($never) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use ($never) { return new Promise(function () { }, $never); }); @@ -754,7 +759,7 @@ public function testResponsePendingPromiseWillBeCancelledIfConnectionCloses() { $once = $this->expectCallableOnce(); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($once) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use ($once) { return new Promise(function () { }, $once); }); @@ -786,7 +791,7 @@ public function testResponseBodyStreamAlreadyClosedWillSendEmptyBodyChunkedEncod $stream = new ThroughStream(); $stream->close(); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($stream) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use ($stream) { return new Response( 200, array(), @@ -815,13 +820,15 @@ function ($data) use (&$buffer) { $this->assertStringStartsWith("HTTP/1.1 200 OK\r\n", $buffer); $this->assertStringEndsWith("\r\n\r\n0\r\n\r\n", $buffer); + + Loop::run(); } public function testResponseBodyStreamEndingWillSendEmptyBodyChunkedEncoded() { $stream = new ThroughStream(); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($stream) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use ($stream) { return new Response( 200, array(), @@ -852,6 +859,8 @@ function ($data) use (&$buffer) { $this->assertStringStartsWith("HTTP/1.1 200 OK\r\n", $buffer); $this->assertStringEndsWith("\r\n\r\n0\r\n\r\n", $buffer); + + Loop::run(); } public function testResponseBodyStreamAlreadyClosedWillSendEmptyBodyPlainHttp10() @@ -859,7 +868,7 @@ public function testResponseBodyStreamAlreadyClosedWillSendEmptyBodyPlainHttp10( $stream = new ThroughStream(); $stream->close(); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($stream) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use ($stream) { return new Response( 200, array(), @@ -895,7 +904,7 @@ public function testResponseStreamWillBeClosedIfConnectionIsAlreadyClosed() $stream = new ThroughStream(); $stream->on('close', $this->expectCallableOnce()); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($stream) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use ($stream) { return new Response( 200, array(), @@ -950,7 +959,7 @@ public function testResponseBodyStreamWillBeClosedIfConnectionEmitsCloseEvent() $stream = new ThroughStream(); $stream->on('close', $this->expectCallableOnce()); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($stream) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use ($stream) { return new Response( 200, array(), @@ -968,7 +977,7 @@ public function testResponseBodyStreamWillBeClosedIfConnectionEmitsCloseEvent() public function testResponseUpgradeInResponseCanBeUsedToAdvertisePossibleUpgrade() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) { return new Response( 200, array( @@ -1000,11 +1009,13 @@ function ($data) use (&$buffer) { $this->connection->emit('data', array($data)); $this->assertEquals("HTTP/1.1 200 OK\r\nUpgrade: demo\r\nContent-Length: 3\r\n\r\nfoo", $buffer); + + Loop::run(); } public function testResponseUpgradeWishInRequestCanBeIgnoredByReturningNormalResponse() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) { return new Response( 200, array( @@ -1035,11 +1046,13 @@ function ($data) use (&$buffer) { $this->connection->emit('data', array($data)); $this->assertEquals("HTTP/1.1 200 OK\r\nContent-Length: 3\r\n\r\nfoo", $buffer); + + Loop::run(); } public function testResponseUpgradeSwitchingProtocolIncludesConnectionUpgradeHeaderWithoutContentLength() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) { return new Response( 101, array( @@ -1079,7 +1092,7 @@ public function testResponseUpgradeSwitchingProtocolWithStreamWillPipeDataToConn { $stream = new ThroughStream(); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($stream) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use ($stream) { return new Response( 101, array( @@ -1120,7 +1133,7 @@ public function testResponseConnectMethodStreamWillPipeDataToConnection() { $stream = new ThroughStream(); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($stream) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use ($stream) { return new Response( 200, array(), @@ -1158,7 +1171,7 @@ public function testResponseConnectMethodStreamWillPipeDataFromConnection() { $stream = new ThroughStream(); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($stream) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use ($stream) { return new Response( 200, array(), @@ -1177,7 +1190,7 @@ public function testResponseConnectMethodStreamWillPipeDataFromConnection() public function testResponseContainsSameRequestProtocolVersionAndChunkedBodyForHttp11() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) { return new Response( 200, array(), @@ -1206,11 +1219,13 @@ function ($data) use (&$buffer) { $this->assertContainsString("HTTP/1.1 200 OK\r\n", $buffer); $this->assertContainsString("bye", $buffer); + + Loop::run(); } public function testResponseContainsSameRequestProtocolVersionAndRawBodyForHttp10() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) { return new Response( 200, array(), @@ -1244,7 +1259,7 @@ function ($data) use (&$buffer) { public function testResponseContainsNoResponseBodyForHeadRequest() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) { return new Response( 200, array(), @@ -1273,6 +1288,8 @@ function ($data) use (&$buffer) { $this->assertContainsString("HTTP/1.1 200 OK\r\n", $buffer); $this->assertContainsString("\r\nContent-Length: 3\r\n", $buffer); $this->assertNotContainsString("bye", $buffer); + + Loop::run(); } public function testResponseContainsNoResponseBodyForHeadRequestWithStreamingResponse() @@ -1280,7 +1297,7 @@ public function testResponseContainsNoResponseBodyForHeadRequestWithStreamingRes $stream = new ThroughStream(); $stream->on('close', $this->expectCallableOnce()); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($stream) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use ($stream) { return new Response( 200, array('Content-Length' => '3'), @@ -1308,11 +1325,13 @@ function ($data) use (&$buffer) { $this->assertContainsString("HTTP/1.1 200 OK\r\n", $buffer); $this->assertContainsString("\r\nContent-Length: 3\r\n", $buffer); + + Loop::run(); } public function testResponseContainsNoResponseBodyAndNoContentLengthForNoContentStatus() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) { return new Response( 204, array(), @@ -1341,6 +1360,8 @@ function ($data) use (&$buffer) { $this->assertContainsString("HTTP/1.1 204 No Content\r\n", $buffer); $this->assertNotContainsString("\r\nContent-Length: 3\r\n", $buffer); $this->assertNotContainsString("bye", $buffer); + + Loop::run(); } public function testResponseContainsNoResponseBodyAndNoContentLengthForNoContentStatusResponseWithStreamingBody() @@ -1348,7 +1369,7 @@ public function testResponseContainsNoResponseBodyAndNoContentLengthForNoContent $stream = new ThroughStream(); $stream->on('close', $this->expectCallableOnce()); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($stream) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use ($stream) { return new Response( 204, array('Content-Length' => '3'), @@ -1376,11 +1397,13 @@ function ($data) use (&$buffer) { $this->assertContainsString("HTTP/1.1 204 No Content\r\n", $buffer); $this->assertNotContainsString("\r\nContent-Length: 3\r\n", $buffer); + + Loop::run(); } public function testResponseContainsNoContentLengthHeaderForNotModifiedStatus() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) { return new Response( 304, array(), @@ -1408,11 +1431,13 @@ function ($data) use (&$buffer) { $this->assertContainsString("HTTP/1.1 304 Not Modified\r\n", $buffer); $this->assertNotContainsString("\r\nContent-Length: 0\r\n", $buffer); + + Loop::run(); } public function testResponseContainsExplicitContentLengthHeaderForNotModifiedStatus() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) { return new Response( 304, array('Content-Length' => 3), @@ -1440,11 +1465,13 @@ function ($data) use (&$buffer) { $this->assertContainsString("HTTP/1.1 304 Not Modified\r\n", $buffer); $this->assertContainsString("\r\nContent-Length: 3\r\n", $buffer); + + Loop::run(); } public function testResponseContainsExplicitContentLengthHeaderForHeadRequests() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) { return new Response( 200, array('Content-Length' => 3), @@ -1472,11 +1499,13 @@ function ($data) use (&$buffer) { $this->assertContainsString("HTTP/1.1 200 OK\r\n", $buffer); $this->assertContainsString("\r\nContent-Length: 3\r\n", $buffer); + + Loop::run(); } public function testResponseContainsNoResponseBodyForNotModifiedStatus() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) { return new Response( 304, array(), @@ -1505,6 +1534,8 @@ function ($data) use (&$buffer) { $this->assertContainsString("HTTP/1.1 304 Not Modified\r\n", $buffer); $this->assertContainsString("\r\nContent-Length: 3\r\n", $buffer); $this->assertNotContainsString("bye", $buffer); + + Loop::run(); } public function testResponseContainsNoResponseBodyForNotModifiedStatusWithStreamingBody() @@ -1512,7 +1543,7 @@ public function testResponseContainsNoResponseBodyForNotModifiedStatusWithStream $stream = new ThroughStream(); $stream->on('close', $this->expectCallableOnce()); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($stream) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use ($stream) { return new Response( 304, array('Content-Length' => '3'), @@ -1540,12 +1571,14 @@ function ($data) use (&$buffer) { $this->assertContainsString("HTTP/1.1 304 Not Modified\r\n", $buffer); $this->assertContainsString("\r\nContent-Length: 3\r\n", $buffer); + + Loop::run(); } public function testRequestInvalidHttpProtocolVersionWillEmitErrorAndSendErrorResponse() { $error = null; - $server = new StreamingServer(Loop::get(), $this->expectCallableNever()); + $server = $this->createStreamingServer($this->expectCallableNever()); $server->on('error', function ($message) use (&$error) { $error = $message; }); @@ -1579,7 +1612,7 @@ function ($data) use (&$buffer) { public function testRequestOverflowWillEmitErrorAndSendErrorResponse() { $error = null; - $server = new StreamingServer(Loop::get(), $this->expectCallableNever()); + $server = $this->createStreamingServer($this->expectCallableNever()); $server->on('error', function ($message) use (&$error) { $error = $message; }); @@ -1613,7 +1646,7 @@ function ($data) use (&$buffer) { public function testRequestInvalidWillEmitErrorAndSendErrorResponse() { $error = null; - $server = new StreamingServer(Loop::get(), $this->expectCallableNever()); + $server = $this->createStreamingServer($this->expectCallableNever()); $server->on('error', function ($message) use (&$error) { $error = $message; }); @@ -1650,7 +1683,7 @@ public function testRequestContentLengthBodyDataWillEmitDataEventOnRequestStream $closeEvent = $this->expectCallableOnce(); $errorEvent = $this->expectCallableNever(); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($dataEvent, $endEvent, $closeEvent, $errorEvent) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use ($dataEvent, $endEvent, $closeEvent, $errorEvent) { $request->getBody()->on('data', $dataEvent); $request->getBody()->on('end', $endEvent); $request->getBody()->on('close', $closeEvent); @@ -1678,7 +1711,7 @@ public function testRequestChunkedTransferEncodingRequestWillEmitDecodedDataEven $errorEvent = $this->expectCallableNever(); $requestValidation = null; - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($dataEvent, $endEvent, $closeEvent, $errorEvent, &$requestValidation) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use ($dataEvent, $endEvent, $closeEvent, $errorEvent, &$requestValidation) { $request->getBody()->on('data', $dataEvent); $request->getBody()->on('end', $endEvent); $request->getBody()->on('close', $closeEvent); @@ -1709,7 +1742,7 @@ public function testRequestChunkedTransferEncodingWithAdditionalDataWontBeEmitte $closeEvent = $this->expectCallableOnce(); $errorEvent = $this->expectCallableNever(); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($dataEvent, $endEvent, $closeEvent, $errorEvent) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use ($dataEvent, $endEvent, $closeEvent, $errorEvent) { $request->getBody()->on('data', $dataEvent); $request->getBody()->on('end', $endEvent); $request->getBody()->on('close', $closeEvent); @@ -1738,7 +1771,7 @@ public function testRequestChunkedTransferEncodingEmpty() $closeEvent = $this->expectCallableOnce(); $errorEvent = $this->expectCallableNever(); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($dataEvent, $endEvent, $closeEvent, $errorEvent) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use ($dataEvent, $endEvent, $closeEvent, $errorEvent) { $request->getBody()->on('data', $dataEvent); $request->getBody()->on('end', $endEvent); $request->getBody()->on('close', $closeEvent); @@ -1766,7 +1799,7 @@ public function testRequestChunkedTransferEncodingHeaderCanBeUpperCase() $errorEvent = $this->expectCallableNever(); $requestValidation = null; - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($dataEvent, $endEvent, $closeEvent, $errorEvent, &$requestValidation) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use ($dataEvent, $endEvent, $closeEvent, $errorEvent, &$requestValidation) { $request->getBody()->on('data', $dataEvent); $request->getBody()->on('end', $endEvent); $request->getBody()->on('close', $closeEvent); @@ -1796,7 +1829,7 @@ public function testRequestChunkedTransferEncodingCanBeMixedUpperAndLowerCase() $closeEvent = $this->expectCallableOnce(); $errorEvent = $this->expectCallableNever(); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($dataEvent, $endEvent, $closeEvent, $errorEvent) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use ($dataEvent, $endEvent, $closeEvent, $errorEvent) { $request->getBody()->on('data', $dataEvent); $request->getBody()->on('end', $endEvent); $request->getBody()->on('close', $closeEvent); @@ -1823,7 +1856,7 @@ public function testRequestContentLengthWillEmitDataEventAndEndEventAndAdditiona $closeEvent = $this->expectCallableOnce(); $errorEvent = $this->expectCallableNever(); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($dataEvent, $endEvent, $closeEvent, $errorEvent) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use ($dataEvent, $endEvent, $closeEvent, $errorEvent) { $request->getBody()->on('data', $dataEvent); $request->getBody()->on('end', $endEvent); $request->getBody()->on('close', $closeEvent); @@ -1854,7 +1887,7 @@ public function testRequestContentLengthWillEmitDataEventAndEndEventAndAdditiona $errorEvent = $this->expectCallableNever(); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($dataEvent, $endEvent, $closeEvent, $errorEvent) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use ($dataEvent, $endEvent, $closeEvent, $errorEvent) { $request->getBody()->on('data', $dataEvent); $request->getBody()->on('end', $endEvent); $request->getBody()->on('close', $closeEvent); @@ -1886,7 +1919,7 @@ public function testRequestZeroContentLengthWillEmitEndEvent() $closeEvent = $this->expectCallableOnce(); $errorEvent = $this->expectCallableNever(); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($dataEvent, $endEvent, $closeEvent, $errorEvent) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use ($dataEvent, $endEvent, $closeEvent, $errorEvent) { $request->getBody()->on('data', $dataEvent); $request->getBody()->on('end', $endEvent); $request->getBody()->on('close', $closeEvent); @@ -1912,7 +1945,7 @@ public function testRequestZeroContentLengthWillEmitEndAndAdditionalDataWillBeIg $closeEvent = $this->expectCallableOnce(); $errorEvent = $this->expectCallableNever(); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($dataEvent, $endEvent, $closeEvent, $errorEvent) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use ($dataEvent, $endEvent, $closeEvent, $errorEvent) { $request->getBody()->on('data', $dataEvent); $request->getBody()->on('end', $endEvent); $request->getBody()->on('close', $closeEvent); @@ -1939,7 +1972,7 @@ public function testRequestZeroContentLengthWillEmitEndAndAdditionalDataWillBeIg $closeEvent = $this->expectCallableOnce(); $errorEvent = $this->expectCallableNever(); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($dataEvent, $endEvent, $closeEvent, $errorEvent) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use ($dataEvent, $endEvent, $closeEvent, $errorEvent) { $request->getBody()->on('data', $dataEvent); $request->getBody()->on('end', $endEvent); $request->getBody()->on('close', $closeEvent); @@ -1965,7 +1998,7 @@ public function testRequestZeroContentLengthWillEmitEndAndAdditionalDataWillBeIg public function testRequestInvalidChunkHeaderTooLongWillEmitErrorOnRequestStream() { $errorEvent = $this->expectCallableOnceWith($this->isInstanceOf('Exception')); - $server = new StreamingServer(Loop::get(), function ($request) use ($errorEvent){ + $server = $this->createStreamingServer(function ($request) use ($errorEvent){ $request->getBody()->on('error', $errorEvent); return \React\Promise\resolve(new Response()); }); @@ -1990,7 +2023,7 @@ public function testRequestInvalidChunkHeaderTooLongWillEmitErrorOnRequestStream public function testRequestInvalidChunkBodyTooLongWillEmitErrorOnRequestStream() { $errorEvent = $this->expectCallableOnceWith($this->isInstanceOf('Exception')); - $server = new StreamingServer(Loop::get(), function ($request) use ($errorEvent){ + $server = $this->createStreamingServer(function ($request) use ($errorEvent){ $request->getBody()->on('error', $errorEvent); }); @@ -2012,7 +2045,7 @@ public function testRequestInvalidChunkBodyTooLongWillEmitErrorOnRequestStream() public function testRequestUnexpectedEndOfRequestWithChunkedTransferConnectionWillEmitErrorOnRequestStream() { $errorEvent = $this->expectCallableOnceWith($this->isInstanceOf('Exception')); - $server = new StreamingServer(Loop::get(), function ($request) use ($errorEvent){ + $server = $this->createStreamingServer(function ($request) use ($errorEvent){ $request->getBody()->on('error', $errorEvent); }); @@ -2035,7 +2068,7 @@ public function testRequestUnexpectedEndOfRequestWithChunkedTransferConnectionWi public function testRequestInvalidChunkHeaderWillEmitErrorOnRequestStream() { $errorEvent = $this->expectCallableOnceWith($this->isInstanceOf('Exception')); - $server = new StreamingServer(Loop::get(), function ($request) use ($errorEvent){ + $server = $this->createStreamingServer(function ($request) use ($errorEvent){ $request->getBody()->on('error', $errorEvent); }); @@ -2057,7 +2090,7 @@ public function testRequestInvalidChunkHeaderWillEmitErrorOnRequestStream() public function testRequestUnexpectedEndOfRequestWithContentLengthWillEmitErrorOnRequestStream() { $errorEvent = $this->expectCallableOnceWith($this->isInstanceOf('Exception')); - $server = new StreamingServer(Loop::get(), function ($request) use ($errorEvent){ + $server = $this->createStreamingServer(function ($request) use ($errorEvent){ $request->getBody()->on('error', $errorEvent); }); @@ -2084,7 +2117,7 @@ public function testRequestWithoutBodyWillEmitEndOnRequestStream() $endEvent = $this->expectCallableOnce(); $errorEvent = $this->expectCallableNever(); - $server = new StreamingServer(Loop::get(), function ($request) use ($dataEvent, $closeEvent, $endEvent, $errorEvent){ + $server = $this->createStreamingServer(function ($request) use ($dataEvent, $closeEvent, $endEvent, $errorEvent){ $request->getBody()->on('data', $dataEvent); $request->getBody()->on('close', $closeEvent); $request->getBody()->on('end', $endEvent); @@ -2108,7 +2141,7 @@ public function testRequestWithoutDefinedLengthWillIgnoreDataEvent() $closeEvent = $this->expectCallableOnce(); $errorEvent = $this->expectCallableNever(); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($dataEvent, $endEvent, $closeEvent, $errorEvent) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use ($dataEvent, $endEvent, $closeEvent, $errorEvent) { $request->getBody()->on('data', $dataEvent); $request->getBody()->on('end', $endEvent); $request->getBody()->on('close', $closeEvent); @@ -2127,7 +2160,7 @@ public function testRequestWithoutDefinedLengthWillIgnoreDataEvent() public function testResponseWithBodyStreamWillUseChunkedTransferEncodingByDefault() { $stream = new ThroughStream(); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($stream) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use ($stream) { return new Response( 200, array(), @@ -2161,7 +2194,7 @@ function ($data) use (&$buffer) { public function testResponseWithBodyStringWillOverwriteExplicitContentLengthAndTransferEncoding() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) { return new Response( 200, array( @@ -2202,7 +2235,7 @@ public function testResponseContainsResponseBodyWithTransferEncodingChunkedForBo $body->expects($this->once())->method('getSize')->willReturn(null); $body->expects($this->once())->method('__toString')->willReturn('body'); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($body) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use ($body) { return new Response( 200, array(), @@ -2231,6 +2264,8 @@ function ($data) use (&$buffer) { $this->assertContainsString("Transfer-Encoding: chunked", $buffer); $this->assertNotContainsString("Content-Length:", $buffer); $this->assertContainsString("body", $buffer); + + Loop::run(); } public function testResponseContainsResponseBodyWithPlainBodyWithUnknownSizeForLegacyHttp10() @@ -2239,7 +2274,7 @@ public function testResponseContainsResponseBodyWithPlainBodyWithUnknownSizeForL $body->expects($this->once())->method('getSize')->willReturn(null); $body->expects($this->once())->method('__toString')->willReturn('body'); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($body) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use ($body) { return new Response( 200, array(), @@ -2273,7 +2308,7 @@ function ($data) use (&$buffer) { public function testResponseWithCustomTransferEncodingWillBeIgnoredAndUseChunkedTransferEncodingInstead() { $stream = new ThroughStream(); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($stream) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use ($stream) { return new Response( 200, array( @@ -2310,7 +2345,7 @@ function ($data) use (&$buffer) { public function testResponseWithoutExplicitDateHeaderWillAddCurrentDateFromClock() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) { return new Response(); }); @@ -2348,7 +2383,7 @@ function ($data) use (&$buffer) { public function testResponseWithCustomDateHeaderOverwritesDefault() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) { return new Response( 200, array("Date" => "Tue, 15 Nov 1994 08:12:31 GMT") @@ -2381,7 +2416,7 @@ function ($data) use (&$buffer) { public function testResponseWithEmptyDateHeaderRemovesDateHeader() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) { return new Response( 200, array('Date' => '') @@ -2414,7 +2449,7 @@ function ($data) use (&$buffer) { public function testResponseCanContainMultipleCookieHeaders() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) { return new Response( 200, array( @@ -2452,7 +2487,7 @@ function ($data) use (&$buffer) { public function testReponseWithExpectContinueRequestContainsContinueWithLaterResponse() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) { return new Response(); }); @@ -2484,7 +2519,7 @@ function ($data) use (&$buffer) { public function testResponseWithExpectContinueRequestWontSendContinueForHttp10() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) { return new Response(); }); @@ -2515,14 +2550,14 @@ function ($data) use (&$buffer) { public function testInvalidCallbackFunctionLeadsToException() { $this->setExpectedException('InvalidArgumentException'); - $server = new StreamingServer(Loop::get(), 'invalid'); + $server = $this->createStreamingServer('invalid'); } public function testResponseBodyStreamWillStreamDataWithChunkedTransferEncoding() { $input = new ThroughStream(); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($input) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use ($input) { return new Response( 200, array(), @@ -2561,7 +2596,7 @@ public function testResponseBodyStreamWithContentLengthWillStreamTillLengthWitho { $input = new ThroughStream(); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($input) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use ($input) { return new Response( 200, array('Content-Length' => 5), @@ -2599,7 +2634,7 @@ function ($data) use (&$buffer) { public function testResponseWithResponsePromise() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) { return \React\Promise\resolve(new Response()); }); @@ -2627,7 +2662,7 @@ function ($data) use (&$buffer) { public function testResponseReturnInvalidTypeWillResultInError() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) { return "invalid"; }); @@ -2661,7 +2696,7 @@ function ($data) use (&$buffer) { public function testResponseResolveWrongTypeInPromiseWillResultInError() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) { return \React\Promise\resolve("invalid"); }); @@ -2689,7 +2724,7 @@ function ($data) use (&$buffer) { public function testResponseRejectedPromiseWillResultInErrorMessage() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) { return new Promise(function ($resolve, $reject) { $reject(new \Exception()); }); @@ -2720,7 +2755,7 @@ function ($data) use (&$buffer) { public function testResponseExceptionInCallbackWillResultInErrorMessage() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) { return new Promise(function ($resolve, $reject) { throw new \Exception('Bad call'); }); @@ -2751,7 +2786,7 @@ function ($data) use (&$buffer) { public function testResponseWithContentLengthHeaderForStringBodyOverwritesTransferEncoding() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) { return new Response( 200, array('Transfer-Encoding' => 'chunked'), @@ -2787,7 +2822,7 @@ function ($data) use (&$buffer) { public function testResponseWillBeHandled() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) { return new Response(); }); @@ -2815,7 +2850,7 @@ function ($data) use (&$buffer) { public function testResponseExceptionThrowInCallBackFunctionWillResultInErrorMessage() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) { throw new \Exception('hello'); }); @@ -2853,7 +2888,7 @@ function ($data) use (&$buffer) { */ public function testResponseThrowableThrowInCallBackFunctionWillResultInErrorMessage() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) { throw new \Error('hello'); }); @@ -2896,7 +2931,7 @@ function ($data) use (&$buffer) { public function testResponseRejectOfNonExceptionWillResultInErrorMessage() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) { return new Promise(function ($resolve, $reject) { $reject('Invalid type'); }); @@ -2933,7 +2968,7 @@ function ($data) use (&$buffer) { public function testRequestServerRequestParams() { $requestValidation = null; - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use (&$requestValidation) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use (&$requestValidation) { $requestValidation = $request; }); @@ -2967,7 +3002,7 @@ public function testRequestServerRequestParams() public function testRequestQueryParametersWillBeAddedToRequest() { $requestValidation = null; - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use (&$requestValidation) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use (&$requestValidation) { $requestValidation = $request; }); @@ -2987,7 +3022,7 @@ public function testRequestQueryParametersWillBeAddedToRequest() public function testRequestCookieWillBeAddedToServerRequest() { $requestValidation = null; - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use (&$requestValidation) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use (&$requestValidation) { $requestValidation = $request; }); @@ -3008,7 +3043,7 @@ public function testRequestCookieWillBeAddedToServerRequest() public function testRequestInvalidMultipleCookiesWontBeAddedToServerRequest() { $requestValidation = null; - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use (&$requestValidation) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use (&$requestValidation) { $requestValidation = $request; }); @@ -3029,7 +3064,7 @@ public function testRequestInvalidMultipleCookiesWontBeAddedToServerRequest() public function testRequestCookieWithSeparatorWillBeAddedToServerRequest() { $requestValidation = null; - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use (&$requestValidation) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use (&$requestValidation) { $requestValidation = $request; }); @@ -3049,7 +3084,7 @@ public function testRequestCookieWithSeparatorWillBeAddedToServerRequest() public function testRequestCookieWithCommaValueWillBeAddedToServerRequest() { $requestValidation = null; - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use (&$requestValidation) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use (&$requestValidation) { $requestValidation = $request; }); @@ -3068,7 +3103,7 @@ public function testRequestCookieWithCommaValueWillBeAddedToServerRequest() public function testNewConnectionWillInvokeParserOnce() { - $server = new StreamingServer(Loop::get(), $this->expectCallableNever()); + $server = $this->createStreamingServer($this->expectCallableNever()); $parser = $this->getMockBuilder('React\Http\Io\RequestHeaderParser')->disableOriginalConstructor()->getMock(); $parser->expects($this->once())->method('handle'); @@ -3085,7 +3120,7 @@ public function testNewConnectionWillInvokeParserOnceAndInvokeRequestHandlerWhen { $request = new ServerRequest('GET', 'http://localhost/', array(), '', '1.0'); - $server = new StreamingServer(Loop::get(), $this->expectCallableOnceWith($request)); + $server = $this->createStreamingServer($this->expectCallableOnceWith($request)); $parser = $this->getMockBuilder('React\Http\Io\RequestHeaderParser')->disableOriginalConstructor()->getMock(); $parser->expects($this->once())->method('handle'); @@ -3108,7 +3143,7 @@ public function testNewConnectionWillInvokeParserOnceAndInvokeRequestHandlerWhen { $request = new ServerRequest('GET', 'http://localhost/', array('Connection' => 'close')); - $server = new StreamingServer(Loop::get(), $this->expectCallableOnceWith($request)); + $server = $this->createStreamingServer($this->expectCallableOnceWith($request)); $parser = $this->getMockBuilder('React\Http\Io\RequestHeaderParser')->disableOriginalConstructor()->getMock(); $parser->expects($this->once())->method('handle'); @@ -3131,7 +3166,7 @@ public function testNewConnectionWillInvokeParserOnceAndInvokeRequestHandlerWhen { $request = new ServerRequest('GET', 'http://localhost/'); - $server = new StreamingServer(Loop::get(), function () { + $server = $this->createStreamingServer(function () { return new Response(200, array('Connection' => 'close')); }); @@ -3156,7 +3191,7 @@ public function testNewConnectionWillInvokeParserTwiceAfterInvokingRequestHandle { $request = new ServerRequest('GET', 'http://localhost/'); - $server = new StreamingServer(Loop::get(), function () { + $server = $this->createStreamingServer(function () { return new Response(); }); @@ -3181,7 +3216,7 @@ public function testNewConnectionWillInvokeParserTwiceAfterInvokingRequestHandle { $request = new ServerRequest('GET', 'http://localhost/', array('Connection' => 'keep-alive'), '', '1.0'); - $server = new StreamingServer(Loop::get(), function () { + $server = $this->createStreamingServer(function () { return new Response(); }); @@ -3207,7 +3242,7 @@ public function testNewConnectionWillInvokeParserOnceAfterInvokingRequestHandler $request = new ServerRequest('GET', 'http://localhost/'); $body = new ThroughStream(); - $server = new StreamingServer(Loop::get(), function () use ($body) { + $server = $this->createStreamingServer(function () use ($body) { return new Response(200, array(), $body); }); @@ -3233,7 +3268,7 @@ public function testNewConnectionWillInvokeParserTwiceAfterInvokingRequestHandle $request = new ServerRequest('GET', 'http://localhost/'); $body = new ThroughStream(); - $server = new StreamingServer(Loop::get(), function () use ($body) { + $server = $this->createStreamingServer(function () use ($body) { return new Response(200, array(), $body); }); @@ -3253,9 +3288,9 @@ public function testNewConnectionWillInvokeParserTwiceAfterInvokingRequestHandle // pretend parser just finished parsing $server->handleRequest($this->connection, $request); - $this->assertCount(3, $this->connection->listeners('close')); + $this->assertCount(2, $this->connection->listeners('close')); $body->end(); - $this->assertCount(3, $this->connection->listeners('close')); + $this->assertCount(1, $this->connection->listeners('close')); } private function createGetRequest() @@ -3267,4 +3302,9 @@ private function createGetRequest() return $data; } + + private function createStreamingServer($requestHandler, $requestTimeout = 1) + { + return new StreamingServer($requestHandler, new RequestHeaderParser(Loop::get(), ($clock = new Clock(Loop::get())), $requestTimeout), $clock); + } }