You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I have found what seems to be similar issues talking about this error (#312, #379) but since they haven't been addressed, I'll leave my take here.
Basically, while consuming a message, if a connection is lost, even though a reconnect occurs, the message won't be able to be acknowledged or rejected. It gets stuck. And it will be indeterminately unacked while the consumer is running.
Script to reproduce issue:
importosimportasyncioimportloggingfromtimeimportsleepimportaio_pikafromaio_pika.queueimportQueueIteratorAMQP_CONNECTION_STRING=os.environ.get("AMQP_CONNECTION_STRING", "<add default here>")
QUEUE_WAIT_MESSAGE_TIMEOUT=0.05# secondsROUTING_KEY="queue-testing"TOTAL_MESSAGES_TO_PUBLISH=5asyncdefprocess_message(
message: aio_pika.abc.AbstractIncomingMessage,
) ->None:
print(message.body)
seconds=250print(f"Sync sleep {seconds} seconds to force loosing connection...")
sleep(seconds)
seconds=10print(f"Async sleep {seconds} to allow connection to fail in heartbeat...")
awaitasyncio.sleep(10)
print("Trying to ack message... (it will stay stuck here)")
awaitmessage.ack()
classQueueIteratorWithHighConsumeTimeout(QueueIterator):
"""To guarantee that the process message has a big timeout. That is bigger than current sleeps being performed in the `process_message` function. """asyncdefconsume(self) ->None:
"""Consumes from a queue."""self._consumer_tag=awaitself._amqp_queue.consume(
self.on_message,
timeout=3600, # 1 Hour
)
asyncdefmain() ->None:
# to see aio pika logging (i.e: heartbeat logs)logging.basicConfig(level=logging.DEBUG)
# Creates connectionconnection=awaitaio_pika.connect_robust(AMQP_CONNECTION_STRING)
asyncwithconnection:
# Creates channelchannel=awaitconnection.channel(publisher_confirms=True)
# Set prefetch to 1awaitchannel.set_qos(prefetch_count=1)
# Declare/Gets queuequeue=awaitchannel.declare_queue(ROUTING_KEY, durable=True)
# Publishes some messages to queuefor__inrange(TOTAL_MESSAGES_TO_PUBLISH):
awaitchannel.default_exchange.publish(
aio_pika.Message(body=f"Hello {ROUTING_KEY}".encode()),
routing_key=ROUTING_KEY,
)
# Starts consuming messages from queueasyncwithQueueIteratorWithHighConsumeTimeout(
queue, timeout=QUEUE_WAIT_MESSAGE_TIMEOUT
) asqueue_iter:
whileTrue:
try:
asyncformessageinqueue_iter:
awaitprocess_message(message)
exceptasyncio.TimeoutError:
# Its catching the timeout error that occurs when the# QUEUE_WAIT_MESSAGE_TIMEOUT is exceeded.passif__name__=="__main__":
asyncio.run(main())
Logs generated:
DEBUG:aio_pika.queue:Start to consuming queue: <RobustQueue(queue-testing): auto_delete=False, durable=True, exclusive=False, arguments=None
DEBUG:aiormq.connection:Prepare to send ChannelFrame(channel_number=1, frames=[<Basic.Consume object at 0x1036afba0>], drain_future=None)
DEBUG:aiormq.connection:Received frame <Basic.ConsumeOk object at 0x1036b2ac0> in channel #1 weight=51 on <Connection: "amqp://guest:******@localhost:5672/vhost" at 0x1035fa270>
DEBUG:aiormq.connection:Received frame <Basic.Deliver object at 0x103620dc0> in channel #1 weight=75 on <Connection: "amqp://guest:******@localhost:5672/vhost" at 0x1035fa270>
DEBUG:aiormq.connection:Received frame <pamqp.header.ContentHeader object at 0x1035fb760> in channel #1 weight=61 on <Connection: "amqp://guest:******@localhost:5672/vhost" at 0x1035fa270>
DEBUG:aiormq.connection:Received frame <pamqp.body.ContentBody object at 0x103648bb0> in channel #1 weight=27 on <Connection: "amqp://guest:******@localhost:5672/vhost" at 0x1035fa270>
b'Hello queue-testing'
Sync Sleep 250 seconds to force loosing connection...
Async sleep 10 to allow connection to fail in heartbeat...
DEBUG:aiormq.connection:Prepare to send ChannelFrame(channel_number=0, frames=[<pamqp.heartbeat.Heartbeat object at 0x10361c250>], drain_future=None)
DEBUG:aiormq.connection:Reader exited for <Connection: "amqp://guest:******@localhost:5672/vhost" at 0x1035fa270>
DEBUG:aiormq.connection:Cancelling cause reader exited abnormally
DEBUG:aiormq.connection:Sending <Connection.Close object at 0x1036aae50> to <Connection: "amqp://guest:******@localhost:5672/vhost" at 0x1035fa270>
DEBUG:aiormq.connection:Writer on connection amqp://guest:******@localhost:5672/vhost closed
DEBUG:aiormq.connection:Writer exited for <Connection: "amqp://guest:******@localhost:5672/vhost" at 0x1035fa270>
DEBUG:aiormq.connection:Closing connection <Connection: "amqp://guest:******@localhost:5672/vhost" at 0x1035fa270> cause: TimeoutError()
INFO:aio_pika.robust_connection:Connection to amqp://guest:******@localhost:5672/vhost closed. Reconnecting after 5 seconds.
DEBUG:aio_pika.queue:Cancelling queue iterator <QueueIteratorWithHighConsumeTimeout: queue='queue-testing' ctag='ctag1.5d7a6c279db64bb88d093e154c73222a'>
DEBUG:aio_pika.queue:Queue iterator <QueueIteratorWithHighConsumeTimeout: queue='queue-testing' ctag='ctag1.5d7a6c279db64bb88d093e154c73222a'> channel closed
DEBUG:aiormq.connection:Connecting to: amqp://guest:******@localhost:5672/vhost
DEBUG:aio_pika.channel:Start reopening channel <aio_pika.robust_channel.RobustChannel object at 0x1035fb670>
DEBUG:aiormq.connection:Prepare to send ChannelFrame(channel_number=1, frames=[<Channel.Open object at 0x1036c9900>], drain_future=None)
DEBUG:aiormq.connection:Received frame <Channel.OpenOk object at 0x103600740> in channel #1 weight=16 on <Connection: "amqp://guest:******@localhost:5672/vhost" at 0x1035fad10>
DEBUG:aiormq.connection:Prepare to send ChannelFrame(channel_number=1, frames=[<Confirm.Select object at 0x1036bd180>], drain_future=None)
DEBUG:aiormq.connection:Received frame <Confirm.SelectOk object at 0x103648370> in channel #1 weight=12 on <Connection: "amqp://guest:******@localhost:5672/vhost" at 0x1035fad10>
DEBUG:aiormq.connection:Prepare to send ChannelFrame(channel_number=1, frames=[<Basic.Qos object at 0x1036cc5e0>], drain_future=None)
DEBUG:aiormq.connection:Received frame <Basic.QosOk object at 0x1036cd670> in channel #1 weight=12 on <Connection: "amqp://guest:******@localhost:5672/vhost" at 0x1035fad10>
DEBUG:aio_pika.queue:Declaring queue: <RobustQueue(queue-testing): auto_delete=False, durable=True, exclusive=False, arguments=None
DEBUG:aiormq.connection:Prepare to send ChannelFrame(channel_number=1, frames=[<Queue.Declare object at 0x10361deb0>], drain_future=None)
DEBUG:aiormq.connection:Received frame <Queue.DeclareOk object at 0x1036cc4f0> in channel #1 weight=34 on <Connection: "amqp://guest:******@localhost:5672/vhost" at 0x1035fad10>
DEBUG:aio_pika.queue:Start to consuming queue: <RobustQueue(queue-testing): auto_delete=False, durable=True, exclusive=False, arguments=None
DEBUG:aiormq.connection:Prepare to send ChannelFrame(channel_number=1, frames=[<Basic.Consume object at 0x10361dcf0>], drain_future=None)
DEBUG:aiormq.connection:Received frame <Basic.ConsumeOk object at 0x103629cc0> in channel #1 weight=51 on <Connection: "amqp://guest:******@localhost:5672/vhost" at 0x1035fad10>
DEBUG:aiormq.connection:Received frame <Basic.Deliver object at 0x1035e5e20> in channel #1 weight=75 on <Connection: "amqp://guest:******@localhost:5672/vhost" at 0x1035fad10>
DEBUG:aiormq.connection:Received frame <pamqp.header.ContentHeader object at 0x10361cf70> in channel #1 weight=61 on <Connection: "amqp://guest:******@localhost:5672/vhost" at 0x1035fad10>
DEBUG:aiormq.connection:Received frame <pamqp.body.ContentBody object at 0x1035fb3a0> in channel #1 weight=27 on <Connection: "amqp://guest:******@localhost:5672/vhost" at 0x1035fad10>
Trying to ack message... (it will stay stuck here)
DEBUG:aiormq.connection:Received frame <pamqp.heartbeat.Heartbeat object at 0x103648df0> in channel #0 weight=8 on <Connection: "amqp://guest:******@localhost:5672/vhost" at 0x1035fad10>
DEBUG:aiormq.connection:Prepare to send ChannelFrame(channel_number=0, frames=[<pamqp.heartbeat.Heartbeat object at 0x103648e20>], drain_future=None)
DEBUG:aiormq.connection:Received frame <pamqp.heartbeat.Heartbeat object at 0x1035fb310> in channel #0 weight=8 on <Connection: "amqp://guest:******@localhost:5672/vhost" at 0x1035fad10>
DEBUG:aiormq.connection:Prepare to send ChannelFrame(channel_number=0, frames=[<pamqp.heartbeat.Heartbeat object at 0x103648e20>], drain_future=None)
Any ideas?
The text was updated successfully, but these errors were encountered:
If you add to print channel status like message.channel.is_closed you will notice that it returns True
print(f"Trying to ack message... (it will stay stuck here), is_closed: {message.channel.is_closed}")
await message.ack()
Comparing with others clients (for different languages) there is should be something like AlreadyClosedError for trying to write to closed channel but not deadlock (hung).
There are bunch of issues in underlying layer aiormq which might be root cause of this problem:
Hello,
I have found what seems to be similar issues talking about this error (#312, #379) but since they haven't been addressed, I'll leave my take here.
Basically, while consuming a message, if a connection is lost, even though a reconnect occurs, the message won't be able to be acknowledged or rejected. It gets stuck. And it will be indeterminately
unacked
while the consumer is running.Script to reproduce issue:
Logs generated:
Any ideas?
The text was updated successfully, but these errors were encountered: