Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Queue level deduplication seems broken #96

Open
aygalinc opened this issue Dec 2, 2022 · 8 comments
Open

Queue level deduplication seems broken #96

aygalinc opened this issue Dec 2, 2022 · 8 comments
Labels
bug Something isn't working

Comments

@aygalinc
Copy link

aygalinc commented Dec 2, 2022

Hello ,

On a rabbit 3.9.26, we use a straightforward setting to test message depuplication with pica :

connection = pika.BlockingConnection(pika.ConnectionParameters(
    'localhost',
    5672,
    '/',
    credentials,
    channel_max=3,
    client_properties={
        'connection_name': 'producer_dedup'
    },
    ))
channel = connection.channel()
args = {
    'x-message-deduplication': True
}
channel.queue_declare(queue='refresh_all', arguments=args)

args = {}
# channel.queue_declare(queue='hello6', arguments=args)
# channel.queue_bind(exchange='refresh_all', queue='refresh_all')
for i in range(10000):
    channel.basic_publish(exchange='',
                        routing_key='refresh_all',
                        body='Hello World!',
                        properties=pika.BasicProperties(headers={
                            "x-deduplication-header": "hello"
                        }))
    print(f" [x] Sent 'Hello World {i} !'")
    sleep(3)
connection.close()

And we have a basic message consumer.
We observe that the consumer publish only one message.
We expect that once the consumer has finish an individual consumption, a new message will be enqueue.

@noxdafox
Copy link
Owner

Hello,

this is not providing the full picture. How is the consumer reading the messages? Is the consumer, for example, acknowledging the message once consumed?

Please provide a working example which reproduces the issue.

@aygalinc
Copy link
Author

aygalinc commented Dec 12, 2022

The consumer side :

import pika, sys, os

def main():
    credentials = pika.PlainCredentials('guest', 'guest')
    connection = pika.BlockingConnection(pika.ConnectionParameters(
        'localhost',
        5672,
        '/',
        credentials,
        client_properties={
            'connection_name': 'sub_dedup',
        },
        ))    
    channel = connection.channel()

    args = {
        'x-message-deduplication': True
    }
    channel.queue_declare(queue='refresh_all', arguments=args)
    def callback(ch, method, properties, body):
        print(" [x] Received %r" % body)

    channel.basic_consume(queue='refresh_all', on_message_callback=callback, auto_ack=True)

    print(' [*] Waiting for messages. To exit press CTRL+C')
    channel.start_consuming()

if __name__ == '__main__':
    try:
        main()
    except KeyboardInterrupt:
        print('Interrupted')
        try:
            sys.exit(0)
        except SystemExit:
            os._exit(0)

@noxdafox
Copy link
Owner

noxdafox commented Mar 5, 2023

There is indeed something odd with the above code.

When I read the message using Elixir/Erlang client, I get the expected behavior. Same applies if I fetch the message via RabbitMQ Management WebUI. But with the Python pika client the message seems to be deduplicated forever even if gone from the queue.

I will investigate this further.

@noxdafox noxdafox added the bug Something isn't working label Mar 5, 2023
@noxdafox
Copy link
Owner

noxdafox commented Mar 5, 2023

I now have a better understanding of the problem.

The issue presents when a consumer is attached to an empty queue with auto_ack enabled. When such scenario presents, the internal queue discard callback is called instead of the expected publish_delivered[1].

Issue with this, is we do not receive the message as parameter of the discard callback but just its Id. This is not enough to retrieve the x-deduplication-header and remove it from the cache.

Hence, the observed behaviour: the first message goes through, all subsequent ones are deemed duplicates as the original header is not removed from the cache.

This issue does not present, if auto_ack is disabled.

[1] https://github.com/rabbitmq/rabbitmq-server/blob/main/deps/rabbit/src/rabbit_amqqueue_process.erl#L684

@michaelklishin
Copy link

The above internal API change contributed by @noxdafox will ship starting with RabbitMQ 3.12.0.

@noxdafox
Copy link
Owner

noxdafox commented Apr 4, 2023

Thanks @michaelklishin, I will make a release of the plugin once RMQ 3.12.0 will be out.

@michaelklishin
Copy link

rabbitmq/rabbitmq-server#7802 was reverted because it is not rolling upgrade-safe without a feature flag.

@michaelklishin
Copy link

Using a feature flag on the hot path has risks. Perhaps the PR should be adjusted so that all modules support both message IDs and message records, and backported to 3.11.x.

Then, at some point, we can pass the entire message record along. For now, the plugin would have to require manual acknowledgements (which is almost always a good idea).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

3 participants