diff --git a/aioamqp/channel.py b/aioamqp/channel.py index 08279be..3f45e28 100644 --- a/aioamqp/channel.py +++ b/aioamqp/channel.py @@ -661,7 +661,7 @@ def basic_deliver(self, frame): @asyncio.coroutine def server_basic_cancel(self, frame): """From the server, means the server won't send anymore messages to this consumer.""" - consumer_tag = frame.arguments['consumer_tag'] + consumer_tag = frame.arguments.get('consumer_tag',None) self.cancelled_consumers.add(consumer_tag) logger.info("consume cancelled received") diff --git a/aioamqp/protocol.py b/aioamqp/protocol.py index f090559..b28adf4 100644 --- a/aioamqp/protocol.py +++ b/aioamqp/protocol.py @@ -97,7 +97,10 @@ def connection_made(self, transport): self._stream_writer = _StreamWriter(transport, self, self._stream_reader, self._loop) def connection_lost(self, exc): - logger.warning("Connection lost exc=%r", exc) + if exc is None: + logger.debug("Connection lost") + else: + logger.warning("Connection lost", exc_info=exc) self.connection_closed.set() self.is_open = False self._close_channels(exception=exc) @@ -199,6 +202,14 @@ def start_connection(self, host, port, login, password, virtualhost, ssl=False, # for now, we read server's responses asynchronously self.worker = ensure_future(self.run(), loop=self._loop) + def done_work(w): + try: + w.result() + except asyncio.CancelledError: + pass + except Exception as exc: + logger.exception("Worker died", exc_info=exc) + self.worker.add_done_callback(done_work) def stop(self): self.is_open = False diff --git a/aioamqp/tests/test_connect.py b/aioamqp/tests/test_connect.py index 0339f4e..c55fb00 100644 --- a/aioamqp/tests/test_connect.py +++ b/aioamqp/tests/test_connect.py @@ -12,7 +12,8 @@ class AmqpConnectionTestCase(testcase.RabbitTestCase, unittest.TestCase): @testing.coroutine def test_connect(self): - _transport, proto = yield from connect(virtualhost=self.vhost, loop=self.loop) + _transport, proto = yield from connect(virtualhost=self.vhost, loop=self.loop, + host=self.host,port=self.port,login=self.login,password=self.password) self.assertTrue(proto.is_open) self.assertIsNotNone(proto.server_properties) yield from proto.close() @@ -29,6 +30,8 @@ def test_connect_tuning(self): channel_max=channel_max, frame_max=frame_max, heartbeat=heartbeat, + host=self.host, port=self.port, + login=self.login, password=self.password, ) self.assertTrue(proto.is_open) self.assertIsNotNone(proto.server_properties) @@ -47,7 +50,8 @@ def test_connect_tuning(self): @testing.coroutine def test_socket_nodelay(self): - transport, proto = yield from connect(virtualhost=self.vhost, loop=self.loop) + transport, proto = yield from connect(virtualhost=self.vhost, loop=self.loop, + host=self.host,port=self.port, login=self.login,password=self.password) sock = transport.get_extra_info('socket') opt_val = sock.getsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY) self.assertEqual(opt_val, 1) diff --git a/aioamqp/tests/test_protocol.py b/aioamqp/tests/test_protocol.py index c02be65..5529ce2 100644 --- a/aioamqp/tests/test_protocol.py +++ b/aioamqp/tests/test_protocol.py @@ -19,7 +19,8 @@ class ProtocolTestCase(testcase.RabbitTestCase, unittest.TestCase): @testing.coroutine def test_connect(self): - _transport, protocol = yield from amqp_connect(virtualhost=self.vhost, loop=self.loop) + _transport, protocol = yield from amqp_connect(virtualhost=self.vhost, loop=self.loop, + host=self.host,port=self.port, login=self.login,password=self.password) self.assertTrue(protocol.is_open) yield from protocol.close() @@ -33,6 +34,8 @@ def test_connect_products_info(self): virtualhost=self.vhost, client_properties=client_properties, loop=self.loop, + host=self.host,port=self.port, + login=self.login,password=self.password ) self.assertEqual(protocol.client_properties, client_properties) @@ -41,11 +44,13 @@ def test_connect_products_info(self): @testing.coroutine def test_connection_unexistant_vhost(self): with self.assertRaises(exceptions.AmqpClosedConnection): - yield from amqp_connect(virtualhost='/unexistant', loop=self.loop) + yield from amqp_connect(virtualhost='/unexistant', loop=self.loop, + host=self.host,port=self.port, login=self.login,password=self.password) def test_connection_wrong_login_password(self): with self.assertRaises(exceptions.AmqpClosedConnection): - self.loop.run_until_complete(amqp_connect(login='wrong', password='wrong', loop=self.loop)) + self.loop.run_until_complete(amqp_connect(login='wrong', password='wrong', loop=self.loop, + host=self.host,port=self.port)) @testing.coroutine def test_connection_from_url(self): diff --git a/aioamqp/tests/testcase.py b/aioamqp/tests/testcase.py index c177b76..2d0709a 100644 --- a/aioamqp/tests/testcase.py +++ b/aioamqp/tests/testcase.py @@ -82,8 +82,11 @@ def setUp(self): super().setUp() self.host = os.environ.get('AMQP_HOST', 'localhost') self.port = os.environ.get('AMQP_PORT', 5672) + self.login = os.environ.get('AMQP_USER', 'guest') + self.password = os.environ.get('AMQP_PASS', 'guest') + self.mgr_port = os.environ.get('AMQP_MGR_PORT', 15672) self.vhost = os.environ.get('AMQP_VHOST', self.VHOST + str(uuid.uuid4())) - self.http_client = pyrabbit.api.Client('localhost:15672/api/', 'guest', 'guest') + self.http_client = pyrabbit.api.Client('%s:%d/api/' % (self.host,self.mgr_port), self.login,self.password) self.amqps = [] self.channels = [] @@ -101,7 +104,7 @@ def reset_vhost(self): self.http_client.create_vhost(self.vhost) self.http_client.set_vhost_permissions( - vname=self.vhost, username='guest', config='.*', rd='.*', wr='.*', + vname=self.vhost, username=self.login, config='.*', rd='.*', wr='.*', ) @asyncio.coroutine @@ -285,7 +288,7 @@ def protocol_factory(*args, **kw): return ProxyAmqpProtocol(self, *args, **kw) vhost = vhost or self.vhost transport, protocol = yield from aioamqp_connect(host=self.host, port=self.port, virtualhost=vhost, - protocol_factory=protocol_factory, loop=self.loop) + protocol_factory=protocol_factory, loop=self.loop, login=self.login, password=self.password) self.amqps.append(protocol) self.transports.append(transport) return transport, protocol