Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How should I handle TimeoutError? #186

Open
cenkalti opened this issue Mar 8, 2018 · 7 comments
Open

How should I handle TimeoutError? #186

cenkalti opened this issue Mar 8, 2018 · 7 comments

Comments

@cenkalti
Copy link

cenkalti commented Mar 8, 2018

TimeoutError: [Errno 110] Connection timed out
  ...
    ch.connection.drain_events(timeout=1)
  File "amqp/connection.py", line 471, in drain_events
    while not self.blocking_read(timeout):
  File "amqp/connection.py", line 476, in blocking_read
    frame = self.transport.read_frame()
  File "amqp/transport.py", line 226, in read_frame
    frame_header = read(7, True)
  File "amqp/transport.py", line 401, in _read
    s = recv(n - len(rbuf))
@cenkalti cenkalti changed the title How should I handle How should I handle TimeoutError? Mar 8, 2018
@cenkalti
Copy link
Author

cenkalti commented Mar 8, 2018

drain_events function normally raises socket.timeout() when there is no event in timeout period. But sometimes I see the previous traceback in logs. AFAIK Python socket module raises socket.timeout() too when there is no data to be received. I can reproduce that with a simple script.

However, I can't reproduce the case that drain_events throwing TimeoutError.

Does this exception needs to be handled by the library or my application code?

Using Python 3.5 on Linux.

@cenkalti
Copy link
Author

cenkalti commented Mar 8, 2018

After some investigation I found out that TimeoutError is specifically a subclass of OSError representing the case where errno is ETIMEDOUT: https://bugs.python.org/issue21376#msg217402

In Linux, ETIMEDOUT is set by operating system if TCP keep-alive mechanism detects that the connection is dead: http://man7.org/linux/man-pages/man7/tcp.7.html

I have found that there are some default options that is set on the socket about TCP keep-alive feature starting from version 2.1.4: https://amqp.readthedocs.io/en/latest/changelog.html#version-2-1-4

These parameters are defined here in code:

DEFAULT_SOCKET_SETTINGS = {
'TCP_NODELAY': 1,
'TCP_USER_TIMEOUT': 1000,
'TCP_KEEPIDLE': 60,
'TCP_KEEPINTVL': 10,
'TCP_KEEPCNT': 9,
}

I think TCP_USER_TIMEOUT parameter here is set to very low value (1 second). I propose increasing this value to at least 10 seconds. What do you think?

@jenstroeger
Copy link

jenstroeger commented Apr 28, 2018

@cenkalti, I see a similar Timeout problem when using a Celery ResultGroup with several tasks, and then attempting to revoke() the group.

   File "/…/celery/tasks.py", line 386, in revoke_group_tasks
     group_result.revoke()
   File "/…/lib/python3.6/site-packages/celery/result.py", line 589, in revoke
     terminate=terminate, signal=signal, reply=wait)
   File "/…/lib/python3.6/site-packages/celery/app/control.py", line 210, in revoke
     }, **kwargs)
   File "/…/lib/python3.6/site-packages/celery/app/control.py", line 436, in broadcast
     limit, callback, channel=channel,
   File "/…/lib/python3.6/site-packages/kombu/pidbox.py", line 315, in _broadcast
     serializer=serializer)
   File "/…/lib/python3.6/site-packages/kombu/pidbox.py", line 290, in _publish
     serializer=serializer,
   File "/…/lib/python3.6/site-packages/kombu/messaging.py", line 181, in publish
     exchange_name, declare,
   File "/…/lib/python3.6/site-packages/kombu/messaging.py", line 203, in _publish
     mandatory=mandatory, immediate=immediate,
   File "/…/lib/python3.6/site-packages/amqp/channel.py", line 1734, in _basic_publish
     (0, exchange, routing_key, mandatory, immediate), msg
   File "/…/lib/python3.6/site-packages/amqp/abstract_channel.py", line 50, in send_method
     conn.frame_writer(1, self.channel_id, sig, args, content)
   File "/…/lib/python3.6/site-packages/amqp/method_framing.py", line 166, in write_frame
     write(view[:offset])
   File "/…/lib/python3.6/site-packages/amqp/transport.py", line 258, in write
     self._write(s)
TimeoutError: [Errno 110] Connection timed out

Did changing the TCP timeout solve this?

A “workaround” for this problem might be to revoke all tasks of a group one-by-one in a loop, although I have not yet tried that.

@cenkalti
Copy link
Author

@jenstroeger No, changing the TCP_USER_TIMEOUT didn't solve the problem, we keep seeing TimeoutError exceptions.

@jenstroeger
Copy link

@cenkalti, hmmm… ok 🤔I guess I’ll switch to plan B then and revoke all tasks of the group individually, while digging for the root cause of the timeout on our side. Thanks!

@jenstroeger
Copy link

@cenkalti, in my case the TimeoutError seems to be caused by revoke() on a task that has already executed: the group contains six tasks which are scheduled over the period of two days, and revoking that group at any time during these two days attempts to revoke a task that has already executed.

# group_results.revoke()
for result in group_results:
    try:
        _log.info("Revoking %s", result.id)
        result.revoke()
    except Exception as e:
        _log.exception(f"Failed to revoke {result.id}!")

It is the task which has already executed that causes the TimeoutError, and all other ones are being revoked successfully.

@georgepsarakis, is it possible to shed some light on that?

@edvm
Copy link

edvm commented Jul 22, 2020

Here having the same error:

Traceback (most recent call last):
  File "/home/emarcozzi/env/lib64/python3.6/site-packages/kombu/messaging.py", line 181, in publish
    exchange_name, declare,
  File "/home/emarcozzi/env/lib64/python3.6/site-packages/kombu/messaging.py", line 203, in _publish
    mandatory=mandatory, immediate=immediate,
  File "/home/emarcozzi/env/lib64/python3.6/site-packages/amqp/channel.py", line 1766, in _basic_publish
    (0, exchange, routing_key, mandatory, immediate), msg
  File "/home/emarcozzi/env/lib64/python3.6/site-packages/amqp/abstract_channel.py", line 59, in send_method
    conn.frame_writer(1, self.channel_id, sig, args, content)
  File "/home/emarcozzi/env/lib64/python3.6/site-packages/amqp/method_framing.py", line 172, in write_frame
    write(view[:offset])
  File "/home/emarcozzi/env/lib64/python3.6/site-packages/amqp/transport.py", line 305, in write
    self._write(s)
TimeoutError: [Errno 110] Connection timed out

Using amqp==2.6.0, and here is the code that (after pushing aprox 1 millon packages) raise the exception:

def run():
    with Connection(rabbit_url, read_timeout=30000, write_timeout=30000) as conn:
        with open("foo.csv", "r") as fp:
            exchange = Exchange("example-exchange-1", type="direct")
            channel = conn.channel()
            producer = Producer(exchange=exchange, channel=channel, routing_key="test")
            queue = Queue(name="test-1", exchange=exchange, routing_key="test")
            queue.maybe_bind(conn)
            queue.declare()
            for line in fp:
                producer.publish(line, max_retries=3, errback=lambda x: print(x))

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants