Skip to content

Commit

Permalink
Merge pull request #80 from NOAA-GSL/bug/rabbitmq-utils-conn-params
Browse files Browse the repository at this point in the history
bug: pass Conn.connection_parameters to pika.BlockingConnection
  • Loading branch information
Geary-Layne authored Oct 24, 2024
2 parents bf65805 + dfeb46b commit 890bd90
Showing 1 changed file with 7 additions and 7 deletions.
14 changes: 7 additions & 7 deletions python/idsse_common/idsse/common/rabbitmq_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,10 +138,10 @@ def _initialize_connection_and_channel(
# connection of unsupported type passed
raise ValueError(
(f'Cannot use or create new RabbitMQ connection using type {type(connection)}. '
'Should a Conn (a dict with connection parameters)')
'Should be type Conn (a dict with connection parameters)')
)

_connection = BlockingConnection(parameters=connection)
_connection = BlockingConnection(connection.connection_parameters)
logger.info('Established new RabbitMQ connection to %s on port %i',
connection.host, connection.port)

Expand Down Expand Up @@ -352,7 +352,7 @@ class Consumer(Thread):
"""
def __init__(
self,
conn_params: ConnectionParameters,
conn_params: Conn,
rmq_params_and_callbacks: RabbitMqParamsAndCallback | list[RabbitMqParamsAndCallback],
num_message_handlers: int,
*args,
Expand All @@ -366,7 +366,7 @@ def __init__(
self._rmq_params_and_callbacks = rmq_params_and_callbacks
else:
self._rmq_params_and_callbacks = [rmq_params_and_callbacks]
self.connection = BlockingConnection(parameters=self._conn_params)
self.connection = BlockingConnection(self._conn_params.connection_parameters)
self.channel = self.connection.channel()

self._consumer_tags = []
Expand Down Expand Up @@ -420,7 +420,7 @@ class Publisher(Thread):
"""
def __init__(
self,
conn_params: ConnectionParameters,
conn_params: Conn,
exch_params: Exch,
*args,
**kwargs,
Expand All @@ -431,7 +431,7 @@ def __init__(
self._exch = exch_params
self._queue = None

self.connection = BlockingConnection(conn_params)
self.connection = BlockingConnection(conn_params.connection_parameters)
self.channel = self.connection.channel()

# if delivery is mandatory there must be a queue attach to the exchange
Expand Down Expand Up @@ -506,6 +506,7 @@ def stop(self):
self.channel.close,
self.connection.close)

# pylint: disable=too-many-arguments,unused-argument
def _publish(
self,
message: bytes,
Expand Down Expand Up @@ -535,7 +536,6 @@ def _publish(
mandatory=self._exch.mandatory)
if success_flag:
success_flag[0] = True
print('\n message published\n')
if self._queue and self._queue.name.startswith('_'):
try:
self.channel.queue_purge(queue=self._queue.name)
Expand Down

0 comments on commit 890bd90

Please sign in to comment.