Skip to content

Commit

Permalink
Added the ability to specify whether a queue should be created if it …
Browse files Browse the repository at this point in the history
…does not exist.
  • Loading branch information
dvazar committed Apr 5, 2024
1 parent efdb82d commit 8b2d145
Showing 1 changed file with 17 additions and 10 deletions.
27 changes: 17 additions & 10 deletions dramatiq_azure/asq.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from azure.core.exceptions import (
HttpResponseError,
ResourceExistsError,
ResourceNotFoundError,
)
from azure.storage.queue import (
BinaryBase64DecodePolicy,
Expand Down Expand Up @@ -201,10 +202,12 @@ def __init__(
*,
dead_letter: bool = False,
middleware=None,
create_queue=True,
) -> None:
super().__init__(middleware=middleware)
self.queues: set = set()
self.dead_letter = dead_letter
self.create_queue = create_queue

@property
def consumer_class(self):
Expand All @@ -230,17 +233,9 @@ def consume(
def declare_queue(self, queue_name: str) -> None:
if queue_name not in self.queues:
self.emit_before("declare_queue", queue_name)
try:
q_client = _get_client(queue_name)
q_client.create_queue()
except ResourceExistsError:
logger.warning(f"Queue already exists: {queue_name}")
self._create_queue_if_required(_get_client(queue_name))
if self.dead_letter:
try:
dlq_client = _get_dlq_client(queue_name)
dlq_client.create_queue()
except ResourceExistsError:
logger.warning(f"DL Queue already exists: {queue_name}")
self._create_queue_if_required(_get_dlq_client(queue_name))
self.queues.add(queue_name)
self.emit_after("declare_queue", queue_name)

Expand Down Expand Up @@ -280,3 +275,15 @@ def get_declared_queues(self) -> Iterable[str]:

def get_declared_delay_queues(self) -> Iterable[str]:
return set()

def _create_queue_if_required(self, queue_client: QueueClient) -> None:
try:
queue_client.get_queue_properties()
except ResourceNotFoundError:
if self.create_queue:
try:
queue_client.create_queue()
except ResourceExistsError:
pass # someone did it first
else:
raise

0 comments on commit 8b2d145

Please sign in to comment.