Skip to content

Commit

Permalink
Apply retry policy to maybe_declare(). (#2174)
Browse files Browse the repository at this point in the history
  • Loading branch information
Omer Katz authored Oct 24, 2024
1 parent c48be88 commit 1686c35
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 7 deletions.
4 changes: 2 additions & 2 deletions kombu/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ def _ensure_channel_is_bound(entity, channel):
raise ChannelError(
f"Cannot bind channel {channel} to entity {entity}")
entity = entity.bind(channel)
return entity
return entity


def _maybe_declare(entity, channel):
Expand Down Expand Up @@ -159,7 +159,7 @@ def _maybe_declare(entity, channel):


def _imaybe_declare(entity, channel, **retry_policy):
_ensure_channel_is_bound(entity, channel)
entity = _ensure_channel_is_bound(entity, channel)

if not entity.channel.connection:
raise RecoverableConnectionError('channel disconnected')
Expand Down
9 changes: 6 additions & 3 deletions kombu/messaging.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,20 +187,23 @@ def publish(self, body, routing_key=None, delivery_mode=None,
return _publish(
body, priority, content_type, content_encoding,
headers, properties, routing_key, mandatory, immediate,
exchange_name, declare, timeout
exchange_name, declare, timeout, retry, retry_policy
)

def _publish(self, body, priority, content_type, content_encoding,
headers, properties, routing_key, mandatory,
immediate, exchange, declare, timeout=None):
immediate, exchange, declare, timeout=None,
retry=False, retry_policy=None):
retry_policy = {} if retry_policy is None else retry_policy
channel = self.channel
message = channel.prepare_message(
body, priority, content_type,
content_encoding, headers, properties,
)
if declare:
maybe_declare = self.maybe_declare
[maybe_declare(entity) for entity in declare]
for entity in declare:
maybe_declare(entity, retry=retry, **retry_policy)

# handle autogenerated queue names for reply_to
reply_to = properties.get('reply_to')
Expand Down
32 changes: 30 additions & 2 deletions t/unit/test_messaging.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import pickle
import sys
from collections import defaultdict
from unittest.mock import Mock, patch
from unittest.mock import ANY, Mock, patch

import pytest

Expand Down Expand Up @@ -170,6 +170,34 @@ def test_publish_with_timeout_and_retry_policy(self):
timeout = p._channel.basic_publish.call_args[1]['timeout']
assert timeout == 1

@patch('kombu.messaging.maybe_declare')
def test_publish_maybe_declare_with_retry_policy(self, maybe_declare):
p = self.connection.Producer(exchange=Exchange('foo'))
p.channel = Mock()
expected_retry_policy = {
"max_retries": 20,
"interval_start": 1,
"interval_step": 2,
"interval_max": 30,
"retry_errors": (OperationalError,)
}
p.publish('test_maybe_declare', exchange=Exchange('foo'), retry=True, retry_policy=expected_retry_policy)
maybe_declare.assert_called_once_with(ANY, ANY, True, **expected_retry_policy)

@patch('kombu.common._imaybe_declare')
def test_publish_maybe_declare_with_retry_policy_ensure_connection(self, _imaybe_declare):
p = self.connection.Producer(exchange=Exchange('foo'))
p.channel = Mock()
expected_retry_policy = {
"max_retries": 20,
"interval_start": 1,
"interval_step": 2,
"interval_max": 30,
"retry_errors": (OperationalError,)
}
p.publish('test_maybe_declare', exchange=Exchange('foo'), retry=True, retry_policy=expected_retry_policy)
_imaybe_declare.assert_called_once_with(ANY, ANY, **expected_retry_policy)

def test_publish_with_reply_to(self):
p = self.connection.Producer()
p.channel = Mock()
Expand Down Expand Up @@ -200,7 +228,7 @@ def test_publish_retry_with_declare(self):
p.connection.ensure = Mock()
ex = Exchange('foo')
p._publish('hello', 0, '', '', {}, {}, 'rk', 0, 0, ex, declare=[ex])
p.maybe_declare.assert_called_with(ex)
p.maybe_declare.assert_called_with(ex, retry=False)

def test_revive_when_channel_is_connection(self):
p = self.connection.Producer()
Expand Down

0 comments on commit 1686c35

Please sign in to comment.