diff --git a/python/idsse_common/idsse/common/rabbitmq_utils.py b/python/idsse_common/idsse/common/rabbitmq_utils.py index 589723c6..a53ea79e 100644 --- a/python/idsse_common/idsse/common/rabbitmq_utils.py +++ b/python/idsse_common/idsse/common/rabbitmq_utils.py @@ -452,7 +452,8 @@ def __init__( def run(self): logger.info('Starting publisher') while self._is_running: - self.connection.process_data_events(time_limit=1) + if self.connection and self.connection.is_open: + self.connection.process_data_events(time_limit=1) def publish(self, message: bytes, properties: BasicProperties = None, route_key: str = None): """ @@ -500,8 +501,8 @@ def stop(self): logger.info("Stopping publisher") self._is_running = False # Wait until all the data events have been processed - self.connection.process_data_events(time_limit=1) - if self.connection.is_open: + if self.connection and self.connection.is_open: + self.connection.process_data_events(time_limit=1) threadsafe_call(self.channel, self.channel.close, self.connection.close) @@ -533,7 +534,7 @@ def _publish( route_key = self._exch.route_key self.channel.basic_publish(self._exch.name, - routing_key=route_key, + route_key if route_key else self._exch.route_key, body=message, properties=properties, mandatory=self._exch.mandatory)