Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add best consume practice #875

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
236 changes: 236 additions & 0 deletions docs/tutorials-redeliver-messages.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,236 @@
---
Id: tutorials-redeliver-messages
title: Consume best practice
sidebar_label: "Consume best practice"
description: Learn how to consume messages and redeliver unacknowledged messages in Pulsar.
---

# Consume Best Practice

## Background Knowledge

### Subscription Types

Pulsar is a distributed message system where messages can be sent to topics by producers and consumed by consumers.
Consumers can subscribe to the topics in four ways (subscription types):

* **Exclusive**
* **Failover**
* **Shared**
* **Key-shared**

The messages are consumed in order for a single partition in Exclusive and Failover modes and out of order for Shared and
Key-shared mode. The main difference between Exclusive and Failover is that in Exclusive mode, the consumer is exclusive
to the entire topic, while in Failover mode, the consumer is exclusive to only one partition. Failover mode allows backup
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even if the topic has partitions across brokers? How?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it is. For example, there is topic 1 with five partitions; five consumers can subscribe to topic 1 in the failover mode, and only one consumer can subscribe to topic 1 in the exclusive mode. The add consumer logic just like this:

                        case Exclusive:
                            if (dispatcher == null || dispatcher.getType() != SubType.Exclusive) {
                                previousDispatcher = dispatcher;
                                dispatcher = new PersistentDispatcherSingleActiveConsumer(
                                        cursor, SubType.Exclusive, 0, topic, this);
                            }
                            break;
.....
                        case Failover:
                            int partitionIndex = TopicName.getPartitionIndex(topicName);
                            if (partitionIndex < 0) {
                                // For non partition topics, use a negative index so
                                // dispatcher won't sort consumers before picking
                                // an active consumer for the topic.
                                partitionIndex = -1;
                            }

                            if (dispatcher == null || dispatcher.getType() != SubType.Failover) {
                                previousDispatcher = dispatcher;
                                dispatcher = new PersistentDispatcherSingleActiveConsumer(cursor, SubType.Failover,
                                                partitionIndex, topic, this);
                            }
                            break;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Back--> Stand-by. The other consumers which are not active, are also connected to the partition in stand-by.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great! Thanks for the correction.

consumer connections that are not consumed. The main difference between Shared and Key-shared is whether their dispatch
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Too confusing.
Key-shared allows to have messages of the same key (e.g. customer-id) sent to the same consumer, where in Shared the message are sent in a round-robin manner to all consumers.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Key-shared allows to have messages of the same key (e.g. customer-id) sent to the same consumer

Key-shared sent messages with the same key to the same set of consumers instead of the single same consumer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And there are many selector implementations of the selector for the key-shared mode.
For the ConsistentHashingStickyKeyConsumerSelector, it could be like the following:

    public Consumer select(int hash) {
        rwLock.readLock().lock();
        try {
            if (hashRing.isEmpty()) {
                return null;
            }

            List<Consumer> consumerList;
            Map.Entry<Integer, List<Consumer>> ceilingEntry = hashRing.ceilingEntry(hash);
            if (ceilingEntry != null) {
                consumerList =  ceilingEntry.getValue();
            } else {
                consumerList = hashRing.firstEntry().getValue();
            }

            return consumerList.get(hash % consumerList.size());
        } finally {
            rwLock.readLock().unlock();
        }
    }

strategy is implemented via a key. For more information about subscription type, refer to the [Pulsar website](https://pulsar.apache.org/docs/3.2.x/concepts-messaging/).
![img.png](../static/img/blog-consume-best-practice/subscription-types.png)

### Acknowledgment

The messages should be acknowledged after they are fully consumed and processed, and then the messages would not be received
for the same subscription again. Pulsar provides two ways to acknowledge messages:

* **Cumulative acknowledgment**
* **Individual acknowledgment**

Cumulative acknowledgment receives a message or a message id as a parameter and marks the messages before the message as
consumed for this subscription. For multiple-partition topics, the cumulative acknowledgment will work for the single
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"subscription - in effect: all messages up to the selected message will be marked as processed (acknowledged)"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"For multiple partitions..." --> When consuming you might get messages from more then one partition. You need to make sure you call cumulative-acknowlege per the partition. @BewareMyPower I need help here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cumulative ack is recommended to be used in the exclusive mode and Failover mode.
For the Exclusive mode, the messages from all the partitions are sent to the same consumer. But the cumulative ack only marks the messages as processed for the single partition. For the Exclusive mode, if they consume 100 messages in a topic with 5 partitions and cumulative ack the last message that is a message in partition4. That does not mean the 100 messages before the last message are processed, it only means the messages in partition 4 before the last message are marked as processed. So I add a notice here.
You can see more information in the following tests.

    @Test
    public void test() throws Exception {
        String subName = "test";
        String topicName = TopicName.get(NAMESPACE1 + "/" + "testCreateTransactionSystemTopic").toString();
        admin.topics().createPartitionedTopic(topicName, 3);
        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
        Consumer<byte[]> consumer1 = pulsarClient.newConsumer().receiverQueueSize(100).topic(topicName)
                .subscriptionName(subName)
                .subscriptionType(SubscriptionType.Exclusive).subscribe();

        for (int i = 0; i < 100; i++) {
            producer.newMessage().send();
        }
        Message lastMessage = null;
        for (int i = 0; i < 100; i++) {
            lastMessage = consumer1.receive();
        }
        log.info("ack message {}", lastMessage.getMessageId());
        consumer1.acknowledgeCumulative(lastMessage);

        consumer1.redeliverUnacknowledgedMessages();
        for (int i = 0; i < 30; i++) {
            lastMessage = consumer1.receive();
            log.info("receive message {}", lastMessage.getMessageId());
        }
    }

partition without impacting other partitions. Individual acknowledgment receives a message or a message id as a parameter
and only marks this message as consumed for this subscription.
![img_1.png](../static/img/blog-consume-best-practice/acknowledgement-types.png)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets add
NOTE: Please keep in mind Pulsar is an at-least-once system. When calling acknowledge() or cumulativeAcknlowedge(), it goes a long way before the acknowledgement is persisted to disk by broker. The acknowledgements are grouped and sent in a batch to the broker - restart of the client abruptly will lose those, since they are in-memory. On the broker, the subscription state is persisted to disk every configured interval hence if broker is abruptly restarted, the acks were not persisted hence lost. In all those occasions, the messages will be redelivered.

### Messages Redeliver Mechanism
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Redelivery


There might be instances where the received messages cannot be processed at this time or some errors happened during processing.
The client needs to redeliver the unacknowledged messages or a particular message after a delay or immediately.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Immediately practically makes no sense. You have the message in your hand.
Also, let's start by saying: This is mostly relevant to Shared subscription type

Pulsar provides at-least-once semantics when the client does not enable transaction because the client may cache some
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I provided a better explanation above.
What is the relationship to transactions - you mean when TX are enabled, and you commit - you know for sure it was persisted to disk? Even after subscription snapshot?

Copy link
Contributor Author

@liangyepianzhou liangyepianzhou Apr 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the transaction is enabled and used, we can ensure all the messages and acknowledgments are persisted to disk after the transaction is committed. Sorry, I do not know what you mean about the subscription snapshot? Is it related to geo-replication?

messages out of Pulsar when redelivering messages.

Pulsar Consumer API provides four ways to reconsume the unacknowledged messages later:

* **ackTimeout**
* **deadLetterPolicy**
* **reconsumeLaterCumulative**
* **reconsumeLater**
* **negativeAcknowledge**
* **redeliverUnacknowledgedMessages**

The **ackTimeout** is an automatic protection mechanism. If a consumer configured ackTimeout, the messages will be
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would write it a bit differently:
After "." I would write:
A consumer can configure an ackTimeout. Once done any message returned to the user via consume() will have an ackTimeut period to acknowledge it. If not, the client will instruct the broker to redeliver this message to another consumer. It's great when your code consumed the messages, but never acknowledged since it was stuck. Some other consumer might be able to process it successfully.

QUESTION: Can we instruct max redeliver?

Copy link
Contributor Author

@liangyepianzhou liangyepianzhou Apr 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion!

If not, the client will instruct the broker to redeliver this message to another consumer.

We can not promise that the message will be delivered to another consumer in the next redelivered, but if they have a set consumer subscribed to the topic, then the messages could be resent to the other consumer after some redelivery.

QUESTION: Can we instruct max redeliver?

We can not instruct the max redeliver for ack timeout. That was used as an auto-guarantee mechanism.

They can redeliver messages by call reconsumeLater which can instruct the max redeliver.

auto-redelivered when the received messages are not acknowledged after a long time. It works at the client side, ensuring
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When a redeliver command is issued, is guaranteed to dispatch it to a different consumer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The messages will be dispatched using the same dispatch policy as the last dispatch.
For the shared mode, it will be dispatched to the consumers by round-in.
For the key-shared mode, it will select a set of consumers and send it to one of them. It could be a different consumer and not. The key point is to redeliver unacknowledged messages to the client for re-consumption to avoid ack-hole.

the unacknowledged messages will be redelivered to another consumer when the connection of a consumer remains active
but the business system gets stuck or misses to ack some messages. If the consumer gets disconnected when the client crashes,
the messages will be auto-redelivered by the broker too. However, this mechanism does not wait for the response of acknowledgment,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please explain that. I can't understand.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The acktimeout mechanism works at the client side. It will redeliver unack messages when the connection is active, and the broker will redeliver unack messages when the connection is inactive.

And because the ack timeout mechanism at the client side does not wait for the ack response, that will cause ack-hole while the ack request lost.

so if an acknowledgment fails on the broker side or proxy side, an ack hole may occur.

The **deadLetterPolicy** is a policy in the message queue used to handle messages that cannot be processed properly.
In many message queue systems (such as RabbitMQ, Google Pub/Sub, Apache Pulsar, etc.), this strategy is implemented.
In Pulsar, the deadLetterPolicy is implemented at the client side, it creates a new retry letter topic and dead letter
topic when building a consumer with `deadLetterPolicy` configuration. When a consumer calls `reconsumeLaterCumulative`
or `reconsumeLater`, the message (method parameter) will be produced to the retry letter topic until the retry time reaches
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let add:
the message will be acknowledged, and a new message, which is a copy of the message read, will be produced to the retry letter topic, bearing a retry number header. When the retry number exceed maxRedeliverCount, the message will be produced instead of the dead-letter topic.
When calling reconsumerLater(msg, delay) the message written to the retry letter topic will produced with delivery delay - it will be persisted immediately by the broker, but will only be delivered to the consumer after delay has passed. It's perfect to space out retries.

the `maxRedeliverCount`. The message will be produced to the dead letter topic when the retry time reaches the `
maxRedeliverCount`. The main difference between them is that `reconsumeLaterCumulative` will cumulative ack the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can not understand why would you use cumulativeAck with retry.

Copy link
Contributor Author

@liangyepianzhou liangyepianzhou Apr 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just want to notice the difference between the reconsumeLaterCumulative and reconsumeLater .
And notice them that the reconsumeLaterCumulative only resent a copy of the last message to the retry topic.

message (method parameter) after it is produced and `reconsumeLater` will individual ack the message (method parameter)
after it is produced.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's clarify:

  1. The consumers automatically subscribe to the retry letter topic.
  2. It's important that the retry-letter topic name will include both the topic name and the subscription name, since we want to avoid having two subscription reading from the topic, writing their retries into the same retry topic. The retries is a state only in the context of your subscription.


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add note:
NOTE: TX are not enabled / supported for this. This means the two operations of acknowledging the original message and publishing the new message to the retry topic does not happen atomically. It can happen that the acknowledge of the original message fails, but the new message is produced, so the consumer will consume the same message twice.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is true. This is one reason that we do not recommend redelivering unack messages when using transactions.

The `negativeAcknowledge` is used to redeliver certain unacknowledged messages while `redeliverUnacknowledgedMessages`
is used to redeliver all the unacknowledged messages received by this consumer. The main difference between them and
deadLetterPolicy is that there is no new topic created, and there is an unlimited number of redeliveries.

## Best Practice Suggestion

Different scenarios require different best practices. Users who value the order of partition messages and wish to batch
process data should choose or implement an appropriate routerPolicy to send a batch of ordered messages to the same
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're talking about producer best practices here? If so, I think it's out of scope for Redelivery content, no?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is related to the choice of subscription type. If users wish to consume a batch of messages in order, and they choose the exclusive mode or failover mode, but they do not send this batch of messages to the same partition, then these messages will be consumed out of order. This is something users do not want to see. To avoid confusing users, it is necessary to explain that before choosing exclusive and failover modes for ordered message consumption, they should ensure that those messages that need to be consumed in order have been sent to the same partition.

We have not elaborated on how to implement the router. This is still a lead-up to the best practices for consumption.

partition. They should also select either Exclusive or Failover subscription modes. For users who do not care about
message order and those in stream processing scenarios, they can opt to use Shared and Key-shared subscription modes.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do not care about message order and those in stream processing scenario (which usually do care about order) - this is a contradiction

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're correct, stream processing scenarios indeed often care about the order of messages, and batch processing scenarios also pay attention to the order of messages. However, stream processing scenarios use individual ack to acknowledge individual messages, while batch processing scenarios use cumulative ack to acknowledge messages in bulk. The subscription types Shared and key-shared recommend using individual ack and do not guarantee the order of messages, so we recommend using Shared and key-shared subscription types in stream processing scenarios where the order of messages is not a concern.

This is not to say that stream processing scenarios do not care about the order of data, but rather that Shared and key-shared types should be used in stream processing scenarios where the order of data is not a concern.


### Shared && Key-shared
#### At-least-once
The term `At least once` ensures that the message is processed at least once, and there is a risk of duplicate messages,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sentence was explained better above in the one I gave IMO.

but the performance is better than the `Exactly once` semantics. For the `At least once` semantics in shared or
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can't ever achieve exactly once for consumption, by only using Pulsar. It relies on your target system providing idempotency.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're correct, we can't solely rely on Pulsar to achieve at least once consumption. However, when we discuss at-least-once consumption in Pulsar, it often refers to the fact that no message will be acknowledged repeatedly. The second ack request for the same message should fail, which is also something that Pulsar transaction needs to ensure.

key-shared mode, the most important matter is avoiding ack-hole as much as possible. The ack-hole refers to instances
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this all paragraph and numbers below do not add any new essential information.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The above provides background knowledge, and the following is organized into categories and gives best practices and code examples.

where some single messages are missed for acknowledgment. This can occur in the following cases (All of these are the cases
where the connection is not interrupted, and reconnection will automatically resend the message.):

1. Consumer receives messages but fails to process due to business system error.
2. Consumer receives messages but misses to process or the business system gets stuck.
3. Consumer acknowledges the message, but the acknowledge request is lost when sending to the broker. The possibility of
4. data loss exists in the case of TCP long connections, although the probability is extremely low.
![img_8.png](../static/img/blog-consume-best-practice/Ack-hole.png)

For case 1, configuring `deadLetterPolicy` is a good solution. When the business system receives a signal of message
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This explanation is redundant IMO. Let's expand at the section above.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I think you are correcting the structure of this article.
The original article is like the following:

# Background knowledge
   ## subscription type
   ## acknowledgment 
   ## redeliver method
# best practice 
  ## shared/ key shared
       ### At-least-once
      ### Exactly-once
 ## Exclusive && Failover
      ### At-least-once
     ### Exactly-once

And you want to delete the part of best practice and expand it in the part of the redeliver method, right?

processing failure, it can immediately check and decide whether to retry after a period of time. After they call
`reconsumeLater` API, the message will be acknowledged and resent to the retry letter topic that is automatically
subscribed by the consumer. When it reaches `maxRedeliverCount`, the message will be sent to the dead letter topic.
Messages sent to the dead letter topic should be considered as non-retryable messages, and we recommend setting an
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be specified above.

initialSubscriptionName to avoid being deleted by the retention policy and then, let maintenance personnel regularly
handle non-retryable messages in the dead letter topic.
![img_2.png](../static/img/blog-consume-best-practice/DLQ.png)
```java
Consumer<Integer> consumer = pulsarClient.newConsumer(Schema.INT32)
.deadLetterPolicy(DeadLetterPolicy.builder()
.maxRedeliverCount(maxRedeliverCount)
.deadLetterTopic(deadLetterTopic)
.retryLetterTopic(retryLetterTopic)
.initialSubscriptionName(initialSubscriptionName)
.build())
.subscriptionType(SubscriptionType.Shared)
.enableRetry(true)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.topic(topic)
.subscriptionName(sub)
.subscribe();
Message<Integer> message = consumer.receive();
try{
// Process message
consumer.acknowledge(message);
}catch(Exception e){
// Check whether to redeliver the message again.
if(e instanceof RetryException){
consumer.reconsumeLater(message, delay, timeunit);
}
consumer.reconsumeLater(message, delay, timeunit);
}
```

For case 2, where the business system gets stuck or an error occurs which causes the received message to be missed for
processing, configuring ack timeout is a good solution. The consumer will record every message received on the client side.
If these messages have not been acknowledged after the specified time, the consumer will request the broker to resend these
messages to other brokers.
**Suggestion:** The ack timeout should be set slightly longer based on the message processing speed of the business system.
If the business system is still processing messages, but the processing time is too long or the timeout is set too small,
it may result in duplicate message consumption.
![img_3.png](../static/img/blog-consume-best-practice/AckTimeout.png)
```java
Consumer<Integer> consumer = pulsarClient.newConsumer(Schema.INT32)
liangyepianzhou marked this conversation as resolved.
Show resolved Hide resolved
.ackTimeout(tiemout, timeunit)
.subscriptionType(SubscriptionType.Shared)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.topic(topic)
.subscriptionName(sub)
.subscribe();
```

For case 3, there are no effective preventive measures. This is because all methods of redelivery are triggered by the
client when the connection is not disconnected, and the client does not wait for an ack response by default.

`Short-term solution`: This should be an extreme case where users can unload the topic to resolve after observing
an abnormal ack hole (existing for more than 3 * ack time out).
`Long-term solution`: Modify the ack-timeout mechanism to wait for the acknowledgment response.

#### Exactly-once
If the users cannot tolerate message repetition, they can acknowledge messages with a transaction. Transaction can
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand why. @codelipenghui ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When users use transactions to acknowledge messages, Pulsar must ensure that a message can not be acknowledged twice. The second acknowledgment for one message with a transaction will result in a TransactionConflictException and abort the transaction.

prevent repeated consumption. If a message has been acknowledged, it will wait for a response and throw
`TransactionConflictException` when the client acknowledges the message with a transaction.

**Notices:** When using transactions, do not configure DeadLetterPolicy, but instead use negativeAcknowledge to resend messages.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment you left could be one reason. The other is that one transaction could include many messages, and sending them all to a retry topic or DLQ may not be appropriate when the transaction is aborted.


```java
Consumer<Integer> consumer = pulsarClient.newConsumer(Schema.INT32)
.ackTimeout(tiemout, timeunit)
.subscriptionType(SubscriptionType.Shared)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.topic(topic)
.subscriptionName(sub)
.subscribe();
Transaction transaction = pulsarClient.newTransaction().withTransactionTimeout(timeout, timeunit).build().get();

Message<Integer> message = consumer.receive();
try {
// process message
consumer.acknowledgeAsync(message.getMessageId(), transaction);
transaction.commit().get();
} catch (Exception e) {
if (!(e.getCause() instanceof PulsarClientException.TransactionConflictException)) {
consumer.negativeAcknowledge(message);
}
transaction.abort().get();
} finally {
message.release();
}
```
### Exclusive && Failover
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have to stop here IMO. Something here in the structure makes it way too long IMO.

#### At-least-once
For `Exclusive` and `Failover` modes, which follow `At least once` semantics, it's crucial to focus on maintaining
the order of messages while ensuring none are lost. Users are recommended to use cumulative acknowledgment in the
`Exclusive` or `Failover` mode. Pulsar guarantees that the user has received all messages prior to a message that will
be cumulative acknowledged. In this mode, there will be no ack-hole and there is no need to redeliver a specific message.
It is also not recommended to redeliver a specific message as it can cause messages to be out of order. When there is a
problem with the processing of a batch of messages, it is recommended to use `redeliverUnacknowledgedMessages` to
redeliver all unprocessed messages to ensure the orderliness of the messages.
````java
Consumer<Integer> consumer = pulsarClient.newConsumer(Schema.INT32)
.subscriptionType(SubscriptionType.Exclusive)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscriptionName(sub)
.subscribe();
Message<Integer> message = consumer.receive();
try {
// process message
message = consumer.receive();
// process message
// ......
consumer.acknowledgeCumulative(message);
} catch (Exception e) {
consumer.redeliverUnacknowledgedMessages();
} finally {
message.release();
}
````

#### Exactly-once (Beta)
In the `Exclusive` and `Failover` mode, the most troublesome issue for users is the problem of duplicate messages caused
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didn't understand anything here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe the expression here is not very precise. Pulsar has not been able to implement the Exactly-once semantics in Exclusive and Failover modes, and there have always been repeated acks.

by disconnection. When a connection is reset, the broker will resend all the unacknowledged messages to the consumer.
The consumer may have processed many messages without acknowledging them, causing the consumer to unconsciously resume
these messages repeatedly. Unfortunately, there is currently no effective way to prevent this situation from happening.
Pulsar transaction is not yet sufficient to solve this problem.
![img_5.png](../static/img/blog-consume-best-practice/cumulative-ack-problem.png)

`Long-term solution`: Apply epoch to avoid receiving repeated messages and abort transaction in the extreme case
(consumer change frequently in the failover subscription type).

In conclusion, ensuring the reliable transmission of messages in Pulsar involves understanding the different subscription
types, acknowledgment methods, and message redelivery mechanisms. Different scenarios require different best practices,
and users should consider factors such as the order of partition messages, the possibility of duplicate messages,
and the speed of the business system. The solutions provided here, including configuring `deadLetterPolicy` and `ackTimeout`
, using transactions, and modifying the ack-timeout mechanism, can help users to address common issues and optimize their use of Pulsar.
3 changes: 2 additions & 1 deletion sidebars.json
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,8 @@
"cookbooks-non-persistent",
"cookbooks-retention-expiry",
"cookbooks-message-queue",
"cookbooks-bookkeepermetadata"
"cookbooks-bookkeepermetadata",
"tutorials-redeliver-messages"
]
},
{
Expand Down
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any chance to use mermaid for diagrams?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Mermaid is suitable for most cases, but for some complex diagrams (if any), it also could be draw.io (diagrams.net) that could be stored in the repository as .xml files.

Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added static/img/blog-consume-best-practice/DLQ.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be Shared not Shaved

Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading