diff --git a/composer.json b/composer.json index 335b526..007b36f 100644 --- a/composer.json +++ b/composer.json @@ -16,7 +16,6 @@ "evenement/evenement": "^3.0 || ^2.0 || ^1.0", "react/event-loop": "^1.2", "react/promise": "^3", - "react/promise-timer": "^1.10", "react/socket": "^1.15" }, "require-dev": { diff --git a/src/Io/Factory.php b/src/Io/Factory.php index b6778b9..a669793 100644 --- a/src/Io/Factory.php +++ b/src/Io/Factory.php @@ -6,13 +6,12 @@ use React\EventLoop\Loop; use React\EventLoop\LoopInterface; use React\Promise\Deferred; +use React\Promise\Promise; use React\Promise\PromiseInterface; -use React\Promise\Timer\TimeoutException; use React\Socket\ConnectionInterface; use React\Socket\Connector; use React\Socket\ConnectorInterface; use function React\Promise\reject; -use function React\Promise\Timer\timeout; /** * @internal @@ -175,14 +174,53 @@ function (\Exception $e) use ($redis, $uri) { return $deferred->promise(); } - return timeout($deferred->promise(), $timeout, $this->loop)->then(null, function (\Throwable $e) use ($uri) { - if ($e instanceof TimeoutException) { - throw new \RuntimeException( - 'Connection to ' . $uri . ' timed out after ' . $e->getTimeout() . ' seconds (ETIMEDOUT)', - defined('SOCKET_ETIMEDOUT') ? SOCKET_ETIMEDOUT : 110 - ); + $promise = $deferred->promise(); + + /** @var Promise */ + $ret = new Promise(function (callable $resolve, callable $reject) use ($timeout, $promise, $uri): void { + /** @var ?\React\EventLoop\TimerInterface */ + $timer = null; + $promise = $promise->then(function (StreamingClient $v) use (&$timer, $resolve): void { + if ($timer) { + $this->loop->cancelTimer($timer); + } + $timer = false; + $resolve($v); + }, function (\Throwable $e) use (&$timer, $reject): void { + if ($timer) { + $this->loop->cancelTimer($timer); + } + $timer = false; + $reject($e); + }); + + // promise already settled => no need to start timer + if ($timer === false) { + return; } - throw $e; + + // start timeout timer which will cancel the pending promise + $timer = $this->loop->addTimer($timeout, function () use ($timeout, &$promise, $reject, $uri): void { + $reject(new \RuntimeException( + 'Connection to ' . $uri . ' timed out after ' . $timeout . ' seconds (ETIMEDOUT)', + \defined('SOCKET_ETIMEDOUT') ? \SOCKET_ETIMEDOUT : 110 + )); + + // Cancel pending connection to clean up any underlying resources and references. + // Avoid garbage references in call stack by passing pending promise by reference. + \assert($promise instanceof PromiseInterface); + $promise->cancel(); + $promise = null; + }); + }, function () use (&$promise): void { + // Cancelling this promise will cancel the pending connection, thus triggering the rejection logic above. + // Avoid garbage references in call stack by passing pending promise by reference. + \assert($promise instanceof PromiseInterface); + $promise->cancel(); + $promise = null; }); + + // variable assignment needed for legacy PHPStan on PHP 7.1 only + return $ret; } } diff --git a/tests/FunctionalTest.php b/tests/FunctionalTest.php index 767c530..c1fe42b 100644 --- a/tests/FunctionalTest.php +++ b/tests/FunctionalTest.php @@ -4,10 +4,9 @@ use Clue\React\Redis\RedisClient; use React\EventLoop\Loop; -use React\Promise\Deferred; +use React\Promise\Promise; use React\Promise\PromiseInterface; use function React\Async\await; -use function React\Promise\Timer\timeout; class FunctionalTest extends TestCase { @@ -144,10 +143,7 @@ public function testPubSub(): void $channel = 'channel:test:' . mt_rand(); // consumer receives a single message - /** @var Deferred */ - $deferred = new Deferred(); $consumer->on('message', $this->expectCallableOnce()); - $consumer->on('message', [$deferred, 'resolve']); $once = $this->expectCallableOnceWith(1); $consumer->subscribe($channel)->then(function() use ($producer, $channel, $once){ // producer sends a single message @@ -155,8 +151,16 @@ public function testPubSub(): void })->then($this->expectCallableOnce()); // expect "message" event to take no longer than 0.1s - - await(timeout($deferred->promise(), 0.1)); + await(new Promise(function (callable $resolve, callable $reject) use ($consumer): void { + $timeout = Loop::addTimer(0.1, function () use ($consumer, $reject): void { + $consumer->close(); + $reject(new \RuntimeException('Timed out')); + }); + $consumer->on('message', function () use ($timeout, $resolve): void { + Loop::cancelTimer($timeout); + $resolve(null); + }); + })); /** @var PromiseInterface */ $promise = $consumer->unsubscribe($channel); diff --git a/tests/Io/FactoryStreamingClientTest.php b/tests/Io/FactoryStreamingClientTest.php index fba36e8..669b069 100644 --- a/tests/Io/FactoryStreamingClientTest.php +++ b/tests/Io/FactoryStreamingClientTest.php @@ -7,6 +7,7 @@ use Clue\Tests\React\Redis\TestCase; use PHPUnit\Framework\MockObject\MockObject; use React\EventLoop\LoopInterface; +use React\EventLoop\TimerInterface; use React\Promise\Deferred; use React\Socket\ConnectionInterface; use React\Socket\ConnectorInterface; @@ -633,4 +634,35 @@ public function testCreateClientWithoutTimeoutParameterWillStartTimerWithDefault $this->factory->createClient('redis://127.0.0.1:2'); ini_set('default_socket_timeout', $old); } + + public function testCreateClientWillCancelTimerWhenConnectionResolves(): void + { + $timer = $this->createMock(TimerInterface::class); + $this->loop->expects($this->once())->method('addTimer')->willReturn($timer); + $this->loop->expects($this->once())->method('cancelTimer')->with($timer); + + $deferred = new Deferred(); + $this->connector->expects($this->once())->method('connect')->with('127.0.0.1:6379')->willReturn($deferred->promise()); + + $promise = $this->factory->createClient('127.0.0.1'); + $promise->then($this->expectCallableOnce()); + + $deferred->resolve($this->createMock(ConnectionInterface::class)); + } + + public function testCreateClientWillCancelTimerWhenConnectionRejects(): void + { + $timer = $this->createMock(TimerInterface::class); + $this->loop->expects($this->once())->method('addTimer')->willReturn($timer); + $this->loop->expects($this->once())->method('cancelTimer')->with($timer); + + $deferred = new Deferred(); + $this->connector->expects($this->once())->method('connect')->with('127.0.0.1:6379')->willReturn($deferred->promise()); + + $promise = $this->factory->createClient('127.0.0.1'); + + $promise->then(null, $this->expectCallableOnceWith($this->isInstanceOf('RuntimeException'))); + + $deferred->reject(new \RuntimeException()); + } }