diff --git a/src/Io/StreamingServer.php b/src/Io/StreamingServer.php index 13f0b0c4..4f01b2fb 100644 --- a/src/Io/StreamingServer.php +++ b/src/Io/StreamingServer.php @@ -157,10 +157,12 @@ public function handleRequest(ConnectionInterface $conn, ServerRequestInterface } // cancel pending promise once connection closes + $connectionOnCloseResponseCanceler = null; if ($response instanceof PromiseInterface && \method_exists($response, 'cancel')) { - $conn->on('close', function () use ($response) { + $connectionOnCloseResponseCanceler = function () use ($response) { $response->cancel(); - }); + }; + $conn->on('close', $connectionOnCloseResponseCanceler); } // happy path: response returned, handle and return immediately @@ -201,7 +203,11 @@ function ($error) use ($that, $conn, $request) { $that->emit('error', array($exception)); return $that->writeError($conn, Response::STATUS_INTERNAL_SERVER_ERROR, $request); } - ); + )->always(function () use ($connectionOnCloseResponseCanceler, $conn) { + if ($connectionOnCloseResponseCanceler !== null) { + $conn->removeListener('close', $connectionOnCloseResponseCanceler); + } + }); } /** @internal */ diff --git a/tests/Io/StreamingServerTest.php b/tests/Io/StreamingServerTest.php index a2700b86..b9bb3584 100644 --- a/tests/Io/StreamingServerTest.php +++ b/tests/Io/StreamingServerTest.php @@ -3245,6 +3245,44 @@ public function testNewConnectionWillInvokeParserTwiceAfterInvokingRequestHandle $this->assertCount(1, $this->connection->listeners('close')); } + public function testCompletingARequestWillRemoveConnectionOnCloseListener() + { + $connection = $this->getMockBuilder('React\Socket\Connection') + ->disableOriginalConstructor() + ->setMethods( + array( + 'write', + 'end', + 'close', + 'pause', + 'resume', + 'isReadable', + 'isWritable', + 'getRemoteAddress', + 'getLocalAddress', + 'pipe', + 'removeListener' + ) + ) + ->getMock(); + + $connection->method('isWritable')->willReturn(true); + $connection->method('isReadable')->willReturn(true); + $request = new ServerRequest('GET', 'http://localhost/'); + + $server = new StreamingServer(Loop::get(), function () { + return \React\Promise\resolve(new Response()); + }); + + $server->listen($this->socket); + $this->socket->emit('connection', array($connection)); + + $connection->expects($this->once())->method('removeListener'); + + // pretend parser just finished parsing + $server->handleRequest($connection, $request); + } + private function createGetRequest() { $data = "GET / HTTP/1.1\r\n";