From 7c3bd22d5b9b625053d697819f8f99e36b6366a3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christian=20L=C3=BCck?= Date: Mon, 20 Nov 2023 12:48:40 +0100 Subject: [PATCH 1/3] Refactor to differentiate "connecting" and "connected" state --- src/MysqlClient.php | 83 ++++++++++++++++++++++++--------------------- 1 file changed, 44 insertions(+), 39 deletions(-) diff --git a/src/MysqlClient.php b/src/MysqlClient.php index 01ac492..12358c1 100644 --- a/src/MysqlClient.php +++ b/src/MysqlClient.php @@ -51,13 +51,16 @@ class MysqlClient extends EventEmitter { private $factory; private $uri; - private $connecting; private $closed = false; private $busy = false; - /** - * @var Connection|null - */ + /** @var PromiseInterface|null */ + private $connecting; + + /** @var ?Connection */ + private $connection; + + /** @var ?Connection */ private $disconnecting; private $loop; @@ -82,8 +85,15 @@ public function __construct( $this->loop = $loop ?: Loop::get(); } - private function connecting() + /** + * @return PromiseInterface + */ + private function getConnection() { + if ($this->connection !== null && $this->disconnecting === null) { + return \React\Promise\resolve($this->connection); + } + if ($this->connecting !== null) { return $this->connecting; } @@ -96,9 +106,12 @@ private function connecting() $this->connecting = $connecting = $this->factory->createConnection($this->uri); $this->connecting->then(function (Connection $connection) { + $this->connection = $connection; + $this->connecting = null; + // connection completed => remember only until closed $connection->on('close', function () { - $this->connecting = null; + $this->connection = null; if ($this->idleTimer !== null) { $this->loop->cancelTimer($this->idleTimer); @@ -127,23 +140,20 @@ private function idle() { --$this->pending; - if ($this->pending < 1 && $this->idlePeriod >= 0 && $this->connecting !== null) { + if ($this->pending < 1 && $this->idlePeriod >= 0 && $this->connection !== null) { $this->idleTimer = $this->loop->addTimer($this->idlePeriod, function () { - $this->connecting->then(function (Connection $connection) { - $this->disconnecting = $connection; - $connection->quit()->then( - function () { - // successfully disconnected => remove reference - $this->disconnecting = null; - }, - function () { - // soft-close failed but will close anyway => remove reference - $this->disconnecting = null; - } - ); - }); - $this->connecting = null; $this->idleTimer = null; + $this->disconnecting = $this->connection; + $this->connection->quit()->then( + function () { + // successfully disconnected => remove reference + $this->disconnecting = null; + }, + function () { + // soft-close failed but will close anyway => remove reference + $this->disconnecting = null; + } + ); }); } } @@ -213,7 +223,7 @@ public function query($sql, array $params = []) return \React\Promise\reject(new Exception('Connection closed')); } - return $this->connecting()->then(function (Connection $connection) use ($sql, $params) { + return $this->getConnection()->then(function (Connection $connection) use ($sql, $params) { $this->awake(); return $connection->query($sql, $params)->then( function (MysqlResult $result) { @@ -294,7 +304,7 @@ public function queryStream($sql, $params = []) } return \React\Promise\Stream\unwrapReadable( - $this->connecting()->then(function (Connection $connection) use ($sql, $params) { + $this->getConnection()->then(function (Connection $connection) use ($sql, $params) { $stream = $connection->queryStream($sql, $params); $this->awake(); @@ -333,7 +343,7 @@ public function ping() return \React\Promise\reject(new Exception('Connection closed')); } - return $this->connecting()->then(function (Connection $connection) { + return $this->getConnection()->then(function (Connection $connection) { $this->awake(); return $connection->ping()->then( function () { @@ -376,13 +386,13 @@ public function quit() } // not already connecting => no need to connect, simply close virtual connection - if ($this->connecting === null) { + if ($this->connection === null && $this->connecting === null) { $this->close(); return \React\Promise\resolve(null); } return new Promise(function (callable $resolve, callable $reject) { - $this->connecting()->then(function (Connection $connection) use ($resolve, $reject) { + $this->getConnection()->then(function (Connection $connection) use ($resolve, $reject) { $this->awake(); // soft-close connection and emit close event afterwards both on success or on error $connection->quit()->then( @@ -428,22 +438,17 @@ public function close() $this->closed = true; // force-close connection if still waiting for previous disconnection + // either close active connection or cancel pending connection attempt + // below branches are exclusive, there can only be a single connection if ($this->disconnecting !== null) { $this->disconnecting->close(); $this->disconnecting = null; - } - - // either close active connection or cancel pending connection attempt - if ($this->connecting !== null) { - $this->connecting->then(function (Connection $connection) { - $connection->close(); - }, function () { - // ignore to avoid reporting unhandled rejection - }); - if ($this->connecting !== null) { - $this->connecting->cancel(); - $this->connecting = null; - } + } elseif ($this->connection !== null) { + $this->connection->close(); + $this->connection = null; + } elseif ($this->connecting !== null) { + $this->connecting->cancel(); + $this->connecting = null; } if ($this->idleTimer !== null) { From df9ac961a59d49dd708a1139baf59d42ef4626e2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christian=20L=C3=BCck?= Date: Fri, 24 Nov 2023 13:12:37 +0100 Subject: [PATCH 2/3] Refactor to move idle connection handling to `Connection` --- src/Io/Connection.php | 91 ++++++- src/Io/Factory.php | 5 +- src/MysqlClient.php | 117 ++------- tests/Io/ConnectionTest.php | 502 +++++++++++++++++++++++++++++++++++- tests/MysqlClientTest.php | 278 ++++---------------- 5 files changed, 644 insertions(+), 349 deletions(-) diff --git a/src/Io/Connection.php b/src/Io/Connection.php index 8fc2007..749f456 100644 --- a/src/Io/Connection.php +++ b/src/Io/Connection.php @@ -3,6 +3,7 @@ namespace React\Mysql\Io; use Evenement\EventEmitter; +use React\EventLoop\LoopInterface; use React\Mysql\Commands\CommandInterface; use React\Mysql\Commands\PingCommand; use React\Mysql\Commands\QueryCommand; @@ -29,26 +30,46 @@ class Connection extends EventEmitter private $executor; /** - * @var integer + * @var int one of the state constants (may change, but should be used readonly from outside) + * @see self::STATE_* */ - private $state = self::STATE_AUTHENTICATED; + public $state = self::STATE_AUTHENTICATED; /** * @var SocketConnectionInterface */ private $stream; + /** @var LoopInterface */ + private $loop; + + /** @var float */ + private $idlePeriod = 0.001; + + /** @var ?\React\EventLoop\TimerInterface */ + private $idleTimer; + + /** @var int */ + private $pending = 0; + /** * Connection constructor. * * @param SocketConnectionInterface $stream * @param Executor $executor + * @param LoopInterface $loop + * @param ?float $idlePeriod */ - public function __construct(SocketConnectionInterface $stream, Executor $executor) + public function __construct(SocketConnectionInterface $stream, Executor $executor, LoopInterface $loop, $idlePeriod) { $this->stream = $stream; $this->executor = $executor; + $this->loop = $loop; + if ($idlePeriod !== null) { + $this->idlePeriod = $idlePeriod; + } + $stream->on('error', [$this, 'handleConnectionError']); $stream->on('close', [$this, 'handleConnectionClosed']); } @@ -71,6 +92,7 @@ public function query($sql, array $params = []) return \React\Promise\reject($e); } + $this->awake(); $deferred = new Deferred(); // store all result set rows until result set end @@ -86,11 +108,13 @@ public function query($sql, array $params = []) $rows = []; + $this->idle(); $deferred->resolve($result); }); // resolve / reject status reply (response without result set) $command->on('error', function ($error) use ($deferred) { + $this->idle(); $deferred->reject($error); }); $command->on('success', function () use ($command, $deferred) { @@ -99,6 +123,7 @@ public function query($sql, array $params = []) $result->insertId = $command->insertId; $result->warningCount = $command->warningCount; + $this->idle(); $deferred->resolve($result); }); @@ -115,20 +140,30 @@ public function queryStream($sql, $params = []) $command = new QueryCommand(); $command->setQuery($query); $this->_doCommand($command); + $this->awake(); + + $stream = new QueryStream($command, $this->stream); + $stream->on('close', function () { + $this->idle(); + }); - return new QueryStream($command, $this->stream); + return $stream; } public function ping() { return new Promise(function ($resolve, $reject) { - $this->_doCommand(new PingCommand()) - ->on('error', function ($reason) use ($reject) { - $reject($reason); - }) - ->on('success', function () use ($resolve) { - $resolve(null); - }); + $command = $this->_doCommand(new PingCommand()); + $this->awake(); + + $command->on('success', function () use ($resolve) { + $this->idle(); + $resolve(null); + }); + $command->on('error', function ($reason) use ($reject) { + $this->idle(); + $reject($reason); + }); }); } @@ -137,6 +172,10 @@ public function quit() return new Promise(function ($resolve, $reject) { $command = $this->_doCommand(new QuitCommand()); $this->state = self::STATE_CLOSING; + + // mark connection as "awake" until it is closed, so never "idle" + $this->awake(); + $command->on('success', function () use ($resolve) { $resolve(null); $this->close(); @@ -158,6 +197,11 @@ public function close() $remoteClosed = $this->stream->isReadable() === false && $this->stream->isWritable() === false; $this->stream->close(); + if ($this->idleTimer !== null) { + $this->loop->cancelTimer($this->idleTimer); + $this->idleTimer = null; + } + // reject all pending commands if connection is closed while (!$this->executor->isIdle()) { $command = $this->executor->dequeue(); @@ -223,4 +267,29 @@ protected function _doCommand(CommandInterface $command) return $this->executor->enqueue($command); } + + private function awake() + { + ++$this->pending; + + if ($this->idleTimer !== null) { + $this->loop->cancelTimer($this->idleTimer); + $this->idleTimer = null; + } + } + + private function idle() + { + --$this->pending; + + if ($this->pending < 1 && $this->idlePeriod >= 0 && $this->state === self::STATE_AUTHENTICATED) { + $this->idleTimer = $this->loop->addTimer($this->idlePeriod, function () { + // soft-close connection and emit close event afterwards both on success or on error + $this->idleTimer = null; + $this->quit()->then(null, function () { + // ignore to avoid reporting unhandled rejection + }); + }); + } + } } diff --git a/src/Io/Factory.php b/src/Io/Factory.php index 60bf3b2..5233907 100644 --- a/src/Io/Factory.php +++ b/src/Io/Factory.php @@ -210,11 +210,12 @@ public function createConnection( $connecting->cancel(); }); - $connecting->then(function (SocketConnectionInterface $stream) use ($authCommand, $deferred, $uri) { + $idlePeriod = isset($args['idle']) ? (float) $args['idle'] : null; + $connecting->then(function (SocketConnectionInterface $stream) use ($authCommand, $deferred, $uri, $idlePeriod) { $executor = new Executor(); $parser = new Parser($stream, $executor); - $connection = new Connection($stream, $executor); + $connection = new Connection($stream, $executor, $this->loop, $idlePeriod); $command = $executor->enqueue($authCommand); $parser->start(); diff --git a/src/MysqlClient.php b/src/MysqlClient.php index 12358c1..b5c0a9c 100644 --- a/src/MysqlClient.php +++ b/src/MysqlClient.php @@ -3,7 +3,6 @@ namespace React\Mysql; use Evenement\EventEmitter; -use React\EventLoop\Loop; use React\EventLoop\LoopInterface; use React\Mysql\Io\Connection; use React\Mysql\Io\Factory; @@ -52,7 +51,6 @@ class MysqlClient extends EventEmitter private $factory; private $uri; private $closed = false; - private $busy = false; /** @var PromiseInterface|null */ private $connecting; @@ -60,13 +58,14 @@ class MysqlClient extends EventEmitter /** @var ?Connection */ private $connection; - /** @var ?Connection */ - private $disconnecting; - - private $loop; - private $idlePeriod = 0.001; - private $idleTimer; - private $pending = 0; + /** + * set to true only between calling `quit()` and the connection closing in response + * + * @var bool + * @see self::quit() + * @see self::$closed + */ + private $quitting = false; public function __construct( #[\SensitiveParameter] @@ -74,15 +73,8 @@ public function __construct( ConnectorInterface $connector = null, LoopInterface $loop = null ) { - $args = []; - \parse_str((string) \parse_url($uri, \PHP_URL_QUERY), $args); - if (isset($args['idle'])) { - $this->idlePeriod = (float)$args['idle']; - } - $this->factory = new Factory($loop, $connector); $this->uri = $uri; - $this->loop = $loop ?: Loop::get(); } /** @@ -90,7 +82,8 @@ public function __construct( */ private function getConnection() { - if ($this->connection !== null && $this->disconnecting === null) { + // happy path: reuse existing connection unless it is already closing after an idle timeout + if ($this->connection !== null && ($this->quitting || $this->connection->state !== Connection::STATE_CLOSING)) { return \React\Promise\resolve($this->connection); } @@ -99,11 +92,12 @@ private function getConnection() } // force-close connection if still waiting for previous disconnection - if ($this->disconnecting !== null) { - $this->disconnecting->close(); - $this->disconnecting = null; + if ($this->connection !== null) { + assert($this->connection->state === Connection::STATE_CLOSING); + $this->connection->close(); } + // create new connection if not already connected or connecting $this->connecting = $connecting = $this->factory->createConnection($this->uri); $this->connecting->then(function (Connection $connection) { $this->connection = $connection; @@ -112,11 +106,6 @@ private function getConnection() // connection completed => remember only until closed $connection->on('close', function () { $this->connection = null; - - if ($this->idleTimer !== null) { - $this->loop->cancelTimer($this->idleTimer); - $this->idleTimer = null; - } }); }, function () { // connection failed => discard connection attempt @@ -126,38 +115,6 @@ private function getConnection() return $connecting; } - private function awake() - { - ++$this->pending; - - if ($this->idleTimer !== null) { - $this->loop->cancelTimer($this->idleTimer); - $this->idleTimer = null; - } - } - - private function idle() - { - --$this->pending; - - if ($this->pending < 1 && $this->idlePeriod >= 0 && $this->connection !== null) { - $this->idleTimer = $this->loop->addTimer($this->idlePeriod, function () { - $this->idleTimer = null; - $this->disconnecting = $this->connection; - $this->connection->quit()->then( - function () { - // successfully disconnected => remove reference - $this->disconnecting = null; - }, - function () { - // soft-close failed but will close anyway => remove reference - $this->disconnecting = null; - } - ); - }); - } - } - /** * Performs an async query. * @@ -224,17 +181,7 @@ public function query($sql, array $params = []) } return $this->getConnection()->then(function (Connection $connection) use ($sql, $params) { - $this->awake(); - return $connection->query($sql, $params)->then( - function (MysqlResult $result) { - $this->idle(); - return $result; - }, - function (\Exception $e) { - $this->idle(); - throw $e; - } - ); + return $connection->query($sql, $params); }); } @@ -305,14 +252,7 @@ public function queryStream($sql, $params = []) return \React\Promise\Stream\unwrapReadable( $this->getConnection()->then(function (Connection $connection) use ($sql, $params) { - $stream = $connection->queryStream($sql, $params); - - $this->awake(); - $stream->on('close', function () { - $this->idle(); - }); - - return $stream; + return $connection->queryStream($sql, $params); }) ); } @@ -344,16 +284,7 @@ public function ping() } return $this->getConnection()->then(function (Connection $connection) { - $this->awake(); - return $connection->ping()->then( - function () { - $this->idle(); - }, - function (\Exception $e) { - $this->idle(); - throw $e; - } - ); + return $connection->ping(); }); } @@ -391,9 +322,9 @@ public function quit() return \React\Promise\resolve(null); } + $this->quitting = true; return new Promise(function (callable $resolve, callable $reject) { $this->getConnection()->then(function (Connection $connection) use ($resolve, $reject) { - $this->awake(); // soft-close connection and emit close event afterwards both on success or on error $connection->quit()->then( function () use ($resolve){ @@ -436,14 +367,11 @@ public function close() } $this->closed = true; + $this->quitting = false; - // force-close connection if still waiting for previous disconnection // either close active connection or cancel pending connection attempt // below branches are exclusive, there can only be a single connection - if ($this->disconnecting !== null) { - $this->disconnecting->close(); - $this->disconnecting = null; - } elseif ($this->connection !== null) { + if ($this->connection !== null) { $this->connection->close(); $this->connection = null; } elseif ($this->connecting !== null) { @@ -451,11 +379,6 @@ public function close() $this->connecting = null; } - if ($this->idleTimer !== null) { - $this->loop->cancelTimer($this->idleTimer); - $this->idleTimer = null; - } - $this->emit('close'); $this->removeAllListeners(); } diff --git a/tests/Io/ConnectionTest.php b/tests/Io/ConnectionTest.php index f0cb934..394545f 100644 --- a/tests/Io/ConnectionTest.php +++ b/tests/Io/ConnectionTest.php @@ -7,13 +7,429 @@ class ConnectionTest extends BaseTestCase { + public function testQueryWillEnqueueOneCommand() + { + $stream = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock(); + $stream->expects($this->never())->method('close'); + + $executor = $this->getMockBuilder('React\Mysql\Io\Executor')->setMethods(['enqueue'])->getMock(); + $executor->expects($this->once())->method('enqueue')->willReturnArgument(0); + + $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); + $loop->expects($this->never())->method('addTimer'); + + $conn = new Connection($stream, $executor, $loop, null); + $conn->query('SELECT 1'); + } + + public function testQueryWillReturnResolvedPromiseAndStartIdleTimerWhenQueryCommandEmitsSuccess() + { + $stream = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock(); + + $currentCommand = null; + $executor = $this->getMockBuilder('React\Mysql\Io\Executor')->setMethods(['enqueue'])->getMock(); + $executor->expects($this->once())->method('enqueue')->willReturnCallback(function ($command) use (&$currentCommand) { + return $currentCommand = $command; + }); + + $timer = $this->getMockBuilder('React\EventLoop\TimerInterface')->getMock(); + $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); + $loop->expects($this->once())->method('addTimer')->with(0.001, $this->anything())->willReturn($timer); + $loop->expects($this->never())->method('cancelTimer'); + + $connection = new Connection($stream, $executor, $loop, null); + + $this->assertNull($currentCommand); + + $promise = $connection->query('SELECT 1'); + + $promise->then($this->expectCallableOnceWith($this->isInstanceOf('React\Mysql\MysqlResult'))); + + $this->assertNotNull($currentCommand); + $currentCommand->emit('success'); + } + + public function testQueryWillReturnResolvedPromiseAndStartIdleTimerWhenQueryCommandEmitsEnd() + { + $stream = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock(); + + $currentCommand = null; + $executor = $this->getMockBuilder('React\Mysql\Io\Executor')->setMethods(['enqueue'])->getMock(); + $executor->expects($this->once())->method('enqueue')->willReturnCallback(function ($command) use (&$currentCommand) { + return $currentCommand = $command; + }); + + $timer = $this->getMockBuilder('React\EventLoop\TimerInterface')->getMock(); + $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); + $loop->expects($this->once())->method('addTimer')->with(0.001, $this->anything())->willReturn($timer); + $loop->expects($this->never())->method('cancelTimer'); + + $connection = new Connection($stream, $executor, $loop, null); + + $this->assertNull($currentCommand); + + $promise = $connection->query('SELECT 1'); + + $promise->then($this->expectCallableOnceWith($this->isInstanceOf('React\Mysql\MysqlResult'))); + + $this->assertNotNull($currentCommand); + $currentCommand->emit('end'); + } + + public function testQueryWillReturnResolvedPromiseAndStartIdleTimerWhenIdlePeriodIsGivenAndQueryCommandEmitsSuccess() + { + $stream = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock(); + + $currentCommand = null; + $executor = $this->getMockBuilder('React\Mysql\Io\Executor')->setMethods(['enqueue'])->getMock(); + $executor->expects($this->once())->method('enqueue')->willReturnCallback(function ($command) use (&$currentCommand) { + return $currentCommand = $command; + }); + + $timer = $this->getMockBuilder('React\EventLoop\TimerInterface')->getMock(); + $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); + $loop->expects($this->once())->method('addTimer')->with(1.0, $this->anything())->willReturn($timer); + $loop->expects($this->never())->method('cancelTimer'); + + $connection = new Connection($stream, $executor, $loop, 1.0); + + $this->assertNull($currentCommand); + + $promise = $connection->query('SELECT 1'); + + $promise->then($this->expectCallableOnceWith($this->isInstanceOf('React\Mysql\MysqlResult'))); + + $this->assertNotNull($currentCommand); + $currentCommand->emit('success'); + } + + public function testQueryWillReturnResolvedPromiseAndNotStartIdleTimerWhenIdlePeriodIsNegativeAndQueryCommandEmitsSuccess() + { + $stream = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock(); + + $currentCommand = null; + $executor = $this->getMockBuilder('React\Mysql\Io\Executor')->setMethods(['enqueue'])->getMock(); + $executor->expects($this->once())->method('enqueue')->willReturnCallback(function ($command) use (&$currentCommand) { + return $currentCommand = $command; + }); + + $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); + $loop->expects($this->never())->method('addTimer'); + + $connection = new Connection($stream, $executor, $loop, -1); + + $this->assertNull($currentCommand); + + $promise = $connection->query('SELECT 1'); + + $promise->then($this->expectCallableOnceWith($this->isInstanceOf('React\Mysql\MysqlResult'))); + + $this->assertNotNull($currentCommand); + $currentCommand->emit('success'); + } + + public function testQueryWillReturnRejectedPromiseAndStartIdleTimerWhenQueryCommandEmitsError() + { + $stream = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock(); + + $currentCommand = null; + $executor = $this->getMockBuilder('React\Mysql\Io\Executor')->setMethods(['enqueue'])->getMock(); + $executor->expects($this->once())->method('enqueue')->willReturnCallback(function ($command) use (&$currentCommand) { + return $currentCommand = $command; + }); + + $timer = $this->getMockBuilder('React\EventLoop\TimerInterface')->getMock(); + $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); + $loop->expects($this->once())->method('addTimer')->with(0.001, $this->anything())->willReturn($timer); + $loop->expects($this->never())->method('cancelTimer'); + + $connection = new Connection($stream, $executor, $loop, null); + + $this->assertNull($currentCommand); + + $promise = $connection->query('SELECT 1'); + + $promise->then(null, $this->expectCallableOnce()); + + $this->assertNotNull($currentCommand); + $currentCommand->emit('error', [new \RuntimeException()]); + } + + public function testQueryFollowedByIdleTimerWillQuitUnderlyingConnectionAndEmitCloseEventWhenQuitCommandEmitsSuccess() + { + $stream = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock(); + $stream->expects($this->once())->method('close'); + + $currentCommand = null; + $executor = $this->getMockBuilder('React\Mysql\Io\Executor')->setMethods(['enqueue'])->getMock(); + $executor->expects($this->any())->method('enqueue')->willReturnCallback(function ($command) use (&$currentCommand) { + return $currentCommand = $command; + }); + + $timeout = null; + $timer = $this->getMockBuilder('React\EventLoop\TimerInterface')->getMock(); + $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); + $loop->expects($this->once())->method('addTimer')->with(0.001, $this->callback(function ($cb) use (&$timeout) { + $timeout = $cb; + return true; + }))->willReturn($timer); + + $connection = new Connection($stream, $executor, $loop, null); + + $connection->on('close', $this->expectCallableOnce()); + + $this->assertNull($currentCommand); + + $connection->query('SELECT 1'); + + $this->assertNotNull($currentCommand); + $currentCommand->emit('success'); + + $this->assertNotNull($timeout); + $timeout(); + + $this->assertNotNull($currentCommand); + $currentCommand->emit('success'); + } + + public function testQueryFollowedByIdleTimerWillQuitUnderlyingConnectionAndEmitCloseEventWhenQuitCommandEmitsError() + { + $stream = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock(); + $stream->expects($this->once())->method('close'); + + $currentCommand = null; + $executor = $this->getMockBuilder('React\Mysql\Io\Executor')->setMethods(['enqueue'])->getMock(); + $executor->expects($this->any())->method('enqueue')->willReturnCallback(function ($command) use (&$currentCommand) { + return $currentCommand = $command; + }); + + $timeout = null; + $timer = $this->getMockBuilder('React\EventLoop\TimerInterface')->getMock(); + $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); + $loop->expects($this->once())->method('addTimer')->with(0.001, $this->callback(function ($cb) use (&$timeout) { + $timeout = $cb; + return true; + }))->willReturn($timer); + + $connection = new Connection($stream, $executor, $loop, null); + + $connection->on('close', $this->expectCallableOnce()); + + $this->assertNull($currentCommand); + + $connection->query('SELECT 1'); + + $this->assertNotNull($currentCommand); + $currentCommand->emit('success'); + + $this->assertNotNull($timeout); + $timeout(); + + $this->assertNotNull($currentCommand); + $currentCommand->emit('error', [new \RuntimeException()]); + } + + public function testQueryTwiceWillEnqueueSecondQueryWithoutStartingIdleTimerWhenFirstQueryCommandEmitsSuccess() + { + $stream = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock(); + + $currentCommand = null; + $executor = $this->getMockBuilder('React\Mysql\Io\Executor')->setMethods(['enqueue'])->getMock(); + $executor->expects($this->any())->method('enqueue')->willReturnCallback(function ($command) use (&$currentCommand) { + return $currentCommand = $command; + }); + + $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); + $loop->expects($this->never())->method('addTimer'); + + $connection = new Connection($stream, $executor, $loop, null); + + $this->assertNull($currentCommand); + + $connection->query('SELECT 1'); + $connection->query('SELECT 2'); + + $this->assertNotNull($currentCommand); + $currentCommand->emit('success'); + } + + public function testQueryTwiceAfterIdleTimerWasStartedWillCancelIdleTimerAndEnqueueSecondCommand() + { + $stream = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock(); + + $currentCommand = null; + $executor = $this->getMockBuilder('React\Mysql\Io\Executor')->setMethods(['enqueue'])->getMock(); + $executor->expects($this->any())->method('enqueue')->willReturnCallback(function ($command) use (&$currentCommand) { + return $currentCommand = $command; + }); + + $timer = $this->getMockBuilder('React\EventLoop\TimerInterface')->getMock(); + $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); + $loop->expects($this->once())->method('addTimer')->with(0.001, $this->anything())->willReturn($timer); + $loop->expects($this->once())->method('cancelTimer')->with($timer); + + $connection = new Connection($stream, $executor, $loop, null); + + $this->assertNull($currentCommand); + + $connection->query('SELECT 1'); + + $this->assertNotNull($currentCommand); + $currentCommand->emit('success'); + + $connection->query('SELECT 2'); + } + + public function testQueryStreamWillEnqueueOneCommand() + { + $stream = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock(); + $stream->expects($this->never())->method('close'); + + $executor = $this->getMockBuilder('React\Mysql\Io\Executor')->setMethods(['enqueue'])->getMock(); + $executor->expects($this->once())->method('enqueue')->willReturnArgument(0); + + $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); + $loop->expects($this->never())->method('addTimer'); + + $conn = new Connection($stream, $executor, $loop, null); + $conn->queryStream('SELECT 1'); + } + + public function testQueryStreamWillReturnStreamThatWillEmitEndEventAndStartIdleTimerWhenQueryCommandEmitsSuccess() + { + $stream = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock(); + + $currentCommand = null; + $executor = $this->getMockBuilder('React\Mysql\Io\Executor')->setMethods(['enqueue'])->getMock(); + $executor->expects($this->once())->method('enqueue')->willReturnCallback(function ($command) use (&$currentCommand) { + return $currentCommand = $command; + }); + + $timer = $this->getMockBuilder('React\EventLoop\TimerInterface')->getMock(); + $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); + $loop->expects($this->once())->method('addTimer')->with(0.001, $this->anything())->willReturn($timer); + $loop->expects($this->never())->method('cancelTimer'); + + $connection = new Connection($stream, $executor, $loop, null); + + $this->assertNull($currentCommand); + + $stream = $connection->queryStream('SELECT 1'); + + $stream->on('end', $this->expectCallableOnce()); + $stream->on('close', $this->expectCallableOnce()); + + $this->assertNotNull($currentCommand); + $currentCommand->emit('success'); + } + + public function testQueryStreamWillReturnStreamThatWillEmitErrorEventAndStartIdleTimerWhenQueryCommandEmitsError() + { + $stream = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock(); + + $currentCommand = null; + $executor = $this->getMockBuilder('React\Mysql\Io\Executor')->setMethods(['enqueue'])->getMock(); + $executor->expects($this->once())->method('enqueue')->willReturnCallback(function ($command) use (&$currentCommand) { + return $currentCommand = $command; + }); + + $timer = $this->getMockBuilder('React\EventLoop\TimerInterface')->getMock(); + $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); + $loop->expects($this->once())->method('addTimer')->with(0.001, $this->anything())->willReturn($timer); + $loop->expects($this->never())->method('cancelTimer'); + + $connection = new Connection($stream, $executor, $loop, null); + + $this->assertNull($currentCommand); + + $stream = $connection->queryStream('SELECT 1'); + + $stream->on('error', $this->expectCallableOnceWith($this->isInstanceOf('RuntimeException'))); + $stream->on('close', $this->expectCallableOnce()); + + $this->assertNotNull($currentCommand); + $currentCommand->emit('error', [new \RuntimeException()]); + } + + public function testPingWillEnqueueOneCommand() + { + $stream = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock(); + $stream->expects($this->never())->method('close'); + + $executor = $this->getMockBuilder('React\Mysql\Io\Executor')->setMethods(['enqueue'])->getMock(); + $executor->expects($this->once())->method('enqueue')->willReturnArgument(0); + + $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); + $loop->expects($this->never())->method('addTimer'); + + $conn = new Connection($stream, $executor, $loop, null); + $conn->ping(); + } + + public function testPingWillReturnResolvedPromiseAndStartIdleTimerWhenPingCommandEmitsSuccess() + { + $stream = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock(); + + $currentCommand = null; + $executor = $this->getMockBuilder('React\Mysql\Io\Executor')->setMethods(['enqueue'])->getMock(); + $executor->expects($this->once())->method('enqueue')->willReturnCallback(function ($command) use (&$currentCommand) { + return $currentCommand = $command; + }); + + $timer = $this->getMockBuilder('React\EventLoop\TimerInterface')->getMock(); + $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); + $loop->expects($this->once())->method('addTimer')->with(0.001, $this->anything())->willReturn($timer); + $loop->expects($this->never())->method('cancelTimer'); + + $connection = new Connection($stream, $executor, $loop, null); + + $this->assertNull($currentCommand); + + $promise = $connection->ping(); + + $promise->then($this->expectCallableOnce()); + + $this->assertNotNull($currentCommand); + $currentCommand->emit('success'); + } + + public function testPingWillReturnRejectedPromiseAndStartIdleTimerWhenPingCommandEmitsError() + { + $stream = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock(); + + $currentCommand = null; + $executor = $this->getMockBuilder('React\Mysql\Io\Executor')->setMethods(['enqueue'])->getMock(); + $executor->expects($this->once())->method('enqueue')->willReturnCallback(function ($command) use (&$currentCommand) { + return $currentCommand = $command; + }); + + $timer = $this->getMockBuilder('React\EventLoop\TimerInterface')->getMock(); + $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); + $loop->expects($this->once())->method('addTimer')->with(0.001, $this->anything())->willReturn($timer); + $loop->expects($this->never())->method('cancelTimer'); + + $connection = new Connection($stream, $executor, $loop, null); + + $this->assertNull($currentCommand); + + $promise = $connection->ping(); + + $promise->then(null, $this->expectCallableOnce()); + + $this->assertNotNull($currentCommand); + $currentCommand->emit('error', [new \RuntimeException()]); + } + public function testQuitWillEnqueueOneCommand() { $stream = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock(); $executor = $this->getMockBuilder('React\Mysql\Io\Executor')->setMethods(['enqueue'])->getMock(); $executor->expects($this->once())->method('enqueue')->willReturnArgument(0); - $conn = new Connection($stream, $executor); + $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); + $loop->expects($this->never())->method('addTimer'); + + $conn = new Connection($stream, $executor, $loop, null); $conn->quit(); } @@ -22,12 +438,15 @@ public function testQuitWillResolveBeforeEmittingCloseEventWhenQuitCommandEmitsS $stream = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock(); $pingCommand = null; - $executor = $this->getMockBuilder('React\MySQL\Io\Executor')->setMethods(['enqueue'])->getMock(); + $executor = $this->getMockBuilder('React\Mysql\Io\Executor')->setMethods(['enqueue'])->getMock(); $executor->expects($this->once())->method('enqueue')->willReturnCallback(function ($command) use (&$pingCommand) { return $pingCommand = $command; }); - $connection = new Connection($stream, $executor); + $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); + $loop->expects($this->never())->method('addTimer'); + + $connection = new Connection($stream, $executor, $loop, null); $events = ''; $connection->on('close', function () use (&$events) { @@ -55,12 +474,15 @@ public function testQuitWillRejectBeforeEmittingCloseEventWhenQuitCommandEmitsEr $stream = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock(); $pingCommand = null; - $executor = $this->getMockBuilder('React\MySQL\Io\Executor')->setMethods(['enqueue'])->getMock(); + $executor = $this->getMockBuilder('React\Mysql\Io\Executor')->setMethods(['enqueue'])->getMock(); $executor->expects($this->once())->method('enqueue')->willReturnCallback(function ($command) use (&$pingCommand) { return $pingCommand = $command; }); - $connection = new Connection($stream, $executor); + $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); + $loop->expects($this->never())->method('addTimer'); + + $connection = new Connection($stream, $executor, $loop, null); $events = ''; $connection->on('close', function () use (&$events) { @@ -83,13 +505,61 @@ public function testQuitWillRejectBeforeEmittingCloseEventWhenQuitCommandEmitsEr $this->assertEquals('rejected.closed.', $events); } + public function testCloseWillEmitCloseEvent() + { + $stream = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock(); + + $executor = $this->getMockBuilder('React\Mysql\Io\Executor')->getMock(); + $executor->expects($this->once())->method('isIdle')->willReturn(true); + + $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); + $loop->expects($this->never())->method('addTimer'); + + $connection = new Connection($stream, $executor, $loop, null); + + $connection->on('close', $this->expectCallableOnce()); + + $connection->close(); + } + + public function testCloseAfterIdleTimerWasStartedWillCancelIdleTimerAndEmitCloseEvent() + { + $stream = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock(); + + $currentCommand = null; + $executor = $this->getMockBuilder('React\Mysql\Io\Executor')->setMethods(['enqueue'])->getMock(); + $executor->expects($this->once())->method('enqueue')->willReturnCallback(function ($command) use (&$currentCommand) { + return $currentCommand = $command; + }); + + $timer = $this->getMockBuilder('React\EventLoop\TimerInterface')->getMock(); + $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); + $loop->expects($this->once())->method('addTimer')->with(0.001, $this->anything())->willReturn($timer); + $loop->expects($this->once())->method('cancelTimer')->with($timer); + + $connection = new Connection($stream, $executor, $loop, null); + + $this->assertNull($currentCommand); + + $connection->ping(); + + $this->assertNotNull($currentCommand); + $currentCommand->emit('success'); + + $connection->on('close', $this->expectCallableOnce()); + + $connection->close(); + } + public function testQueryAfterQuitRejectsImmediately() { $stream = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock(); $executor = $this->getMockBuilder('React\Mysql\Io\Executor')->setMethods(['enqueue'])->getMock(); $executor->expects($this->once())->method('enqueue')->willReturnArgument(0); - $conn = new Connection($stream, $executor); + $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); + + $conn = new Connection($stream, $executor, $loop, null); $conn->quit(); $promise = $conn->query('SELECT 1'); @@ -112,7 +582,9 @@ public function testQueryAfterCloseRejectsImmediately() $executor = $this->getMockBuilder('React\Mysql\Io\Executor')->setMethods(['enqueue'])->getMock(); $executor->expects($this->never())->method('enqueue'); - $conn = new Connection($stream, $executor); + $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); + + $conn = new Connection($stream, $executor, $loop, null); $conn->close(); $promise = $conn->query('SELECT 1'); @@ -135,7 +607,9 @@ public function testQueryStreamAfterQuitThrows() $executor = $this->getMockBuilder('React\Mysql\Io\Executor')->setMethods(['enqueue'])->getMock(); $executor->expects($this->once())->method('enqueue')->willReturnArgument(0); - $conn = new Connection($stream, $executor); + $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); + + $conn = new Connection($stream, $executor, $loop, null); $conn->quit(); try { @@ -152,7 +626,9 @@ public function testPingAfterQuitRejectsImmediately() $executor = $this->getMockBuilder('React\Mysql\Io\Executor')->setMethods(['enqueue'])->getMock(); $executor->expects($this->once())->method('enqueue')->willReturnArgument(0); - $conn = new Connection($stream, $executor); + $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); + + $conn = new Connection($stream, $executor, $loop, null); $conn->quit(); $promise = $conn->ping(); @@ -175,7 +651,9 @@ public function testQuitAfterQuitRejectsImmediately() $executor = $this->getMockBuilder('React\Mysql\Io\Executor')->setMethods(['enqueue'])->getMock(); $executor->expects($this->once())->method('enqueue')->willReturnArgument(0); - $conn = new Connection($stream, $executor); + $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); + + $conn = new Connection($stream, $executor, $loop, null); $conn->quit(); $promise = $conn->quit(); @@ -206,7 +684,9 @@ public function testCloseStreamEmitsErrorEvent() $executor = $this->getMockBuilder('React\Mysql\Io\Executor')->setMethods(['enqueue'])->getMock(); $executor->expects($this->never())->method('enqueue'); - $conn = new Connection($stream, $executor); + $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); + + $conn = new Connection($stream, $executor, $loop, null); $conn->on('error', $this->expectCallableOnceWith( $this->logicalAnd( $this->isInstanceOf('RuntimeException'), diff --git a/tests/MysqlClientTest.php b/tests/MysqlClientTest.php index 5adf222..2576902 100644 --- a/tests/MysqlClientTest.php +++ b/tests/MysqlClientTest.php @@ -27,12 +27,6 @@ public function testConstructWithoutConnectorAndLoopAssignsConnectorAndLoopAutom $this->assertInstanceOf('React\Socket\ConnectorInterface', $connector); - $ref = new \ReflectionProperty($mysql, 'loop'); - $ref->setAccessible(true); - $loop = $ref->getValue($mysql); - - $this->assertInstanceOf('React\EventLoop\LoopInterface', $loop); - $ref = new \ReflectionProperty($factory, 'loop'); $ref->setAccessible(true); $loop = $ref->getValue($factory); @@ -56,11 +50,6 @@ public function testConstructWithConnectorAndLoopAssignsGivenConnectorAndLoop() $this->assertSame($connector, $ref->getValue($factory)); - $ref = new \ReflectionProperty($mysql, 'loop'); - $ref->setAccessible(true); - - $this->assertSame($loop, $ref->getValue($mysql)); - $ref = new \ReflectionProperty($factory, 'loop'); $ref->setAccessible(true); @@ -90,7 +79,7 @@ public function testPingWillNotCloseConnectionWhenPendingConnectionFails() $deferred->reject(new \RuntimeException()); } - public function testPingWillNotCloseConnectionWhenUnderlyingConnectionCloses() + public function testConnectionCloseEventAfterPingWillNotEmitCloseEvent() { $base = $this->getMockBuilder('React\Mysql\Io\Connection')->setMethods(['ping', 'close'])->disableOriginalConstructor()->getMock(); $base->expects($this->once())->method('ping')->willReturn(\React\Promise\resolve(null)); @@ -114,34 +103,7 @@ public function testPingWillNotCloseConnectionWhenUnderlyingConnectionCloses() $base->emit('close'); } - public function testPingWillCancelTimerWithoutClosingConnectionWhenUnderlyingConnectionCloses() - { - $base = $this->getMockBuilder('React\Mysql\Io\Connection')->setMethods(['ping', 'close'])->disableOriginalConstructor()->getMock(); - $base->expects($this->once())->method('ping')->willReturn(\React\Promise\resolve(null)); - - $factory = $this->getMockBuilder('React\Mysql\Io\Factory')->disableOriginalConstructor()->getMock(); - $factory->expects($this->once())->method('createConnection')->willReturn(\React\Promise\resolve($base)); - - $timer = $this->getMockBuilder('React\EventLoop\TimerInterface')->getMock(); - $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); - $loop->expects($this->once())->method('addTimer')->willReturn($timer); - $loop->expects($this->once())->method('cancelTimer')->with($timer); - - $connection = new MysqlClient('', null, $loop); - - $ref = new \ReflectionProperty($connection, 'factory'); - $ref->setAccessible(true); - $ref->setValue($connection, $factory); - - $connection->on('close', $this->expectCallableNever()); - - $connection->ping(); - - assert($base instanceof Connection); - $base->emit('close'); - } - - public function testPingWillNotForwardErrorFromUnderlyingConnection() + public function testConnectionErrorEventAfterPingWillNotEmitErrorEvent() { $base = $this->getMockBuilder('React\Mysql\Io\Connection')->setMethods(['ping'])->disableOriginalConstructor()->getMock(); $base->expects($this->once())->method('ping')->willReturn(\React\Promise\resolve(null)); @@ -161,87 +123,15 @@ public function testPingWillNotForwardErrorFromUnderlyingConnection() $connection->ping(); - $base->emit('error', [new \RuntimeException()]); - } - - public function testPingFollowedByIdleTimerWillQuitUnderlyingConnection() - { - $base = $this->getMockBuilder('React\Mysql\Io\Connection')->setMethods(['ping', 'quit', 'close'])->disableOriginalConstructor()->getMock(); - $base->expects($this->once())->method('ping')->willReturn(\React\Promise\resolve(null)); - $base->expects($this->once())->method('quit')->willReturn(\React\Promise\resolve(null)); - $base->expects($this->never())->method('close'); - - $factory = $this->getMockBuilder('React\Mysql\Io\Factory')->disableOriginalConstructor()->getMock(); - $factory->expects($this->once())->method('createConnection')->willReturn(\React\Promise\resolve($base)); - - $timeout = null; - $timer = $this->getMockBuilder('React\EventLoop\TimerInterface')->getMock(); - $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); - $loop->expects($this->once())->method('addTimer')->with($this->anything(), $this->callback(function ($cb) use (&$timeout) { - $timeout = $cb; - return true; - }))->willReturn($timer); - - $connection = new MysqlClient('', null, $loop); - - $ref = new \ReflectionProperty($connection, 'factory'); - $ref->setAccessible(true); - $ref->setValue($connection, $factory); - - $connection->on('close', $this->expectCallableNever()); - - $connection->ping(); - - $this->assertNotNull($timeout); - $timeout(); - } - - public function testPingFollowedByIdleTimerWillNotHaveToCloseUnderlyingConnectionWhenQuitFailsBecauseUnderlyingConnectionEmitsCloseAutomatically() - { - $base = $this->getMockBuilder('React\Mysql\Io\Connection')->setMethods(['ping', 'quit', 'close'])->disableOriginalConstructor()->getMock(); - $base->expects($this->once())->method('ping')->willReturn(\React\Promise\resolve(null)); - $base->expects($this->once())->method('quit')->willReturn(\React\Promise\reject(new \RuntimeException())); - $base->expects($this->never())->method('close'); - - $factory = $this->getMockBuilder('React\Mysql\Io\Factory')->disableOriginalConstructor()->getMock(); - $factory->expects($this->once())->method('createConnection')->willReturn(\React\Promise\resolve($base)); - - $timeout = null; - $timer = $this->getMockBuilder('React\EventLoop\TimerInterface')->getMock(); - $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); - $loop->expects($this->once())->method('addTimer')->with($this->anything(), $this->callback(function ($cb) use (&$timeout) { - $timeout = $cb; - return true; - }))->willReturn($timer); - - $connection = new MysqlClient('', null, $loop); - - $ref = new \ReflectionProperty($connection, 'factory'); - $ref->setAccessible(true); - $ref->setValue($connection, $factory); - - $connection->on('close', $this->expectCallableNever()); - - $connection->ping(); - - $this->assertNotNull($timeout); - $timeout(); - assert($base instanceof Connection); - $base->emit('close'); - - $ref = new \ReflectionProperty($connection, 'connecting'); - $ref->setAccessible(true); - $connecting = $ref->getValue($connection); - - $this->assertNull($connecting); + $base->emit('error', [new \RuntimeException()]); } - public function testPingAfterIdleTimerWillCloseUnderlyingConnectionBeforeCreatingSecondConnection() + public function testPingAfterConnectionIsInClosingStateDueToIdleTimerWillCloseConnectionBeforeCreatingSecondConnection() { $base = $this->getMockBuilder('React\Mysql\Io\Connection')->setMethods(['ping', 'quit', 'close'])->disableOriginalConstructor()->getMock(); $base->expects($this->once())->method('ping')->willReturn(\React\Promise\resolve(null)); - $base->expects($this->once())->method('quit')->willReturn(new Promise(function () { })); + $base->expects($this->never())->method('quit'); $base->expects($this->once())->method('close'); $factory = $this->getMockBuilder('React\Mysql\Io\Factory')->disableOriginalConstructor()->getMock(); @@ -250,13 +140,7 @@ public function testPingAfterIdleTimerWillCloseUnderlyingConnectionBeforeCreatin new Promise(function () { }) ); - $timeout = null; - $timer = $this->getMockBuilder('React\EventLoop\TimerInterface')->getMock(); $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); - $loop->expects($this->once())->method('addTimer')->with($this->anything(), $this->callback(function ($cb) use (&$timeout) { - $timeout = $cb; - return true; - }))->willReturn($timer); $connection = new MysqlClient('', null, $loop); @@ -268,21 +152,19 @@ public function testPingAfterIdleTimerWillCloseUnderlyingConnectionBeforeCreatin $connection->ping(); - $this->assertNotNull($timeout); - $timeout(); + // emulate triggering idle timer by setting connection state to closing + $base->state = Connection::STATE_CLOSING; $connection->ping(); } - - public function testQueryReturnsPendingPromiseAndWillNotStartTimerWhenConnectionIsPending() + public function testQueryReturnsPendingPromiseWhenConnectionIsPending() { $deferred = new Deferred(); $factory = $this->getMockBuilder('React\Mysql\Io\Factory')->disableOriginalConstructor()->getMock(); $factory->expects($this->once())->method('createConnection')->willReturn($deferred->promise()); $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); - $loop->expects($this->never())->method('addTimer'); $connection = new MysqlClient('', null, $loop); @@ -314,7 +196,7 @@ public function testQueryWillQueryUnderlyingConnectionWhenResolved() $connection->query('SELECT 1'); } - public function testQueryWillResolveAndStartTimerWithDefaultIntervalWhenQueryFromUnderlyingConnectionResolves() + public function testQueryWillResolveWhenQueryFromUnderlyingConnectionResolves() { $result = new MysqlResult(); @@ -325,7 +207,6 @@ public function testQueryWillResolveAndStartTimerWithDefaultIntervalWhenQueryFro $factory->expects($this->once())->method('createConnection')->willReturn(\React\Promise\resolve($base)); $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); - $loop->expects($this->once())->method('addTimer')->with(0.001, $this->anything()); $connection = new MysqlClient('', null, $loop); @@ -337,53 +218,7 @@ public function testQueryWillResolveAndStartTimerWithDefaultIntervalWhenQueryFro $ret->then($this->expectCallableOnceWith($result), $this->expectCallableNever()); } - public function testQueryWillResolveAndStartTimerWithIntervalFromIdleParameterWhenQueryFromUnderlyingConnectionResolves() - { - $result = new MysqlResult(); - - $base = $this->getMockBuilder('React\Mysql\Io\Connection')->disableOriginalConstructor()->getMock(); - $base->expects($this->once())->method('query')->with('SELECT 1')->willReturn(\React\Promise\resolve($result)); - - $factory = $this->getMockBuilder('React\Mysql\Io\Factory')->disableOriginalConstructor()->getMock(); - $factory->expects($this->once())->method('createConnection')->willReturn(\React\Promise\resolve($base)); - - $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); - $loop->expects($this->once())->method('addTimer')->with(2.5, $this->anything()); - - $connection = new MysqlClient('mysql://localhost?idle=2.5', null, $loop); - - $ref = new \ReflectionProperty($connection, 'factory'); - $ref->setAccessible(true); - $ref->setValue($connection, $factory); - - $ret = $connection->query('SELECT 1'); - $ret->then($this->expectCallableOnceWith($result), $this->expectCallableNever()); - } - - public function testQueryWillResolveWithoutStartingTimerWhenQueryFromUnderlyingConnectionResolvesAndIdleParameterIsNegative() - { - $result = new MysqlResult(); - - $base = $this->getMockBuilder('React\Mysql\Io\Connection')->disableOriginalConstructor()->getMock(); - $base->expects($this->once())->method('query')->with('SELECT 1')->willReturn(\React\Promise\resolve($result)); - - $factory = $this->getMockBuilder('React\Mysql\Io\Factory')->disableOriginalConstructor()->getMock(); - $factory->expects($this->once())->method('createConnection')->willReturn(\React\Promise\resolve($base)); - - $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); - $loop->expects($this->never())->method('addTimer'); - - $connection = new MysqlClient('mysql://localhost?idle=-1', null, $loop); - - $ref = new \ReflectionProperty($connection, 'factory'); - $ref->setAccessible(true); - $ref->setValue($connection, $factory); - - $ret = $connection->query('SELECT 1'); - $ret->then($this->expectCallableOnceWith($result), $this->expectCallableNever()); - } - - public function testQueryBeforePingWillResolveWithoutStartingTimerWhenQueryFromUnderlyingConnectionResolvesBecausePingIsStillPending() + public function testPingAfterQueryWillPassPingToConnectionWhenQueryResolves() { $result = new MysqlResult(); $deferred = new Deferred(); @@ -396,7 +231,6 @@ public function testQueryBeforePingWillResolveWithoutStartingTimerWhenQueryFromU $factory->expects($this->once())->method('createConnection')->willReturn(\React\Promise\resolve($base)); $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); - $loop->expects($this->never())->method('addTimer'); $connection = new MysqlClient('', null, $loop); @@ -412,31 +246,7 @@ public function testQueryBeforePingWillResolveWithoutStartingTimerWhenQueryFromU $ret->then($this->expectCallableOnceWith($result), $this->expectCallableNever()); } - public function testQueryAfterPingWillCancelTimerAgainWhenPingFromUnderlyingConnectionResolved() - { - $base = $this->getMockBuilder('React\Mysql\Io\Connection')->disableOriginalConstructor()->getMock(); - $base->expects($this->once())->method('ping')->willReturn(\React\Promise\resolve(null)); - $base->expects($this->once())->method('query')->with('SELECT 1')->willReturn(new Promise(function () { })); - - $factory = $this->getMockBuilder('React\Mysql\Io\Factory')->disableOriginalConstructor()->getMock(); - $factory->expects($this->once())->method('createConnection')->willReturn(\React\Promise\resolve($base)); - - $timer = $this->getMockBuilder('React\EventLoop\TimerInterface')->getMock(); - $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); - $loop->expects($this->once())->method('addTimer')->willReturn($timer); - $loop->expects($this->once())->method('cancelTimer')->with($timer); - - $connection = new MysqlClient('', null, $loop); - - $ref = new \ReflectionProperty($connection, 'factory'); - $ref->setAccessible(true); - $ref->setValue($connection, $factory); - - $connection->ping(); - $connection->query('SELECT 1'); - } - - public function testQueryWillRejectAndStartTimerWhenQueryFromUnderlyingConnectionRejects() + public function testQueryWillRejectWhenQueryFromUnderlyingConnectionRejects() { $error = new \RuntimeException(); @@ -447,7 +257,6 @@ public function testQueryWillRejectAndStartTimerWhenQueryFromUnderlyingConnectio $factory->expects($this->once())->method('createConnection')->willReturn(\React\Promise\resolve($base)); $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); - $loop->expects($this->once())->method('addTimer'); $connection = new MysqlClient('', null, $loop); @@ -459,14 +268,13 @@ public function testQueryWillRejectAndStartTimerWhenQueryFromUnderlyingConnectio $ret->then($this->expectCallableNever(), $this->expectCallableOnceWith($error)); } - public function testQueryWillRejectWithoutStartingTimerWhenUnderlyingConnectionRejects() + public function testQueryWillRejectWhenUnderlyingConnectionRejects() { $deferred = new Deferred(); $factory = $this->getMockBuilder('React\Mysql\Io\Factory')->disableOriginalConstructor()->getMock(); $factory->expects($this->once())->method('createConnection')->willReturn($deferred->promise()); $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); - $loop->expects($this->never())->method('addTimer'); $connection = new MysqlClient('', null, $loop); @@ -499,7 +307,7 @@ public function testQueryStreamReturnsReadableStreamWhenConnectionIsPending() $this->assertTrue($ret->isReadable()); } - public function testQueryStreamWillReturnStreamFromUnderlyingConnectionWithoutStartingTimerWhenResolved() + public function testQueryStreamWillReturnStreamFromUnderlyingConnectionWhenResolved() { $stream = new ThroughStream(); $base = $this->getMockBuilder('React\Mysql\Io\Connection')->disableOriginalConstructor()->getMock(); @@ -509,7 +317,6 @@ public function testQueryStreamWillReturnStreamFromUnderlyingConnectionWithoutSt $factory->expects($this->once())->method('createConnection')->willReturn(\React\Promise\resolve($base)); $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); - $loop->expects($this->never())->method('addTimer'); $connection = new MysqlClient('', null, $loop); @@ -525,7 +332,7 @@ public function testQueryStreamWillReturnStreamFromUnderlyingConnectionWithoutSt $this->assertTrue($ret->isReadable()); } - public function testQueryStreamWillReturnStreamFromUnderlyingConnectionAndStartTimerWhenResolvedAndClosed() + public function testQueryStreamWillReturnStreamFromUnderlyingConnectionWhenResolvedAndClosed() { $stream = new ThroughStream(); $base = $this->getMockBuilder('React\Mysql\Io\Connection')->disableOriginalConstructor()->getMock(); @@ -535,7 +342,6 @@ public function testQueryStreamWillReturnStreamFromUnderlyingConnectionAndStartT $factory->expects($this->once())->method('createConnection')->willReturn(\React\Promise\resolve($base)); $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); - $loop->expects($this->once())->method('addTimer'); $connection = new MysqlClient('', null, $loop); @@ -653,7 +459,7 @@ public function testPingWillTryToCreateNewUnderlyingConnectionAfterPreviousPingF $connection->ping()->then($this->expectCallableNever(), $this->expectCallableOnceWith($error)); } - public function testPingWillResolveAndStartTimerWhenPingFromUnderlyingConnectionResolves() + public function testPingWillResolveWhenPingFromUnderlyingConnectionResolves() { $base = $this->getMockBuilder('React\Mysql\Io\Connection')->disableOriginalConstructor()->getMock(); $base->expects($this->once())->method('ping')->willReturn(\React\Promise\resolve(null)); @@ -662,7 +468,6 @@ public function testPingWillResolveAndStartTimerWhenPingFromUnderlyingConnection $factory->expects($this->once())->method('createConnection')->willReturn(\React\Promise\resolve($base)); $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); - $loop->expects($this->once())->method('addTimer'); $connection = new MysqlClient('', null, $loop); @@ -674,7 +479,7 @@ public function testPingWillResolveAndStartTimerWhenPingFromUnderlyingConnection $ret->then($this->expectCallableOnce(), $this->expectCallableNever()); } - public function testPingWillRejectAndStartTimerWhenPingFromUnderlyingConnectionRejects() + public function testPingWillRejectWhenPingFromUnderlyingConnectionRejects() { $error = new \RuntimeException(); @@ -685,7 +490,6 @@ public function testPingWillRejectAndStartTimerWhenPingFromUnderlyingConnectionR $factory->expects($this->once())->method('createConnection')->willReturn(\React\Promise\resolve($base)); $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); - $loop->expects($this->once())->method('addTimer'); $connection = new MysqlClient('', null, $loop); @@ -697,7 +501,7 @@ public function testPingWillRejectAndStartTimerWhenPingFromUnderlyingConnectionR $ret->then($this->expectCallableNever(), $this->expectCallableOnceWith($error)); } - public function testPingWillRejectAndNotStartIdleTimerWhenPingFromUnderlyingConnectionRejectsBecauseConnectionIsDead() + public function testPingWillRejectWhenPingFromUnderlyingConnectionEmitsCloseEventAndRejects() { $error = new \RuntimeException(); @@ -712,7 +516,6 @@ public function testPingWillRejectAndNotStartIdleTimerWhenPingFromUnderlyingConn $factory->expects($this->once())->method('createConnection')->willReturn(\React\Promise\resolve($base)); $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); - $loop->expects($this->never())->method('addTimer'); $connection = new MysqlClient('', null, $loop); @@ -871,6 +674,33 @@ public function testQuitAfterPingRejectsAndThenEmitsCloseWhenUnderlyingConnectio $deferred->reject(new \RuntimeException()); } + public function testPingAfterQuitWillPassPingCommandToConnectionWhenItIsStillQuitting() + { + $connection = $this->getMockBuilder('React\Mysql\Io\Connection')->setMethods(['ping', 'quit', 'close'])->disableOriginalConstructor()->getMock(); + $connection->expects($this->exactly(2))->method('ping')->willReturn(\React\Promise\resolve(null)); + $connection->expects($this->once())->method('quit')->willReturn(new Promise(function () { })); + $connection->expects($this->never())->method('close'); + + $factory = $this->getMockBuilder('React\Mysql\Io\Factory')->disableOriginalConstructor()->getMock(); + $factory->expects($this->once())->method('createConnection')->willReturn(\React\Promise\resolve($connection)); + + $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); + + $mysql = new MysqlClient('', null, $loop); + + $ref = new \ReflectionProperty($mysql, 'factory'); + $ref->setAccessible(true); + $ref->setValue($mysql, $factory); + + $mysql->on('close', $this->expectCallableNever()); + + $mysql->ping(); + + $mysql->quit(); + + $mysql->ping(); + } + public function testCloseEmitsCloseImmediatelyWhenConnectionIsNotAlreadyPending() { $factory = $this->getMockBuilder('React\Mysql\Io\Factory')->disableOriginalConstructor()->getMock(); @@ -953,18 +783,16 @@ public function testCloseAfterPingDoesNotEmitConnectionErrorFromAbortedConnectio $connection->close(); } - public function testCloseAfterPingWillCancelTimerWhenPingFromUnderlyingConnectionResolves() + public function testCloseAfterPingWillCloseUnderlyingConnection() { $base = $this->getMockBuilder('React\Mysql\Io\Connection')->disableOriginalConstructor()->getMock(); $base->expects($this->once())->method('ping')->willReturn(\React\Promise\resolve(null)); + $base->expects($this->once())->method('close'); $factory = $this->getMockBuilder('React\Mysql\Io\Factory')->disableOriginalConstructor()->getMock(); $factory->expects($this->once())->method('createConnection')->willReturn(\React\Promise\resolve($base)); - $timer = $this->getMockBuilder('React\EventLoop\TimerInterface')->getMock(); $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); - $loop->expects($this->once())->method('addTimer')->willReturn($timer); - $loop->expects($this->once())->method('cancelTimer')->with($timer); $connection = new MysqlClient('', null, $loop); @@ -1021,23 +849,17 @@ public function testCloseAfterQuitAfterPingWillCloseUnderlyingConnectionWhenQuit $connection->close(); } - public function testCloseAfterPingAfterIdleTimeoutWillCloseUnderlyingConnectionWhenQuitIsStillPending() + public function testCloseAfterConnectionIsInClosingStateDueToIdleTimerWillCloseUnderlyingConnection() { $base = $this->getMockBuilder('React\Mysql\Io\Connection')->disableOriginalConstructor()->getMock(); $base->expects($this->once())->method('ping')->willReturn(\React\Promise\resolve(null)); - $base->expects($this->once())->method('quit')->willReturn(new Promise(function () { })); + $base->expects($this->never())->method('quit'); $base->expects($this->once())->method('close'); $factory = $this->getMockBuilder('React\Mysql\Io\Factory')->disableOriginalConstructor()->getMock(); $factory->expects($this->once())->method('createConnection')->willReturn(\React\Promise\resolve($base)); - $timeout = null; - $timer = $this->getMockBuilder('React\EventLoop\TimerInterface')->getMock(); $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); - $loop->expects($this->once())->method('addTimer')->with($this->anything(), $this->callback(function ($cb) use (&$timeout) { - $timeout = $cb; - return true; - }))->willReturn($timer); $connection = new MysqlClient('', null, $loop); @@ -1047,8 +869,8 @@ public function testCloseAfterPingAfterIdleTimeoutWillCloseUnderlyingConnectionW $connection->ping(); - $this->assertNotNull($timeout); - $timeout(); + // emulate triggering idle timer by setting connection state to closing + $base->state = Connection::STATE_CLOSING; $connection->close(); } From 836ca2d75c6d2eb7a4d97b63397f61623d213d3e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christian=20L=C3=BCck?= Date: Tue, 28 Nov 2023 19:40:48 +0100 Subject: [PATCH 3/3] Refactor to move command queuing logic to `MysqlClient` --- src/Io/Connection.php | 18 +- src/Io/Factory.php | 2 +- src/Io/Parser.php | 11 + src/MysqlClient.php | 150 +++-- tests/Io/ConnectionTest.php | 144 ++++- tests/MysqlClientTest.php | 1043 ++++++++++++++++++++++++++++++++++- 6 files changed, 1269 insertions(+), 99 deletions(-) diff --git a/src/Io/Connection.php b/src/Io/Connection.php index 749f456..74be321 100644 --- a/src/Io/Connection.php +++ b/src/Io/Connection.php @@ -40,6 +40,9 @@ class Connection extends EventEmitter */ private $stream; + /** @var Parser */ + private $parser; + /** @var LoopInterface */ private $loop; @@ -57,13 +60,15 @@ class Connection extends EventEmitter * * @param SocketConnectionInterface $stream * @param Executor $executor + * @param Parser $parser * @param LoopInterface $loop * @param ?float $idlePeriod */ - public function __construct(SocketConnectionInterface $stream, Executor $executor, LoopInterface $loop, $idlePeriod) + public function __construct(SocketConnectionInterface $stream, Executor $executor, Parser $parser, LoopInterface $loop, $idlePeriod) { $this->stream = $stream; $this->executor = $executor; + $this->parser = $parser; $this->loop = $loop; if ($idlePeriod !== null) { @@ -74,6 +79,17 @@ public function __construct(SocketConnectionInterface $stream, Executor $executo $stream->on('close', [$this, 'handleConnectionClosed']); } + /** + * busy executing some command such as query or ping + * + * @return bool + * @throws void + */ + public function isBusy() + { + return $this->parser->isBusy() || !$this->executor->isIdle(); + } + /** * {@inheritdoc} */ diff --git a/src/Io/Factory.php b/src/Io/Factory.php index 5233907..0300415 100644 --- a/src/Io/Factory.php +++ b/src/Io/Factory.php @@ -215,7 +215,7 @@ public function createConnection( $executor = new Executor(); $parser = new Parser($stream, $executor); - $connection = new Connection($stream, $executor, $this->loop, $idlePeriod); + $connection = new Connection($stream, $executor, $parser, $this->loop, $idlePeriod); $command = $executor->enqueue($authCommand); $parser->start(); diff --git a/src/Io/Parser.php b/src/Io/Parser.php index f65ca5e..c3006e9 100644 --- a/src/Io/Parser.php +++ b/src/Io/Parser.php @@ -115,6 +115,17 @@ public function __construct(DuplexStreamInterface $stream, Executor $executor) }); } + /** + * busy executing some command such as query or ping + * + * @return bool + * @throws void + */ + public function isBusy() + { + return $this->currCommand !== null; + } + public function start() { $this->stream->on('data', [$this, 'handleData']); diff --git a/src/MysqlClient.php b/src/MysqlClient.php index b5c0a9c..a7d8aa8 100644 --- a/src/MysqlClient.php +++ b/src/MysqlClient.php @@ -6,6 +6,7 @@ use React\EventLoop\LoopInterface; use React\Mysql\Io\Connection; use React\Mysql\Io\Factory; +use React\Promise\Deferred; use React\Promise\Promise; use React\Socket\ConnectorInterface; use React\Stream\ReadableStreamInterface; @@ -58,6 +59,13 @@ class MysqlClient extends EventEmitter /** @var ?Connection */ private $connection; + /** + * array of outstanding connection requests to send next commands once a connection becomes ready + * + * @var array> + */ + private $pending = []; + /** * set to true only between calling `quit()` and the connection closing in response * @@ -77,44 +85,6 @@ public function __construct( $this->uri = $uri; } - /** - * @return PromiseInterface - */ - private function getConnection() - { - // happy path: reuse existing connection unless it is already closing after an idle timeout - if ($this->connection !== null && ($this->quitting || $this->connection->state !== Connection::STATE_CLOSING)) { - return \React\Promise\resolve($this->connection); - } - - if ($this->connecting !== null) { - return $this->connecting; - } - - // force-close connection if still waiting for previous disconnection - if ($this->connection !== null) { - assert($this->connection->state === Connection::STATE_CLOSING); - $this->connection->close(); - } - - // create new connection if not already connected or connecting - $this->connecting = $connecting = $this->factory->createConnection($this->uri); - $this->connecting->then(function (Connection $connection) { - $this->connection = $connection; - $this->connecting = null; - - // connection completed => remember only until closed - $connection->on('close', function () { - $this->connection = null; - }); - }, function () { - // connection failed => discard connection attempt - $this->connecting = null; - }); - - return $connecting; - } - /** * Performs an async query. * @@ -176,12 +146,18 @@ private function getConnection() */ public function query($sql, array $params = []) { - if ($this->closed) { + if ($this->closed || $this->quitting) { return \React\Promise\reject(new Exception('Connection closed')); } return $this->getConnection()->then(function (Connection $connection) use ($sql, $params) { - return $connection->query($sql, $params); + return $connection->query($sql, $params)->then(function (MysqlResult $result) use ($connection) { + $this->handleConnectionReady($connection); + return $result; + }, function (\Exception $e) use ($connection) { + $this->handleConnectionReady($connection); + throw $e; + }); }); } @@ -246,13 +222,22 @@ public function query($sql, array $params = []) */ public function queryStream($sql, $params = []) { - if ($this->closed) { + if ($this->closed || $this->quitting) { throw new Exception('Connection closed'); } return \React\Promise\Stream\unwrapReadable( $this->getConnection()->then(function (Connection $connection) use ($sql, $params) { - return $connection->queryStream($sql, $params); + $stream = $connection->queryStream($sql, $params); + + $stream->on('end', function () use ($connection) { + $this->handleConnectionReady($connection); + }); + $stream->on('error', function () use ($connection) { + $this->handleConnectionReady($connection); + }); + + return $stream; }) ); } @@ -279,12 +264,17 @@ public function queryStream($sql, $params = []) */ public function ping() { - if ($this->closed) { + if ($this->closed || $this->quitting) { return \React\Promise\reject(new Exception('Connection closed')); } return $this->getConnection()->then(function (Connection $connection) { - return $connection->ping(); + return $connection->ping()->then(function () use ($connection) { + $this->handleConnectionReady($connection); + }, function (\Exception $e) use ($connection) { + $this->handleConnectionReady($connection); + throw $e; + }); }); } @@ -312,7 +302,7 @@ public function ping() */ public function quit() { - if ($this->closed) { + if ($this->closed || $this->quitting) { return \React\Promise\reject(new Exception('Connection closed')); } @@ -379,7 +369,77 @@ public function close() $this->connecting = null; } + // clear all outstanding commands + foreach ($this->pending as $deferred) { + $deferred->reject(new \RuntimeException('Connection closed')); + } + $this->pending = []; + $this->emit('close'); $this->removeAllListeners(); } + + + /** + * @return PromiseInterface + */ + private function getConnection() + { + $deferred = new Deferred(); + + // force-close connection if still waiting for previous disconnection due to idle timer + if ($this->connection !== null && $this->connection->state === Connection::STATE_CLOSING) { + $this->connection->close(); + $this->connection = null; + } + + // happy path: reuse existing connection unless it is currently busy executing another command + if ($this->connection !== null && !$this->connection->isBusy()) { + $deferred->resolve($this->connection); + return $deferred->promise(); + } + + // queue pending connection request until connection becomes ready + $this->pending[] = $deferred; + + // create new connection if not already connected or connecting + if ($this->connection === null && $this->connecting === null) { + $this->connecting = $this->factory->createConnection($this->uri); + $this->connecting->then(function (Connection $connection) { + // connection completed => remember only until closed + $this->connecting = null; + $this->connection = $connection; + $connection->on('close', function () { + $this->connection = null; + }); + + // handle first command from queue when connection is ready + $this->handleConnectionReady($connection); + }, function (\Exception $e) { + // connection failed => discard connection attempt + $this->connecting = null; + + foreach ($this->pending as $key => $deferred) { + $deferred->reject($e); + unset($this->pending[$key]); + } + }); + } + + return $deferred->promise(); + } + + private function handleConnectionReady(Connection $connection) + { + $deferred = \reset($this->pending); + if ($deferred === false) { + // nothing to do if there are no outstanding connection requests + return; + } + + assert($deferred instanceof Deferred); + unset($this->pending[\key($this->pending)]); + + $deferred->resolve($connection); + } } diff --git a/tests/Io/ConnectionTest.php b/tests/Io/ConnectionTest.php index 394545f..5a0a5ff 100644 --- a/tests/Io/ConnectionTest.php +++ b/tests/Io/ConnectionTest.php @@ -7,6 +7,42 @@ class ConnectionTest extends BaseTestCase { + public function testIsBusyReturnsTrueWhenParserIsBusy() + { + $stream = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock(); + + $executor = $this->getMockBuilder('React\Mysql\Io\Executor')->setMethods(['enqueue', 'isIdle'])->getMock(); + $executor->expects($this->once())->method('enqueue')->willReturnArgument(0); + $executor->expects($this->never())->method('isIdle'); + + $parser = $this->getMockBuilder('React\Mysql\Io\Parser')->disableOriginalConstructor()->getMock(); + $parser->expects($this->once())->method('isBusy')->willReturn(true); + + $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); + + $connection = new Connection($stream, $executor, $parser, $loop, null); + + $connection->query('SELECT 1'); + + $this->assertTrue($connection->isBusy()); + } + + public function testIsBusyReturnsFalseWhenParserIsNotBusyAndExecutorIsIdle() + { + $stream = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock(); + + $executor = $this->getMockBuilder('React\Mysql\Io\Executor')->getMock(); + $executor->expects($this->once())->method('isIdle')->willReturn(true); + + $parser = $this->getMockBuilder('React\Mysql\Io\Parser')->disableOriginalConstructor()->getMock(); + + $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); + + $connection = new Connection($stream, $executor, $parser, $loop, null); + + $this->assertFalse($connection->isBusy()); + } + public function testQueryWillEnqueueOneCommand() { $stream = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock(); @@ -15,10 +51,12 @@ public function testQueryWillEnqueueOneCommand() $executor = $this->getMockBuilder('React\Mysql\Io\Executor')->setMethods(['enqueue'])->getMock(); $executor->expects($this->once())->method('enqueue')->willReturnArgument(0); + $parser = $this->getMockBuilder('React\Mysql\Io\Parser')->disableOriginalConstructor()->getMock(); + $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); $loop->expects($this->never())->method('addTimer'); - $conn = new Connection($stream, $executor, $loop, null); + $conn = new Connection($stream, $executor, $parser, $loop, null); $conn->query('SELECT 1'); } @@ -32,12 +70,14 @@ public function testQueryWillReturnResolvedPromiseAndStartIdleTimerWhenQueryComm return $currentCommand = $command; }); + $parser = $this->getMockBuilder('React\Mysql\Io\Parser')->disableOriginalConstructor()->getMock(); + $timer = $this->getMockBuilder('React\EventLoop\TimerInterface')->getMock(); $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); $loop->expects($this->once())->method('addTimer')->with(0.001, $this->anything())->willReturn($timer); $loop->expects($this->never())->method('cancelTimer'); - $connection = new Connection($stream, $executor, $loop, null); + $connection = new Connection($stream, $executor, $parser, $loop, null); $this->assertNull($currentCommand); @@ -59,12 +99,14 @@ public function testQueryWillReturnResolvedPromiseAndStartIdleTimerWhenQueryComm return $currentCommand = $command; }); + $parser = $this->getMockBuilder('React\Mysql\Io\Parser')->disableOriginalConstructor()->getMock(); + $timer = $this->getMockBuilder('React\EventLoop\TimerInterface')->getMock(); $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); $loop->expects($this->once())->method('addTimer')->with(0.001, $this->anything())->willReturn($timer); $loop->expects($this->never())->method('cancelTimer'); - $connection = new Connection($stream, $executor, $loop, null); + $connection = new Connection($stream, $executor, $parser, $loop, null); $this->assertNull($currentCommand); @@ -86,12 +128,14 @@ public function testQueryWillReturnResolvedPromiseAndStartIdleTimerWhenIdlePerio return $currentCommand = $command; }); + $parser = $this->getMockBuilder('React\Mysql\Io\Parser')->disableOriginalConstructor()->getMock(); + $timer = $this->getMockBuilder('React\EventLoop\TimerInterface')->getMock(); $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); $loop->expects($this->once())->method('addTimer')->with(1.0, $this->anything())->willReturn($timer); $loop->expects($this->never())->method('cancelTimer'); - $connection = new Connection($stream, $executor, $loop, 1.0); + $connection = new Connection($stream, $executor, $parser, $loop, 1.0); $this->assertNull($currentCommand); @@ -113,10 +157,12 @@ public function testQueryWillReturnResolvedPromiseAndNotStartIdleTimerWhenIdlePe return $currentCommand = $command; }); + $parser = $this->getMockBuilder('React\Mysql\Io\Parser')->disableOriginalConstructor()->getMock(); + $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); $loop->expects($this->never())->method('addTimer'); - $connection = new Connection($stream, $executor, $loop, -1); + $connection = new Connection($stream, $executor, $parser, $loop, -1); $this->assertNull($currentCommand); @@ -138,12 +184,14 @@ public function testQueryWillReturnRejectedPromiseAndStartIdleTimerWhenQueryComm return $currentCommand = $command; }); + $parser = $this->getMockBuilder('React\Mysql\Io\Parser')->disableOriginalConstructor()->getMock(); + $timer = $this->getMockBuilder('React\EventLoop\TimerInterface')->getMock(); $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); $loop->expects($this->once())->method('addTimer')->with(0.001, $this->anything())->willReturn($timer); $loop->expects($this->never())->method('cancelTimer'); - $connection = new Connection($stream, $executor, $loop, null); + $connection = new Connection($stream, $executor, $parser, $loop, null); $this->assertNull($currentCommand); @@ -166,6 +214,8 @@ public function testQueryFollowedByIdleTimerWillQuitUnderlyingConnectionAndEmitC return $currentCommand = $command; }); + $parser = $this->getMockBuilder('React\Mysql\Io\Parser')->disableOriginalConstructor()->getMock(); + $timeout = null; $timer = $this->getMockBuilder('React\EventLoop\TimerInterface')->getMock(); $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); @@ -174,7 +224,7 @@ public function testQueryFollowedByIdleTimerWillQuitUnderlyingConnectionAndEmitC return true; }))->willReturn($timer); - $connection = new Connection($stream, $executor, $loop, null); + $connection = new Connection($stream, $executor, $parser, $loop, null); $connection->on('close', $this->expectCallableOnce()); @@ -203,6 +253,8 @@ public function testQueryFollowedByIdleTimerWillQuitUnderlyingConnectionAndEmitC return $currentCommand = $command; }); + $parser = $this->getMockBuilder('React\Mysql\Io\Parser')->disableOriginalConstructor()->getMock(); + $timeout = null; $timer = $this->getMockBuilder('React\EventLoop\TimerInterface')->getMock(); $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); @@ -211,7 +263,7 @@ public function testQueryFollowedByIdleTimerWillQuitUnderlyingConnectionAndEmitC return true; }))->willReturn($timer); - $connection = new Connection($stream, $executor, $loop, null); + $connection = new Connection($stream, $executor, $parser, $loop, null); $connection->on('close', $this->expectCallableOnce()); @@ -239,10 +291,12 @@ public function testQueryTwiceWillEnqueueSecondQueryWithoutStartingIdleTimerWhen return $currentCommand = $command; }); + $parser = $this->getMockBuilder('React\Mysql\Io\Parser')->disableOriginalConstructor()->getMock(); + $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); $loop->expects($this->never())->method('addTimer'); - $connection = new Connection($stream, $executor, $loop, null); + $connection = new Connection($stream, $executor, $parser, $loop, null); $this->assertNull($currentCommand); @@ -263,12 +317,14 @@ public function testQueryTwiceAfterIdleTimerWasStartedWillCancelIdleTimerAndEnqu return $currentCommand = $command; }); + $parser = $this->getMockBuilder('React\Mysql\Io\Parser')->disableOriginalConstructor()->getMock(); + $timer = $this->getMockBuilder('React\EventLoop\TimerInterface')->getMock(); $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); $loop->expects($this->once())->method('addTimer')->with(0.001, $this->anything())->willReturn($timer); $loop->expects($this->once())->method('cancelTimer')->with($timer); - $connection = new Connection($stream, $executor, $loop, null); + $connection = new Connection($stream, $executor, $parser, $loop, null); $this->assertNull($currentCommand); @@ -288,10 +344,12 @@ public function testQueryStreamWillEnqueueOneCommand() $executor = $this->getMockBuilder('React\Mysql\Io\Executor')->setMethods(['enqueue'])->getMock(); $executor->expects($this->once())->method('enqueue')->willReturnArgument(0); + $parser = $this->getMockBuilder('React\Mysql\Io\Parser')->disableOriginalConstructor()->getMock(); + $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); $loop->expects($this->never())->method('addTimer'); - $conn = new Connection($stream, $executor, $loop, null); + $conn = new Connection($stream, $executor, $parser, $loop, null); $conn->queryStream('SELECT 1'); } @@ -305,12 +363,14 @@ public function testQueryStreamWillReturnStreamThatWillEmitEndEventAndStartIdleT return $currentCommand = $command; }); + $parser = $this->getMockBuilder('React\Mysql\Io\Parser')->disableOriginalConstructor()->getMock(); + $timer = $this->getMockBuilder('React\EventLoop\TimerInterface')->getMock(); $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); $loop->expects($this->once())->method('addTimer')->with(0.001, $this->anything())->willReturn($timer); $loop->expects($this->never())->method('cancelTimer'); - $connection = new Connection($stream, $executor, $loop, null); + $connection = new Connection($stream, $executor, $parser, $loop, null); $this->assertNull($currentCommand); @@ -333,12 +393,14 @@ public function testQueryStreamWillReturnStreamThatWillEmitErrorEventAndStartIdl return $currentCommand = $command; }); + $parser = $this->getMockBuilder('React\Mysql\Io\Parser')->disableOriginalConstructor()->getMock(); + $timer = $this->getMockBuilder('React\EventLoop\TimerInterface')->getMock(); $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); $loop->expects($this->once())->method('addTimer')->with(0.001, $this->anything())->willReturn($timer); $loop->expects($this->never())->method('cancelTimer'); - $connection = new Connection($stream, $executor, $loop, null); + $connection = new Connection($stream, $executor, $parser, $loop, null); $this->assertNull($currentCommand); @@ -359,10 +421,12 @@ public function testPingWillEnqueueOneCommand() $executor = $this->getMockBuilder('React\Mysql\Io\Executor')->setMethods(['enqueue'])->getMock(); $executor->expects($this->once())->method('enqueue')->willReturnArgument(0); + $parser = $this->getMockBuilder('React\Mysql\Io\Parser')->disableOriginalConstructor()->getMock(); + $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); $loop->expects($this->never())->method('addTimer'); - $conn = new Connection($stream, $executor, $loop, null); + $conn = new Connection($stream, $executor, $parser, $loop, null); $conn->ping(); } @@ -376,12 +440,14 @@ public function testPingWillReturnResolvedPromiseAndStartIdleTimerWhenPingComman return $currentCommand = $command; }); + $parser = $this->getMockBuilder('React\Mysql\Io\Parser')->disableOriginalConstructor()->getMock(); + $timer = $this->getMockBuilder('React\EventLoop\TimerInterface')->getMock(); $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); $loop->expects($this->once())->method('addTimer')->with(0.001, $this->anything())->willReturn($timer); $loop->expects($this->never())->method('cancelTimer'); - $connection = new Connection($stream, $executor, $loop, null); + $connection = new Connection($stream, $executor, $parser, $loop, null); $this->assertNull($currentCommand); @@ -403,12 +469,14 @@ public function testPingWillReturnRejectedPromiseAndStartIdleTimerWhenPingComman return $currentCommand = $command; }); + $parser = $this->getMockBuilder('React\Mysql\Io\Parser')->disableOriginalConstructor()->getMock(); + $timer = $this->getMockBuilder('React\EventLoop\TimerInterface')->getMock(); $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); $loop->expects($this->once())->method('addTimer')->with(0.001, $this->anything())->willReturn($timer); $loop->expects($this->never())->method('cancelTimer'); - $connection = new Connection($stream, $executor, $loop, null); + $connection = new Connection($stream, $executor, $parser, $loop, null); $this->assertNull($currentCommand); @@ -426,10 +494,12 @@ public function testQuitWillEnqueueOneCommand() $executor = $this->getMockBuilder('React\Mysql\Io\Executor')->setMethods(['enqueue'])->getMock(); $executor->expects($this->once())->method('enqueue')->willReturnArgument(0); + $parser = $this->getMockBuilder('React\Mysql\Io\Parser')->disableOriginalConstructor()->getMock(); + $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); $loop->expects($this->never())->method('addTimer'); - $conn = new Connection($stream, $executor, $loop, null); + $conn = new Connection($stream, $executor, $parser, $loop, null); $conn->quit(); } @@ -443,10 +513,12 @@ public function testQuitWillResolveBeforeEmittingCloseEventWhenQuitCommandEmitsS return $pingCommand = $command; }); + $parser = $this->getMockBuilder('React\Mysql\Io\Parser')->disableOriginalConstructor()->getMock(); + $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); $loop->expects($this->never())->method('addTimer'); - $connection = new Connection($stream, $executor, $loop, null); + $connection = new Connection($stream, $executor, $parser, $loop, null); $events = ''; $connection->on('close', function () use (&$events) { @@ -479,10 +551,12 @@ public function testQuitWillRejectBeforeEmittingCloseEventWhenQuitCommandEmitsEr return $pingCommand = $command; }); + $parser = $this->getMockBuilder('React\Mysql\Io\Parser')->disableOriginalConstructor()->getMock(); + $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); $loop->expects($this->never())->method('addTimer'); - $connection = new Connection($stream, $executor, $loop, null); + $connection = new Connection($stream, $executor, $parser, $loop, null); $events = ''; $connection->on('close', function () use (&$events) { @@ -512,10 +586,12 @@ public function testCloseWillEmitCloseEvent() $executor = $this->getMockBuilder('React\Mysql\Io\Executor')->getMock(); $executor->expects($this->once())->method('isIdle')->willReturn(true); + $parser = $this->getMockBuilder('React\Mysql\Io\Parser')->disableOriginalConstructor()->getMock(); + $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); $loop->expects($this->never())->method('addTimer'); - $connection = new Connection($stream, $executor, $loop, null); + $connection = new Connection($stream, $executor, $parser, $loop, null); $connection->on('close', $this->expectCallableOnce()); @@ -532,12 +608,14 @@ public function testCloseAfterIdleTimerWasStartedWillCancelIdleTimerAndEmitClose return $currentCommand = $command; }); + $parser = $this->getMockBuilder('React\Mysql\Io\Parser')->disableOriginalConstructor()->getMock(); + $timer = $this->getMockBuilder('React\EventLoop\TimerInterface')->getMock(); $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); $loop->expects($this->once())->method('addTimer')->with(0.001, $this->anything())->willReturn($timer); $loop->expects($this->once())->method('cancelTimer')->with($timer); - $connection = new Connection($stream, $executor, $loop, null); + $connection = new Connection($stream, $executor, $parser, $loop, null); $this->assertNull($currentCommand); @@ -557,9 +635,11 @@ public function testQueryAfterQuitRejectsImmediately() $executor = $this->getMockBuilder('React\Mysql\Io\Executor')->setMethods(['enqueue'])->getMock(); $executor->expects($this->once())->method('enqueue')->willReturnArgument(0); + $parser = $this->getMockBuilder('React\Mysql\Io\Parser')->disableOriginalConstructor()->getMock(); + $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); - $conn = new Connection($stream, $executor, $loop, null); + $conn = new Connection($stream, $executor, $parser, $loop, null); $conn->quit(); $promise = $conn->query('SELECT 1'); @@ -582,9 +662,11 @@ public function testQueryAfterCloseRejectsImmediately() $executor = $this->getMockBuilder('React\Mysql\Io\Executor')->setMethods(['enqueue'])->getMock(); $executor->expects($this->never())->method('enqueue'); + $parser = $this->getMockBuilder('React\Mysql\Io\Parser')->disableOriginalConstructor()->getMock(); + $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); - $conn = new Connection($stream, $executor, $loop, null); + $conn = new Connection($stream, $executor, $parser, $loop, null); $conn->close(); $promise = $conn->query('SELECT 1'); @@ -607,9 +689,11 @@ public function testQueryStreamAfterQuitThrows() $executor = $this->getMockBuilder('React\Mysql\Io\Executor')->setMethods(['enqueue'])->getMock(); $executor->expects($this->once())->method('enqueue')->willReturnArgument(0); + $parser = $this->getMockBuilder('React\Mysql\Io\Parser')->disableOriginalConstructor()->getMock(); + $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); - $conn = new Connection($stream, $executor, $loop, null); + $conn = new Connection($stream, $executor, $parser, $loop, null); $conn->quit(); try { @@ -626,9 +710,11 @@ public function testPingAfterQuitRejectsImmediately() $executor = $this->getMockBuilder('React\Mysql\Io\Executor')->setMethods(['enqueue'])->getMock(); $executor->expects($this->once())->method('enqueue')->willReturnArgument(0); + $parser = $this->getMockBuilder('React\Mysql\Io\Parser')->disableOriginalConstructor()->getMock(); + $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); - $conn = new Connection($stream, $executor, $loop, null); + $conn = new Connection($stream, $executor, $parser, $loop, null); $conn->quit(); $promise = $conn->ping(); @@ -651,9 +737,11 @@ public function testQuitAfterQuitRejectsImmediately() $executor = $this->getMockBuilder('React\Mysql\Io\Executor')->setMethods(['enqueue'])->getMock(); $executor->expects($this->once())->method('enqueue')->willReturnArgument(0); + $parser = $this->getMockBuilder('React\Mysql\Io\Parser')->disableOriginalConstructor()->getMock(); + $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); - $conn = new Connection($stream, $executor, $loop, null); + $conn = new Connection($stream, $executor, $parser, $loop, null); $conn->quit(); $promise = $conn->quit(); @@ -684,9 +772,11 @@ public function testCloseStreamEmitsErrorEvent() $executor = $this->getMockBuilder('React\Mysql\Io\Executor')->setMethods(['enqueue'])->getMock(); $executor->expects($this->never())->method('enqueue'); + $parser = $this->getMockBuilder('React\Mysql\Io\Parser')->disableOriginalConstructor()->getMock(); + $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); - $conn = new Connection($stream, $executor, $loop, null); + $conn = new Connection($stream, $executor, $parser, $loop, null); $conn->on('error', $this->expectCallableOnceWith( $this->logicalAnd( $this->isInstanceOf('RuntimeException'), diff --git a/tests/MysqlClientTest.php b/tests/MysqlClientTest.php index 2576902..2cedb2b 100644 --- a/tests/MysqlClientTest.php +++ b/tests/MysqlClientTest.php @@ -158,42 +158,1036 @@ public function testPingAfterConnectionIsInClosingStateDueToIdleTimerWillCloseCo $connection->ping(); } - public function testQueryReturnsPendingPromiseWhenConnectionIsPending() + public function testQueryWillCreateNewConnectionAndReturnPendingPromiseWhenConnectionIsPending() + { + $factory = $this->getMockBuilder('React\Mysql\Io\Factory')->disableOriginalConstructor()->getMock(); + $factory->expects($this->once())->method('createConnection')->willReturn(new Promise(function () { })); + + $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); + + $mysql = new MysqlClient('', null, $loop); + + $ref = new \ReflectionProperty($mysql, 'factory'); + $ref->setAccessible(true); + $ref->setValue($mysql, $factory); + + $promise = $mysql->query('SELECT 1'); + + $promise->then($this->expectCallableNever(), $this->expectCallableNever()); + } + + public function testQueryWillCreateNewConnectionAndReturnPendingPromiseWhenConnectionResolvesAndQueryOnConnectionIsPending() + { + $connection = $this->getMockBuilder('React\Mysql\Io\Connection')->disableOriginalConstructor()->getMock(); + $connection->expects($this->once())->method('query')->with('SELECT 1')->willReturn(new Promise(function () { })); + + $factory = $this->getMockBuilder('React\Mysql\Io\Factory')->disableOriginalConstructor()->getMock(); + $factory->expects($this->once())->method('createConnection')->willReturn(\React\Promise\resolve($connection)); + $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); + + $mysql = new MysqlClient('', null, $loop); + + $ref = new \ReflectionProperty($mysql, 'factory'); + $ref->setAccessible(true); + $ref->setValue($mysql, $factory); + + $promise = $mysql->query('SELECT 1'); + + $promise->then($this->expectCallableNever(), $this->expectCallableNever()); + } + + public function testQueryWillReturnResolvedPromiseWhenQueryOnConnectionResolves() + { + $result = new MysqlResult(); + $connection = $this->getMockBuilder('React\Mysql\Io\Connection')->disableOriginalConstructor()->getMock(); + $connection->expects($this->once())->method('query')->with('SELECT 1')->willReturn(\React\Promise\resolve($result)); + + $factory = $this->getMockBuilder('React\Mysql\Io\Factory')->disableOriginalConstructor()->getMock(); + $factory->expects($this->once())->method('createConnection')->willReturn(\React\Promise\resolve($connection)); + $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); + + $mysql = new MysqlClient('', null, $loop); + + $ref = new \ReflectionProperty($mysql, 'factory'); + $ref->setAccessible(true); + $ref->setValue($mysql, $factory); + + $promise = $mysql->query('SELECT 1'); + + $promise->then($this->expectCallableOnceWith($result)); + } + + public function testQueryWillReturnRejectedPromiseWhenCreateConnectionRejects() + { + $factory = $this->getMockBuilder('React\Mysql\Io\Factory')->disableOriginalConstructor()->getMock(); + $factory->expects($this->once())->method('createConnection')->willReturn(\React\Promise\reject(new \RuntimeException())); + $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); + + $mysql = new MysqlClient('', null, $loop); + + $ref = new \ReflectionProperty($mysql, 'factory'); + $ref->setAccessible(true); + $ref->setValue($mysql, $factory); + + $promise = $mysql->query('SELECT 1'); + + $promise->then(null, $this->expectCallableOnce()); + } + + public function testQueryWillReturnRejectedPromiseWhenQueryOnConnectionRejectsAfterCreateConnectionResolves() + { + $connection = $this->getMockBuilder('React\Mysql\Io\Connection')->disableOriginalConstructor()->getMock(); + $connection->expects($this->once())->method('query')->with('SELECT 1')->willReturn(\React\Promise\reject(new \RuntimeException())); + + $factory = $this->getMockBuilder('React\Mysql\Io\Factory')->disableOriginalConstructor()->getMock(); + $factory->expects($this->once())->method('createConnection')->willReturn(\React\Promise\resolve($connection)); + $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); + + $mysql = new MysqlClient('', null, $loop); + + $ref = new \ReflectionProperty($mysql, 'factory'); + $ref->setAccessible(true); + $ref->setValue($mysql, $factory); + + $promise = $mysql->query('SELECT 1'); + + $promise->then(null, $this->expectCallableOnce()); + } + + public function testQueryTwiceWillCreateSingleConnectionAndReturnPendingPromiseWhenCreateConnectionIsPending() + { + $factory = $this->getMockBuilder('React\Mysql\Io\Factory')->disableOriginalConstructor()->getMock(); + $factory->expects($this->once())->method('createConnection')->willReturn(new Promise(function () { })); + + $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); + + $mysql = new MysqlClient('', null, $loop); + + $ref = new \ReflectionProperty($mysql, 'factory'); + $ref->setAccessible(true); + $ref->setValue($mysql, $factory); + + $mysql->query('SELECT 1'); + + $promise = $mysql->query('SELECT 2'); + + $promise->then($this->expectCallableNever(), $this->expectCallableNever()); + } + + public function testQueryTwiceWillCallQueryOnConnectionOnlyOnceWhenQueryIsStillPending() + { + $connection = $this->getMockBuilder('React\Mysql\Io\Connection')->disableOriginalConstructor()->getMock(); + $connection->expects($this->once())->method('query')->with('SELECT 1')->willReturn(new Promise(function () { })); + $connection->expects($this->once())->method('isBusy')->willReturn(true); + + $factory = $this->getMockBuilder('React\Mysql\Io\Factory')->disableOriginalConstructor()->getMock(); + $factory->expects($this->once())->method('createConnection')->willReturn(\React\Promise\resolve($connection)); + $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); + + $mysql = new MysqlClient('', null, $loop); + + $ref = new \ReflectionProperty($mysql, 'factory'); + $ref->setAccessible(true); + $ref->setValue($mysql, $factory); + + $mysql->query('SELECT 1'); + + $promise = $mysql->query('SELECT 2'); + + $promise->then($this->expectCallableNever(), $this->expectCallableNever()); + } + + public function testQueryTwiceWillReuseConnectionForSecondQueryWhenFirstQueryIsAlreadyResolved() + { + $connection = $this->getMockBuilder('React\Mysql\Io\Connection')->disableOriginalConstructor()->getMock(); + $connection->expects($this->exactly(2))->method('query')->withConsecutive( + ['SELECT 1'], + ['SELECT 2'] + )->willReturnOnConsecutiveCalls( + \React\Promise\resolve(new MysqlResult()), + new Promise(function () { }) + ); + $connection->expects($this->once())->method('isBusy')->willReturn(false); + + $factory = $this->getMockBuilder('React\Mysql\Io\Factory')->disableOriginalConstructor()->getMock(); + $factory->expects($this->once())->method('createConnection')->willReturn(\React\Promise\resolve($connection)); + $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); + + $mysql = new MysqlClient('', null, $loop); + + $ref = new \ReflectionProperty($mysql, 'factory'); + $ref->setAccessible(true); + $ref->setValue($mysql, $factory); + + $mysql->query('SELECT 1'); + + $promise = $mysql->query('SELECT 2'); + + $promise->then($this->expectCallableNever(), $this->expectCallableNever()); + } + + public function testQueryTwiceWillCallSecondQueryOnConnectionAfterFirstQueryResolvesWhenBothQueriesAreGivenBeforeCreateConnectionResolves() + { + $connection = $this->getMockBuilder('React\Mysql\Io\Connection')->disableOriginalConstructor()->getMock(); + $connection->expects($this->exactly(2))->method('query')->withConsecutive( + ['SELECT 1'], + ['SELECT 2'] + )->willReturnOnConsecutiveCalls( + \React\Promise\resolve(new MysqlResult()), + new Promise(function () { }) + ); + $connection->expects($this->never())->method('isBusy'); + + $deferred = new Deferred(); + $factory = $this->getMockBuilder('React\Mysql\Io\Factory')->disableOriginalConstructor()->getMock(); + $factory->expects($this->once())->method('createConnection')->willReturn($deferred->promise()); + $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); + + $mysql = new MysqlClient('', null, $loop); + + $ref = new \ReflectionProperty($mysql, 'factory'); + $ref->setAccessible(true); + $ref->setValue($mysql, $factory); + + $mysql->query('SELECT 1'); + + $promise = $mysql->query('SELECT 2'); + + $deferred->resolve($connection); + + $promise->then($this->expectCallableNever(), $this->expectCallableNever()); + } + + public function testQueryTwiceWillCreateNewConnectionForSecondQueryWhenFirstConnectionIsClosedAfterFirstQueryIsResolved() + { + $connection = $this->getMockBuilder('React\Mysql\Io\Connection')->disableOriginalConstructor()->setMethods(['query', 'isBusy'])->getMock(); + $connection->expects($this->once())->method('query')->with('SELECT 1')->willReturn(\React\Promise\resolve(new MysqlResult())); + $connection->expects($this->never())->method('isBusy'); + + $factory = $this->getMockBuilder('React\Mysql\Io\Factory')->disableOriginalConstructor()->getMock(); + $factory->expects($this->exactly(2))->method('createConnection')->willReturnOnConsecutiveCalls( + \React\Promise\resolve($connection), + new Promise(function () { }) + ); + $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); + + $mysql = new MysqlClient('', null, $loop); + + $ref = new \ReflectionProperty($mysql, 'factory'); + $ref->setAccessible(true); + $ref->setValue($mysql, $factory); + + $mysql->query('SELECT 1'); + + assert($connection instanceof Connection); + $connection->emit('close'); + + $promise = $mysql->query('SELECT 2'); + + $promise->then($this->expectCallableNever(), $this->expectCallableNever()); + } + + public function testQueryTwiceWillCloseFirstConnectionAndCreateNewConnectionForSecondQueryWhenFirstConnectionIsInClosingStateDueToIdleTimerAfterFirstQueryIsResolved() + { + $connection = $this->getMockBuilder('React\Mysql\Io\Connection')->disableOriginalConstructor()->setMethods(['query', 'isBusy', 'close'])->getMock(); + $connection->expects($this->once())->method('query')->with('SELECT 1')->willReturn(\React\Promise\resolve(new MysqlResult())); + $connection->expects($this->once())->method('close'); + $connection->expects($this->never())->method('isBusy'); + + $factory = $this->getMockBuilder('React\Mysql\Io\Factory')->disableOriginalConstructor()->getMock(); + $factory->expects($this->exactly(2))->method('createConnection')->willReturnOnConsecutiveCalls( + \React\Promise\resolve($connection), + new Promise(function () { }) + ); + $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); + + $mysql = new MysqlClient('', null, $loop); + + $mysql->on('close', $this->expectCallableNever()); + + $ref = new \ReflectionProperty($mysql, 'factory'); + $ref->setAccessible(true); + $ref->setValue($mysql, $factory); + + $mysql->query('SELECT 1'); + + // emulate triggering idle timer by setting connection state to closing + $connection->state = Connection::STATE_CLOSING; + + $promise = $mysql->query('SELECT 2'); + + $promise->then($this->expectCallableNever(), $this->expectCallableNever()); + } + + public function testQueryTwiceWillRejectFirstQueryWhenCreateConnectionRejectsAndWillCreateNewConnectionForSecondQuery() + { + $factory = $this->getMockBuilder('React\Mysql\Io\Factory')->disableOriginalConstructor()->getMock(); + $factory->expects($this->exactly(2))->method('createConnection')->willReturnOnConsecutiveCalls( + \React\Promise\reject(new \RuntimeException()), + new Promise(function () { }) + ); + $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); + + $mysql = new MysqlClient('', null, $loop); + + $ref = new \ReflectionProperty($mysql, 'factory'); + $ref->setAccessible(true); + $ref->setValue($mysql, $factory); + + $promise1 = $mysql->query('SELECT 1'); + + $promise2 = $mysql->query('SELECT 2'); + + $promise1->then(null, $this->expectCallableOnce()); + + $promise2->then($this->expectCallableNever(), $this->expectCallableNever()); + } + + public function testQueryTwiceWillRejectBothQueriesWhenBothQueriesAreGivenBeforeCreateConnectionRejects() + { + $deferred = new Deferred(); + $factory = $this->getMockBuilder('React\Mysql\Io\Factory')->disableOriginalConstructor()->getMock(); + $factory->expects($this->once())->method('createConnection')->willReturn($deferred->promise()); + $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); + + $mysql = new MysqlClient('', null, $loop); + + $ref = new \ReflectionProperty($mysql, 'factory'); + $ref->setAccessible(true); + $ref->setValue($mysql, $factory); + + $promise1 = $mysql->query('SELECT 1'); + $promise2 = $mysql->query('SELECT 2'); + + $deferred->reject(new \RuntimeException()); + + $promise1->then(null, $this->expectCallableOnce()); + $promise2->then(null, $this->expectCallableOnce()); + } + + public function testQueryTriceWillRejectFirstTwoQueriesAndKeepThirdPendingWhenTwoQueriesAreGivenBeforeCreateConnectionRejects() + { + $deferred = new Deferred(); + $factory = $this->getMockBuilder('React\Mysql\Io\Factory')->disableOriginalConstructor()->getMock(); + $factory->expects($this->exactly(2))->method('createConnection')->willReturnOnConsecutiveCalls( + $deferred->promise(), + new Promise(function () { }) + ); + $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); + + $mysql = new MysqlClient('', null, $loop); + + $ref = new \ReflectionProperty($mysql, 'factory'); + $ref->setAccessible(true); + $ref->setValue($mysql, $factory); + + $promise1 = $mysql->query('SELECT 1'); + $promise2 = $mysql->query('SELECT 2'); + + $promise3 = $promise1->then(null, function () use ($mysql) { + return $mysql->query('SELECT 3'); + }); + + $deferred->reject(new \RuntimeException()); + + $promise1->then(null, $this->expectCallableOnce()); + $promise2->then(null, $this->expectCallableOnce()); + $promise3->then($this->expectCallableNever(), $this->expectCallableNever()); + } + + public function testQueryTwiceWillCallSecondQueryOnConnectionAfterFirstQueryRejectsWhenBothQueriesAreGivenBeforeCreateConnectionResolves() + { + $connection = $this->getMockBuilder('React\Mysql\Io\Connection')->disableOriginalConstructor()->getMock(); + $connection->expects($this->exactly(2))->method('query')->withConsecutive( + ['SELECT 1'], + ['SELECT 2'] + )->willReturnOnConsecutiveCalls( + \React\Promise\reject(new \RuntimeException()), + new Promise(function () { }) + ); + $connection->expects($this->never())->method('isBusy'); + + $deferred = new Deferred(); + $factory = $this->getMockBuilder('React\Mysql\Io\Factory')->disableOriginalConstructor()->getMock(); + $factory->expects($this->once())->method('createConnection')->willReturn($deferred->promise()); + $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); + + $mysql = new MysqlClient('', null, $loop); + + $ref = new \ReflectionProperty($mysql, 'factory'); + $ref->setAccessible(true); + $ref->setValue($mysql, $factory); + + $promise1 = $mysql->query('SELECT 1'); + + $promise2 = $mysql->query('SELECT 2'); + + $deferred->resolve($connection); + + $promise1->then(null, $this->expectCallableOnce()); + $promise2->then($this->expectCallableNever(), $this->expectCallableNever()); + } + + public function testQueryStreamWillCreateNewConnectionAndReturnReadableStreamWhenConnectionIsPending() + { + $factory = $this->getMockBuilder('React\Mysql\Io\Factory')->disableOriginalConstructor()->getMock(); + $factory->expects($this->once())->method('createConnection')->willReturn(new Promise(function () { })); + + $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); + + $mysql = new MysqlClient('', null, $loop); + + $ref = new \ReflectionProperty($mysql, 'factory'); + $ref->setAccessible(true); + $ref->setValue($mysql, $factory); + + $stream = $mysql->queryStream('SELECT 1'); + + $this->assertTrue($stream->isReadable()); + } + + public function testQueryStreamWillCreateNewConnectionAndReturnReadableStreamWhenConnectionResolvesAndQueryStreamOnConnectionReturnsReadableStream() + { + $connection = $this->getMockBuilder('React\Mysql\Io\Connection')->disableOriginalConstructor()->getMock(); + $connection->expects($this->once())->method('queryStream')->with('SELECT 1')->willReturn(new ThroughStream()); + + $factory = $this->getMockBuilder('React\Mysql\Io\Factory')->disableOriginalConstructor()->getMock(); + $factory->expects($this->once())->method('createConnection')->willReturn(\React\Promise\resolve($connection)); + $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); + + $mysql = new MysqlClient('', null, $loop); + + $ref = new \ReflectionProperty($mysql, 'factory'); + $ref->setAccessible(true); + $ref->setValue($mysql, $factory); + + $stream = $mysql->queryStream('SELECT 1'); + + $this->assertTrue($stream->isReadable()); + } + + public function testQueryStreamTwiceWillCallQueryStreamOnConnectionOnlyOnceWhenQueryStreamIsStillReadable() + { + $connection = $this->getMockBuilder('React\Mysql\Io\Connection')->disableOriginalConstructor()->getMock(); + $connection->expects($this->once())->method('queryStream')->with('SELECT 1')->willReturn(new ThroughStream()); + $connection->expects($this->once())->method('isBusy')->willReturn(true); + + $factory = $this->getMockBuilder('React\Mysql\Io\Factory')->disableOriginalConstructor()->getMock(); + $factory->expects($this->once())->method('createConnection')->willReturn(\React\Promise\resolve($connection)); + $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); + + $mysql = new MysqlClient('', null, $loop); + + $ref = new \ReflectionProperty($mysql, 'factory'); + $ref->setAccessible(true); + $ref->setValue($mysql, $factory); + + $mysql->queryStream('SELECT 1'); + + $stream = $mysql->queryStream('SELECT 2'); + + $this->assertTrue($stream->isReadable()); + } + + public function testQueryStreamTwiceWillReuseConnectionForSecondQueryStreamWhenFirstQueryStreamEnds() + { + $connection = $this->getMockBuilder('React\Mysql\Io\Connection')->disableOriginalConstructor()->getMock(); + $connection->expects($this->exactly(2))->method('queryStream')->withConsecutive( + ['SELECT 1'], + ['SELECT 2'] + )->willReturnOnConsecutiveCalls( + $base = new ThroughStream(), + new ThroughStream() + ); + $connection->expects($this->once())->method('isBusy')->willReturn(false); + + $factory = $this->getMockBuilder('React\Mysql\Io\Factory')->disableOriginalConstructor()->getMock(); + $factory->expects($this->once())->method('createConnection')->willReturn(\React\Promise\resolve($connection)); + $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); + + $mysql = new MysqlClient('', null, $loop); + + $ref = new \ReflectionProperty($mysql, 'factory'); + $ref->setAccessible(true); + $ref->setValue($mysql, $factory); + + $mysql->queryStream('SELECT 1'); + + $base->end(); + + $stream = $mysql->queryStream('SELECT 2'); + + $this->assertTrue($stream->isReadable()); + } + + public function testQueryStreamTwiceWillReuseConnectionForSecondQueryStreamWhenFirstQueryStreamEmitsError() + { + $connection = $this->getMockBuilder('React\Mysql\Io\Connection')->disableOriginalConstructor()->getMock(); + $connection->expects($this->exactly(2))->method('queryStream')->withConsecutive( + ['SELECT 1'], + ['SELECT 2'] + )->willReturnOnConsecutiveCalls( + $base = new ThroughStream(), + new ThroughStream() + ); + $connection->expects($this->once())->method('isBusy')->willReturn(true); + + $factory = $this->getMockBuilder('React\Mysql\Io\Factory')->disableOriginalConstructor()->getMock(); + $factory->expects($this->once())->method('createConnection')->willReturn(\React\Promise\resolve($connection)); + $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); + + $mysql = new MysqlClient('', null, $loop); + + $ref = new \ReflectionProperty($mysql, 'factory'); + $ref->setAccessible(true); + $ref->setValue($mysql, $factory); + + $stream1 = $mysql->queryStream('SELECT 1'); + $stream2 = $mysql->queryStream('SELECT 2'); + + $this->assertTrue($stream1->isReadable()); + $this->assertTrue($stream2->isReadable()); + + $base->emit('error', [new \RuntimeException()]); + + $this->assertFalse($stream1->isReadable()); + $this->assertTrue($stream2->isReadable()); + } + + public function testQueryStreamTwiceWillWaitForFirstQueryStreamToEndBeforeStartingSecondQueryStreamWhenFirstQueryStreamIsExplicitlyClosed() + { + $connection = $this->getMockBuilder('React\Mysql\Io\Connection')->disableOriginalConstructor()->getMock(); + $connection->expects($this->once())->method('queryStream')->with('SELECT 1')->willReturn(new ThroughStream()); + $connection->expects($this->once())->method('isBusy')->willReturn(true); + + $factory = $this->getMockBuilder('React\Mysql\Io\Factory')->disableOriginalConstructor()->getMock(); + $factory->expects($this->once())->method('createConnection')->willReturn(\React\Promise\resolve($connection)); + $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); + + $mysql = new MysqlClient('', null, $loop); + + $ref = new \ReflectionProperty($mysql, 'factory'); + $ref->setAccessible(true); + $ref->setValue($mysql, $factory); + + $stream1 = $mysql->queryStream('SELECT 1'); + $stream2 = $mysql->queryStream('SELECT 2'); + + $this->assertTrue($stream1->isReadable()); + $this->assertTrue($stream2->isReadable()); + + $stream1->close(); + + $this->assertFalse($stream1->isReadable()); + $this->assertTrue($stream2->isReadable()); + } + + public function testQueryStreamTwiceWillCallSecondQueryStreamOnConnectionAfterFirstQueryStreamIsClosedWhenBothQueriesAreGivenBeforeCreateConnectionResolves() + { + $connection = $this->getMockBuilder('React\Mysql\Io\Connection')->disableOriginalConstructor()->getMock(); + $connection->expects($this->exactly(2))->method('queryStream')->withConsecutive( + ['SELECT 1'], + ['SELECT 2'] + )->willReturnOnConsecutiveCalls( + $base = new ThroughStream(), + new ThroughStream() + ); + $connection->expects($this->never())->method('isBusy'); + + $deferred = new Deferred(); + $factory = $this->getMockBuilder('React\Mysql\Io\Factory')->disableOriginalConstructor()->getMock(); + $factory->expects($this->once())->method('createConnection')->willReturn($deferred->promise()); + $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); + + $mysql = new MysqlClient('', null, $loop); + + $ref = new \ReflectionProperty($mysql, 'factory'); + $ref->setAccessible(true); + $ref->setValue($mysql, $factory); + + $mysql->queryStream('SELECT 1'); + + $stream = $mysql->queryStream('SELECT 2'); + + $deferred->resolve($connection); + $base->end(); + + $this->assertTrue($stream->isReadable()); + } + + public function testQueryStreamTwiceWillCreateNewConnectionForSecondQueryStreamWhenFirstConnectionIsClosedAfterFirstQueryStreamIsClosed() + { + $connection = $this->getMockBuilder('React\Mysql\Io\Connection')->disableOriginalConstructor()->setMethods(['queryStream', 'isBusy'])->getMock(); + $connection->expects($this->once())->method('queryStream')->with('SELECT 1')->willReturn($base = new ThroughStream()); + $connection->expects($this->never())->method('isBusy'); + + $factory = $this->getMockBuilder('React\Mysql\Io\Factory')->disableOriginalConstructor()->getMock(); + $factory->expects($this->exactly(2))->method('createConnection')->willReturnOnConsecutiveCalls( + \React\Promise\resolve($connection), + new Promise(function () { }) + ); + $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); + + $mysql = new MysqlClient('', null, $loop); + + $ref = new \ReflectionProperty($mysql, 'factory'); + $ref->setAccessible(true); + $ref->setValue($mysql, $factory); + + $mysql->queryStream('SELECT 1'); + + $base->end(); + assert($connection instanceof Connection); + $connection->emit('close'); + + $stream = $mysql->queryStream('SELECT 2'); + + $this->assertTrue($stream->isReadable()); + } + + public function testQueryStreamTwiceWillCloseFirstConnectionAndCreateNewConnectionForSecondQueryStreamWhenFirstConnectionIsInClosingStateDueToIdleTimerAfterFirstQueryStreamIsClosed() + { + $connection = $this->getMockBuilder('React\Mysql\Io\Connection')->disableOriginalConstructor()->setMethods(['queryStream', 'isBusy', 'close'])->getMock(); + $connection->expects($this->once())->method('queryStream')->with('SELECT 1')->willReturn($base = new ThroughStream()); + $connection->expects($this->once())->method('close'); + $connection->expects($this->never())->method('isBusy'); + + $factory = $this->getMockBuilder('React\Mysql\Io\Factory')->disableOriginalConstructor()->getMock(); + $factory->expects($this->exactly(2))->method('createConnection')->willReturnOnConsecutiveCalls( + \React\Promise\resolve($connection), + new Promise(function () { }) + ); + $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); + + $mysql = new MysqlClient('', null, $loop); + + $mysql->on('close', $this->expectCallableNever()); + + $ref = new \ReflectionProperty($mysql, 'factory'); + $ref->setAccessible(true); + $ref->setValue($mysql, $factory); + + $mysql->queryStream('SELECT 1'); + + $base->end(); + // emulate triggering idle timer by setting connection state to closing + $connection->state = Connection::STATE_CLOSING; + + $stream = $mysql->queryStream('SELECT 2'); + + $this->assertTrue($stream->isReadable()); + } + + public function testQueryStreamTwiceWillEmitErrorOnFirstQueryStreamWhenCreateConnectionRejectsAndWillCreateNewConnectionForSecondQueryStream() + { + $factory = $this->getMockBuilder('React\Mysql\Io\Factory')->disableOriginalConstructor()->getMock(); + $factory->expects($this->exactly(2))->method('createConnection')->willReturnOnConsecutiveCalls( + \React\Promise\reject(new \RuntimeException()), + new Promise(function () { }) + ); + $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); + + $mysql = new MysqlClient('', null, $loop); + + $ref = new \ReflectionProperty($mysql, 'factory'); + $ref->setAccessible(true); + $ref->setValue($mysql, $factory); + + $stream1 = $mysql->queryStream('SELECT 1'); + + $this->assertFalse($stream1->isReadable()); + + $stream2 = $mysql->queryStream('SELECT 2'); + + $this->assertTrue($stream2->isReadable()); + } + + public function testQueryStreamTwiceWillEmitErrorOnBothQueriesWhenBothQueriesAreGivenBeforeCreateConnectionRejects() + { + $deferred = new Deferred(); + $factory = $this->getMockBuilder('React\Mysql\Io\Factory')->disableOriginalConstructor()->getMock(); + $factory->expects($this->once())->method('createConnection')->willReturn($deferred->promise()); + $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); + + $mysql = new MysqlClient('', null, $loop); + + $ref = new \ReflectionProperty($mysql, 'factory'); + $ref->setAccessible(true); + $ref->setValue($mysql, $factory); + + $stream1 = $mysql->queryStream('SELECT 1'); + $stream2 = $mysql->queryStream('SELECT 2'); + + $stream1->on('error', $this->expectCallableOnceWith($this->isInstanceOf('Exception'))); + $stream1->on('close', $this->expectCallableOnce()); + + $stream2->on('error', $this->expectCallableOnceWith($this->isInstanceOf('Exception'))); + $stream2->on('close', $this->expectCallableOnce()); + + $deferred->reject(new \RuntimeException()); + + $this->assertFalse($stream1->isReadable()); + $this->assertFalse($stream2->isReadable()); + } + + public function testPingWillCreateNewConnectionAndReturnPendingPromiseWhenConnectionIsPending() + { + $factory = $this->getMockBuilder('React\Mysql\Io\Factory')->disableOriginalConstructor()->getMock(); + $factory->expects($this->once())->method('createConnection')->willReturn(new Promise(function () { })); + + $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); + + $mysql = new MysqlClient('', null, $loop); + + $ref = new \ReflectionProperty($mysql, 'factory'); + $ref->setAccessible(true); + $ref->setValue($mysql, $factory); + + $promise = $mysql->ping(); + + $promise->then($this->expectCallableNever(), $this->expectCallableNever()); + } + + public function testPingWillCreateNewConnectionAndReturnPendingPromiseWhenConnectionResolvesAndPingOnConnectionIsPending() + { + $connection = $this->getMockBuilder('React\Mysql\Io\Connection')->disableOriginalConstructor()->getMock(); + $connection->expects($this->once())->method('ping')->willReturn(new Promise(function () { })); + + $factory = $this->getMockBuilder('React\Mysql\Io\Factory')->disableOriginalConstructor()->getMock(); + $factory->expects($this->once())->method('createConnection')->willReturn(\React\Promise\resolve($connection)); + $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); + + $mysql = new MysqlClient('', null, $loop); + + $ref = new \ReflectionProperty($mysql, 'factory'); + $ref->setAccessible(true); + $ref->setValue($mysql, $factory); + + $promise = $mysql->ping(); + + $promise->then($this->expectCallableNever(), $this->expectCallableNever()); + } + + public function testPingWillReturnResolvedPromiseWhenPingOnConnectionResolves() + { + $connection = $this->getMockBuilder('React\Mysql\Io\Connection')->disableOriginalConstructor()->getMock(); + $connection->expects($this->once())->method('ping')->willReturn(\React\Promise\resolve(null)); + + $factory = $this->getMockBuilder('React\Mysql\Io\Factory')->disableOriginalConstructor()->getMock(); + $factory->expects($this->once())->method('createConnection')->willReturn(\React\Promise\resolve($connection)); + $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); + + $mysql = new MysqlClient('', null, $loop); + + $ref = new \ReflectionProperty($mysql, 'factory'); + $ref->setAccessible(true); + $ref->setValue($mysql, $factory); + + $promise = $mysql->ping(); + + $promise->then($this->expectCallableOnce()); + } + + public function testPingWillReturnRejectedPromiseWhenCreateConnectionRejects() + { + $factory = $this->getMockBuilder('React\Mysql\Io\Factory')->disableOriginalConstructor()->getMock(); + $factory->expects($this->once())->method('createConnection')->willReturn(\React\Promise\reject(new \RuntimeException())); + $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); + + $mysql = new MysqlClient('', null, $loop); + + $ref = new \ReflectionProperty($mysql, 'factory'); + $ref->setAccessible(true); + $ref->setValue($mysql, $factory); + + $promise = $mysql->ping(); + + $promise->then(null, $this->expectCallableOnce()); + } + + public function testPingWillReturnRejectedPromiseWhenPingOnConnectionRejectsAfterCreateConnectionResolves() + { + $connection = $this->getMockBuilder('React\Mysql\Io\Connection')->disableOriginalConstructor()->getMock(); + $connection->expects($this->once())->method('ping')->willReturn(\React\Promise\reject(new \RuntimeException())); + + $factory = $this->getMockBuilder('React\Mysql\Io\Factory')->disableOriginalConstructor()->getMock(); + $factory->expects($this->once())->method('createConnection')->willReturn(\React\Promise\resolve($connection)); + $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); + + $mysql = new MysqlClient('', null, $loop); + + $ref = new \ReflectionProperty($mysql, 'factory'); + $ref->setAccessible(true); + $ref->setValue($mysql, $factory); + + $promise = $mysql->ping(); + + $promise->then(null, $this->expectCallableOnce()); + } + + public function testPingTwiceWillCreateSingleConnectionAndReturnPendingPromiseWhenCreateConnectionIsPending() + { + $factory = $this->getMockBuilder('React\Mysql\Io\Factory')->disableOriginalConstructor()->getMock(); + $factory->expects($this->once())->method('createConnection')->willReturn(new Promise(function () { })); + + $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); + + $mysql = new MysqlClient('', null, $loop); + + $ref = new \ReflectionProperty($mysql, 'factory'); + $ref->setAccessible(true); + $ref->setValue($mysql, $factory); + + $mysql->ping(); + + $promise = $mysql->ping(); + + $promise->then($this->expectCallableNever(), $this->expectCallableNever()); + } + + public function testPingTwiceWillCallPingOnConnectionOnlyOnceWhenPingIsStillPending() + { + $connection = $this->getMockBuilder('React\Mysql\Io\Connection')->disableOriginalConstructor()->getMock(); + $connection->expects($this->once())->method('ping')->willReturn(new Promise(function () { })); + $connection->expects($this->once())->method('isBusy')->willReturn(true); + + $factory = $this->getMockBuilder('React\Mysql\Io\Factory')->disableOriginalConstructor()->getMock(); + $factory->expects($this->once())->method('createConnection')->willReturn(\React\Promise\resolve($connection)); + $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); + + $mysql = new MysqlClient('', null, $loop); + + $ref = new \ReflectionProperty($mysql, 'factory'); + $ref->setAccessible(true); + $ref->setValue($mysql, $factory); + + $mysql->ping(); + + $promise = $mysql->ping(); + + $promise->then($this->expectCallableNever(), $this->expectCallableNever()); + } + + public function testPingTwiceWillReuseConnectionForSecondPingWhenFirstPingIsAlreadyResolved() + { + $connection = $this->getMockBuilder('React\Mysql\Io\Connection')->disableOriginalConstructor()->getMock(); + $connection->expects($this->exactly(2))->method('ping')->willReturnOnConsecutiveCalls( + \React\Promise\resolve(null), + new Promise(function () { }) + ); + $connection->expects($this->once())->method('isBusy')->willReturn(false); + + $factory = $this->getMockBuilder('React\Mysql\Io\Factory')->disableOriginalConstructor()->getMock(); + $factory->expects($this->once())->method('createConnection')->willReturn(\React\Promise\resolve($connection)); + $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); + + $mysql = new MysqlClient('', null, $loop); + + $ref = new \ReflectionProperty($mysql, 'factory'); + $ref->setAccessible(true); + $ref->setValue($mysql, $factory); + + $mysql->ping(); + + $promise = $mysql->ping(); + + $promise->then($this->expectCallableNever(), $this->expectCallableNever()); + } + + public function testPingTwiceWillCallSecondPingOnConnectionAfterFirstPingResolvesWhenBothQueriesAreGivenBeforeCreateConnectionResolves() + { + $connection = $this->getMockBuilder('React\Mysql\Io\Connection')->disableOriginalConstructor()->getMock(); + $connection->expects($this->exactly(2))->method('ping')->willReturnOnConsecutiveCalls( + \React\Promise\resolve(new MysqlResult()), + new Promise(function () { }) + ); + $connection->expects($this->never())->method('isBusy'); + + $deferred = new Deferred(); + $factory = $this->getMockBuilder('React\Mysql\Io\Factory')->disableOriginalConstructor()->getMock(); + $factory->expects($this->once())->method('createConnection')->willReturn($deferred->promise()); + $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); + + $mysql = new MysqlClient('', null, $loop); + + $ref = new \ReflectionProperty($mysql, 'factory'); + $ref->setAccessible(true); + $ref->setValue($mysql, $factory); + + $mysql->ping(); + + $promise = $mysql->ping(); + + $deferred->resolve($connection); + + $promise->then($this->expectCallableNever(), $this->expectCallableNever()); + } + + public function testPingTwiceWillCreateNewConnectionForSecondPingWhenFirstConnectionIsClosedAfterFirstPingIsResolved() + { + $connection = $this->getMockBuilder('React\Mysql\Io\Connection')->disableOriginalConstructor()->setMethods(['ping', 'isBusy'])->getMock(); + $connection->expects($this->once())->method('ping')->willReturn(\React\Promise\resolve(null)); + $connection->expects($this->never())->method('isBusy'); + + $factory = $this->getMockBuilder('React\Mysql\Io\Factory')->disableOriginalConstructor()->getMock(); + $factory->expects($this->exactly(2))->method('createConnection')->willReturnOnConsecutiveCalls( + \React\Promise\resolve($connection), + new Promise(function () { }) + ); + $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); + + $mysql = new MysqlClient('', null, $loop); + + $ref = new \ReflectionProperty($mysql, 'factory'); + $ref->setAccessible(true); + $ref->setValue($mysql, $factory); + + $mysql->ping(); + + assert($connection instanceof Connection); + $connection->emit('close'); + + $promise = $mysql->ping(); + + $promise->then($this->expectCallableNever(), $this->expectCallableNever()); + } + + public function testPingTwiceWillCloseFirstConnectionAndCreateNewConnectionForSecondPingWhenFirstConnectionIsInClosingStateDueToIdleTimerAfterFirstPingIsResolved() + { + $connection = $this->getMockBuilder('React\Mysql\Io\Connection')->disableOriginalConstructor()->setMethods(['ping', 'isBusy', 'close'])->getMock(); + $connection->expects($this->once())->method('ping')->willReturn(\React\Promise\resolve(null)); + $connection->expects($this->once())->method('close'); + $connection->expects($this->never())->method('isBusy'); + + $factory = $this->getMockBuilder('React\Mysql\Io\Factory')->disableOriginalConstructor()->getMock(); + $factory->expects($this->exactly(2))->method('createConnection')->willReturnOnConsecutiveCalls( + \React\Promise\resolve($connection), + new Promise(function () { }) + ); + $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); + + $mysql = new MysqlClient('', null, $loop); + + $mysql->on('close', $this->expectCallableNever()); + + $ref = new \ReflectionProperty($mysql, 'factory'); + $ref->setAccessible(true); + $ref->setValue($mysql, $factory); + + $mysql->ping(); + + // emulate triggering idle timer by setting connection state to closing + $connection->state = Connection::STATE_CLOSING; + + $promise = $mysql->ping(); + + $promise->then($this->expectCallableNever(), $this->expectCallableNever()); + } + + public function testPingTwiceWillRejectFirstPingWhenCreateConnectionRejectsAndWillCreateNewConnectionForSecondPing() + { + $factory = $this->getMockBuilder('React\Mysql\Io\Factory')->disableOriginalConstructor()->getMock(); + $factory->expects($this->exactly(2))->method('createConnection')->willReturnOnConsecutiveCalls( + \React\Promise\reject(new \RuntimeException()), + new Promise(function () { }) + ); + $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); + + $mysql = new MysqlClient('', null, $loop); + + $ref = new \ReflectionProperty($mysql, 'factory'); + $ref->setAccessible(true); + $ref->setValue($mysql, $factory); + + $promise1 = $mysql->ping(); + + $promise2 = $mysql->ping(); + + $promise1->then(null, $this->expectCallableOnce()); + + $promise2->then($this->expectCallableNever(), $this->expectCallableNever()); + } + + public function testPingTwiceWillRejectBothQueriesWhenBothQueriesAreGivenBeforeCreateConnectionRejects() { $deferred = new Deferred(); $factory = $this->getMockBuilder('React\Mysql\Io\Factory')->disableOriginalConstructor()->getMock(); $factory->expects($this->once())->method('createConnection')->willReturn($deferred->promise()); + $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); + + $mysql = new MysqlClient('', null, $loop); + + $ref = new \ReflectionProperty($mysql, 'factory'); + $ref->setAccessible(true); + $ref->setValue($mysql, $factory); + $promise1 = $mysql->ping(); + $promise2 = $mysql->ping(); + + $deferred->reject(new \RuntimeException()); + + $promise1->then(null, $this->expectCallableOnce()); + $promise2->then(null, $this->expectCallableOnce()); + } + + public function testPingTriceWillRejectFirstTwoQueriesAndKeepThirdPendingWhenTwoQueriesAreGivenBeforeCreateConnectionRejects() + { + $deferred = new Deferred(); + $factory = $this->getMockBuilder('React\Mysql\Io\Factory')->disableOriginalConstructor()->getMock(); + $factory->expects($this->exactly(2))->method('createConnection')->willReturnOnConsecutiveCalls( + $deferred->promise(), + new Promise(function () { }) + ); $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); - $connection = new MysqlClient('', null, $loop); + $mysql = new MysqlClient('', null, $loop); - $ref = new \ReflectionProperty($connection, 'factory'); + $ref = new \ReflectionProperty($mysql, 'factory'); $ref->setAccessible(true); - $ref->setValue($connection, $factory); + $ref->setValue($mysql, $factory); - $ret = $connection->query('SELECT 1'); + $promise1 = $mysql->ping(); + $promise2 = $mysql->ping(); - $this->assertTrue($ret instanceof PromiseInterface); - $ret->then($this->expectCallableNever(), $this->expectCallableNever()); + $promise3 = $promise1->then(null, function () use ($mysql) { + return $mysql->ping(); + }); + + $deferred->reject(new \RuntimeException()); + + $promise1->then(null, $this->expectCallableOnce()); + $promise2->then(null, $this->expectCallableOnce()); + $promise3->then($this->expectCallableNever(), $this->expectCallableNever()); } - public function testQueryWillQueryUnderlyingConnectionWhenResolved() + public function testPingTwiceWillCallSecondPingOnConnectionAfterFirstPingRejectsWhenBothQueriesAreGivenBeforeCreateConnectionResolves() { - $base = $this->getMockBuilder('React\Mysql\Io\Connection')->disableOriginalConstructor()->getMock(); - $base->expects($this->once())->method('query')->with('SELECT 1')->willReturn(new Promise(function () { })); + $connection = $this->getMockBuilder('React\Mysql\Io\Connection')->disableOriginalConstructor()->getMock(); + $connection->expects($this->exactly(2))->method('ping')->willReturnOnConsecutiveCalls( + \React\Promise\reject(new \RuntimeException()), + new Promise(function () { }) + ); + $connection->expects($this->never())->method('isBusy'); + $deferred = new Deferred(); $factory = $this->getMockBuilder('React\Mysql\Io\Factory')->disableOriginalConstructor()->getMock(); - $factory->expects($this->once())->method('createConnection')->willReturn(\React\Promise\resolve($base)); + $factory->expects($this->once())->method('createConnection')->willReturn($deferred->promise()); $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); - $connection = new MysqlClient('', null, $loop); + $mysql = new MysqlClient('', null, $loop); - $ref = new \ReflectionProperty($connection, 'factory'); + $ref = new \ReflectionProperty($mysql, 'factory'); $ref->setAccessible(true); - $ref->setValue($connection, $factory); + $ref->setValue($mysql, $factory); - $connection->query('SELECT 1'); + $promise1 = $mysql->ping(); + + $promise2 = $mysql->ping(); + + $deferred->resolve($connection); + + $promise1->then(null, $this->expectCallableOnce()); + $promise2->then($this->expectCallableNever(), $this->expectCallableNever()); } public function testQueryWillResolveWhenQueryFromUnderlyingConnectionResolves() @@ -571,7 +1565,7 @@ public function testQuitAfterPingReturnsPendingPromiseWhenConnectionIsPending() public function testQuitAfterPingRejectsAndThenEmitsCloseWhenFactoryFailsToCreateUnderlyingConnection() { $deferred = new Deferred(); - $factory = $this->getMockBuilder('React\MySQL\Io\Factory')->disableOriginalConstructor()->getMock(); + $factory = $this->getMockBuilder('React\Mysql\Io\Factory')->disableOriginalConstructor()->getMock(); $factory->expects($this->once())->method('createConnection')->willReturn($deferred->promise()); $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); @@ -674,12 +1668,13 @@ public function testQuitAfterPingRejectsAndThenEmitsCloseWhenUnderlyingConnectio $deferred->reject(new \RuntimeException()); } - public function testPingAfterQuitWillPassPingCommandToConnectionWhenItIsStillQuitting() + public function testPingAfterQuitWillNotPassPingCommandToConnection() { - $connection = $this->getMockBuilder('React\Mysql\Io\Connection')->setMethods(['ping', 'quit', 'close'])->disableOriginalConstructor()->getMock(); - $connection->expects($this->exactly(2))->method('ping')->willReturn(\React\Promise\resolve(null)); + $connection = $this->getMockBuilder('React\Mysql\Io\Connection')->setMethods(['ping', 'quit', 'close', 'isBusy'])->disableOriginalConstructor()->getMock(); + $connection->expects($this->once())->method('ping')->willReturn(\React\Promise\resolve(null)); $connection->expects($this->once())->method('quit')->willReturn(new Promise(function () { })); $connection->expects($this->never())->method('close'); + $connection->expects($this->once())->method('isBusy')->willReturn(false); $factory = $this->getMockBuilder('React\Mysql\Io\Factory')->disableOriginalConstructor()->getMock(); $factory->expects($this->once())->method('createConnection')->willReturn(\React\Promise\resolve($connection)); @@ -698,7 +1693,7 @@ public function testPingAfterQuitWillPassPingCommandToConnectionWhenItIsStillQui $mysql->quit(); - $mysql->ping(); + $mysql->ping()->then(null, $this->expectCallableOnce()); } public function testCloseEmitsCloseImmediatelyWhenConnectionIsNotAlreadyPending() @@ -732,7 +1727,7 @@ public function testCloseAfterPingCancelsPendingConnection() $ref->setAccessible(true); $ref->setValue($connection, $factory); - $connection->ping(); + $connection->ping()->then(null, $this->expectCallableOnce()); $connection->close(); } @@ -808,9 +1803,7 @@ public function testCloseAfterPingHasResolvedWillCloseUnderlyingConnectionWithou { $base = $this->getMockBuilder('React\Mysql\Io\Connection')->setMethods(['ping', 'close'])->disableOriginalConstructor()->getMock(); $base->expects($this->once())->method('ping')->willReturn(\React\Promise\resolve(null)); - $base->expects($this->once())->method('close')->willReturnCallback(function () use ($base) { - $base->emit('close'); - }); + $base->expects($this->once())->method('close'); $factory = $this->getMockBuilder('React\Mysql\Io\Factory')->disableOriginalConstructor()->getMock(); $factory->expects($this->once())->method('createConnection')->willReturn(\React\Promise\resolve($base)); @@ -891,7 +1884,7 @@ public function testCloseTwiceAfterPingEmitsCloseEventOnceWhenConnectionIsPendin $connection->on('error', $this->expectCallableNever()); $connection->on('close', $this->expectCallableOnce()); - $connection->ping(); + $connection->ping()->then(null, $this->expectCallableOnce()); $connection->close(); $connection->close(); }