@@ -101,26 +101,26 @@ async def _publish(self, data):
101101 raise asyncio .CancelledError ()
102102
103103 async def _listen (self ):
104- async with ( await self . _connection ()) as connection :
105- channel = await self . _channel ( connection )
106- await channel . set_qos ( prefetch_count = 1 )
107- exchange = await self ._exchange ( channel )
108- queue = await self ._queue ( channel , exchange )
109-
110- retry_sleep = 1
111- while True :
112- try :
104+ retry_sleep = 1
105+ while True :
106+ try :
107+ async with ( await self ._connection ()) as connection :
108+ channel = await self ._channel ( connection )
109+ await channel . set_qos ( prefetch_count = 1 )
110+ exchange = await self . _exchange ( channel )
111+ queue = await self . _queue ( channel , exchange )
112+
113113 async with queue .iterator () as queue_iter :
114114 async for message in queue_iter :
115115 async with message .process ():
116116 yield message .body
117117 retry_sleep = 1
118- except aio_pika .AMQPException :
119- self ._get_logger ().error (
120- 'Cannot receive from rabbitmq... '
121- 'retrying in {} secs' .format (retry_sleep ))
122- await asyncio .sleep (retry_sleep )
123- retry_sleep = min (retry_sleep * 2 , 60 )
124- except aio_pika .exceptions .ChannelInvalidStateError :
125- # aio_pika raises this exception when the task is cancelled
126- raise asyncio .CancelledError ()
118+ except aio_pika .AMQPException :
119+ self ._get_logger ().error (
120+ 'Cannot receive from rabbitmq... '
121+ 'retrying in {} secs' .format (retry_sleep ))
122+ await asyncio .sleep (retry_sleep )
123+ retry_sleep = min (retry_sleep * 2 , 60 )
124+ except aio_pika .exceptions .ChannelInvalidStateError :
125+ # aio_pika raises this exception when the task is cancelled
126+ raise asyncio .CancelledError ()
0 commit comments