Skip to content

Commit

Permalink
Add flush_queues
Browse files Browse the repository at this point in the history
  • Loading branch information
stchris committed Jun 25, 2024
1 parent c7ce7dc commit 394fda5
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 1 deletion.
15 changes: 15 additions & 0 deletions servicelayer/taskqueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -822,3 +822,18 @@ def queue_task(
channel.close()
except pika.exceptions.ChannelWrongStateError:
log.exception("Failed to explicitly close RabbitMQ channel.")


def flush_queues(rmq_conn, redis_conn, queues):
try:
channel = rmq_conn.channel()
for queue in queues:
try:
channel.queue_purge(queue)
except ValueError:
logging.exception(f"Error while flushing the {queue} queue")
channel.close()
except pika.exceptions.AMQPError:
logging.exception("Error while flushing task queue")
for key in redis_conn.scan_iter(PREFIX + "*"):
redis_conn.delete(key)
3 changes: 2 additions & 1 deletion tests/test_taskqueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
get_rabbitmq_connection,
dataset_from_collection_id,
declare_rabbitmq_queue,
flush_queues,
)
from servicelayer.util import unpack_datetime

Expand Down Expand Up @@ -190,9 +191,9 @@ def did_nack():


def test_get_priority_bucket():
# flush_queue()
redis = get_fakeredis()
rmq = get_rabbitmq_connection()
flush_queues(rmq, redis, ["index"])
collection = Mock(id=1)

assert get_task_count(collection, redis) == 0
Expand Down

0 comments on commit 394fda5

Please sign in to comment.