Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Messages are NACK'ed multiple times around queue deletion #630

Open
lfse-slafleur opened this issue May 17, 2024 · 3 comments
Open

Messages are NACK'ed multiple times around queue deletion #630

lfse-slafleur opened this issue May 17, 2024 · 3 comments

Comments

@lfse-slafleur
Copy link
Contributor

Hello!

Thank you for aio-pika, it is a great library.

I have been running into some issues when using aio-pika in the orchestrator component for another opensource project called OMOTES (https://github.com/Project-OMOTES/orchestrator/). We use aio-pika to create quite a few queue's, receive 1 or more messages on the queue and delete the queue after. Now, we have found an issue where occassionally Rabbitmq (3.12.14-management) raises a PRECONDITION_FAILED - unknown delivery tag exception to the subscriber of the queue's.

Using wireshark, I have managed to analyze the issue. Attached: rabbitmq_unknown_delivery_tag.zip

Context:

  • There is 1 subscriber and 1 publisher. We are only interested in the subscriber which we can filter using tcp.port == 49146
  • There is only 1 channel.
  • Subscriber uses a prefetch of 1.
  • The subscriber creates a bunch of queue's in the channel with names test_{0,1,2,3,4,5,6,7,8,9}.
  • Once the subscriber has created all queue's, it sends a message to the publisher that it is ready by sending ready! to the queue ready.
  • After this, the publisher will send 5 messages to each of the test_{0,1,2,3,4,6,7,8,9} queues.
  • The subscriber will delete the queue after receiving 1 message on each of the queues, also deleting the other 4 messages without being read/delivered.
  • This forces a situation where there are more messages available in the queue for the subscriber even though it will only ACK 1 message. Also, the subscriber may have received more messages which it may NACK.

This leads to the issue as analyzed in the attached wireshark pcap:

  • Frame 267 shows the PRECONDITION_FAILED - unknown delivery tag 16 being raised by RabbitMQ.
  • This is in relation to the NACK on frame 266 where delivery tag 16 is NACK'ed.
  • However, in the earlier frame 263 every delivery up to and including tag 17 was NACK'ed, which includes 16.
  • This is the cause of the unknown delivery tag error as delivery tag 16 is now NACK'ed twice.

This is also documented here: https://www.rabbitmq.com/amqp-0-9-1-reference#basic.nack.multiple

Couple of more comments regarding this issue:

  • Delivery tag 16 and 17 happen in frame 225.
  • Delivery tag 16 belongs to consumer-tag ctag1.fd3c37a647bf42acba3c4733626804c2 and is received from queue test_7
  • Delivery tag 17 belongs to consumer-tag ctag1.cee0f236c17e4ac3a8624d8431166ab6 and is received from queue test_6
  • In other words, the deliveries belong to different subscriptions. They also belong to different queues.

In order to reproduce this issue more easily, I have prepared the following subscriber and producer scripts:

"""Publisher"""
import asyncio

import aiormq


async def main():
    body = b"Hello World!"

    # Perform connection
    connection = await aiormq.connect("amqp://guest:guest@localhost//")

    # Creating a channel
    channel = await connection.channel()
    await channel.queue_declare("ready")
    await channel.basic_get("ready", timeout=None)

    for i in range(0, 100):
        queue_name = f"test_{i}"
        for j in range(0, 5):
            await channel.basic_publish(body, routing_key=queue_name)

    await connection.close()


loop = asyncio.get_event_loop()
loop.run_until_complete(main())
"""Subscriber"""
import asyncio

import aio_pika
from aio_pika.abc import AbstractChannel, AbstractRobustQueue


class MessageHandler:
    channel: AbstractChannel
    queue: AbstractRobustQueue

    def __init__(self, channel: AbstractChannel, queue: AbstractRobustQueue) -> None:
        self.channel = channel
        self.queue = queue

    async def run(self) -> None:
        async with self.queue.iterator() as queue_iter:
            async for message in queue_iter:
                async with message.process(requeue=True):
                    print(f"Received {self.queue.name}: {message.body!r}")
                    break
        await self.queue.delete(if_empty=False)


async def main():
    connection = await aio_pika.connect_robust("amqp://guest:[email protected]/")
    async with connection:
        channel = await connection.channel()
        await channel.set_qos(prefetch_count=1)

        all_tasks = []
        for i in range(0, 100):
            queue_name = f"test_{i}"
            queue = await channel.declare_queue(queue_name, auto_delete=False)
            handler = MessageHandler(channel, queue)
            all_tasks.append(loop.create_task(handler.run()))
        print("All handlers are waiting")
        await channel.declare_queue("ready")
        await channel.default_exchange.publish(
            aio_pika.Message(body="ready!".encode()), routing_key="ready"
        )
        print("Send ready to publisher to start sending")
        for task in all_tasks:
            await task


loop = asyncio.get_event_loop()
loop.run_until_complete(main())

To start the necessary rabbitmq: docker run --rm -ti -p "5672:5672" -p "15672:15672" rabbitmq:3.12-management

Tested with aio-pika 9.3.1 and rabbitmq 3.12.14-management

@lfse-slafleur
Copy link
Contributor Author

It appears the the culprit is here: https://github.com/mosquito/aio-pika/blob/master/aio_pika/queue.py#L469. The message is NACK'ed and multiple is set indiscriminetely, even if the deliver-tag of other messages belongs to other subscriptions & queues.

@lfse-slafleur
Copy link
Contributor Author

Workaround: Keep each queue & subscription in a separate channel. This prevents from the 'multiple = True' part of the NACK to not reference deliveries of other queues & subscriptions.

@lfse-slafleur
Copy link
Contributor Author

I am working on a test case to reproduce the issue as well as a fix. Should be relatively easy to fix by not nack'ing multiple messages but rather the messages that have been received. RabbitMQ should reschedule it the moment the subscription is removed without nack'ing or ack'ing the message anyways.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant