Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: AMQP pub sub #67

Closed
wants to merge 20 commits into from
Closed
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions async_processor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
from async_processor.function_service import FunctionAsyncExecutor
from async_processor.processor import AsyncProcessor, Processor
from async_processor.types import (
AMQPInputConfig,
AMQPOutputConfig,
AWSAccessKeyAuth,
CoreNATSOutputConfig,
Input,
Expand Down Expand Up @@ -31,6 +33,8 @@
"NATSOutputConfig",
"OutputMessage",
"WorkerConfig",
"AMQPInputConfig",
"AMQPOutputConfig",
"ProcessStatus",
"SQSInputConfig",
"SQSOutputConfig",
Expand Down
105 changes: 105 additions & 0 deletions async_processor/amqp_pub_sub.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
from contextlib import asynccontextmanager
from typing import AsyncIterator, Optional

from aio_pika import Message, connect_robust
from aio_pika.exceptions import AMQPError

from async_processor.types import (
AMQPInputConfig,
AMQPOutputConfig,
Input,
InputFetchAckFailure,
InputMessageFetchFailure,
Output,
)


class AMQPInput(Input):
def __init__(self, config: AMQPInputConfig):
self._queue_url = config.queue_url
self._queue_name = config.queue_name
self._connection = None
self._channel = None
self._queue = None

async def _get_connect(self):
if not self._connection:
self._connection = await connect_robust(self._queue_url)
return self._connection

async def _get_channel(self):
if not self._channel:
bradleytrf marked this conversation as resolved.
Show resolved Hide resolved
await self._get_connect()
self._channel = await self._connection.channel()
return self._channel

async def _get_queue(self):
if not self._queue:
await self._get_channel()
self._queue = await self._channel.declare_queue(self._queue_name)
return self._queue
bradleytrf marked this conversation as resolved.
Show resolved Hide resolved

@asynccontextmanager
async def get_input_message(
self,
) -> AsyncIterator[Optional[str]]:
message = None
queue = await self._get_queue()
try:
message = await queue.get(fail=False, timeout=5)
bradleytrf marked this conversation as resolved.
Show resolved Hide resolved
if not message:
yield None
return
yield message.body.decode()
except Exception as ex:
raise InputMessageFetchFailure(f"Error fetch input message: {ex}") from ex
finally:
if message:
bradleytrf marked this conversation as resolved.
Show resolved Hide resolved
bradleytrf marked this conversation as resolved.
Show resolved Hide resolved
try:
await message.ack()
except Exception as ex:
raise InputFetchAckFailure(
f"Error publishing input message: {ex}"
) from ex

async def publish_input_message(
self, serialized_input_message: bytes, request_id: str
):
try:
channel = await self._get_channel()
await channel.default_exchange.publish(
Message(body=serialized_input_message), routing_key=self._queue_name
)
except AMQPError as ex:
raise InputFetchAckFailure(f"Error publishing input message: {ex}") from ex
bradleytrf marked this conversation as resolved.
Show resolved Hide resolved


class AMQPOutput(Output):
def __init__(self, config: AMQPOutputConfig):
self._queue_url = config.queue_url
self._queue_name = config.queue_name
self._connection = None
self._channel = None
self._queue = None

async def _get_connect(self):
if not self._connection:
bradleytrf marked this conversation as resolved.
Show resolved Hide resolved
self._connection = await connect_robust(self._queue_url)
return self._connection

async def _get_channel(self):
if not self._channel:
bradleytrf marked this conversation as resolved.
Show resolved Hide resolved
await self._get_connect()
self._channel = await self._connection.channel()
return self._channel

async def publish_output_message(
self, serialized_output_message: bytes, request_id: Optional[str]
):
try:
queue = await self._get_channel()
await queue.default_exchange.publish(
Message(body=serialized_output_message), routing_key=self._queue_name
)
except AMQPError as ex:
raise InputFetchAckFailure(f"Error publishing output message: {ex}") from ex
bradleytrf marked this conversation as resolved.
Show resolved Hide resolved
23 changes: 23 additions & 0 deletions async_processor/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,17 @@ def to_input(self) -> Input:
return SQSInput(self)


class AMQPInputConfig(InputConfig):
type: constr(regex=r"^amqp$") = "amqp"
queue_url: str
bradleytrf marked this conversation as resolved.
Show resolved Hide resolved
queue_name: str
bradleytrf marked this conversation as resolved.
Show resolved Hide resolved

def to_input(self) -> Input:
from async_processor.amqp_pub_sub import AMQPInput

return AMQPInput(self)


class NATSInputConfig(InputConfig):
type: constr(regex=r"^nats$") = "nats"

Expand Down Expand Up @@ -288,6 +299,18 @@ def to_output(self) -> Output:
return SQSOutput(self)


class AMQPOutputConfig(OutputConfig):
type: constr(regex=r"^amqp$") = "amqp"

queue_url: str
queue_name: str

def to_output(self) -> Output:
from async_processor.amqp_pub_sub import AMQPOutput

return AMQPOutput(self)


class NATSOutputConfig(OutputConfig):
type: constr(regex=r"^nats$") = "nats"

Expand Down
25 changes: 25 additions & 0 deletions examples/amqp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from async_processor import (
AMQPInputConfig,
AMQPOutputConfig,
InputMessage,
Processor,
WorkerConfig,
)


class MultiplicationProcessor(Processor):
def process(self, input_message: InputMessage) -> int:
body = input_message.body
return body["x"] * body["y"]


app = MultiplicationProcessor().build_app(
worker_config=WorkerConfig(
input_config=AMQPInputConfig(
queue_url="amqp://guest:guest@localhost:5672/", queue_name="home1"
),
output_config=AMQPOutputConfig(
queue_url="amqp://guest:guest@localhost:5672/", queue_name="home2"
),
),
)
11 changes: 11 additions & 0 deletions examples/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
#!/bin/bash

module_name="$1"

if [ -z "$module_name" ]; then
echo "Usage: $0 <module_name>"
echo "Example: $0 [amqp]"
exit 1
fi

gunicorn "$module_name:app" --workers 1 --worker-class uvicorn.workers.UvicornWorker --bind 127.0.0.1:8000
Loading
Loading