From 1f7b38d6252a55364cbf92fdc6574fbf405cae82 Mon Sep 17 00:00:00 2001 From: Geary-Layne Date: Thu, 5 Sep 2024 09:14:46 -0600 Subject: [PATCH 1/3] added thread safe callback --- .../idsse/common/publish_confirm.py | 10 ++-- .../idsse/common/rabbitmq_utils.py | 49 ++++++++++++++++++- 2 files changed, 53 insertions(+), 6 deletions(-) diff --git a/python/idsse_common/idsse/common/publish_confirm.py b/python/idsse_common/idsse/common/publish_confirm.py index 85645f2..215b694 100644 --- a/python/idsse_common/idsse/common/publish_confirm.py +++ b/python/idsse_common/idsse/common/publish_confirm.py @@ -121,7 +121,7 @@ def publish_message(self, is_channel_ready = self._wait_for_channel_to_be_ready() if not is_channel_ready: - logger.error('RabbitMQ channel not established for some reason. Cannnot publish') + logger.error('RabbitMQ channel not established for some reason. Cannot publish') return False logger.debug('DEBUG: channel is ready to publish message') @@ -133,7 +133,7 @@ def publish_message(self, except (AMQPChannelError, AMQPConnectionError) as exc: # something wrong with RabbitMQ connection; destroy and recreate the daemon Thread logger.warning('Publish message problem, restarting thread to re-attempt: (%s) %s', - type(exc), str(exc)) + type(exc), str(exc)) # create new Thread, abandoning old one (it will shut itself down) self._create_thread() @@ -344,7 +344,7 @@ def _on_channel_open(self, channel: Channel): self._channel.add_on_close_callback(self._on_channel_closed) # Declare exchange on our new channel - exch_name, exch_type, exch_durable = self._rmq_params.exchange # pylint: disable=unused-variable + exch_name, exch_type, _ = self._rmq_params.exchange # pylint: disable=unused-variable logger.debug('Declaring exchange %s', exch_name) # Note: using functools.partial is not required, it is demonstrating @@ -352,8 +352,8 @@ def _on_channel_open(self, channel: Channel): cb = functools.partial(self._on_exchange_declareok, userdata=exch_name) try: self._channel.exchange_declare(exchange=exch_name, - exchange_type=exch_type, - callback=cb) + exchange_type=exch_type, + callback=cb) except ValueError as exc: logger.warning('RabbitMQ failed to declare exchange: (%s) %s', type(exc), str(exc)) if self._is_ready_future: diff --git a/python/idsse_common/idsse/common/rabbitmq_utils.py b/python/idsse_common/idsse/common/rabbitmq_utils.py index bf3cb08..125d555 100644 --- a/python/idsse_common/idsse/common/rabbitmq_utils.py +++ b/python/idsse_common/idsse/common/rabbitmq_utils.py @@ -21,7 +21,7 @@ from pika.adapters import BlockingConnection from pika.adapters.blocking_connection import BlockingChannel from pika.channel import Channel -from pika.exceptions import AMQPConnectionError +from pika.exceptions import AMQPConnectionError, ChannelClosed from pika.frame import Method from pika.spec import Basic @@ -198,6 +198,53 @@ def subscribe_to_queue( return _connection, _channel +def threadsafe_call(connection, channel, *partial_functions): + """This function provides a thread safe way to call pika functions (or functions that call + pika functions) from a thread other than the main. The need for this utility is practice of + executing function/method and separate thread to avoid blocking the rabbitMQ heartbeat + messages send by pika from the main thread. + + Note: that `channel` must be the same pika channel instance via which + the message being ACKed was retrieved (AMQP protocol constraint). + + Examples: + # Simple ack a message + threadsafe_call(self.connection, self.channel, + partial(self.channel.basic_ack, + delivery_tag=delivery_tag)) + + # RPC response followed and nack without requeueing + response = {'Error': 'Invalid request'} + threadsafe_call(self.connection, self.channel, + partial(self.channel.basic_publish, + exchange='', + routing_key=response_props.reply_to, + properties=response_props, + body=json.dumps(response)), + partial(channel.basic_nack, + delivery_tag=delivery_tag, + requeue=False)) + + # Publishing message via the PublishConfirm utility + threadsafe_call(self.connection, self.pub_conf.channel, + partial(self.pub_conf.publish_message, + message=message)) + Args: + connection (BlockingConnection): RabbitMQ connection. + channel (BlockingChannel): RabbitMQ channel. + partial_functions (Callable): One or more callable function (typically created via + functools.partial) + """ + def call_if_channel_is_open(): + if channel.is_open: + for func in partial_functions: + func() + else: + logger.error('Channel closed before callback could be run') + raise ChannelClosed('Channel closed') + connection.add_callback_threadsafe(call_if_channel_is_open) + + class PublisherSync: """ Uses a synchronous, blocking RabbitMQ connection to publish messages (no thread safety From 9d73dcb5f694bfa43fee4d7cf0f9bf9de0b375b5 Mon Sep 17 00:00:00 2001 From: Geary Layne <77741618+Geary-Layne@users.noreply.github.com> Date: Thu, 5 Sep 2024 09:34:16 -0600 Subject: [PATCH 2/3] Update publish_confirm.py --- python/idsse_common/idsse/common/publish_confirm.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/idsse_common/idsse/common/publish_confirm.py b/python/idsse_common/idsse/common/publish_confirm.py index 215b694..fc28dd1 100644 --- a/python/idsse_common/idsse/common/publish_confirm.py +++ b/python/idsse_common/idsse/common/publish_confirm.py @@ -344,7 +344,7 @@ def _on_channel_open(self, channel: Channel): self._channel.add_on_close_callback(self._on_channel_closed) # Declare exchange on our new channel - exch_name, exch_type, _ = self._rmq_params.exchange # pylint: disable=unused-variable + exch_name, exch_type, _ = self._rmq_params.exchange logger.debug('Declaring exchange %s', exch_name) # Note: using functools.partial is not required, it is demonstrating From ab850a8240e223b13fbb4670eea66a17f2bac0db Mon Sep 17 00:00:00 2001 From: Geary-Layne Date: Thu, 5 Sep 2024 09:57:08 -0600 Subject: [PATCH 3/3] switch to ConnectionError exception --- python/idsse_common/idsse/common/rabbitmq_utils.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/idsse_common/idsse/common/rabbitmq_utils.py b/python/idsse_common/idsse/common/rabbitmq_utils.py index 125d555..7f6cd10 100644 --- a/python/idsse_common/idsse/common/rabbitmq_utils.py +++ b/python/idsse_common/idsse/common/rabbitmq_utils.py @@ -21,7 +21,7 @@ from pika.adapters import BlockingConnection from pika.adapters.blocking_connection import BlockingChannel from pika.channel import Channel -from pika.exceptions import AMQPConnectionError, ChannelClosed +from pika.exceptions import AMQPConnectionError from pika.frame import Method from pika.spec import Basic @@ -241,7 +241,7 @@ def call_if_channel_is_open(): func() else: logger.error('Channel closed before callback could be run') - raise ChannelClosed('Channel closed') + raise ConnectionError('RabbitMQ Channel is closed') connection.add_callback_threadsafe(call_if_channel_is_open)