Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: AMQP module #68

Merged
merged 8 commits into from
Mar 18, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions async_processor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
from async_processor.function_service import FunctionAsyncExecutor
from async_processor.processor import AsyncProcessor, Processor
from async_processor.types import (
AMQPInputConfig,
AMQPOutputConfig,
AWSAccessKeyAuth,
CoreNATSOutputConfig,
Input,
Expand Down Expand Up @@ -31,6 +33,8 @@
"NATSOutputConfig",
"OutputMessage",
"WorkerConfig",
"AMQPInputConfig",
"AMQPOutputConfig",
"ProcessStatus",
"SQSInputConfig",
"SQSOutputConfig",
Expand Down
116 changes: 116 additions & 0 deletions async_processor/amqp_pub_sub.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
from contextlib import asynccontextmanager
from typing import AsyncIterator, Optional

from aio_pika import Message, connect_robust

from async_processor.types import (
AMQPInputConfig,
AMQPOutputConfig,
Input,
InputFetchAckFailure,
InputMessageFetchFailure,
Output,
)


class AMQPInput(Input):
def __init__(self, config: AMQPInputConfig):
self._queue_url = config.queue_url
self._queue_name = config.queue_name
self._wait_time_seconds = config.wait_time_seconds
self._connection = None
self._channel = None
self._queue = None

async def _get_connect(self):
bradleytrf marked this conversation as resolved.
Show resolved Hide resolved
if self._connection:
return self._connection
self._connection = await connect_robust(self._queue_url)
return self._connection

async def _get_channel(self):
if self._channel:
return self._channel
await self._get_connect()
self._channel = await self._connection.channel()
bradleytrf marked this conversation as resolved.
Show resolved Hide resolved
return self._channel

async def _get_queue(self):
if self._queue:
return self._queue
await self._get_channel()
self._queue = await self._channel.declare_queue(self._queue_name, durable=True)
bradleytrf marked this conversation as resolved.
Show resolved Hide resolved
return self._queue

@asynccontextmanager
async def get_input_message(
self,
) -> AsyncIterator[Optional[str]]:
bradleytrf marked this conversation as resolved.
Show resolved Hide resolved
message = None
queue = await self._get_queue()
try:
message = await queue.get(fail=False, timeout=self._wait_time_seconds)
bradleytrf marked this conversation as resolved.
Show resolved Hide resolved
except Exception as ex:
raise InputMessageFetchFailure(f"Error fetch input message: {ex}") from ex
if not message:
yield None
return
try:
yield message.body.decode()
bradleytrf marked this conversation as resolved.
Show resolved Hide resolved
except Exception as ex:
raise InputMessageFetchFailure(
f"Error decoding input message body: {ex}"
) from ex
bradleytrf marked this conversation as resolved.
Show resolved Hide resolved
finally:
try:
await message.ack()
except Exception as ex:
raise InputFetchAckFailure(
f"Error publishing input message: {ex}"
) from ex

async def publish_input_message(
self, serialized_input_message: bytes, request_id: str
):
channel = await self._get_channel()
await channel.default_exchange.publish(
Message(body=serialized_input_message), routing_key=self._queue_name
)
Comment on lines +99 to +101
Copy link
Member

@chiragjn chiragjn Mar 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exchange name and routing key should be configurable
Maybe not today but some day in future?

Same reason as this: #68 (comment)
We don't need the queue name directly to publish

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another question - Is it always guaranteed that a routing key = X always delivers to a queue named X?



class AMQPOutput(Output):
def __init__(self, config: AMQPOutputConfig):
self._queue_url = config.queue_url
self._queue_name = config.queue_name
self._connection = None
self._channel = None
self._queue = None

async def _get_connect(self):
if self._connection:
return self._connection
self._connection = await connect_robust(self._queue_url)
return self._connection

async def _get_channel(self):
bradleytrf marked this conversation as resolved.
Show resolved Hide resolved
if self._channel:
return self._channel
await self._get_connect()
self._channel = await self._connection.channel()
bradleytrf marked this conversation as resolved.
Show resolved Hide resolved
return self._channel

async def _get_queue(self):
if self._queue:
return self._queue
await self._get_channel()
self._queue = await self._channel.declare_queue(self._queue_name, durable=True)
bradleytrf marked this conversation as resolved.
Show resolved Hide resolved
return self._queue

async def publish_output_message(
self, serialized_output_message: bytes, request_id: Optional[str]
):
channel = await self._get_channel()
await self._get_queue()
bradleytrf marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this needed?

await channel.default_exchange.publish(
Message(body=serialized_output_message), routing_key=self._queue_name
Comment on lines +159 to +160
Copy link
Member

@chiragjn chiragjn Mar 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see we have restricted ourselves to default exchange and re-using queue name as routing key
Shouldn't the exchange name and routing key be distinct inputs?
Maybe not today but some day in future?


afaik, for publishing we don't need queue name, just the exchange + routing key
What queue is bound to what exchange and routing key is not needed by us when publishing
See: https://aio-pika.readthedocs.io/en/latest/rabbitmq-tutorial/4-routing.html#bindings

)
29 changes: 29 additions & 0 deletions async_processor/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,20 @@ def to_input(self) -> Input:
return SQSInput(self)


class AMQPInputConfig(InputConfig):
type: constr(regex=r"^amqp$") = "amqp"
queue_url: constr(
bradleytrf marked this conversation as resolved.
Show resolved Hide resolved
regex=r"^amqp:\/\/(?:([^:/?#\s]+)(?::([^@/?#\s]+))?@)?([^/?#\s]+)(?::(\d+))?\/?([^?#\s]*)?(?:\?(.*))?$"
)
queue_name: str
wait_time_seconds: conint(ge=1, le=20) = 5

def to_input(self) -> Input:
from async_processor.amqp_pub_sub import AMQPInput

return AMQPInput(self)


class NATSInputConfig(InputConfig):
type: constr(regex=r"^nats$") = "nats"

Expand Down Expand Up @@ -288,6 +302,21 @@ def to_output(self) -> Output:
return SQSOutput(self)


class AMQPOutputConfig(OutputConfig):
type: constr(regex=r"^amqp$") = "amqp"

queue_url: constr(
regex=r"^amqp:\/\/(?:([^:/?#\s]+)(?::([^@/?#\s]+))?@)?([^/?#\s]+)(?::(\d+))?\/?([^?#\s]*)?(?:\?(.*))?$"
)
queue_name: str
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally this should be

Suggested change
queue_name: str
exchange_name: str
routing_key: str

wait_time_seconds: confloat(ge=1) = 5

def to_output(self) -> Output:
from async_processor.amqp_pub_sub import AMQPOutput

return AMQPOutput(self)


class NATSOutputConfig(OutputConfig):
type: constr(regex=r"^nats$") = "nats"

Expand Down
67 changes: 67 additions & 0 deletions examples/amqp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import asyncio
import json
import random
import uuid

import aio_pika

from async_processor import (
AMQPInputConfig,
AMQPOutputConfig,
InputMessage,
Processor,
WorkerConfig,
)

# change this config
input_queue_url = "amqp://guest:guest@localhost:5672/"
input_queue_name = "home1"
output_queue_url = "amqp://guest:guest@localhost:5672/"
output_queue_name = "home2"


class MultiplicationProcessor(Processor):
def process(self, input_message: InputMessage) -> int:
body = input_message.body
return body["x"] * body["y"]


app = MultiplicationProcessor().build_app(
worker_config=WorkerConfig(
input_config=AMQPInputConfig(
queue_url=input_queue_url, queue_name=input_queue_name
),
output_config=AMQPOutputConfig(
queue_url=output_queue_url, queue_name=output_queue_name
),
),
)


async def send_request(queue_url: str, routing_key: str):
connection = await aio_pika.connect_robust(queue_url)

async with connection:
request_id = str(uuid.uuid4())

channel = await connection.channel()

payload = json.dumps(
{
"request_id": request_id,
"body": {"x": random.randint(1, 100), "y": random.randint(1, 100)},
}
)
print(payload)
await channel.default_exchange.publish(
aio_pika.Message(payload.encode()), routing_key=routing_key
)


async def test():
for _ in range(100):
await send_request(queue_url=input_queue_url, routing_key=input_queue_name)


if __name__ == "__main__":
asyncio.run(test())
17 changes: 17 additions & 0 deletions examples/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#!/bin/bash

module_name="$1"

if [ -z "$module_name" ]; then
echo "Usage: $0 <module_name>"
echo "Example: $0 [amqp]"
exit 1
fi

if [ -f "${module_name}" ]; then
module_name="${module_name%.py}"
else
module_name="$module_name"
fi

gunicorn "$module_name:app" --workers 1 --worker-class uvicorn.workers.UvicornWorker --bind 127.0.0.1:8000
17 changes: 17 additions & 0 deletions examples/test.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#!/bin/bash

module_name="$1"

if [ -z "$module_name" ]; then
echo "Usage: $0 <module_name>"
echo "Example: $0 [amqp]"
exit 1
fi

if [ -f "${module_name}" ]; then
module_name="${module_name}"
else
module_name="$module_name.py"
fi

python $module_name
Loading
Loading