Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add best consume practice #875

Closed
wants to merge 2 commits into from
Closed

Add best consume practice #875

wants to merge 2 commits into from

Conversation

liangyepianzhou
Copy link
Contributor

image

✅ Contribution Checklist


The messages are consumed in order for a single partition in Exclusive and Failover modes and out of order for Shared and
Key-shared mode. The main difference between Exclusive and Failover is that in Exclusive mode, the consumer is exclusive
to the entire topic, while in Failover mode, the consumer is exclusive to only one partition. Failover mode allows backup
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even if the topic has partitions across brokers? How?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it is. For example, there is topic 1 with five partitions; five consumers can subscribe to topic 1 in the failover mode, and only one consumer can subscribe to topic 1 in the exclusive mode. The add consumer logic just like this:

                        case Exclusive:
                            if (dispatcher == null || dispatcher.getType() != SubType.Exclusive) {
                                previousDispatcher = dispatcher;
                                dispatcher = new PersistentDispatcherSingleActiveConsumer(
                                        cursor, SubType.Exclusive, 0, topic, this);
                            }
                            break;
.....
                        case Failover:
                            int partitionIndex = TopicName.getPartitionIndex(topicName);
                            if (partitionIndex < 0) {
                                // For non partition topics, use a negative index so
                                // dispatcher won't sort consumers before picking
                                // an active consumer for the topic.
                                partitionIndex = -1;
                            }

                            if (dispatcher == null || dispatcher.getType() != SubType.Failover) {
                                previousDispatcher = dispatcher;
                                dispatcher = new PersistentDispatcherSingleActiveConsumer(cursor, SubType.Failover,
                                                partitionIndex, topic, this);
                            }
                            break;


The messages are consumed in order for a single partition in Exclusive and Failover modes and out of order for Shared and
Key-shared mode. The main difference between Exclusive and Failover is that in Exclusive mode, the consumer is exclusive
to the entire topic, while in Failover mode, the consumer is exclusive to only one partition. Failover mode allows backup
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Back--> Stand-by. The other consumers which are not active, are also connected to the partition in stand-by.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great! Thanks for the correction.

The messages are consumed in order for a single partition in Exclusive and Failover modes and out of order for Shared and
Key-shared mode. The main difference between Exclusive and Failover is that in Exclusive mode, the consumer is exclusive
to the entire topic, while in Failover mode, the consumer is exclusive to only one partition. Failover mode allows backup
consumer connections that are not consumed. The main difference between Shared and Key-shared is whether their dispatch
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Too confusing.
Key-shared allows to have messages of the same key (e.g. customer-id) sent to the same consumer, where in Shared the message are sent in a round-robin manner to all consumers.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Key-shared allows to have messages of the same key (e.g. customer-id) sent to the same consumer

Key-shared sent messages with the same key to the same set of consumers instead of the single same consumer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And there are many selector implementations of the selector for the key-shared mode.
For the ConsistentHashingStickyKeyConsumerSelector, it could be like the following:

    public Consumer select(int hash) {
        rwLock.readLock().lock();
        try {
            if (hashRing.isEmpty()) {
                return null;
            }

            List<Consumer> consumerList;
            Map.Entry<Integer, List<Consumer>> ceilingEntry = hashRing.ceilingEntry(hash);
            if (ceilingEntry != null) {
                consumerList =  ceilingEntry.getValue();
            } else {
                consumerList = hashRing.firstEntry().getValue();
            }

            return consumerList.get(hash % consumerList.size());
        } finally {
            rwLock.readLock().unlock();
        }
    }

* **Individual acknowledgment**

Cumulative acknowledgment receives a message or a message id as a parameter and marks the messages before the message as
consumed for this subscription. For multiple-partition topics, the cumulative acknowledgment will work for the single
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"subscription - in effect: all messages up to the selected message will be marked as processed (acknowledged)"

* **Individual acknowledgment**

Cumulative acknowledgment receives a message or a message id as a parameter and marks the messages before the message as
consumed for this subscription. For multiple-partition topics, the cumulative acknowledgment will work for the single
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"For multiple partitions..." --> When consuming you might get messages from more then one partition. You need to make sure you call cumulative-acknowlege per the partition. @BewareMyPower I need help here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cumulative ack is recommended to be used in the exclusive mode and Failover mode.
For the Exclusive mode, the messages from all the partitions are sent to the same consumer. But the cumulative ack only marks the messages as processed for the single partition. For the Exclusive mode, if they consume 100 messages in a topic with 5 partitions and cumulative ack the last message that is a message in partition4. That does not mean the 100 messages before the last message are processed, it only means the messages in partition 4 before the last message are marked as processed. So I add a notice here.
You can see more information in the following tests.

    @Test
    public void test() throws Exception {
        String subName = "test";
        String topicName = TopicName.get(NAMESPACE1 + "/" + "testCreateTransactionSystemTopic").toString();
        admin.topics().createPartitionedTopic(topicName, 3);
        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
        Consumer<byte[]> consumer1 = pulsarClient.newConsumer().receiverQueueSize(100).topic(topicName)
                .subscriptionName(subName)
                .subscriptionType(SubscriptionType.Exclusive).subscribe();

        for (int i = 0; i < 100; i++) {
            producer.newMessage().send();
        }
        Message lastMessage = null;
        for (int i = 0; i < 100; i++) {
            lastMessage = consumer1.receive();
        }
        log.info("ack message {}", lastMessage.getMessageId());
        consumer1.acknowledgeCumulative(lastMessage);

        consumer1.redeliverUnacknowledgedMessages();
        for (int i = 0; i < 30; i++) {
            lastMessage = consumer1.receive();
            log.info("receive message {}", lastMessage.getMessageId());
        }
    }

`Long-term solution`: Modify the ack-timeout mechanism to wait for the acknowledgment response.

#### Exactly-once
If the users cannot tolerate message repetition, they can acknowledge messages with a transaction. Transaction can
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand why. @codelipenghui ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When users use transactions to acknowledge messages, Pulsar must ensure that a message can not be acknowledged twice. The second acknowledgment for one message with a transaction will result in a TransactionConflictException and abort the transaction.

prevent repeated consumption. If a message has been acknowledged, it will wait for a response and throw
`TransactionConflictException` when the client acknowledges the message with a transaction.

**Notices:** When using transactions, do not configure DeadLetterPolicy, but instead use negativeAcknowledge to resend messages.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment you left could be one reason. The other is that one transaction could include many messages, and sending them all to a retry topic or DLQ may not be appropriate when the transaction is aborted.

message.release();
}
```
### Exclusive && Failover
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have to stop here IMO. Something here in the structure makes it way too long IMO.

````

#### Exactly-once (Beta)
In the `Exclusive` and `Failover` mode, the most troublesome issue for users is the problem of duplicate messages caused
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didn't understand anything here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe the expression here is not very precise. Pulsar has not been able to implement the Exactly-once semantics in Exclusive and Failover modes, and there have always been repeated acks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any chance to use mermaid for diagrams?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Mermaid is suitable for most cases, but for some complex diagrams (if any), it also could be draw.io (diagrams.net) that could be stored in the repository as .xml files.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be Shared not Shaved

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants