Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: AMQP pub sub #67

Closed
wants to merge 20 commits into from
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions async_processor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
from async_processor.function_service import FunctionAsyncExecutor
from async_processor.processor import AsyncProcessor, Processor
from async_processor.types import (
AMQPInputConfig,
AMQPOutputConfig,
AWSAccessKeyAuth,
CoreNATSOutputConfig,
Input,
Expand Down Expand Up @@ -31,6 +33,8 @@
"NATSOutputConfig",
"OutputMessage",
"WorkerConfig",
"AMQPInputConfig",
"AMQPOutputConfig",
"ProcessStatus",
"SQSInputConfig",
"SQSOutputConfig",
Expand Down
101 changes: 101 additions & 0 deletions async_processor/amqp_pub_sub.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
from contextlib import asynccontextmanager
from typing import AsyncIterator, Optional

from aio_pika import Message, connect_robust
from aio_pika.exceptions import AMQPError, QueueEmpty

from async_processor.types import (
AMQPInputConfig,
AMQPOutputConfig,
Input,
InputFetchAckFailure,
InputMessageFetchFailure,
Output,
)


class AMQPInput(Input):
def __init__(self, config: AMQPInputConfig):
self._queue_url = config.queue_url
self._queue_name = config.queue_name
self._connection = None

async def _connect(self):
self._connection = await connect_robust(self._queue_url)

async def _disconnect(self):
if self._connection is not None:
await self._connection.close()

@asynccontextmanager
async def get_input_message(
self,
) -> AsyncIterator[Optional[str]]:
try:
await self._connect()
bradleytrf marked this conversation as resolved.
Show resolved Hide resolved
async with self._connection.channel() as channel:
queue = await channel.declare_queue(self._queue_name)
try:
message = await queue.get()
if message is None:
yield None
else:
async with message.process():
yield message.body.decode()
except QueueEmpty:
yield None

except AMQPError as ex:
bradleytrf marked this conversation as resolved.
Show resolved Hide resolved
raise InputMessageFetchFailure(
f"Error fetching input message: {ex}"
) from ex
finally:
await self._disconnect()
bradleytrf marked this conversation as resolved.
Show resolved Hide resolved

async def publish_input_message(
self, serialized_input_message: bytes, request_id: str
):
try:
await self._connect()
async with self._connection.channel() as channel:
message_body = (
serialized_input_message
if isinstance(serialized_input_message, bytes)
else serialized_input_message
)
await channel.default_exchange.publish(
Message(body=message_body), routing_key=self._queue_name
)
except AMQPError as ex:
raise InputFetchAckFailure(f"Error publishing input message: {ex}") from ex
bradleytrf marked this conversation as resolved.
Show resolved Hide resolved
finally:
await self._disconnect()


class AMQPOutput(Output):
def __init__(self, config: AMQPOutputConfig):
self._queue_url = config.queue_url
self._queue_name = config.queue_name
self._connection = None

async def _connect(self):
self._connection = await connect_robust(self._queue_url)

async def _disconnect(self):
if self._connection is not None:
await self._connection.close()

async def publish_output_message(
self, serialized_output_message: bytes, request_id: Optional[str]
):
try:
await self._connect()
async with self._connection.channel() as channel:
await channel.default_exchange.publish(
Message(body=serialized_output_message),
routing_key=self._queue_name,
)
except AMQPError as ex:
raise RuntimeError(f"Error publishing output message: {ex}") from ex
finally:
await self._disconnect()
23 changes: 23 additions & 0 deletions async_processor/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,17 @@ def to_input(self) -> Input:
return SQSInput(self)


class AMQPInputConfig(InputConfig):
type: constr(regex=r"^amqp$") = "amqp"
queue_url: str
bradleytrf marked this conversation as resolved.
Show resolved Hide resolved
queue_name: str
bradleytrf marked this conversation as resolved.
Show resolved Hide resolved

def to_input(self) -> Input:
from async_processor.amqp_pub_sub import AMQPInput

return AMQPInput(self)


class NATSInputConfig(InputConfig):
type: constr(regex=r"^nats$") = "nats"

Expand Down Expand Up @@ -288,6 +299,18 @@ def to_output(self) -> Output:
return SQSOutput(self)


class AMQPOutputConfig(OutputConfig):
type: constr(regex=r"^amqp$") = "amqp"

queue_url: str
queue_name: str

def to_output(self) -> Output:
from async_processor.amqp_pub_sub import AMQPOutput

return AMQPOutput(self)


class NATSOutputConfig(OutputConfig):
type: constr(regex=r"^nats$") = "nats"

Expand Down
Loading
Loading