Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow re-scheduling of processing (unacked) items. #27

Open
sam0delkin opened this issue Jan 28, 2019 · 3 comments
Open

Allow re-scheduling of processing (unacked) items. #27

sam0delkin opened this issue Jan 28, 2019 · 3 comments

Comments

@sam0delkin
Copy link

Hi! First of all, want to say thank you for such a great plugin :)

My question is related to duplicated message processing. Currently, the plugin does not allow to add duplicated message to the queue at all. And I want to be able to add duplicated message to the queue when some message with same x-deduplication-header already exists, but only in case that some consumer already processing this message.

So, assume we have messages with x-deduplication-header = A, B, C, D, and messages C and D are already processing by some consumers.

Let's try to add a new message with x-deduplication-header = C.

Current behavior: the message will be ignored.

Desired behavior: the message will be queued (even additional message with same x-deduplication-header is already in the queue).

Does this possible?

Thanks in advance!

@noxdafox
Copy link
Owner

Hello,

there are few issues with your request which would bring more problems than benefits.

The amqp protocol provides two ways for consuming messages: direct or with consumer's acknowledgment. In the latter approach, the message is considered within the queue up until is not acknowledged. Therefore, allowing a second message while the first is being processed would basically violate the uniqueness protocol the plugin is providing.

The second issue is more practical and would be very hard to tackle. Let's say that while message C is being processed, we allow a second one in. What happens if the consumer processing the first C message crashes? The protocol states that in this case the message should be replaced in the queue as it has not been acknowledged. This means we would end up with 2 C messages in the queue. This would be fairly tricky to detect in a distributed system and could cause lots of headaches.

I would rather suggest a different approach. You could rely on publisher's confirmation in order to see at the publisher side if your message made it to the queue. If not, you publisher could try again few times (possibly using some exponential backoff mechanism). The assumption is that if the message is being processed, it will soon be acknowledged and gone from the queue. Unfortunately, for this approach you will need RabbitMQ 3.8.0 as previous versions do not support publisher confirmation for duplicate messages (see issue #21).

@sam0delkin
Copy link
Author

@noxdafox hey. I think I will try the approach with confirmation. But would be nice to have a config option to support this. So, in my example, I have a lot of statistics calculated via a queue for many DB rows. These calculations are not so easy and slow, so I'm using a queue for this operation. But the problem that calculations are triggering by any changes related to a row in DB. So, if a user made some changes during current calculations, I need to add a new message to a queue even if it will be already non-unique. But if other changes are made or consumer has somehow broken, I still expect that queue will not have > 2 instances of the same message. So, if you plan to support this and maybe other people like me need this, you can support this feature, but not initially and via setting some config value for the plugin. If not, you can close this issue :) Thanks in advance!

@Ocramius
Copy link

Ocramius commented Sep 29, 2020

EDIT3: My mistake - I declared a de-duplication **exchange** instead of a **queue** I just got bitten by this behavior as well.

My use-case is very similar to @sam0delkin: I'm trying to use the plugin as a way to deduplicate/debounce (not specific term, but it gives the right idea) messages that would lead to the same result on the consumer side, but which are computational intensive.

Let's assume I have following messages (w/headers):

  • x-deduplication-header = A
  • x-deduplication-header = B
  • x-deduplication-header = C
  • x-deduplication-header = D
Given an empty queue attached to an x-message-deduplication exchange
When the input sequence "AAABBBCCCAAA" is pushed to the queue as different messages
Then the queue should contain the sequence "CBA"

This works as expected: great!

Now, let's assume the queue was filled and emptied:

Given an empty queue attached to an x-message-deduplication exchange
And the queue was filled with sequence "ABC"
And the queue was fully consumed
When the input sequence "ABCD" is pushed to the queue as different messages
Then the queue should contain the sequence "DCBA"

The above (last step) doesn't match expectations, as per discussion above: only "D" made it to the queue.

I did not fully understand #27 (comment), but I read through the code, and it seems like there is some message deletion during fetch:

def fetch(needs_ack, state = dqstate(queue: queue, queue_state: qs)) do
case passthrough2(state, do: fetch(needs_ack, qs)) do
{:empty, state} -> {:empty, state}
{{message, delivery, ack_tag}, state} ->
if duplicate?(queue) do
if needs_ack do
head = Common.message_header(message, "x-deduplication-header")
{{message, delivery, dqack(tag: ack_tag, header: head)}, state}
else
maybe_delete_cache_entry(queue, message)
{{message, delivery, ack_tag}, state}
end
else
{{message, delivery, ack_tag}, state}
end
end
end
# TODO: this is a bit of a hack.
# As the drop callback returns only the message id, we can't retrieve
# the message deduplication header. As a workaround `fetch` is used.
# This assumes the backing queue drop and fetch behaviours are the same.
# A better solution would be to store the message IDs in a dedicated index.
def drop(need_ack, state = dqstate(queue: queue, queue_state: qs)) do
if duplicate?(queue) do
case fetch(need_ack, state) do
{:empty, state} -> {:empty, state}
{{message = basic_message(id: id), _, ack_tag}, state} ->
maybe_delete_cache_entry(queue, message)
{{id, ack_tag}, state}
end
else
passthrough2(state, do: drop(need_ack, qs))
end
end

My elixir-fu is too low to understand fully what is going on: specifically what the conditional if needs_ack do means.

My hope is that when a consumer of the queue fetches items, those items also get deleted from the cache/map of this plugin, which doesn't seem to be the case right now. Am I misunderstanding the underlying design of the plugin, or is my AMQP consumer doing something wrong? As far as I know, it does ACK/NACK correctly...

EDIT: the ack() of my client is done in PHP, and seems to be relatively simple:

https://github.com/symfony/symfony/blob/af8ad344d38bdbf3ea563d33c6afd9950f8d6569/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php#L442-L445

    public function ack(\AMQPEnvelope $message, string $queueName): bool
    {
        return $this->queue($queueName)->ack($message->getDeliveryTag());
    }

The delivery tag seems to only be an integer-ish string here - not much else going on.

EDIT2: for more context on what I'm doing, here's how I configured my queues during my experiments.

        transports:
            product-queue:
                dsn: 'amqp://rabbitmq:5672/%2f/product'
                # Requires https://github.com/noxdafox/rabbitmq-message-deduplication/releases/tag/0.4.5 to function!
                # This setting allows us to add an "x-deduplication-header" to AMQP messages sent
                # to the product indexing queue, so that we can avoid repeated operations
                # if one is already scheduled in our message queue.
                #
                # For usage, see https://gist.github.com/noxdafox/ad1fb4c3769e06a888c3a542fc08c544
                options:
                    exchange:
                        name: 'deduplicate-product-indexing'
                        type: 'x-message-deduplication'
                        arguments:
                            # Size of de-duplication cache - 100M should be plenty
                            x-cache-size: 100000000

Still, getting bitten by other failures now, but I'll report a separate issue after investigation :D

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants