Tenacity is an Apache 2.0 licensed general-purpose retrying library, written in Python, to simplify the task of adding retry behavior to just about anything. It originates from a fork of retrying which is sadly no longer maintained. Tenacity isn't api compatible with retrying but adds significant new functionality and fixes a number of longstanding bugs.
The simplest use case is retrying a flaky function whenever an Exception occurs until a value is returned.
.. testcode:: import random from tenacity import retry @retry def do_something_unreliable(): if random.randint(0, 10) > 1: raise IOError("Broken sauce, everything is hosed!!!111one") else: return "Awesome sauce!" print(do_something_unreliable())
.. testoutput:: :hide: Awesome sauce!
.. toctree:: :hidden: :maxdepth: 2 changelog api
- Generic Decorator API
- Specify stop condition (i.e. limit by number of attempts)
- Specify wait condition (i.e. exponential backoff sleeping between attempts)
- Customize retrying on Exceptions
- Customize retrying on expected returned result
- Retry on coroutines
- Retry code block with context manager
To install tenacity, simply:
$ pip install tenacity
.. testsetup:: * import logging from tenacity import * class MyException(Exception): pass
As you saw above, the default behavior is to retry forever without waiting when an exception is raised.
.. testcode:: @retry def never_give_up_never_surrender(): print("Retry forever ignoring Exceptions, don't wait between retries") raise Exception
Let's be a little less persistent and set some boundaries, such as the number of attempts before giving up.
.. testcode:: @retry(stop=stop_after_attempt(7)) def stop_after_7_attempts(): print("Stopping after 7 attempts") raise Exception
We don't have all day, so let's set a boundary for how long we should be retrying stuff.
.. testcode:: @retry(stop=stop_after_delay(10)) def stop_after_10_s(): print("Stopping after 10 seconds") raise Exception
You can combine several stop conditions by using the | operator:
.. testcode:: @retry(stop=(stop_after_delay(10) | stop_after_attempt(5))) def stop_after_10_s_or_5_retries(): print("Stopping after 10 seconds or 5 retries") raise Exception
Most things don't like to be polled as fast as possible, so let's just wait 2 seconds between retries.
.. testcode:: @retry(wait=wait_fixed(2)) def wait_2_s(): print("Wait 2 second between retries") raise Exception
Some things perform best with a bit of randomness injected.
.. testcode:: @retry(wait=wait_random(min=1, max=2)) def wait_random_1_to_2_s(): print("Randomly wait 1 to 2 seconds between retries") raise Exception
Then again, it's hard to beat exponential backoff when retrying distributed services and other remote endpoints.
.. testcode:: @retry(wait=wait_exponential(multiplier=1, min=4, max=10)) def wait_exponential_1(): print("Wait 2^x * 1 second between each retry starting with 4 seconds, then up to 10 seconds, then 10 seconds afterwards") raise Exception
Then again, it's also hard to beat combining fixed waits and jitter (to help avoid thundering herds) when retrying distributed services and other remote endpoints.
.. testcode:: @retry(wait=wait_fixed(3) + wait_random(0, 2)) def wait_fixed_jitter(): print("Wait at least 3 seconds, and add up to 2 seconds of random delay") raise Exception
When multiple processes are in contention for a shared resource, exponentially increasing jitter helps minimise collisions.
.. testcode:: @retry(wait=wait_random_exponential(multiplier=1, max=60)) def wait_exponential_jitter(): print("Randomly wait up to 2^x * 1 seconds between each retry until the range reaches 60 seconds, then randomly up to 60 seconds afterwards") raise Exception
Sometimes it's necessary to build a chain of backoffs.
.. testcode:: @retry(wait=wait_chain(*[wait_fixed(3) for i in range(3)] + [wait_fixed(7) for i in range(2)] + [wait_fixed(9)])) def wait_fixed_chained(): print("Wait 3s for 3 attempts, 7s for the next 2 attempts and 9s for all attempts thereafter") raise Exception
We have a few options for dealing with retries that raise specific or general exceptions, as in the cases here.
.. testcode:: @retry(retry=retry_if_exception_type(IOError)) def might_io_error(): print("Retry forever with no wait if an IOError occurs, raise any other errors") raise Exception
We can also use the result of the function to alter the behavior of retrying.
.. testcode:: def is_none_p(value): """Return True if value is None""" return value is None @retry(retry=retry_if_result(is_none_p)) def might_return_none(): print("Retry with no wait if return value is None")
We can also combine several conditions:
.. testcode:: def is_none_p(value): """Return True if value is None""" return value is None @retry(retry=(retry_if_result(is_none_p) | retry_if_exception_type())) def might_return_none(): print("Retry forever ignoring Exceptions with no wait if return value is None")
Any combination of stop, wait, etc. is also supported to give you the freedom to mix and match.
It's also possible to retry explicitly at any time by raising the TryAgain exception:
.. testcode:: @retry def do_something(): result = something_else() if result == 23: raise TryAgain
While callables that "timeout" retrying raise a RetryError by default, we can reraise the last attempt's exception if needed:
.. testcode:: @retry(reraise=True, stop=stop_after_attempt(3)) def raise_my_exception(): raise MyException("Fail") try: raise_my_exception() except MyException: # timed out retrying pass
It's possible to execute an action before any attempt of calling the function by using the before callback function:
.. testcode:: import logging logging.basicConfig(stream=sys.stderr, level=logging.DEBUG) logger = logging.getLogger(__name__) @retry(stop=stop_after_attempt(3), before=before_log(logger, logging.DEBUG)) def raise_my_exception(): raise MyException("Fail")
In the same spirit, It's possible to execute after a call that failed:
.. testcode:: import logging logging.basicConfig(stream=sys.stderr, level=logging.DEBUG) logger = logging.getLogger(__name__) @retry(stop=stop_after_attempt(3), after=after_log(logger, logging.DEBUG)) def raise_my_exception(): raise MyException("Fail")
It's also possible to only log failures that are going to be retried. Normally
retries happen after a wait interval, so the keyword argument is called
before_sleep
:
.. testcode:: import logging logging.basicConfig(stream=sys.stderr, level=logging.DEBUG) logger = logging.getLogger(__name__) @retry(stop=stop_after_attempt(3), before_sleep=before_sleep_log(logger, logging.DEBUG)) def raise_my_exception(): raise MyException("Fail")
You can access the statistics about the retry made over a function by using the retry attribute attached to the function and its statistics attribute:
.. testcode:: @retry(stop=stop_after_attempt(3)) def raise_my_exception(): raise MyException("Fail") try: raise_my_exception() except Exception: pass print(raise_my_exception.retry.statistics)
.. testoutput:: :hide: ...
You can also define your own callbacks. The callback should accept one
parameter called retry_state
that contains all information about current
retry invocation.
For example, you can call a custom callback function after all retries failed, without raising an exception (or you can re-raise or do anything really)
.. testcode:: def return_last_value(retry_state): """return the result of the last call attempt""" return retry_state.outcome.result() def is_false(value): """Return True if value is False""" return value is False # will return False after trying 3 times to get a different result @retry(stop=stop_after_attempt(3), retry_error_callback=return_last_value, retry=retry_if_result(is_false)) def eventually_return_false(): return False
Note
Calling the parameter retry_state
is important, because this is how
tenacity internally distinguishes callbacks from their :ref:`deprecated
counterparts <deprecated-callbacks>`.
retry_state
argument is an object of RetryCallState class:
.. autoclass:: tenacity.RetryCallState Constant attributes: .. autoinstanceattribute:: start_time(float) :annotation: .. autoinstanceattribute:: retry_object(BaseRetrying) :annotation: .. autoinstanceattribute:: fn(callable) :annotation: .. autoinstanceattribute:: args(tuple) :annotation: .. autoinstanceattribute:: kwargs(dict) :annotation: Variable attributes: .. autoinstanceattribute:: attempt_number(int) :annotation: .. autoinstanceattribute:: outcome(tenacity.Future or None) :annotation: .. autoinstanceattribute:: outcome_timestamp(float or None) :annotation: .. autoinstanceattribute:: idle_for(float) :annotation: .. autoinstanceattribute:: next_action(tenacity.RetryAction or None) :annotation:
It's also possible to define custom callbacks for other keyword arguments.
.. function:: my_stop(retry_state) :param RetryState retry_state: info about current retry invocation :return: whether or not retrying should stop :rtype: bool
.. function:: my_wait(retry_state) :param RetryState retry_state: info about current retry invocation :return: number of seconds to wait before next retry :rtype: float
.. function:: my_retry(retry_state) :param RetryState retry_state: info about current retry invocation :return: whether or not retrying should continue :rtype: bool
.. function:: my_before(retry_state) :param RetryState retry_state: info about current retry invocation
.. function:: my_after(retry_state) :param RetryState retry_state: info about current retry invocation
.. function:: my_before_sleep(retry_state) :param RetryState retry_state: info about current retry invocation
Here's an example with a custom before_sleep
function:
.. testcode:: import logging logging.basicConfig(stream=sys.stderr, level=logging.DEBUG) logger = logging.getLogger(__name__) def my_before_sleep(retry_state): if retry_state.attempt_number < 1: loglevel = logging.INFO else: loglevel = logging.WARNING logger.log( loglevel, 'Retrying %s: attempt %s ended with: %s', retry_state.fn, retry_state.attempt_number, retry_state.outcome) @retry(stop=stop_after_attempt(3), before_sleep=my_before_sleep) def raise_my_exception(): raise MyException("Fail") try: raise_my_exception() except RetryError: pass
Note
It was also possible to define custom callbacks before, but they accepted varying parameter sets and none of those provided full state. The old way is deprecated, but kept for backward compatibility.
.. function:: my_deprecated_stop(previous_attempt_number, delay_since_first_attempt) *deprecated* :param previous_attempt_number: the number of current attempt :type previous_attempt_number: int :param delay_since_first_attempt: interval in seconds between the beginning of first attempt and current time :type delay_since_first_attempt: float :rtype: bool
.. function:: my_deprecated_wait(previous_attempt_number, delay_since_first_attempt [, last_result]) *deprecated* :param previous_attempt_number: the number of current attempt :type previous_attempt_number: int :param delay_since_first_attempt: interval in seconds between the beginning of first attempt and current time :type delay_since_first_attempt: float :param tenacity.Future last_result: current outcome :return: number of seconds to wait before next retry :rtype: float
.. function:: my_deprecated_retry(attempt) *deprecated* :param tenacity.Future attempt: current outcome :return: whether or not retrying should continue :rtype: bool
.. function:: my_deprecated_before(func, trial_number) *deprecated* :param callable func: function whose outcome is to be retried :param int trial_number: the number of current attempt
.. function:: my_deprecated_after(func, trial_number, trial_time_taken) *deprecated* :param callable func: function whose outcome is to be retried :param int trial_number: the number of current attempt :param float trial_time_taken: interval in seconds between the beginning of first attempt and current time
.. function:: my_deprecated_before_sleep(func, sleep, last_result) *deprecated* :param callable func: function whose outcome is to be retried :param float sleep: number of seconds to wait until next retry :param tenacity.Future last_result: current outcome
.. testcode:: import logging logging.basicConfig(stream=sys.stderr, level=logging.DEBUG) logger = logging.getLogger(__name__) def my_before_sleep(retry_object, sleep, last_result): logger.warning( 'Retrying %s: last_result=%s, retrying in %s seconds...', retry_object.fn, last_result, sleep) @retry(stop=stop_after_attempt(3), before_sleep=my_before_sleep) def raise_my_exception(): raise MyException("Fail") try: raise_my_exception() except RetryError: pass
You can change the arguments of a retry decorator as needed when calling it by using the retry_with function attached to the wrapped function:
.. testcode:: @retry(stop=stop_after_attempt(3)) def raise_my_exception(): raise MyException("Fail") try: raise_my_exception.retry_with(stop=stop_after_attempt(4))() except Exception: pass print(raise_my_exception.retry.statistics)
.. testoutput:: :hide: ...
If you want to use variables to set up the retry parameters, you don't have to use the retry decorator - you can instead use Retrying directly:
.. testcode:: def never_good_enough(arg1): raise Exception('Invalid argument: {}'.format(arg1)) def try_never_good_enough(max_attempts=3): retryer = Retrying(stop=stop_after_attempt(max_attempts), reraise=True) retryer(never_good_enough, 'I really do try')
Tenacity allows you to retry a code block without the need to wraps it in an isolated function. This makes it easy to isolate failing block while sharing context. The trick is to combine a for loop and a context manager.
.. testcode:: from tenacity import Retrying, RetryError, stop_after_attempt try: for attempt in Retrying(stop=stop_after_attempt(3)): with attempt: raise Exception('My code is failing!') except RetryError: pass
You can configure every details of retry policy by configuring the Retrying object.
With async code you can use AsyncRetrying.
.. testcode:: from tenacity import AsyncRetrying, RetryError, stop_after_attempt async def function(): try: async for attempt in AsyncRetrying(stop=stop_after_attempt(3)): with attempt: raise Exception('My code is failing!') except RetryError: pass
Finally, retry
works also on asyncio and Tornado (>= 4.5) coroutines.
Sleeps are done asynchronously too.
@retry
async def my_async_function(loop):
await loop.getaddrinfo('8.8.8.8', 53)
@retry
@tornado.gen.coroutine
def my_async_function(http_client, url):
yield http_client.fetch(url)
You can even use alternative event loops such as curio or Trio by passing the correct sleep function:
@retry(sleep=trio.sleep)
async def my_async_function(loop):
await asks.get('https://example.org')
- Check for open issues or open a fresh issue to start a discussion around a feature idea or a bug.
- Fork the repository on GitHub to start making your changes to the master branch (or branch off of it).
- Write a test which shows that the bug was fixed or that the feature works as expected.
- Add a changelog
- Make the docs better (or more detailed, or more easier to read, or ...)
reno is used for managing changelogs. Take a look at their usage docs.
The doc generation will automatically compile the changelogs. You just need to add them.
# Opens a template file in an editor
tox -e reno -- new some-slug-for-my-change --edit