Skip to content

Commit

Permalink
Rpc: revert to protected vars/functions, ignore pylint warnings for now
Browse files Browse the repository at this point in the history
  • Loading branch information
mackenzie-grimes-noaa committed Feb 20, 2025
1 parent d647b50 commit 9def360
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 10 deletions.
16 changes: 8 additions & 8 deletions python/idsse_common/idsse/common/rabbitmq_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -332,16 +332,16 @@ def __init__(self, conn_params: Conn, exch: Exch, timeout: float | None = None):
self.pending_requests: dict[str, Future] = {}

# Start long-running thread to consume any messages from response queue
self.consumer = Consumer(
self._consumer = Consumer(
conn_params,
RabbitMqParamsAndCallback(RabbitMqParams(Exch('', 'direct'), self._queue),
self.on_response)
self._on_response)
)

@property
def is_open(self) -> bool:
"""Returns True if RabbitMQ connection (Publisher) is open and ready to send messages"""
return self.consumer.is_alive() and self.consumer.channel.is_open
return self._consumer.is_alive() and self._consumer.channel.is_open

def send_request(self, request_body: str | bytes) -> RabbitMqMessage | None:
"""Send message to remote RabbitMQ service using thread-safe RPC. Will block until response
Expand All @@ -368,7 +368,7 @@ def send_request(self, request_body: str | bytes) -> RabbitMqMessage | None:
self.pending_requests[request_id] = request_future

logger.debug('Publishing request message to external service with body: %s', request_body)
_blocking_publish(self.consumer.channel,
_blocking_publish(self._consumer.channel,
self._exch,
RabbitMqMessage(request_body, properties, self._exch.route_key),
self._queue)
Expand All @@ -392,7 +392,7 @@ def start(self):
not required to use the client. It will automatically call this internally as needed."""
if not self.is_open:
logger.debug('Starting RPC thread to send and consume messages')
self.consumer.start()
self._consumer.start()

def stop(self):
"""Unsubscribe to Direct Reply-To queue and cleanup thread"""
Expand All @@ -402,10 +402,10 @@ def stop(self):
return

# tell Consumer cleanup RabbitMQ resources and wait for thread to terminate
self.consumer.stop()
self.consumer.join()
self._consumer.stop()
self._consumer.join()

def on_response(
def _on_response(
self,
channel: Channel,
method: Basic.Deliver,
Expand Down
6 changes: 4 additions & 2 deletions python/idsse_common/test/test_rabbitmq_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,8 @@ def mock_blocking_publish(*_args, **_kwargs):
method = Method('', 123)
props = BasicProperties(content_type='application/json', headers={'rpc': EXAMPLE_UUID})
body = bytes(json.dumps(example_message), encoding='utf-8')
rpc_thread.on_response(mock_channel, method, props, body)
# pylint: disable=protected-access
rpc_thread._on_response(mock_channel, method, props, body)

monkeypatch.setattr('idsse.common.rabbitmq_utils._blocking_publish',
Mock(side_effect=mock_blocking_publish))
Expand Down Expand Up @@ -350,7 +351,8 @@ def test_nacks_unrecognized_response(rpc_thread: Rpc,
props = BasicProperties(content_type='application/json', headers={'rpc': 'unknown_id'})
body = bytes(json.dumps({'data': 123}), encoding='utf-8')

rpc_thread.on_response(mock_channel, Method(delivery_tag=delivery_tag), props, body)
# pylint: disable=protected-access
rpc_thread._on_response(mock_channel, Method(delivery_tag=delivery_tag), props, body)

# unregistered message was nacked
mock_channel.basic_nack.assert_called_with(delivery_tag=delivery_tag, requeue=False)
Expand Down

0 comments on commit 9def360

Please sign in to comment.