Skip to content

Commit

Permalink
Fix: add test
Browse files Browse the repository at this point in the history
  • Loading branch information
bradleytrf committed Mar 14, 2024
1 parent d1e3060 commit b45d5f6
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 5 deletions.
14 changes: 11 additions & 3 deletions async_processor/amqp_pub_sub.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ async def _get_queue(self):
if self._queue:
return self._queue
await self._get_channel()
self._queue = await self._channel.declare_queue(self._queue_name)
self._queue = await self._channel.declare_queue(self._queue_name, durable=True)
return self._queue

@asynccontextmanager
Expand Down Expand Up @@ -98,11 +98,19 @@ async def _get_channel(self):
await self._get_connect()
self._channel = await self._connection.channel()
return self._channel

async def _get_queue(self):
if self._queue:
return self._queue
await self._get_channel()
self._queue = await self._channel.declare_queue(self._queue_name, durable=True)
return self._queue

async def publish_output_message(
self, serialized_output_message: bytes, request_id: Optional[str]
):
queue = await self._get_channel()
await queue.default_exchange.publish(
channel = await self._get_channel()
await self._get_queue()
await channel.default_exchange.publish(
Message(body=serialized_output_message), routing_key=self._queue_name
)
46 changes: 44 additions & 2 deletions examples/amqp.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
import asyncio
import json
import random
import uuid

import aio_pika

from async_processor import (
AMQPInputConfig,
AMQPOutputConfig,
Expand All @@ -6,6 +13,12 @@
WorkerConfig,
)

#change this config
input_queue_url = "amqp://guest:guest@localhost:5672/"
input_queue_name = "home1"
output_queue_url = "amqp://guest:guest@localhost:5672/"
output_queue_name = "home2"


class MultiplicationProcessor(Processor):
def process(self, input_message: InputMessage) -> int:
Expand All @@ -16,10 +29,39 @@ def process(self, input_message: InputMessage) -> int:
app = MultiplicationProcessor().build_app(
worker_config=WorkerConfig(
input_config=AMQPInputConfig(
queue_url="amqp://guest:guest@localhost:5672/", queue_name="home1"
queue_url=input_queue_url, queue_name=input_queue_name
),
output_config=AMQPOutputConfig(
queue_url="amqp://guest:guest@localhost:5672/", queue_name="home2"
queue_url=output_queue_url, queue_name=output_queue_name
),
),
)


async def send_request(queue_url: str, routing_key: str):
connection = await aio_pika.connect_robust(queue_url)

async with connection:
request_id = str(uuid.uuid4())

channel = await connection.channel()

payload = json.dumps(
{
"request_id": request_id,
"body": {"x": random.randint(1, 100), "y": random.randint(1, 100)},
}
)
print(payload)
await channel.default_exchange.publish(
aio_pika.Message(payload.encode()), routing_key=routing_key
)


async def test():
for _ in range(100):
await send_request(queue_url=input_queue_url, routing_key=input_queue_name)


if __name__ == "__main__":
asyncio.run(test())
17 changes: 17 additions & 0 deletions examples/test.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#!/bin/bash

module_name="$1"

if [ -z "$module_name" ]; then
echo "Usage: $0 <module_name>"
echo "Example: $0 [amqp]"
exit 1
fi

if [ -f "${module_name}" ]; then
module_name="${module_name}"
else
module_name="$module_name.py"
fi

python $module_name

0 comments on commit b45d5f6

Please sign in to comment.