Skip to content

Commit

Permalink
update ampq output config (#70)
Browse files Browse the repository at this point in the history
  • Loading branch information
akashg3627 authored Mar 19, 2024
1 parent 36d9521 commit 35b639e
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 32 deletions.
71 changes: 43 additions & 28 deletions async_processor/amqp_pub_sub.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,12 @@
from typing import AsyncIterator, Optional

from aio_pika import Message, connect_robust
from aio_pika.abc import AbstractChannel, AbstractConnection, AbstractQueue
from aio_pika.abc import (
AbstractChannel,
AbstractConnection,
AbstractExchange,
AbstractQueue,
)
from aio_pika.exceptions import ChannelNotFoundEntity, QueueEmpty

from async_processor.logger import logger
Expand Down Expand Up @@ -60,12 +65,17 @@ async def _get_queue(self) -> AbstractQueue:
return self._queue

async def __aexit__(self, exc_type, exc_value, traceback):
if self._ch:
try:
await self._ch.close()
except Exception:
logger.exception("Failed to drain and close AMQP channel")
if not self._nc:
return
try:
await self._nc.close()
except Exception:
logger.exception("Failed to drain and close nats connection")
logger.exception("Failed to drain and close AMQP connection")

@asynccontextmanager
async def get_input_message(
Expand Down Expand Up @@ -104,25 +114,12 @@ async def publish_input_message(
class AMQPOutput(Output):
def __init__(self, config: AMQPOutputConfig):
self._url = config.url
self._queue_name = config.queue_name
self._queue = None
self._exchange_name = config.exchange_name
self._routing_key = config.routing_key
self._exchange = None
self._nc = None
self._ch = None

async def _validate_queue_exists(self):
channel = await self._get_channel()
try:
self._queue = await channel.declare_queue(self._queue_name, passive=True)
except ChannelNotFoundEntity as ex:
raise Exception(
f"Queue {self._queue_name!r} does not exist."
" Please create the queue before running the async processor."
) from ex

async def __aenter__(self):
await self._validate_queue_exists()
return self

async def _get_connect(self) -> AbstractConnection:
if self._nc:
return self._nc
Expand All @@ -136,26 +133,44 @@ async def _get_channel(self) -> AbstractChannel:
self._ch = await connection.channel()
return self._ch

async def _get_queue(self) -> AbstractQueue:
if self._queue:
return self._queue
async def __aenter__(self):
await self._get_exchange()
return self

async def _get_exchange(self) -> AbstractExchange:
if self._exchange:
return self._exchange
channel = await self._get_channel()
self._queue = await channel.declare_queue(self._queue_name, passive=True)
return self._queue
try:
# https://aio-pika.readthedocs.io/en/latest/apidoc.html#aio_pika.Channel.get_exchange
# Keep ensure=True only if exchange_name is provided
self._exchange = await channel.get_exchange(
self._exchange_name, ensure=True if self._exchange_name else False
)
except ChannelNotFoundEntity as ex:
raise Exception(
f"Exchange {self._exchange_name!r} does not exist."
" Please create the exchange before running the async processor."
) from ex
return self._exchange

async def __aexit__(self, exc_type, exc_value, traceback):
if self._ch:
try:
await self._ch.close()
except Exception:
logger.exception("Failed to drain and close AMQP channel")
if not self._nc:
return
try:
await self._nc.close()
except Exception:
logger.exception("Failed to drain and close nats connection")
logger.exception("Failed to drain and close AMQP connection")

async def publish_output_message(
self, serialized_output_message: bytes, request_id: Optional[str]
):
channel = await self._get_channel()
await self._get_queue()
await channel.default_exchange.publish(
Message(body=serialized_output_message), routing_key=self._queue_name
exchange = await self._get_exchange()
await exchange.publish(
Message(body=serialized_output_message), routing_key=self._routing_key
)
4 changes: 2 additions & 2 deletions async_processor/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,8 +308,8 @@ class AMQPOutputConfig(OutputConfig):
url: constr(
regex=r"^(?:amqp|amqps):\/\/(?:([^:/?#\s]+)(?::([^@/?#\s]+))?@)?([^/?#\s]+)(?::(\d+))?\/?([^?#\s]*)?(?:\?(.*))?$"
)
queue_name: str
wait_time_seconds: confloat(ge=1) = 5
routing_key: str
exchange_name: str = ""

def to_output(self) -> Output:
from async_processor.amqp_pub_sub import AMQPOutput
Expand Down
7 changes: 5 additions & 2 deletions examples/amqp.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
input_url = "amqp://guest:guest@localhost:5672/"
input_queue_name = "home1"
output_url = "amqp://guest:guest@localhost:5672/"
output_queue_name = "home2"
exchange_name = ""
routing_key = "home2"


class MultiplicationProcessor(Processor):
Expand All @@ -29,7 +30,9 @@ def process(self, input_message: InputMessage) -> int:
app = MultiplicationProcessor().build_app(
worker_config=WorkerConfig(
input_config=AMQPInputConfig(url=input_url, queue_name=input_queue_name),
output_config=AMQPOutputConfig(url=output_url, queue_name=output_queue_name),
output_config=AMQPOutputConfig(
url=output_url, exchange_name=exchange_name, routing_key=routing_key
),
),
)

Expand Down

0 comments on commit 35b639e

Please sign in to comment.