Skip to content

Commit

Permalink
add optional channel arg to Publisher to reuse existing RMQ Channel
Browse files Browse the repository at this point in the history
  • Loading branch information
mackenzie-grimes-noaa committed Nov 18, 2024
1 parent f0ffa71 commit 429ebab
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 8 deletions.
27 changes: 24 additions & 3 deletions python/idsse_common/idsse/common/rabbitmq_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,7 @@ def __init__(
conn_params: Conn,
exch_params: Exch,
*args,
channel: Channel | None = None,
**kwargs,
):
super().__init__(*args, **kwargs)
Expand All @@ -446,8 +447,28 @@ def __init__(
self._exch = exch_params
self._queue = None

self.connection = BlockingConnection(conn_params.connection_parameters)
self.channel = self.connection.channel()
# establish RabbitMQ connection/channel and initialize exchange (or reuse provided channel)
self.setup(channel if channel is not None else conn_params)

def setup(self, channel_args: Conn | Channel):
"""
Method that initializes the RabbitMQ connection, channel, exchange, and queue to be used
to publish messages in a threadsafe way.
Can be overridden to customize how Publisher establishes these RMQ resources.
Args:
rmq_args (Conn | None)
"""
if isinstance(channel_args, Channel):
# reuse the existing RabbitMQ Connection and Channel passed to setup()
self.connection = channel_args.connection
self.channel = channel_args
elif isinstance(channel_args, Conn):
# create new RabbitMQ Connection and Channel using the provided params
self.connection = BlockingConnection(channel_args.connection_parameters)
self.channel = self.connection.channel()
else:
raise ValueError('Publisher expects RabbitMQ params (Conn) or existing Channel to run setup')

# if delivery is mandatory there must be a queue attach to the exchange
if self._exch.mandatory:
Expand All @@ -457,7 +478,7 @@ def __init__(
exclusive=True,
auto_delete=False,
arguments={'x-queue-type': 'classic',
'x-message-ttl': 10 * 1000})
'x-message-ttl': 10 * 1000})

_setup_exch_and_queue(self.channel, self._exch, self._queue)
else:
Expand Down
2 changes: 0 additions & 2 deletions python/idsse_common/idsse/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ def exec_cmd(commands: Sequence[str], timeout: int | None = None) -> Sequence[st

def to_iso(date_time: datetime) -> str:
"""Format a datetime instance to an ISO string"""
logger.debug('Datetime (%s) to iso', datetime)
return (f'{date_time.strftime("%Y-%m-%dT%H:%M")}:'
f'{(date_time.second + date_time.microsecond / 1e6):06.3f}'
'Z' if date_time.tzname() in [None, str(timezone.utc)]
Expand All @@ -126,7 +125,6 @@ def to_iso(date_time: datetime) -> str:

def to_compact(date_time: datetime) -> str:
"""Format a datetime instance to a compact string"""
logger.debug('Datetime (%s) to compact -- %s', datetime, __name__)
return date_time.strftime('%Y%m%d%H%M%S')


Expand Down
19 changes: 16 additions & 3 deletions python/idsse_common/test/test_rabbitmq_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

from pytest import fixture, raises, MonkeyPatch
from pika.adapters import blocking_connection
from pika.channel import Channel

from idsse.common.rabbitmq_utils import (
Conn, Exch, Queue, Publisher, RabbitMqParams, subscribe_to_queue
Expand Down Expand Up @@ -154,9 +155,7 @@ def test_passing_connection_does_not_create_new(mock_connection, monkeypatch):
'idsse.common.rabbitmq_utils.BlockingConnection', mock_blocking_connection
)

new_connection, new_channel = subscribe_to_queue(
CONN, RMQ_PARAMS, mock_callback_function
)
new_connection, new_channel = subscribe_to_queue(CONN, RMQ_PARAMS, mock_callback_function)

mock_connection.assert_not_called()
assert new_connection == mock_connection
Expand Down Expand Up @@ -237,3 +236,17 @@ def test_simple_publisher(monkeypatch: MonkeyPatch, mock_connection: Mock):

publisher.stop()
assert 'MockChannel.close' in str(mock_threadsafe.call_args[0][1])


def test_simple_publisher_existing_channel(
monkeypatch: MonkeyPatch, mock_connection: Mock, mock_channel: Mock
):
mock_blocking_connection = Mock(return_value=mock_connection)
monkeypatch.setattr('idsse.common.rabbitmq_utils.BlockingConnection', mock_blocking_connection)
mock_channel.__class__ = Channel # make mock look like real pika.Channel

publisher = Publisher(CONN, RMQ_PARAMS.exchange, channel=mock_channel)

mock_blocking_connection.assert_not_called() # should not have created new Connection/Channel
assert publisher.channel == mock_channel
assert publisher.connection == mock_channel.connection

0 comments on commit 429ebab

Please sign in to comment.