Skip to content

Commit

Permalink
[improve][cli] PIP-353: Improve transaction message visibility for pe…
Browse files Browse the repository at this point in the history
…ek-message (#22762)
  • Loading branch information
shibd committed May 28, 2024
1 parent 55ad4b2 commit 20e83b9
Show file tree
Hide file tree
Showing 8 changed files with 343 additions and 51 deletions.
51 changes: 27 additions & 24 deletions pip/pip-353.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ This behavior can confuse users and lead to incorrect data handling. The proposa

### In Scope

- Implement flags to selectively display `server markers`, `uncommitted messages`, and `aborted messages` in peek operations.
- Implement flags to selectively display `server markers`, `uncommitted messages(include aborted messages) for transaction` in peek operations.
- Set the default behavior to only show messages from committed transactions to ensure data integrity.

### Out of Scope
Expand All @@ -37,16 +37,17 @@ This behavior can confuse users and lead to incorrect data handling. The proposa
The proposal introduces three new flags to the `peek-messages` command:

1. `--show-server-marker`: Controls the visibility of server markers (default: `false`).
2. `--show-txn-uncommitted`: Controls the visibility of messages from uncommitted transactions (default: `false`).
3. `--show-txn-aborted`: Controls the visibility of messages from aborted transactions (default: `false`).
2. `---transaction-isolation-level`: Controls the visibility of messages for transactions. (default: `READ_COMMITTED`). Options:
- READ_COMMITTED: Can only consume all transactional messages which have been committed.
- READ_UNCOMMITTED: Can consume all messages, even transactional messages which have been aborted.

These flags will allow administrators and developers to tailor the peek functionality to their needs, improving the usability and security of message handling in transactional contexts.

## Detailed Design

### Design & Implementation Details

To support the `--show-server-marker` and `--show-txn-aborted`, `--show-txn-uncommitted` flags, needs to introduce specific tag into the `headers` of messages returned by the
To support the `--show-server-marker` and `---transaction-isolation-level` flags, needs to introduce specific tag into the `headers` of messages returned by the
[peekNthMessage REST API](https://github.com/apache/pulsar/blob/8ca01cd42edfd4efd986f752f6f8538ea5bf4f94/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java#L1892-L1905).

- `X-Pulsar-marker-type`: Already exists.
Expand All @@ -62,11 +63,10 @@ see the following code: [https://github.com/shibd/pulsar/pull/34](https://github

New command line flags added for the `bin/pulsar-admin topics peek-messages` command:

| Flag | Abbreviation | Type | Default | Description |
|--------------------------|--------------|---------|---------|----------------------------------------------------------------|
| `--show-server-marker` | `-ssm` | Boolean | `false` | Enables the display of internal server write markers. |
| `--show-txn-uncommitted` | `-stu` | Boolean | `false` | Enables the display of messages from uncommitted transactions. |
| `--show-txn-aborted` | `-sta` | Boolean | `false` | Enables the display of messages from aborted transactions. |
| Flag | Abbreviation | Type | Default | Description |
|----------------------------------|--------------|---------|---------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `--show-server-marker` | `-ssm` | Boolean | `false` | Enables the display of internal server write markers. |
| `---transaction-isolation-level` | `-til` | Enum | `false` | Enables theSets the isolation level for consuming messages within transactions. </br> - 'READ_COMMITTED' allows consuming only committed transactional messages. </br> - 'READ_UNCOMMITTED' allows consuming all messages, even transactional messages which have been aborted. |


## Public-facing Changes
Expand All @@ -85,10 +85,11 @@ Add two methods to the admin.Topics() interface.
* Number of messages
* @param showServerMarker
* Enables the display of internal server write markers
* @param showTxnAborted
* Enables the display of messages from aborted transactions
* @param showTxnUncommitted
* Enables the display of messages from uncommitted transactions
* @param transactionIsolationLevel
* Sets the isolation level for consuming messages within transactions.
* - 'READ_COMMITTED' allows consuming only committed transactional messages.
* - 'READ_UNCOMMITTED' allows consuming all messages,
* even transactional messages which have been aborted.
* @return
* @throws NotAuthorizedException
* Don't have admin permission
Expand All @@ -98,8 +99,9 @@ Add two methods to the admin.Topics() interface.
* Unexpected error
*/
List<Message<byte[]>> peekMessages(String topic, String subName, int numMessages,
boolean showServerMarker, boolean showTxnAborted,
boolean showTxnUncommitted) throws PulsarAdminException;
boolean showServerMarker, TransactionIsolationLevel transactionIsolationLevel)
throws PulsarAdminException;


/**
* Peek messages from a topic subscription asynchronously.
Expand All @@ -112,15 +114,16 @@ Add two methods to the admin.Topics() interface.
* Number of messages
* @param showServerMarker
* Enables the display of internal server write markers
* @param showTxnAborted
* Enables the display of messages from aborted transactions
* @param showTxnUncommitted
* Enables the display of messages from uncommitted transactions
* @return a future that can be used to track when the messages are returned
@param transactionIsolationLevel
* Sets the isolation level for consuming messages within transactions.
* - 'READ_COMMITTED' allows consuming only committed transactional messages.
* - 'READ_UNCOMMITTED' allows consuming all messages,
* even transactional messages which have been aborted.
* @return a future that can be used to track when the messages are returned
*/
CompletableFuture<List<Message<byte[]>>> peekMessagesAsync(String topic, String subName, int numMessages,
boolean showServerMarker, boolean showTxnAborted,
boolean showTxnUncommitted);
CompletableFuture<List<Message<byte[]>>> peekMessagesAsync(
String topic, String subName, int numMessages,
boolean showServerMarker, TransactionIsolationLevel transactionIsolationLevel);
```

## Backward & Forward Compatibility
Expand All @@ -130,5 +133,5 @@ Reverting to a previous version of Pulsar without this feature will remove the a

### Upgrade
While upgrading to the new version of Pulsar that includes these changes, the default behavior of the `peek-messages` command will change.
Existing scripts or commands that rely on the old behavior (where transaction markers and messages from uncommitted or aborted transactions are visible) will need to explicitly set the new flags (`--show-server-marker`, `--show-txn-uncommitted`, `--show-txn-aborted`) to `true` to maintain the old behavior.
Existing scripts or commands that rely on the old behavior (where transaction markers and messages from uncommitted or aborted transactions are visible) will need to explicitly set the new flags (`--show-server-marker true` and `--transaction-isolation-level READ_UNCOMMITTED` to maintain the old behavior.
This change is necessary as the previous default behavior did not align with typical expectations around data visibility and integrity in transactional systems.
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
Expand Down Expand Up @@ -2726,7 +2727,7 @@ public void readEntryFailed(ManagedLedgerException exception,
@Override
public void readEntryComplete(Entry entry, Object ctx) {
try {
results.complete(generateResponseWithEntry(entry));
results.complete(generateResponseWithEntry(entry, (PersistentTopic) topic));
} catch (IOException exception) {
throw new RestException(exception);
} finally {
Expand Down Expand Up @@ -2867,10 +2868,12 @@ protected CompletableFuture<Response> internalPeekNthMessageAsync(String subName
entry = sub.peekNthMessage(messagePosition);
}
}
return entry;
}).thenCompose(entry -> {
return entry.thenApply(e -> Pair.of(e, (PersistentTopic) topic));
}).thenCompose(entryTopicPair -> {
Entry entry = entryTopicPair.getLeft();
PersistentTopic persistentTopic = entryTopicPair.getRight();
try {
Response response = generateResponseWithEntry(entry);
Response response = generateResponseWithEntry(entry, persistentTopic);
return CompletableFuture.completedFuture(response);
} catch (NullPointerException npe) {
throw new RestException(Status.NOT_FOUND, "Message not found");
Expand Down Expand Up @@ -2949,17 +2952,18 @@ public String toString() {
PersistentTopicsBase.this.topicName);
}
}, null);
return future;
return future.thenApply(entry -> Pair.of(entry, (PersistentTopic) topic));
} catch (ManagedLedgerException exception) {
log.error("[{}] Failed to examine message at position {} from {} due to {}", clientAppId(),
messagePosition,
topicName, exception);
throw new RestException(exception);
}

}).thenApply(entry -> {
}).thenApply(entryTopicPair -> {
Entry entry = entryTopicPair.getLeft();
PersistentTopic persistentTopic = entryTopicPair.getRight();
try {
return generateResponseWithEntry(entry);
return generateResponseWithEntry(entry, persistentTopic);
} catch (IOException exception) {
throw new RestException(exception);
} finally {
Expand All @@ -2970,7 +2974,7 @@ public String toString() {
});
}

private Response generateResponseWithEntry(Entry entry) throws IOException {
private Response generateResponseWithEntry(Entry entry, PersistentTopic persistentTopic) throws IOException {
checkNotNull(entry);
PositionImpl pos = (PositionImpl) entry.getPosition();
ByteBuf metadataAndPayload = entry.getDataBuffer();
Expand Down Expand Up @@ -3088,6 +3092,14 @@ private Response generateResponseWithEntry(Entry entry) throws IOException {
if (metadata.hasNullPartitionKey()) {
responseBuilder.header("X-Pulsar-null-partition-key", metadata.isNullPartitionKey());
}
if (metadata.hasTxnidMostBits() && metadata.hasTxnidLeastBits()) {
TxnID txnID = new TxnID(metadata.getTxnidMostBits(), metadata.getTxnidLeastBits());
boolean isTxnAborted = persistentTopic.isTxnAborted(txnID, (PositionImpl) entry.getPosition());
responseBuilder.header("X-Pulsar-txn-aborted", isTxnAborted);
}
boolean isTxnUncommitted = ((PositionImpl) entry.getPosition())
.compareTo(persistentTopic.getMaxReadPosition()) > 0;
responseBuilder.header("X-Pulsar-txn-uncommitted", isTxnUncommitted);

// Decode if needed
CompressionCodec codec = CompressionCodecProvider.getCompressionCodec(metadata.getCompression());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import lombok.Cleanup;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.http.HttpStatus;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor;
Expand All @@ -48,12 +49,16 @@
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TransactionIsolationLevel;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.transaction.TransactionImpl;
import org.apache.pulsar.common.api.proto.MarkerType;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicDomain;
Expand Down Expand Up @@ -917,6 +922,127 @@ public void testAbortTransaction() throws Exception {
}
}

@Test
public void testPeekMessageForSkipTxnMarker() throws Exception {
initTransaction(1);

final String topic = BrokerTestUtil.newUniqueName("persistent://public/default/peek_marker");

@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create();
int n = 10;
for (int i = 0; i < n; i++) {
Transaction txn = pulsarClient.newTransaction().build().get();
producer.newMessage(txn).value("msg").send();
txn.commit().get();
}

List<Message<byte[]>> peekMsgs = admin.topics().peekMessages(topic, "t-sub", n,
false, TransactionIsolationLevel.READ_UNCOMMITTED);
assertEquals(peekMsgs.size(), n);
for (Message<byte[]> peekMsg : peekMsgs) {
assertEquals(new String(peekMsg.getValue()), "msg");
}
}

@Test
public void testPeekMessageFoReadCommittedMessages() throws Exception {
initTransaction(1);

final String topic = BrokerTestUtil.newUniqueName("persistent://public/default/peek_txn");

@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create();
int n = 10;
// Alternately sends `n` committed transactional messages and `n` abort transactional messages.
for (int i = 0; i < 2 * n; i++) {
Transaction txn = pulsarClient.newTransaction().build().get();
if (i % 2 == 0) {
producer.newMessage(txn).value("msg").send();
txn.commit().get();
} else {
producer.newMessage(txn).value("msg-aborted").send();
txn.abort();
}
}
// Then sends 1 uncommitted transactional messages.
Transaction txn = pulsarClient.newTransaction().build().get();
producer.newMessage(txn).value("msg-uncommitted").send();
// Then sends n-1 no transaction messages.
for (int i = 0; i < n - 1; i++) {
producer.newMessage().value("msg-after-uncommitted").send();
}

// peek n message, all messages value should be "msg"
{
List<Message<byte[]>> peekMsgs = admin.topics().peekMessages(topic, "t-sub", n,
false, TransactionIsolationLevel.READ_COMMITTED);
assertEquals(peekMsgs.size(), n);
for (Message<byte[]> peekMsg : peekMsgs) {
assertEquals(new String(peekMsg.getValue()), "msg");
}
}

// peek 3 * n message, and still get n message, all messages value should be "msg"
{
List<Message<byte[]>> peekMsgs = admin.topics().peekMessages(topic, "t-sub", 2 * n,
false, TransactionIsolationLevel.READ_COMMITTED);
assertEquals(peekMsgs.size(), n);
for (Message<byte[]> peekMsg : peekMsgs) {
assertEquals(new String(peekMsg.getValue()), "msg");
}
}
}

@Test
public void testPeekMessageForShowAllMessages() throws Exception {
initTransaction(1);

final String topic = BrokerTestUtil.newUniqueName("persistent://public/default/peek_all");

@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create();
int n = 10;
// Alternately sends `n` committed transactional messages and `n` abort transactional messages.
for (int i = 0; i < 2 * n; i++) {
Transaction txn = pulsarClient.newTransaction().build().get();
if (i % 2 == 0) {
producer.newMessage(txn).value("msg").send();
txn.commit().get();
} else {
producer.newMessage(txn).value("msg-aborted").send();
txn.abort();
}
}
// Then sends `n` uncommitted transactional messages.
Transaction txn = pulsarClient.newTransaction().build().get();
for (int i = 0; i < n; i++) {
producer.newMessage(txn).value("msg-uncommitted").send();
}

// peek 5 * n message, will get 5 * n msg.
List<Message<byte[]>> peekMsgs = admin.topics().peekMessages(topic, "t-sub", 5 * n,
true, TransactionIsolationLevel.READ_UNCOMMITTED);
assertEquals(peekMsgs.size(), 5 * n);

for (int i = 0; i < 4 * n; i++) {
Message<byte[]> peekMsg = peekMsgs.get(i);
MessageImpl peekMsgImpl = (MessageImpl) peekMsg;
MessageMetadata metadata = peekMsgImpl.getMessageBuilder();
if (metadata.hasMarkerType()) {
assertTrue(metadata.getMarkerType() == MarkerType.TXN_COMMIT_VALUE ||
metadata.getMarkerType() == MarkerType.TXN_ABORT_VALUE);
} else {
String value = new String(peekMsg.getValue());
assertTrue(value.equals("msg") || value.equals("msg-aborted"));
}
}
for (int i = 4 * n; i < peekMsgs.size(); i++) {
Message<byte[]> peekMsg = peekMsgs.get(i);
assertEquals(new String(peekMsg.getValue()), "msg-uncommitted");
}
}

private static void verifyCoordinatorStats(String state,
long sequenceId, long lowWaterMark) {
assertEquals(state, "Ready");
Expand Down
Loading

0 comments on commit 20e83b9

Please sign in to comment.