Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

When py-amqp tries to publish to nonexistent exchange crash with AttributeError: 'NoneType' object has no attribute 'drain_events' #218

Open
matusvalo opened this issue Nov 13, 2018 · 21 comments

Comments

@matusvalo
Copy link
Member

matusvalo commented Nov 13, 2018

Steps to reproduce:

  1. ensure that test_exc exchange does not exist
  2. execute the code below

Example code:

import amqp

conn = amqp.Connection(host='localhost')
conn.connect()
ch = conn.channel(channel_id=1)
ch.basic_publish(msg=amqp.Message(body=b'Hello world'), exchange='test_exc', routing_key='rkey')
conn.close()

Executing of the script ends up with the following stacktrace:

  File "test.py", line 7, in <module>
    conn.close()
  File "/home/matus/dev/py-amqp/amqp/connection.py", line 575, in close
    wait=spec.Connection.CloseOk,
  File "/home/matus/dev/py-amqp/amqp/abstract_channel.py", line 59, in send_method
    return self.wait(wait, returns_tuple=returns_tuple)
  File "/home/matus/dev/py-amqp/amqp/abstract_channel.py", line 79, in wait
    self.connection.drain_events(timeout=timeout)
  File "/home/matus/dev/py-amqp/amqp/connection.py", line 500, in drain_events
    while not self.blocking_read(timeout):
  File "/home/matus/dev/py-amqp/amqp/connection.py", line 506, in blocking_read
    return self.on_inbound_frame(frame)
  File "/home/matus/dev/py-amqp/amqp/method_framing.py", line 55, in on_frame
    callback(channel, method_sig, buf, None)
  File "/home/matus/dev/py-amqp/amqp/connection.py", line 510, in on_inbound_method
    method_sig, payload, content,
  File "/home/matus/dev/py-amqp/amqp/abstract_channel.py", line 124, in dispatch_method
    listener(*args)
  File "/home/matus/dev/py-amqp/amqp/channel.py", line 277, in _on_close
    self._do_revive()
  File "/home/matus/dev/py-amqp/amqp/channel.py", line 164, in _do_revive
    self.open()
  File "/home/matus/dev/py-amqp/amqp/channel.py", line 434, in open
    spec.Channel.Open, 's', ('',), wait=spec.Channel.OpenOk,
  File "/home/matus/dev/py-amqp/amqp/abstract_channel.py", line 59, in send_method
    return self.wait(wait, returns_tuple=returns_tuple)
  File "/home/matus/dev/py-amqp/amqp/abstract_channel.py", line 79, in wait
    self.connection.drain_events(timeout=timeout)
AttributeError: 'NoneType' object has no attribute 'drain_events'

I was able to replicate this issue on RabbitMQ 3.7.8 and Master branch of py-amqp.

I have found out that connection was set to None in the following line:

self._transport = self.connection = self.channels = None

After inserting breakpoint I have found out the following tracebacks:

(Pdb) w
  /home/matus/dev/py-amqp/test.py(7)<module>()
-> conn.close()
  /home/matus/dev/py-amqp/amqp/connection.py(577)close()                       # Client sents Close Method, waits CloseOK method
-> wait=spec.Connection.CloseOk,
  /home/matus/dev/py-amqp/amqp/abstract_channel.py(59)send_method()
-> return self.wait(wait, returns_tuple=returns_tuple)
  /home/matus/dev/py-amqp/amqp/abstract_channel.py(79)wait()
-> self.connection.drain_events(timeout=timeout)
  /home/matus/dev/py-amqp/amqp/connection.py(502)drain_events()
-> while not self.blocking_read(timeout):
  /home/matus/dev/py-amqp/amqp/connection.py(508)blocking_read()
-> return self.on_inbound_frame(frame)
  /home/matus/dev/py-amqp/amqp/method_framing.py(55)on_frame()
-> callback(channel, method_sig, buf, None)
  /home/matus/dev/py-amqp/amqp/connection.py(512)on_inbound_method()
-> method_sig, payload, content,
  /home/matus/dev/py-amqp/amqp/abstract_channel.py(124)dispatch_method()
-> listener(*args)
  /home/matus/dev/py-amqp/amqp/channel.py(279)_on_close()                       # Client receives Close Method
-> self._do_revive()
  /home/matus/dev/py-amqp/amqp/channel.py(165)_do_revive()
-> self.open()
  /home/matus/dev/py-amqp/amqp/channel.py(436)open()
-> spec.Channel.Open, 's', ('',), wait=spec.Channel.OpenOk,                     # Client sents Open method, waits for OpenOK method
  /home/matus/dev/py-amqp/amqp/abstract_channel.py(59)send_method()
-> return self.wait(wait, returns_tuple=returns_tuple)
  /home/matus/dev/py-amqp/amqp/abstract_channel.py(79)wait()
-> self.connection.drain_events(timeout=timeout)
  /home/matus/dev/py-amqp/amqp/connection.py(502)drain_events()
-> while not self.blocking_read(timeout):
  /home/matus/dev/py-amqp/amqp/connection.py(508)blocking_read()
-> return self.on_inbound_frame(frame)
  /home/matus/dev/py-amqp/amqp/method_framing.py(55)on_frame()
-> callback(channel, method_sig, buf, None)
  /home/matus/dev/py-amqp/amqp/connection.py(512)on_inbound_method()
-> method_sig, payload, content,
  /home/matus/dev/py-amqp/amqp/abstract_channel.py(124)dispatch_method()
-> listener(*args)
  /home/matus/dev/py-amqp/amqp/connection.py(666)_on_close_ok()                 # Client receives CloseOK method
-> self.collect()
> /home/matus/dev/py-amqp/amqp/connection.py(454)collect()                      # Client is setting self.channels to None
-> try:
(Pdb) c

Traceback (most recent call last):
  File "test.py", line 7, in <module>
    conn.close()
  File "/home/matus/dev/py-amqp/amqp/connection.py", line 577, in close
    wait=spec.Connection.CloseOk,
  File "/home/matus/dev/py-amqp/amqp/abstract_channel.py", line 59, in send_method
    return self.wait(wait, returns_tuple=returns_tuple)
  File "/home/matus/dev/py-amqp/amqp/abstract_channel.py", line 79, in wait
    self.connection.drain_events(timeout=timeout)
  File "/home/matus/dev/py-amqp/amqp/connection.py", line 502, in drain_events
    while not self.blocking_read(timeout):
  File "/home/matus/dev/py-amqp/amqp/connection.py", line 508, in blocking_read
    return self.on_inbound_frame(frame)
  File "/home/matus/dev/py-amqp/amqp/method_framing.py", line 55, in on_frame
    callback(channel, method_sig, buf, None)
  File "/home/matus/dev/py-amqp/amqp/connection.py", line 512, in on_inbound_method
    method_sig, payload, content,
  File "/home/matus/dev/py-amqp/amqp/abstract_channel.py", line 124, in dispatch_method
    listener(*args)
  File "/home/matus/dev/py-amqp/amqp/channel.py", line 279, in _on_close
    self._do_revive()
  File "/home/matus/dev/py-amqp/amqp/channel.py", line 165, in _do_revive
    self.open()
  File "/home/matus/dev/py-amqp/amqp/channel.py", line 436, in open
    spec.Channel.Open, 's', ('',), wait=spec.Channel.OpenOk,
  File "/home/matus/dev/py-amqp/amqp/abstract_channel.py", line 59, in send_method
    return self.wait(wait, returns_tuple=returns_tuple)
  File "/home/matus/dev/py-amqp/amqp/abstract_channel.py", line 79, in wait
    self.connection.drain_events(timeout=timeout)
  File "/home/matus/dev/py-amqp/amqp/connection.py", line 502, in drain_events
    while not self.blocking_read(timeout):
  File "/home/matus/dev/py-amqp/amqp/connection.py", line 508, in blocking_read                # Client still waits for OpenOK. Receives some method but crashes, since it cleared channels
    return self.on_inbound_frame(frame)
  File "/home/matus/dev/py-amqp/amqp/method_framing.py", line 55, in on_frame
    callback(channel, method_sig, buf, None)
  File "/home/matus/dev/py-amqp/amqp/connection.py", line 511, in on_inbound_method
    return self.channels[channel_id].dispatch_method(
TypeError: 'NoneType' object is not subscriptable

From the tracebacks can be seen that:

  1. Client sents Close method to server
  2. Client waits for CloseOK. He starts drain_events loop.
  3. Client receives Close method instead of CloseOk.
  4. Clients sents Open method (part of _do_revive() method) and waits for OpenOK. It starts another (!) drain_events loop
  5. Client recieves CloseOK method. It clears connection (sets self.channels = None)
  6. Client still waits in second drain_events loop for OpenOk method. Client receives some method but crashes since Connection.channels == None

In general the problem is that client executes Channel._do_revive() method even when connection is closing:

py-amqp/amqp/channel.py

Lines 276 to 280 in 0e793de

self.send_method(spec.Channel.CloseOk)
self._do_revive()
raise error_for_code(
reply_code, reply_text, (class_id, method_id), ChannelError,
)

Possible solution is to in general will be roughly like this:

  1. Mark Connection as closing when Connection.close() method is called:
    def close(self, reply_code=0, reply_text='', method_sig=(0, 0),
              argsig='BsBB'):
        self.closing = True
        if self._transport is None:
            # already closed
            return


        try:
            return self.send_method(
                spec.Connection.Close, argsig,
                (reply_code, reply_text, method_sig[0], method_sig[1]),
                wait=spec.Connection.CloseOk,
            )
        except (OSError, IOError, SSLError):
            # close connection
            self.collect()
            raise
  1. Test whether connection is closing before calling Channel._do_revive():
    def _on_close(self, reply_code, reply_text, class_id, method_id):
        self.send_method(spec.Channel.CloseOk)
        if not self.connection.closing:
            self._do_revive()
        raise error_for_code(
            reply_code, reply_text, (class_id, method_id), ChannelError,
        )

After this fix correct exception is raised:

# python test.py
Traceback (most recent call last):
  File "test.py", line 7, in <module>
    conn.close()
  File "/home/matus/dev/py-amqp/amqp/connection.py", line 578, in close
    wait=spec.Connection.CloseOk,
  File "/home/matus/dev/py-amqp/amqp/abstract_channel.py", line 59, in send_method
    return self.wait(wait, returns_tuple=returns_tuple)
  File "/home/matus/dev/py-amqp/amqp/abstract_channel.py", line 79, in wait
    self.connection.drain_events(timeout=timeout)
  File "/home/matus/dev/py-amqp/amqp/connection.py", line 502, in drain_events
    while not self.blocking_read(timeout):
  File "/home/matus/dev/py-amqp/amqp/connection.py", line 508, in blocking_read
    return self.on_inbound_frame(frame)
  File "/home/matus/dev/py-amqp/amqp/method_framing.py", line 55, in on_frame
    callback(channel, method_sig, buf, None)
  File "/home/matus/dev/py-amqp/amqp/connection.py", line 512, in on_inbound_method
    method_sig, payload, content,
  File "/home/matus/dev/py-amqp/amqp/abstract_channel.py", line 124, in dispatch_method
    listener(*args)
  File "/home/matus/dev/py-amqp/amqp/channel.py", line 282, in _on_close
    reply_code, reply_text, (class_id, method_id), ChannelError,
amqp.exceptions.NotFound: Basic.publish: (404) NOT_FOUND - no exchange 'test_exc' in vhost '/'

Moreover does it make sense to revive connection just before raising exception?

py-amqp/amqp/channel.py

Lines 276 to 280 in 0e793de

self.send_method(spec.Channel.CloseOk)
self._do_revive()
raise error_for_code(
reply_code, reply_text, (class_id, method_id), ChannelError,
)

@matusvalo
Copy link
Member Author

Another improvement can be to ensure that after connection is closed - after calling Connection.collect() method - drain_events loop is immediately ended:

py-amqp/amqp/connection.py

Lines 498 to 501 in 0e793de

def drain_events(self, timeout=None):
# read until message is ready
while not self.blocking_read(timeout):
pass

@matusvalo
Copy link
Member Author

@auvipy you can close this issue now...

@auvipy auvipy closed this as completed Jan 10, 2019
@auvipy
Copy link
Member

auvipy commented Jan 10, 2019

thanks!!

@AvnerCohen
Copy link

Running 2.4.0, we are still getting the same exception:
'NoneType' object has no attribute 'drain_events'
From amqp/abstract_channel.py in wait at line 80

            while not p.ready:
                self.connection.drain_events(timeout=timeout)

Anyone else still getting it ?

Thoughts on what is causing it ?

@matusvalo
Copy link
Member Author

@AvnerCohen I was not able to create situation with your Exception. Could you provide some simple failing example?

@AvnerCohen
Copy link

:( I afraid not. We randomly get it in production, random workers would crash.

@matusvalo What's interesting is that your suggested fix is basically to expose the "correct: exception:
amqp.exceptions.NotFound: Basic.publish: (404) NOT_FOUND - no exchange 'test_exc' in vhost '/'

In our case, we seem to be getting both of this errors:

Basic.consume: (404) NOT_FOUND - no queue 
Unrecoverable error: NotFound(404, u"NOT_FOUND - no queue ' [email protected]' in vhost '/'", (60, 20), u'Basic.consume')

From amqp/channel.py in _on_close at line 282:

            raise error_for_code(
                reply_code, reply_text, (class_id, method_id), ChannelError,
            )

The 2 errors would happen in parallel and random workers will fail on it.

maybe this is now no longer an amqp issue but a core celery issue?

@matusvalo
Copy link
Member Author

@AvnerCohen I was afraid that it is random issue. I am not sure about problem on kombu/celery side because the exception is coming from py-amqp. If there is issue on celery/kombu side in worst case py-amqp must be able handle correctly wrong usage. Could you post here full tracebacks for not found errors and also attribute error? + how often do you get this errors?

At least I know about one problem on py-amqp side - it is not fully following amqp standards because it processes incoming messages in drain_events() even when close() method is received from broker... This should be fixed but we need to be careful to not broke something else :-)

@AvnerCohen
Copy link

@matusvalo thanks so much for the insights and time.

We are getting this during deploy of new code changes or any maintenance that involves large scale stop/start of workers (btw - which brings this item as could be relevant - celery/celery#4618 - bot looks like there is no extra info there).
So it is very much dependent on the count of deploys we make.

Here are the two raw stack traces:

AttributeError: 'NoneType' object has no attribute 'drain_events'
  File "celery/worker/worker.py", line 205, in start
    self.blueprint.start(self)
  File "celery/bootsteps.py", line 119, in start
    step.start(parent)
  File "celery/bootsteps.py", line 369, in start
    return self.obj.start()
  File "celery/worker/consumer/consumer.py", line 318, in start
    blueprint.start(self)
  File "celery/bootsteps.py", line 119, in start
    step.start(parent)
  File "celery/worker/consumer/consumer.py", line 596, in start
    c.loop(*c.loop_args())
  File "celery/worker/loops.py", line 91, in asynloop
    next(loop)
  File "kombu/asynchronous/hub.py", line 354, in create_loop
    cb(*cbargs)
  File "kombu/transport/base.py", line 236, in on_readable
    reader(loop)
  File "kombu/transport/base.py", line 218, in _read
    drain_events(timeout=0)
  File "amqp/connection.py", line 500, in drain_events
    while not self.blocking_read(timeout):
  File "amqp/connection.py", line 506, in blocking_read
    return self.on_inbound_frame(frame)
  File "amqp/method_framing.py", line 79, in on_frame
    callback(channel, msg.frame_method, msg.frame_args, msg)
  File "amqp/connection.py", line 510, in on_inbound_method
    method_sig, payload, content,
  File "amqp/abstract_channel.py", line 126, in dispatch_method
    listener(*args)
  File "amqp/channel.py", line 1616, in _on_basic_deliver
    fun(msg)
  File "kombu/messaging.py", line 624, in _receive_callback
    return on_m(message) if on_m else self.receive(decoded, message)
  File "kombu/messaging.py", line 590, in receive
    [callback(body, message) for callback in callbacks]
  File "celery/worker/pidbox.py", line 51, in on_message
    self.reset()
  File "celery/worker/pidbox.py", line 66, in reset
    self.stop(self.c)
  File "celery/worker/pidbox.py", line 63, in stop
    self.consumer = self._close_channel(c)
  File "celery/worker/pidbox.py", line 71, in _close_channel
    ignore_errors(c, self.node.channel.close)
  File "kombu/common.py", line 298, in ignore_errors
    return fun(*args, **kwargs)
  File "amqp/channel.py", line 226, in close
    wait=spec.Channel.CloseOk,
  File "amqp/abstract_channel.py", line 60, in send_method
    return self.wait(wait, returns_tuple=returns_tuple)
  File "amqp/abstract_channel.py", line 80, in wait
    self.connection.drain_events(timeout=timeout)
  File "amqp/connection.py", line 500, in drain_events
    while not self.blocking_read(timeout):
  File "amqp/connection.py", line 506, in blocking_read
    return self.on_inbound_frame(frame)
  File "amqp/method_framing.py", line 79, in on_frame
    callback(channel, msg.frame_method, msg.frame_args, msg)
  File "amqp/connection.py", line 510, in on_inbound_method
    method_sig, payload, content,
  File "amqp/abstract_channel.py", line 126, in dispatch_method
    listener(*args)
  File "amqp/channel.py", line 1616, in _on_basic_deliver
    fun(msg)
  File "kombu/messaging.py", line 624, in _receive_callback
    return on_m(message) if on_m else self.receive(decoded, message)
  File "kombu/messaging.py", line 590, in receive
    [callback(body, message) for callback in callbacks]
  File "celery/worker/pidbox.py", line 51, in on_message
    self.reset()
  File "celery/worker/pidbox.py", line 67, in reset
    self.start(self.c)
  File "celery/worker/pidbox.py", line 54, in start
    self.node.channel = c.connection.channel()
  File "kombu/connection.py", line 266, in channel
    chan = self.transport.create_channel(self.connection)
  File "kombu/transport/pyamqp.py", line 100, in create_channel
    return connection.channel()
  File "amqp/connection.py", line 491, in channel
    channel.open()
  File "amqp/channel.py", line 437, in open
    spec.Channel.Open, 's', ('',), wait=spec.Channel.OpenOk,
  File "amqp/abstract_channel.py", line 60, in send_method
    return self.wait(wait, returns_tuple=returns_tuple)
  File "amqp/abstract_channel.py", line 80, in wait
    self.connection.drain_events(timeout=timeout)

And:

NotFound: Basic.consume: (404) NOT_FOUND - no queue ' [email protected]' in vhost '/'
  File "celery/worker/worker.py", line 205, in start
    self.blueprint.start(self)
  File "celery/bootsteps.py", line 119, in start
    step.start(parent)
  File "celery/bootsteps.py", line 370, in start
    return self.obj.start()
  File "celery/worker/consumer/consumer.py", line 316, in start
    blueprint.start(self)
  File "celery/bootsteps.py", line 119, in start
    step.start(parent)
  File "celery/worker/pidbox.py", line 55, in start
    self.consumer = self.node.listen(callback=self.on_message)
  File "kombu/pidbox.py", line 91, in listen
    consumer.consume()
  File "kombu/messaging.py", line 477, in consume
    self._basic_consume(T, no_ack=no_ack, nowait=False)
  File "kombu/messaging.py", line 598, in _basic_consume
    no_ack=no_ack, nowait=nowait)
  File "kombu/entity.py", line 737, in consume
    arguments=self.consumer_arguments)
  File "amqp/channel.py", line 1572, in basic_consume
    returns_tuple=True
  File "amqp/abstract_channel.py", line 60, in send_method
    return self.wait(wait, returns_tuple=returns_tuple)
  File "amqp/abstract_channel.py", line 80, in wait
    self.connection.drain_events(timeout=timeout)
  File "amqp/connection.py", line 500, in drain_events
    while not self.blocking_read(timeout):
  File "amqp/connection.py", line 506, in blocking_read
    return self.on_inbound_frame(frame)
  File "amqp/method_framing.py", line 55, in on_frame
    callback(channel, method_sig, buf, None)
  File "amqp/connection.py", line 510, in on_inbound_method
    method_sig, payload, content,
  File "amqp/abstract_channel.py", line 126, in dispatch_method
    listener(*args)
  File "amqp/channel.py", line 282, in _on_close
    reply_code, reply_text, (class_id, method_id), ChannelError,

@AvnerCohen
Copy link

@matusvalo Anything we can help or provide information additional info on that?

@matusvalo
Copy link
Member Author

Thank you @AvnerCohen. I need to have to fine some spare time to have a look on that... For now no.

@thedrow
Copy link
Member

thedrow commented Mar 13, 2019

Anything new on this issue?

@matusvalo
Copy link
Member Author

matusvalo commented Jun 10, 2019

@thedrow for now no. I am not able to reproduce the issue. There are multiple issues present which points to using closed connection/channel - e.g. another issue: celery/kombu#1027

There are only two places where connection or channel is set to None:

py-amqp/amqp/channel.py

Lines 145 to 156 in 756d60d

def collect(self):
"""Tear down this object.
Best called after we've agreed to close with the server.
"""
AMQP_LOGGER.debug('Closed channel #%s', self.channel_id)
self.is_open = False
channel_id, self.channel_id = self.channel_id, None
connection, self.connection = self.connection, None
if connection:
connection.channels.pop(channel_id, None)
connection._avail_channel_ids.append(channel_id)

py-amqp/amqp/connection.py

Lines 452 to 464 in 756d60d

def collect(self):
try:
if self._transport:
self._transport.close()
temp_list = [x for x in values(self.channels or {})
if x is not self]
for ch in temp_list:
ch.collect()
except socket.error:
pass # connection already closed on the other end
finally:
self._transport = self.connection = self.channels = None

Connection.collect() is called when:

  1. Connection.connect() fails - this is not the case of issues
  2. Close-OK is received from broker. But this should be a reply to Close message sent by client.
  3. Close received by broker, but this case raises an exception so it should not yield tracebacks as we have

Channel.collect() is called only when Close-OK is received. This is also after client sends Close.

One possible option can be if multiple threads shares one single connection...

@matusvalo
Copy link
Member Author

One possibility came to my mind:
From stacktrace:

  File "celery/worker/pidbox.py", line 63, in stop
    self.consumer = self._close_channel(c)
  File "celery/worker/pidbox.py", line 71, in _close_channel
    ignore_errors(c, self.node.channel.close)
  File "kombu/common.py", line 298, in ignore_errors
    return fun(*args, **kwargs)
  File "amqp/channel.py", line 226, in close
    wait=spec.Channel.CloseOk,

The other possiblity is that when Client sends Close method it waits for Close-OK but in the meanwhile another message comes requesting waiting which yields another message with wating etc... In general AMQP spec requires that after close method is sent client must process only Close-OK replies [1]:

After sending this method, any received methods except Close and Close-OK MUST be discarded. The > response to receiving a Close after sending Close must be to send Close-Ok.

Unfortunately py-amqp library does not conform specs in this case.

[1] https://www.rabbitmq.com/amqp-0-9-1-reference.html

@thedrow
Copy link
Member

thedrow commented Jun 10, 2019

Hmm so how do we fix that?

@matusvalo
Copy link
Member Author

@thedrow see #280.

@matusvalo
Copy link
Member Author

matusvalo commented Jun 13, 2019

@AvnerCohen could you please check master branch if your problem still occurs?

@AvnerCohen
Copy link

@matusvalo On our end, we have seen this behavior when starting some 80 celery workers (with anywhere between 2 to 8 concurrency) in parallel.
To work around the issue, we broke this down and are no longer doing that (at max we start 20 workers at the same time) and this seems to have solved the issue for us.

@mlodic
Copy link

mlodic commented Sep 2, 2021

Hi everyone, I found this issue after looking for a solution for this problem.

Env:
Celery 5.1.2
Rabbitmq: 3.9.4

From rabbit 3.8.15, they introduced this new feature: https://www.rabbitmq.com/consumers.html#acknowledgement-timeout

When the timeout is triggered, the 'NoneType' object has no attribute 'drain_events' error appears again, causing the celery worker to stop working

Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/celery/worker/worker.py", line 203, in start
    self.blueprint.start(self)
  File "/usr/local/lib/python3.8/site-packages/celery/bootsteps.py", line 116, in start
    step.start(parent)
  File "/usr/local/lib/python3.8/site-packages/celery/bootsteps.py", line 365, in start
    return self.obj.start()
  File "/usr/local/lib/python3.8/site-packages/celery/worker/consumer/consumer.py", line 326, in start
    blueprint.start(self)
  File "/usr/local/lib/python3.8/site-packages/celery/bootsteps.py", line 116, in start
    step.start(parent)
  File "/usr/local/lib/python3.8/site-packages/celery/worker/consumer/consumer.py", line 618, in start
    c.loop(*c.loop_args())
  File "/usr/local/lib/python3.8/site-packages/celery/worker/loops.py", line 81, in asynloop
    next(loop)
  File "/usr/local/lib/python3.8/site-packages/kombu/asynchronous/hub.py", line 361, in create_loop
    cb(*cbargs)
  File "/usr/local/lib/python3.8/site-packages/kombu/transport/base.py", line 235, in on_readable
    reader(loop)
  File "/usr/local/lib/python3.8/site-packages/kombu/transport/base.py", line 217, in _read
    drain_events(timeout=0)
  File "/usr/local/lib/python3.8/site-packages/amqp/connection.py", line 523, in drain_events
    while not self.blocking_read(timeout):
  File "/usr/local/lib/python3.8/site-packages/amqp/connection.py", line 529, in blocking_read
    return self.on_inbound_frame(frame)
  File "/usr/local/lib/python3.8/site-packages/amqp/method_framing.py", line 53, in on_frame
    callback(channel, method_sig, buf, None)
  File "/usr/local/lib/python3.8/site-packages/amqp/connection.py", line 535, in on_inbound_method
    return self.channels[channel_id].dispatch_method(
  File "/usr/local/lib/python3.8/site-packages/amqp/abstract_channel.py", line 143, in dispatch_method
    listener(*args)
  File "/usr/local/lib/python3.8/site-packages/amqp/channel.py", line 277, in _on_close
    raise error_for_code(
amqp.exceptions.PreconditionFailed: (0, 0): (406) PRECONDITION_FAILED - delivery acknowledgement on channel 1 timed out. Timeout value used: 1800000 ms. This timeout value can be configured, see consumers doc guide to learn more

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/bin/celery", line 8, in <module>
    sys.exit(main())
  File "/usr/local/lib/python3.8/site-packages/celery/__main__.py", line 15, in main
    sys.exit(_main())
  File "/usr/local/lib/python3.8/site-packages/celery/bin/celery.py", line 213, in main
    return celery(auto_envvar_prefix="CELERY")
  File "/usr/local/lib/python3.8/site-packages/click/core.py", line 829, in __call__
    return self.main(*args, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/click/core.py", line 782, in main
    rv = self.invoke(ctx)
  File "/usr/local/lib/python3.8/site-packages/click/core.py", line 1259, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/usr/local/lib/python3.8/site-packages/click/core.py", line 1066, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/usr/local/lib/python3.8/site-packages/click/core.py", line 610, in invoke
    return callback(*args, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/click/decorators.py", line 21, in new_func
    return f(get_current_context(), *args, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/celery/bin/base.py", line 133, in caller
    return f(ctx, *args, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/celery/bin/worker.py", line 346, in worker
    worker.start()
  File "/usr/local/lib/python3.8/site-packages/celery/worker/worker.py", line 208, in start
    self.stop(exitcode=EX_FAILURE)
  File "/usr/local/lib/python3.8/site-packages/celery/worker/worker.py", line 251, in stop
    self._shutdown(warm=True)
  File "/usr/local/lib/python3.8/site-packages/celery/worker/worker.py", line 266, in _shutdown
    self.blueprint.stop(self, terminate=not warm)
  File "/usr/local/lib/python3.8/site-packages/celery/bootsteps.py", line 174, in stop
    self.on_stopped()
  File "/usr/local/lib/python3.8/site-packages/celery/worker/worker.py", line 162, in on_stopped
    self.consumer.shutdown()
  File "/usr/local/lib/python3.8/site-packages/celery/worker/consumer/consumer.py", line 370, in shutdown
    self.blueprint.shutdown(self)
  File "/usr/local/lib/python3.8/site-packages/celery/worker/consumer/consumer.py", line 168, in shutdown
    self.send_all(parent, 'shutdown')
  File "/usr/local/lib/python3.8/site-packages/celery/bootsteps.py", line 148, in send_all
    fun(parent, *args)
  File "/usr/local/lib/python3.8/site-packages/celery/worker/consumer/connection.py", line 29, in shutdown
    ignore_errors(connection, connection.close)
  File "/usr/local/lib/python3.8/site-packages/kombu/common.py", line 325, in ignore_errors
    return fun(*args, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/kombu/connection.py", line 375, in release
    self._close()
  File "/usr/local/lib/python3.8/site-packages/kombu/connection.py", line 341, in _close
    self._do_close_self()
  File "/usr/local/lib/python3.8/site-packages/kombu/connection.py", line 331, in _do_close_self
    self.maybe_close_channel(self._default_channel)
  File "/usr/local/lib/python3.8/site-packages/kombu/connection.py", line 323, in maybe_close_channel
    channel.close()
  File "/usr/local/lib/python3.8/site-packages/amqp/channel.py", line 219, in close
    return self.send_method(
  File "/usr/local/lib/python3.8/site-packages/amqp/abstract_channel.py", line 66, in send_method
    return self.wait(wait, returns_tuple=returns_tuple)
  File "/usr/local/lib/python3.8/site-packages/amqp/abstract_channel.py", line 86, in wait
    self.connection.drain_events(timeout=timeout)
  File "/usr/local/lib/python3.8/site-packages/amqp/connection.py", line 523, in drain_events
    while not self.blocking_read(timeout):
  File "/usr/local/lib/python3.8/site-packages/amqp/connection.py", line 529, in blocking_read
    return self.on_inbound_frame(frame)
  File "/usr/local/lib/python3.8/site-packages/amqp/method_framing.py", line 53, in on_frame
    callback(channel, method_sig, buf, None)
  File "/usr/local/lib/python3.8/site-packages/amqp/connection.py", line 535, in on_inbound_method
    return self.channels[channel_id].dispatch_method(
  File "/usr/local/lib/python3.8/site-packages/amqp/abstract_channel.py", line 143, in dispatch_method
    listener(*args)
  File "/usr/local/lib/python3.8/site-packages/amqp/channel.py", line 276, in _on_close
    self._do_revive()
  File "/usr/local/lib/python3.8/site-packages/amqp/channel.py", line 161, in _do_revive
    self.open()
  File "/usr/local/lib/python3.8/site-packages/amqp/channel.py", line 432, in open
    return self.send_method(
  File "/usr/local/lib/python3.8/site-packages/amqp/abstract_channel.py", line 66, in send_method
    return self.wait(wait, returns_tuple=returns_tuple)
  File "/usr/local/lib/python3.8/site-packages/amqp/abstract_channel.py", line 86, in wait
    self.connection.drain_events(timeout=timeout)
AttributeError: 'NoneType' object has no attribute 'drain_events'

Do you know how to prevent this? Celery should not break for timeout errors

@m-aciek
Copy link

m-aciek commented Oct 18, 2021

For me no permissions to exchange (or virtual host) ends up with the behavior above.

@povilasb
Copy link

@matusvalo I'm able to reproduce the NotFound: Basic.consume: (404) NOT_FOUND - no queue issue with https://github.com/povilasb/celery-issues/ . Hope that helps :)

@povilasb
Copy link

I've also created a py-amqp only example to reproduce the issue: https://github.com/povilasb/celery-issues/#py-amqp-example

What happens, I believe, is that RabbitMQ responds with "Channel Close" message when we're trying to publish to non-existing queue. Then [Channel._on_close()` is called](

spec.Channel.Close: self._on_close,
).

Upon reading the docs I tend to think that this may be expected from the AMQP library:

Certain scenarios are assumed to be recoverable ("soft") errors in the protocol. They render the channel closed but applications can open another one and try to recover or retry a number of times. Most common examples are:

* Consuming from a queue that does not exist will fail with a 404 NOT_FOUND error
* Publishing to an exchange that does not exist will fail with a 404 NOT_FOUND error

Anyway, I'm gonna stay away from this issue now since I guess this is more of a Kombu/Celery issue for not reopening a new channel. My issue may not be related to the original one as well.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

7 participants