Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: AMQP module #68

Merged
merged 8 commits into from
Mar 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions async_processor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
from async_processor.function_service import FunctionAsyncExecutor
from async_processor.processor import AsyncProcessor, Processor
from async_processor.types import (
AMQPInputConfig,
AMQPOutputConfig,
AWSAccessKeyAuth,
CoreNATSOutputConfig,
Input,
Expand Down Expand Up @@ -31,6 +33,8 @@
"NATSOutputConfig",
"OutputMessage",
"WorkerConfig",
"AMQPInputConfig",
"AMQPOutputConfig",
"ProcessStatus",
"SQSInputConfig",
"SQSOutputConfig",
Expand Down
161 changes: 161 additions & 0 deletions async_processor/amqp_pub_sub.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
from contextlib import asynccontextmanager
from typing import AsyncIterator, Optional

from aio_pika import Message, connect_robust
from aio_pika.abc import AbstractChannel, AbstractConnection, AbstractQueue
from aio_pika.exceptions import ChannelNotFoundEntity, QueueEmpty

from async_processor.logger import logger
from async_processor.types import (
AMQPInputConfig,
AMQPOutputConfig,
Input,
InputFetchAckFailure,
InputMessageFetchFailure,
Output,
)


class AMQPInput(Input):
def __init__(self, config: AMQPInputConfig):
self._url = config.url
self._queue_name = config.queue_name
self._wait_time_seconds = config.wait_time_seconds
self._nc = None
self._ch = None
self._queue = None

async def _validate_queue_exists(self):
channel = await self._get_channel()
try:
self._queue = await channel.declare_queue(self._queue_name, passive=True)
except ChannelNotFoundEntity as ex:
raise Exception(
f"Queue {self._queue_name!r} does not exist."
" Please create the queue before running the async processor."
) from ex

async def __aenter__(self):
await self._validate_queue_exists()
return self

async def _get_connect(self) -> AbstractConnection:
if self._nc:
return self._nc
self._nc = await connect_robust(self._url)
return self._nc

async def _get_channel(self) -> AbstractChannel:
if self._ch:
return self._ch
connection = await self._get_connect()
self._ch = await connection.channel()
return self._ch

async def _get_queue(self) -> AbstractQueue:
if self._queue:
return self._queue
channel = await self._get_channel()
self._queue = await channel.declare_queue(self._queue_name, passive=True)
return self._queue

async def __aexit__(self, exc_type, exc_value, traceback):
if not self._nc:
return
try:
await self._nc.close()
except Exception:
logger.exception("Failed to drain and close nats connection")

@asynccontextmanager
async def get_input_message(
self,
) -> AsyncIterator[Optional[bytes]]:
message = None
queue = await self._get_queue()
try:
message = await queue.get(fail=False, timeout=self._wait_time_seconds)
bradleytrf marked this conversation as resolved.
Show resolved Hide resolved
except QueueEmpty:
logger.debug("No message in queue")
except Exception as ex:
raise InputMessageFetchFailure(f"Error fetch input message: {ex}") from ex
if not message:
yield None
return
try:
yield message.body
finally:
try:
await message.ack()
except Exception as ex:
raise InputFetchAckFailure(
f"Error publishing input message: {ex}"
) from ex

async def publish_input_message(
self, serialized_input_message: bytes, request_id: str
):
channel = await self._get_channel()
await channel.default_exchange.publish(
Message(body=serialized_input_message), routing_key=self._queue_name
)
Comment on lines +99 to +101
Copy link
Member

@chiragjn chiragjn Mar 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exchange name and routing key should be configurable
Maybe not today but some day in future?

Same reason as this: #68 (comment)
We don't need the queue name directly to publish

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another question - Is it always guaranteed that a routing key = X always delivers to a queue named X?



class AMQPOutput(Output):
def __init__(self, config: AMQPOutputConfig):
self._url = config.url
self._queue_name = config.queue_name
self._queue = None
self._nc = None
self._ch = None

async def _validate_queue_exists(self):
channel = await self._get_channel()
try:
self._queue = await channel.declare_queue(self._queue_name, passive=True)
except ChannelNotFoundEntity as ex:
raise Exception(
f"Queue {self._queue_name!r} does not exist."
" Please create the queue before running the async processor."
) from ex

async def __aenter__(self):
await self._validate_queue_exists()
return self

async def _get_connect(self) -> AbstractConnection:
if self._nc:
return self._nc
self._nc = await connect_robust(self._url)
return self._nc

async def _get_channel(self) -> AbstractChannel:
if self._ch:
return self._ch
connection = await self._get_connect()
self._ch = await connection.channel()
return self._ch

async def _get_queue(self) -> AbstractQueue:
if self._queue:
return self._queue
channel = await self._get_channel()
self._queue = await channel.declare_queue(self._queue_name, passive=True)
return self._queue

async def __aexit__(self, exc_type, exc_value, traceback):
if not self._nc:
return
try:
await self._nc.close()
except Exception:
logger.exception("Failed to drain and close nats connection")

async def publish_output_message(
self, serialized_output_message: bytes, request_id: Optional[str]
):
channel = await self._get_channel()
await self._get_queue()
bradleytrf marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this needed?

await channel.default_exchange.publish(
Message(body=serialized_output_message), routing_key=self._queue_name
Comment on lines +159 to +160
Copy link
Member

@chiragjn chiragjn Mar 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see we have restricted ourselves to default exchange and re-using queue name as routing key
Shouldn't the exchange name and routing key be distinct inputs?
Maybe not today but some day in future?


afaik, for publishing we don't need queue name, just the exchange + routing key
What queue is bound to what exchange and routing key is not needed by us when publishing
See: https://aio-pika.readthedocs.io/en/latest/rabbitmq-tutorial/4-routing.html#bindings

)
29 changes: 29 additions & 0 deletions async_processor/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,20 @@ def to_input(self) -> Input:
return SQSInput(self)


class AMQPInputConfig(InputConfig):
type: constr(regex=r"^amqp$") = "amqp"
url: constr(
regex=r"^(?:amqp|amqps):\/\/(?:([^:/?#\s]+)(?::([^@/?#\s]+))?@)?([^/?#\s]+)(?::(\d+))?\/?([^?#\s]*)?(?:\?(.*))?$"
)
queue_name: str
wait_time_seconds: conint(ge=1, le=20) = 5

def to_input(self) -> Input:
from async_processor.amqp_pub_sub import AMQPInput

return AMQPInput(self)


class NATSInputConfig(InputConfig):
type: constr(regex=r"^nats$") = "nats"

Expand Down Expand Up @@ -288,6 +302,21 @@ def to_output(self) -> Output:
return SQSOutput(self)


class AMQPOutputConfig(OutputConfig):
type: constr(regex=r"^amqp$") = "amqp"

url: constr(
regex=r"^(?:amqp|amqps):\/\/(?:([^:/?#\s]+)(?::([^@/?#\s]+))?@)?([^/?#\s]+)(?::(\d+))?\/?([^?#\s]*)?(?:\?(.*))?$"
)
queue_name: str
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally this should be

Suggested change
queue_name: str
exchange_name: str
routing_key: str

wait_time_seconds: confloat(ge=1) = 5

def to_output(self) -> Output:
from async_processor.amqp_pub_sub import AMQPOutput

return AMQPOutput(self)


class NATSOutputConfig(OutputConfig):
type: constr(regex=r"^nats$") = "nats"

Expand Down
63 changes: 63 additions & 0 deletions examples/amqp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import asyncio
import json
import random
import uuid

import aio_pika

from async_processor import (
AMQPInputConfig,
AMQPOutputConfig,
InputMessage,
Processor,
WorkerConfig,
)

# change this config
input_url = "amqp://guest:guest@localhost:5672/"
input_queue_name = "home1"
output_url = "amqp://guest:guest@localhost:5672/"
output_queue_name = "home2"


class MultiplicationProcessor(Processor):
def process(self, input_message: InputMessage) -> int:
body = input_message.body
return body["x"] * body["y"]


app = MultiplicationProcessor().build_app(
worker_config=WorkerConfig(
input_config=AMQPInputConfig(url=input_url, queue_name=input_queue_name),
output_config=AMQPOutputConfig(url=output_url, queue_name=output_queue_name),
),
)


async def send_request(url: str, routing_key: str):
connection = await aio_pika.connect_robust(url)

async with connection:
request_id = str(uuid.uuid4())

channel = await connection.channel()

payload = json.dumps(
{
"request_id": request_id,
"body": {"x": random.randint(1, 100), "y": random.randint(1, 100)},
}
)
print(payload)
await channel.default_exchange.publish(
aio_pika.Message(payload.encode()), routing_key=routing_key
)


async def test():
for _ in range(100):
await send_request(url=input_url, routing_key=input_queue_name)


if __name__ == "__main__":
asyncio.run(test())
17 changes: 17 additions & 0 deletions examples/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#!/bin/bash

module_name="$1"

if [ -z "$module_name" ]; then
echo "Usage: $0 <module_name>"
echo "Example: $0 [amqp]"
exit 1
fi

if [ -f "${module_name}" ]; then
module_name="${module_name%.py}"
else
module_name="$module_name"
fi

gunicorn "$module_name:app" --workers 1 --worker-class uvicorn.workers.UvicornWorker --bind 127.0.0.1:8000
17 changes: 17 additions & 0 deletions examples/test.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#!/bin/bash

module_name="$1"

if [ -z "$module_name" ]; then
echo "Usage: $0 <module_name>"
echo "Example: $0 [amqp]"
exit 1
fi

if [ -f "${module_name}" ]; then
module_name="${module_name}"
else
module_name="$module_name.py"
fi

python $module_name
Loading
Loading