Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: AMQP pub sub #67

Closed
wants to merge 20 commits into from
Closed
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions async_processor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
from async_processor.function_service import FunctionAsyncExecutor
from async_processor.processor import AsyncProcessor, Processor
from async_processor.types import (
AMQPInputConfig,
AMQPOutputConfig,
AWSAccessKeyAuth,
CoreNATSOutputConfig,
Input,
Expand Down Expand Up @@ -31,6 +33,8 @@
"NATSOutputConfig",
"OutputMessage",
"WorkerConfig",
"AMQPInputConfig",
"AMQPOutputConfig",
"ProcessStatus",
"SQSInputConfig",
"SQSOutputConfig",
Expand Down
101 changes: 101 additions & 0 deletions async_processor/amqp_pub_sub.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
from contextlib import asynccontextmanager
from typing import AsyncIterator, Optional

from aio_pika import Message, connect_robust

from async_processor.types import (
AMQPInputConfig,
AMQPOutputConfig,
Input,
InputFetchAckFailure,
InputMessageFetchFailure,
Output,
)


class AMQPInput(Input):
def __init__(self, config: AMQPInputConfig):
self._queue_url = config.queue_url
self._queue_name = config.queue_name
self._wait_time_seconds = config.wait_time_seconds
self._connection = None
self._channel = None
self._queue = None

async def _get_connect(self):
if self._connection:
return self._connection
self._connection = await connect_robust(self._queue_url)
return self._connection

async def _get_channel(self):
if self._channel:
return self._channel
await self._get_connect()
self._channel = await self._connection.channel()
return self._channel

async def _get_queue(self):
if self._queue:
return self._queue
await self._get_channel()
self._queue = await self._channel.declare_queue(self._queue_name)
return self._queue

@asynccontextmanager
async def get_input_message(
self,
) -> AsyncIterator[Optional[str]]:
message = None
queue = await self._get_queue()
try:
message = await queue.get(fail=False, timeout=self._wait_time_seconds)
if not message:
yield None
return
yield message.body.decode()
except Exception as ex:
raise InputMessageFetchFailure(f"Error fetch input message: {ex}") from ex
bradleytrf marked this conversation as resolved.
Show resolved Hide resolved
finally:
try:
await message.ack()
except Exception as ex:
raise InputFetchAckFailure(
f"Error publishing input message: {ex}"
) from ex

async def publish_input_message(
self, serialized_input_message: bytes, request_id: str
):
channel = await self._get_channel()
await channel.default_exchange.publish(
Message(body=serialized_input_message), routing_key=self._queue_name
)


class AMQPOutput(Output):
def __init__(self, config: AMQPOutputConfig):
self._queue_url = config.queue_url
self._queue_name = config.queue_name
self._connection = None
self._channel = None
self._queue = None

async def _get_connect(self):
if not self._connection:
bradleytrf marked this conversation as resolved.
Show resolved Hide resolved
self._connection = await connect_robust(self._queue_url)
return self._connection

async def _get_channel(self):
if not self._channel:
bradleytrf marked this conversation as resolved.
Show resolved Hide resolved
await self._get_connect()
self._channel = await self._connection.channel()
return self._channel

async def publish_output_message(
self, serialized_output_message: bytes, request_id: Optional[str]
):
queue = await self._get_channel()
await queue.default_exchange.publish(
Message(body=serialized_output_message), routing_key=self._queue_name
)
24 changes: 24 additions & 0 deletions async_processor/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,18 @@ def to_input(self) -> Input:
return SQSInput(self)


class AMQPInputConfig(InputConfig):
type: constr(regex=r"^amqp$") = "amqp"
queue_url: str
bradleytrf marked this conversation as resolved.
Show resolved Hide resolved
queue_name: str
bradleytrf marked this conversation as resolved.
Show resolved Hide resolved
wait_time_seconds: conint(ge=1, le=20) = 5

def to_input(self) -> Input:
from async_processor.amqp_pub_sub import AMQPInput

return AMQPInput(self)


class NATSInputConfig(InputConfig):
type: constr(regex=r"^nats$") = "nats"

Expand Down Expand Up @@ -288,6 +300,18 @@ def to_output(self) -> Output:
return SQSOutput(self)


class AMQPOutputConfig(OutputConfig):
type: constr(regex=r"^amqp$") = "amqp"

queue_url: str
queue_name: str

def to_output(self) -> Output:
from async_processor.amqp_pub_sub import AMQPOutput

return AMQPOutput(self)


class NATSOutputConfig(OutputConfig):
type: constr(regex=r"^nats$") = "nats"

Expand Down
25 changes: 25 additions & 0 deletions examples/amqp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from async_processor import (
AMQPInputConfig,
AMQPOutputConfig,
InputMessage,
Processor,
WorkerConfig,
)


class MultiplicationProcessor(Processor):
def process(self, input_message: InputMessage) -> int:
body = input_message.body
return body["x"] * body["y"]


app = MultiplicationProcessor().build_app(
worker_config=WorkerConfig(
input_config=AMQPInputConfig(
queue_url="amqp://guest:guest@localhost:5672/", queue_name="home1"
),
output_config=AMQPOutputConfig(
queue_url="amqp://guest:guest@localhost:5672/", queue_name="home2"
),
),
)
11 changes: 11 additions & 0 deletions examples/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
#!/bin/bash

module_name="$1"

if [ -z "$module_name" ]; then
echo "Usage: $0 <module_name>"
echo "Example: $0 [amqp]"
exit 1
fi

gunicorn "$module_name:app" --workers 1 --worker-class uvicorn.workers.UvicornWorker --bind 127.0.0.1:8000
Loading
Loading