Skip to content

Commit

Permalink
shutdown with error whenever disconnected with amqp server
Browse files Browse the repository at this point in the history
  • Loading branch information
hsldymq committed Mar 29, 2019
1 parent 797b701 commit a9c850f
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 12 deletions.
14 changes: 3 additions & 11 deletions src/BunnyAsyncClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,9 @@ public function onDataAvailable()
parent::onDataAvailable();
} catch (\Throwable $e) {
$this->eventLoop->removeReadStream($this->getStream());
if (@feof($this->stream)) {
$this->state = ClientStateEnum::ERROR;
}

throw $e;
$this->eventLoop->futureTick(function () use ($e) {
$this->emit('error', [$e, $this]);
});
}
}

Expand Down Expand Up @@ -62,10 +60,4 @@ function (\Throwable $e) use ($deferred) {

return $deferred->promise();
}

public function isConnected()
{
$isEof = @feof($this->stream);
return parent::isConnected() && !$isEof;
}
}
7 changes: 6 additions & 1 deletion src/Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class Connection implements AMQPConnectionInterface
private $connectionOptions;

/**
* @var Client
* @var BunnyAsyncClient
*/
private $client;

Expand Down Expand Up @@ -128,6 +128,11 @@ public function connect(LoopInterface $eventLoop, ConsumerHandlerInterface $hand
$this->eventLoop = $eventLoop;
$this->handler = $handler;
$client = new BunnyAsyncClient($this->eventLoop, $this->connectionOptions);
$client->on('error', function (\Throwable $e) {
$this->state = self::STATE_DISCONNECTED;
throw $e;
});

return $client->connect()
->then(function (Client $client) {
$this->state = self::STATE_CONNECTED;
Expand Down

0 comments on commit a9c850f

Please sign in to comment.