diff --git a/python/idsse_common/idsse/common/rabbitmq_utils.py b/python/idsse_common/idsse/common/rabbitmq_utils.py index 6a75ccd..e717e03 100644 --- a/python/idsse_common/idsse/common/rabbitmq_utils.py +++ b/python/idsse_common/idsse/common/rabbitmq_utils.py @@ -437,6 +437,7 @@ def __init__( conn_params: Conn, exch_params: Exch, *args, + channel: Channel | None = None, **kwargs, ): super().__init__(*args, **kwargs) @@ -446,8 +447,28 @@ def __init__( self._exch = exch_params self._queue = None - self.connection = BlockingConnection(conn_params.connection_parameters) - self.channel = self.connection.channel() + # establish RabbitMQ connection/channel and initialize exchange (or reuse provided channel) + self.setup(channel if channel is not None else conn_params) + + def setup(self, channel_args: Conn | Channel): + """ + Method that initializes the RabbitMQ connection, channel, exchange, and queue to be used + to publish messages in a threadsafe way. + Can be overridden to customize how Publisher establishes these RMQ resources. + + Args: + rmq_args (Conn | None) + """ + if isinstance(channel_args, Channel): + # reuse the existing RabbitMQ Connection and Channel passed to setup() + self.connection = channel_args.connection + self.channel = channel_args + elif isinstance(channel_args, Conn): + # create new RabbitMQ Connection and Channel using the provided params + self.connection = BlockingConnection(channel_args.connection_parameters) + self.channel = self.connection.channel() + else: + raise ValueError('Publisher expects RabbitMQ params (Conn) or existing Channel to run setup') # if delivery is mandatory there must be a queue attach to the exchange if self._exch.mandatory: @@ -457,7 +478,7 @@ def __init__( exclusive=True, auto_delete=False, arguments={'x-queue-type': 'classic', - 'x-message-ttl': 10 * 1000}) + 'x-message-ttl': 10 * 1000}) _setup_exch_and_queue(self.channel, self._exch, self._queue) else: diff --git a/python/idsse_common/idsse/common/utils.py b/python/idsse_common/idsse/common/utils.py index ea12da6..c35e97d 100644 --- a/python/idsse_common/idsse/common/utils.py +++ b/python/idsse_common/idsse/common/utils.py @@ -117,7 +117,6 @@ def exec_cmd(commands: Sequence[str], timeout: int | None = None) -> Sequence[st def to_iso(date_time: datetime) -> str: """Format a datetime instance to an ISO string""" - logger.debug('Datetime (%s) to iso', datetime) return (f'{date_time.strftime("%Y-%m-%dT%H:%M")}:' f'{(date_time.second + date_time.microsecond / 1e6):06.3f}' 'Z' if date_time.tzname() in [None, str(timezone.utc)] @@ -126,7 +125,6 @@ def to_iso(date_time: datetime) -> str: def to_compact(date_time: datetime) -> str: """Format a datetime instance to a compact string""" - logger.debug('Datetime (%s) to compact -- %s', datetime, __name__) return date_time.strftime('%Y%m%d%H%M%S') diff --git a/python/idsse_common/test/test_rabbitmq_utils.py b/python/idsse_common/test/test_rabbitmq_utils.py index bf8ec32..1f7a2e8 100644 --- a/python/idsse_common/test/test_rabbitmq_utils.py +++ b/python/idsse_common/test/test_rabbitmq_utils.py @@ -17,6 +17,7 @@ from pytest import fixture, raises, MonkeyPatch from pika.adapters import blocking_connection +from pika.channel import Channel from idsse.common.rabbitmq_utils import ( Conn, Exch, Queue, Publisher, RabbitMqParams, subscribe_to_queue @@ -154,9 +155,7 @@ def test_passing_connection_does_not_create_new(mock_connection, monkeypatch): 'idsse.common.rabbitmq_utils.BlockingConnection', mock_blocking_connection ) - new_connection, new_channel = subscribe_to_queue( - CONN, RMQ_PARAMS, mock_callback_function - ) + new_connection, new_channel = subscribe_to_queue(CONN, RMQ_PARAMS, mock_callback_function) mock_connection.assert_not_called() assert new_connection == mock_connection @@ -237,3 +236,17 @@ def test_simple_publisher(monkeypatch: MonkeyPatch, mock_connection: Mock): publisher.stop() assert 'MockChannel.close' in str(mock_threadsafe.call_args[0][1]) + + +def test_simple_publisher_existing_channel( + monkeypatch: MonkeyPatch, mock_connection: Mock, mock_channel: Mock +): + mock_blocking_connection = Mock(return_value=mock_connection) + monkeypatch.setattr('idsse.common.rabbitmq_utils.BlockingConnection', mock_blocking_connection) + mock_channel.__class__ = Channel # make mock look like real pika.Channel + + publisher = Publisher(CONN, RMQ_PARAMS.exchange, channel=mock_channel) + + mock_blocking_connection.assert_not_called() # should not have created new Connection/Channel + assert publisher.channel == mock_channel + assert publisher.connection == mock_channel.connection