diff --git a/src/BunnyAsyncClient.php b/src/BunnyAsyncClient.php index 718b9c8..b44dec8 100644 --- a/src/BunnyAsyncClient.php +++ b/src/BunnyAsyncClient.php @@ -25,11 +25,9 @@ public function onDataAvailable() parent::onDataAvailable(); } catch (\Throwable $e) { $this->eventLoop->removeReadStream($this->getStream()); - if (@feof($this->stream)) { - $this->state = ClientStateEnum::ERROR; - } - - throw $e; + $this->eventLoop->futureTick(function () use ($e) { + $this->emit('error', [$e, $this]); + }); } } @@ -62,10 +60,4 @@ function (\Throwable $e) use ($deferred) { return $deferred->promise(); } - - public function isConnected() - { - $isEof = @feof($this->stream); - return parent::isConnected() && !$isEof; - } } \ No newline at end of file diff --git a/src/Connection.php b/src/Connection.php index c7b2b74..4b5a844 100644 --- a/src/Connection.php +++ b/src/Connection.php @@ -63,7 +63,7 @@ class Connection implements AMQPConnectionInterface private $connectionOptions; /** - * @var Client + * @var BunnyAsyncClient */ private $client; @@ -128,6 +128,11 @@ public function connect(LoopInterface $eventLoop, ConsumerHandlerInterface $hand $this->eventLoop = $eventLoop; $this->handler = $handler; $client = new BunnyAsyncClient($this->eventLoop, $this->connectionOptions); + $client->on('error', function (\Throwable $e) { + $this->state = self::STATE_DISCONNECTED; + throw $e; + }); + return $client->connect() ->then(function (Client $client) { $this->state = self::STATE_CONNECTED;