Skip to content

Commit

Permalink
use --transaction-isolation-level
Browse files Browse the repository at this point in the history
  • Loading branch information
shibd committed May 27, 2024
1 parent 9acf879 commit 8e82fd8
Show file tree
Hide file tree
Showing 7 changed files with 113 additions and 75 deletions.
51 changes: 27 additions & 24 deletions pip/pip-353.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ This behavior can confuse users and lead to incorrect data handling. The proposa

### In Scope

- Implement flags to selectively display `server markers`, `uncommitted messages`, and `aborted messages` in peek operations.
- Implement flags to selectively display `server markers`, `uncommitted messages(include aborted messages) for transaction` in peek operations.
- Set the default behavior to only show messages from committed transactions to ensure data integrity.

### Out of Scope
Expand All @@ -37,16 +37,17 @@ This behavior can confuse users and lead to incorrect data handling. The proposa
The proposal introduces three new flags to the `peek-messages` command:

1. `--show-server-marker`: Controls the visibility of server markers (default: `false`).
2. `--show-txn-uncommitted`: Controls the visibility of messages from uncommitted transactions (default: `false`).
3. `--show-txn-aborted`: Controls the visibility of messages from aborted transactions (default: `false`).
2. `---transaction-isolation-level`: Controls the visibility of messages for transactions. (default: `READ_COMMITTED`). Options:
- READ_COMMITTED: Can only consume all transactional messages which have been committed.
- READ_UNCOMMITTED: Can consume all messages, even transactional messages which have been aborted.

These flags will allow administrators and developers to tailor the peek functionality to their needs, improving the usability and security of message handling in transactional contexts.

## Detailed Design

### Design & Implementation Details

To support the `--show-server-marker` and `--show-txn-aborted`, `--show-txn-uncommitted` flags, needs to introduce specific tag into the `headers` of messages returned by the
To support the `--show-server-marker` and `---transaction-isolation-level` flags, needs to introduce specific tag into the `headers` of messages returned by the
[peekNthMessage REST API](https://github.com/apache/pulsar/blob/8ca01cd42edfd4efd986f752f6f8538ea5bf4f94/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java#L1892-L1905).

- `X-Pulsar-marker-type`: Already exists.
Expand All @@ -62,11 +63,10 @@ see the following code: [https://github.com/shibd/pulsar/pull/34](https://github

New command line flags added for the `bin/pulsar-admin topics peek-messages` command:

| Flag | Abbreviation | Type | Default | Description |
|--------------------------|--------------|---------|---------|----------------------------------------------------------------|
| `--show-server-marker` | `-ssm` | Boolean | `false` | Enables the display of internal server write markers. |
| `--show-txn-uncommitted` | `-stu` | Boolean | `false` | Enables the display of messages from uncommitted transactions. |
| `--show-txn-aborted` | `-sta` | Boolean | `false` | Enables the display of messages from aborted transactions. |
| Flag | Abbreviation | Type | Default | Description |
|----------------------------------|--------------|---------|---------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `--show-server-marker` | `-ssm` | Boolean | `false` | Enables the display of internal server write markers. |
| `---transaction-isolation-level` | `-til` | Enum | `false` | Enables theSets the isolation level for consuming messages within transactions. </br> - 'READ_COMMITTED' allows consuming only committed transactional messages. </br> - 'READ_UNCOMMITTED' allows consuming all messages, even transactional messages which have been aborted. |


## Public-facing Changes
Expand All @@ -85,10 +85,11 @@ Add two methods to the admin.Topics() interface.
* Number of messages
* @param showServerMarker
* Enables the display of internal server write markers
* @param showTxnAborted
* Enables the display of messages from aborted transactions
* @param showTxnUncommitted
* Enables the display of messages from uncommitted transactions
* @param transactionIsolationLevel
* Sets the isolation level for consuming messages within transactions.
* - 'READ_COMMITTED' allows consuming only committed transactional messages.
* - 'READ_UNCOMMITTED' allows consuming all messages,
* even transactional messages which have been aborted.
* @return
* @throws NotAuthorizedException
* Don't have admin permission
Expand All @@ -98,8 +99,9 @@ Add two methods to the admin.Topics() interface.
* Unexpected error
*/
List<Message<byte[]>> peekMessages(String topic, String subName, int numMessages,
boolean showServerMarker, boolean showTxnAborted,
boolean showTxnUncommitted) throws PulsarAdminException;
boolean showServerMarker, SubscriptionIsolationLevel transactionIsolationLevel)
throws PulsarAdminException;


/**
* Peek messages from a topic subscription asynchronously.
Expand All @@ -112,15 +114,16 @@ Add two methods to the admin.Topics() interface.
* Number of messages
* @param showServerMarker
* Enables the display of internal server write markers
* @param showTxnAborted
* Enables the display of messages from aborted transactions
* @param showTxnUncommitted
* Enables the display of messages from uncommitted transactions
* @return a future that can be used to track when the messages are returned
@param transactionIsolationLevel
* Sets the isolation level for consuming messages within transactions.
* - 'READ_COMMITTED' allows consuming only committed transactional messages.
* - 'READ_UNCOMMITTED' allows consuming all messages,
* even transactional messages which have been aborted.
* @return a future that can be used to track when the messages are returned
*/
CompletableFuture<List<Message<byte[]>>> peekMessagesAsync(String topic, String subName, int numMessages,
boolean showServerMarker, boolean showTxnAborted,
boolean showTxnUncommitted);
CompletableFuture<List<Message<byte[]>>> peekMessagesAsync(
String topic, String subName, int numMessages,
boolean showServerMarker, SubscriptionIsolationLevel transactionIsolationLevel);
```

## Backward & Forward Compatibility
Expand All @@ -130,5 +133,5 @@ Reverting to a previous version of Pulsar without this feature will remove the a

### Upgrade
While upgrading to the new version of Pulsar that includes these changes, the default behavior of the `peek-messages` command will change.
Existing scripts or commands that rely on the old behavior (where transaction markers and messages from uncommitted or aborted transactions are visible) will need to explicitly set the new flags (`--show-server-marker`, `--show-txn-uncommitted`, `--show-txn-aborted`) to `true` to maintain the old behavior.
Existing scripts or commands that rely on the old behavior (where transaction markers and messages from uncommitted or aborted transactions are visible) will need to explicitly set the new flags (`--show-server-marker true` and `--transaction-isolation-level READ_UNCOMMITTED` to maintain the old behavior.
This change is necessary as the previous default behavior did not align with typical expectations around data visibility and integrity in transactional systems.
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionIsolationLevel;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.client.api.transaction.TxnID;
Expand Down Expand Up @@ -937,15 +938,15 @@ public void testPeekMessageForSkipTxnMarker() throws Exception {
}

List<Message<byte[]>> peekMsgs = admin.topics().peekMessages(topic, "t-sub", n,
false, true, true);
false, SubscriptionIsolationLevel.READ_UNCOMMITTED);
assertEquals(peekMsgs.size(), n);
for (Message<byte[]> peekMsg : peekMsgs) {
assertEquals(new String(peekMsg.getValue()), "msg");
}
}

@Test
public void testPeekMessageForSkipAbortedAndUnCommittedMessages() throws Exception {
public void testPeekMessageFoReadCommittedMessages() throws Exception {
initTransaction(1);

final String topic = BrokerTestUtil.newUniqueName("persistent://public/default/peek_txn");
Expand Down Expand Up @@ -975,7 +976,7 @@ public void testPeekMessageForSkipAbortedAndUnCommittedMessages() throws Excepti
// peek n message, all messages value should be "msg"
{
List<Message<byte[]>> peekMsgs = admin.topics().peekMessages(topic, "t-sub", n,
false, false, false);
false, SubscriptionIsolationLevel.READ_COMMITTED);
assertEquals(peekMsgs.size(), n);
for (Message<byte[]> peekMsg : peekMsgs) {
assertEquals(new String(peekMsg.getValue()), "msg");
Expand All @@ -985,7 +986,7 @@ public void testPeekMessageForSkipAbortedAndUnCommittedMessages() throws Excepti
// peek 3 * n message, and still get n message, all messages value should be "msg"
{
List<Message<byte[]>> peekMsgs = admin.topics().peekMessages(topic, "t-sub", 2 * n,
false, false, false);
false, SubscriptionIsolationLevel.READ_COMMITTED);
assertEquals(peekMsgs.size(), n);
for (Message<byte[]> peekMsg : peekMsgs) {
assertEquals(new String(peekMsg.getValue()), "msg");
Expand Down Expand Up @@ -1021,7 +1022,7 @@ public void testPeekMessageForShowAllMessages() throws Exception {

// peek 5 * n message, will get 5 * n msg.
List<Message<byte[]>> peekMsgs = admin.topics().peekMessages(topic, "t-sub", 5 * n,
true, true, true);
true, SubscriptionIsolationLevel.READ_UNCOMMITTED);
assertEquals(peekMsgs.size(), 5 * n);

for (int i = 0; i < 4 * n; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.SubscriptionIsolationLevel;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
Expand Down Expand Up @@ -1638,7 +1639,6 @@ void expireMessagesForAllSubscriptions(String topic, long expireTimeInSeconds)

/**
* Peek messages from a topic subscription.
* This method will show server marker, uncommitted and aborted messages for transaction by default.
*
* @param topic
* topic name
Expand Down Expand Up @@ -1667,10 +1667,11 @@ void expireMessagesForAllSubscriptions(String topic, long expireTimeInSeconds)
* Number of messages
* @param showServerMarker
* Enables the display of internal server write markers
* @param showTxnAborted
* Enables the display of messages from aborted transactions
* @param showTxnUncommitted
* Enables the display of messages from uncommitted transactions
* @param transactionIsolationLevel
* Sets the isolation level for peeking messages within transactions.
* - 'READ_COMMITTED' allows peeking only committed transactional messages.
* - 'READ_UNCOMMITTED' allows peeking all messages,
* even transactional messages which have been aborted.
* @return
* @throws NotAuthorizedException
* Don't have admin permission
Expand All @@ -1680,12 +1681,11 @@ void expireMessagesForAllSubscriptions(String topic, long expireTimeInSeconds)
* Unexpected error
*/
List<Message<byte[]>> peekMessages(String topic, String subName, int numMessages,
boolean showServerMarker, boolean showTxnAborted,
boolean showTxnUncommitted) throws PulsarAdminException;
boolean showServerMarker, SubscriptionIsolationLevel transactionIsolationLevel)
throws PulsarAdminException;

/**
* Peek messages from a topic subscription asynchronously.
* This method will show server marker, uncommitted and aborted messages for transaction by default.
*
* @param topic
* topic name
Expand All @@ -1708,15 +1708,16 @@ List<Message<byte[]>> peekMessages(String topic, String subName, int numMessages
* Number of messages
* @param showServerMarker
* Enables the display of internal server write markers
* @param showTxnAborted
* Enables the display of messages from aborted transactions
* @param showTxnUncommitted
* Enables the display of messages from uncommitted transactions
@param transactionIsolationLevel
* Sets the isolation level for peeking messages within transactions.
* - 'READ_COMMITTED' allows peeking only committed transactional messages.
* - 'READ_UNCOMMITTED' allows peeking all messages,
* even transactional messages which have been aborted.
* @return a future that can be used to track when the messages are returned
*/
CompletableFuture<List<Message<byte[]>>> peekMessagesAsync(String topic, String subName, int numMessages,
boolean showServerMarker, boolean showTxnAborted,
boolean showTxnUncommitted);
CompletableFuture<List<Message<byte[]>>> peekMessagesAsync(
String topic, String subName, int numMessages,
boolean showServerMarker, SubscriptionIsolationLevel transactionIsolationLevel);

/**
* Get a message by its messageId via a topic subscription.
Expand Down
Loading

0 comments on commit 8e82fd8

Please sign in to comment.