From 9acf879c2a8a15202309c1ca0c6ec5e3a59b9f57 Mon Sep 17 00:00:00 2001 From: Baodi Shi Date: Wed, 22 May 2024 08:48:52 +0800 Subject: [PATCH 1/4] [improve][cli] PIP-353: Improve transaction message visibility for peek-messages --- .../admin/impl/PersistentTopicsBase.java | 30 +++-- .../admin/v3/AdminApiTransactionTest.java | 125 ++++++++++++++++++ .../apache/pulsar/client/admin/Topics.java | 50 +++++++ .../client/admin/internal/TopicsImpl.java | 78 +++++++++-- .../pulsar/admin/cli/PulsarAdminToolTest.java | 2 +- .../apache/pulsar/admin/cli/CmdTopics.java | 21 ++- 6 files changed, 283 insertions(+), 23 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 104a84d041d8a..c4618cefeed93 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -97,6 +97,7 @@ import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.api.transaction.TxnID; import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.client.impl.MessageImpl; import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; @@ -2717,7 +2718,7 @@ public void readEntryFailed(ManagedLedgerException exception, @Override public void readEntryComplete(Entry entry, Object ctx) { try { - results.complete(generateResponseWithEntry(entry)); + results.complete(generateResponseWithEntry(entry, (PersistentTopic) topic)); } catch (IOException exception) { throw new RestException(exception); } finally { @@ -2858,10 +2859,12 @@ protected CompletableFuture internalPeekNthMessageAsync(String subName entry = sub.peekNthMessage(messagePosition); } } - return entry; - }).thenCompose(entry -> { + return entry.thenApply(e -> Pair.of(e, (PersistentTopic) topic)); + }).thenCompose(entryTopicPair -> { + Entry entry = entryTopicPair.getLeft(); + PersistentTopic persistentTopic = entryTopicPair.getRight(); try { - Response response = generateResponseWithEntry(entry); + Response response = generateResponseWithEntry(entry, persistentTopic); return CompletableFuture.completedFuture(response); } catch (NullPointerException npe) { throw new RestException(Status.NOT_FOUND, "Message not found"); @@ -2940,17 +2943,18 @@ public String toString() { PersistentTopicsBase.this.topicName); } }, null); - return future; + return future.thenApply(entry -> Pair.of(entry, (PersistentTopic) topic)); } catch (ManagedLedgerException exception) { log.error("[{}] Failed to examine message at position {} from {} due to {}", clientAppId(), messagePosition, topicName, exception); throw new RestException(exception); } - - }).thenApply(entry -> { + }).thenApply(entryTopicPair -> { + Entry entry = entryTopicPair.getLeft(); + PersistentTopic persistentTopic = entryTopicPair.getRight(); try { - return generateResponseWithEntry(entry); + return generateResponseWithEntry(entry, persistentTopic); } catch (IOException exception) { throw new RestException(exception); } finally { @@ -2961,7 +2965,7 @@ public String toString() { }); } - private Response generateResponseWithEntry(Entry entry) throws IOException { + private Response generateResponseWithEntry(Entry entry, PersistentTopic persistentTopic) throws IOException { checkNotNull(entry); PositionImpl pos = (PositionImpl) entry.getPosition(); ByteBuf metadataAndPayload = entry.getDataBuffer(); @@ -3079,6 +3083,14 @@ private Response generateResponseWithEntry(Entry entry) throws IOException { if (metadata.hasNullPartitionKey()) { responseBuilder.header("X-Pulsar-null-partition-key", metadata.isNullPartitionKey()); } + if (metadata.hasTxnidMostBits() && metadata.hasTxnidLeastBits()) { + TxnID txnID = new TxnID(metadata.getTxnidMostBits(), metadata.getTxnidLeastBits()); + boolean isTxnAborted = persistentTopic.isTxnAborted(txnID, (PositionImpl) entry.getPosition()); + responseBuilder.header("X-Pulsar-txn-aborted", isTxnAborted); + } + boolean isTxnUncommitted = ((PositionImpl) entry.getPosition()) + .compareTo(persistentTopic.getMaxReadPosition()) > 0; + responseBuilder.header("X-Pulsar-txn-uncommitted", isTxnUncommitted); // Decode if needed CompressionCodec codec = CompressionCodecProvider.getCompressionCodec(metadata.getCompression()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java index adf810945de5f..4cfce3f88e3e7 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java @@ -38,6 +38,7 @@ import lombok.Cleanup; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.http.HttpStatus; +import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor; @@ -53,7 +54,10 @@ import org.apache.pulsar.client.api.transaction.TxnID; import org.apache.pulsar.client.impl.BatchMessageIdImpl; import org.apache.pulsar.client.impl.MessageIdImpl; +import org.apache.pulsar.client.impl.MessageImpl; import org.apache.pulsar.client.impl.transaction.TransactionImpl; +import org.apache.pulsar.common.api.proto.MarkerType; +import org.apache.pulsar.common.api.proto.MessageMetadata; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.SystemTopicNames; import org.apache.pulsar.common.naming.TopicDomain; @@ -917,6 +921,127 @@ public void testAbortTransaction() throws Exception { } } + @Test + public void testPeekMessageForSkipTxnMarker() throws Exception { + initTransaction(1); + + final String topic = BrokerTestUtil.newUniqueName("persistent://public/default/peek_marker"); + + @Cleanup + Producer producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create(); + int n = 10; + for (int i = 0; i < n; i++) { + Transaction txn = pulsarClient.newTransaction().build().get(); + producer.newMessage(txn).value("msg").send(); + txn.commit().get(); + } + + List> peekMsgs = admin.topics().peekMessages(topic, "t-sub", n, + false, true, true); + assertEquals(peekMsgs.size(), n); + for (Message peekMsg : peekMsgs) { + assertEquals(new String(peekMsg.getValue()), "msg"); + } + } + + @Test + public void testPeekMessageForSkipAbortedAndUnCommittedMessages() throws Exception { + initTransaction(1); + + final String topic = BrokerTestUtil.newUniqueName("persistent://public/default/peek_txn"); + + @Cleanup + Producer producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create(); + int n = 10; + // Alternately sends `n` committed transactional messages and `n` abort transactional messages. + for (int i = 0; i < 2 * n; i++) { + Transaction txn = pulsarClient.newTransaction().build().get(); + if (i % 2 == 0) { + producer.newMessage(txn).value("msg").send(); + txn.commit().get(); + } else { + producer.newMessage(txn).value("msg-aborted").send(); + txn.abort(); + } + } + // Then sends 1 uncommitted transactional messages. + Transaction txn = pulsarClient.newTransaction().build().get(); + producer.newMessage(txn).value("msg-uncommitted").send(); + // Then sends n-1 no transaction messages. + for (int i = 0; i < n - 1; i++) { + producer.newMessage().value("msg-after-uncommitted").send(); + } + + // peek n message, all messages value should be "msg" + { + List> peekMsgs = admin.topics().peekMessages(topic, "t-sub", n, + false, false, false); + assertEquals(peekMsgs.size(), n); + for (Message peekMsg : peekMsgs) { + assertEquals(new String(peekMsg.getValue()), "msg"); + } + } + + // peek 3 * n message, and still get n message, all messages value should be "msg" + { + List> peekMsgs = admin.topics().peekMessages(topic, "t-sub", 2 * n, + false, false, false); + assertEquals(peekMsgs.size(), n); + for (Message peekMsg : peekMsgs) { + assertEquals(new String(peekMsg.getValue()), "msg"); + } + } + } + + @Test + public void testPeekMessageForShowAllMessages() throws Exception { + initTransaction(1); + + final String topic = BrokerTestUtil.newUniqueName("persistent://public/default/peek_all"); + + @Cleanup + Producer producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create(); + int n = 10; + // Alternately sends `n` committed transactional messages and `n` abort transactional messages. + for (int i = 0; i < 2 * n; i++) { + Transaction txn = pulsarClient.newTransaction().build().get(); + if (i % 2 == 0) { + producer.newMessage(txn).value("msg").send(); + txn.commit().get(); + } else { + producer.newMessage(txn).value("msg-aborted").send(); + txn.abort(); + } + } + // Then sends `n` uncommitted transactional messages. + Transaction txn = pulsarClient.newTransaction().build().get(); + for (int i = 0; i < n; i++) { + producer.newMessage(txn).value("msg-uncommitted").send(); + } + + // peek 5 * n message, will get 5 * n msg. + List> peekMsgs = admin.topics().peekMessages(topic, "t-sub", 5 * n, + true, true, true); + assertEquals(peekMsgs.size(), 5 * n); + + for (int i = 0; i < 4 * n; i++) { + Message peekMsg = peekMsgs.get(i); + MessageImpl peekMsgImpl = (MessageImpl) peekMsg; + MessageMetadata metadata = peekMsgImpl.getMessageBuilder(); + if (metadata.hasMarkerType()) { + assertTrue(metadata.getMarkerType() == MarkerType.TXN_COMMIT_VALUE || + metadata.getMarkerType() == MarkerType.TXN_ABORT_VALUE); + } else { + String value = new String(peekMsg.getValue()); + assertTrue(value.equals("msg") || value.equals("msg-aborted")); + } + } + for (int i = 4 * n; i < peekMsgs.size(); i++) { + Message peekMsg = peekMsgs.get(i); + assertEquals(new String(peekMsg.getValue()), "msg-uncommitted"); + } + } + private static void verifyCoordinatorStats(String state, long sequenceId, long lowWaterMark) { assertEquals(state, "Ready"); diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java index 574b859e82c80..46c033fdde2cf 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java @@ -1638,6 +1638,7 @@ void expireMessagesForAllSubscriptions(String topic, long expireTimeInSeconds) /** * Peek messages from a topic subscription. + * This method will show server marker, uncommitted and aborted messages for transaction by default. * * @param topic * topic name @@ -1655,8 +1656,36 @@ void expireMessagesForAllSubscriptions(String topic, long expireTimeInSeconds) */ List> peekMessages(String topic, String subName, int numMessages) throws PulsarAdminException; + /** + * Peek messages from a topic subscription. + * + * @param topic + * topic name + * @param subName + * Subscription name + * @param numMessages + * Number of messages + * @param showServerMarker + * Enables the display of internal server write markers + * @param showTxnAborted + * Enables the display of messages from aborted transactions + * @param showTxnUncommitted + * Enables the display of messages from uncommitted transactions + * @return + * @throws NotAuthorizedException + * Don't have admin permission + * @throws NotFoundException + * Topic or subscription does not exist + * @throws PulsarAdminException + * Unexpected error + */ + List> peekMessages(String topic, String subName, int numMessages, + boolean showServerMarker, boolean showTxnAborted, + boolean showTxnUncommitted) throws PulsarAdminException; + /** * Peek messages from a topic subscription asynchronously. + * This method will show server marker, uncommitted and aborted messages for transaction by default. * * @param topic * topic name @@ -1668,6 +1697,27 @@ void expireMessagesForAllSubscriptions(String topic, long expireTimeInSeconds) */ CompletableFuture>> peekMessagesAsync(String topic, String subName, int numMessages); + /** + * Peek messages from a topic subscription asynchronously. + * + * @param topic + * topic name + * @param subName + * Subscription name + * @param numMessages + * Number of messages + * @param showServerMarker + * Enables the display of internal server write markers + * @param showTxnAborted + * Enables the display of messages from aborted transactions + * @param showTxnUncommitted + * Enables the display of messages from uncommitted transactions + * @return a future that can be used to track when the messages are returned + */ + CompletableFuture>> peekMessagesAsync(String topic, String subName, int numMessages, + boolean showServerMarker, boolean showTxnAborted, + boolean showTxnUncommitted); + /** * Get a message by its messageId via a topic subscription. * @param topic diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java index f76cfbcde985f..f9134e16d0aeb 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java @@ -130,6 +130,8 @@ public class TopicsImpl extends BaseResource implements Topics { private static final String SCHEMA_VERSION = "X-Pulsar-Base64-schema-version-b64encoded"; private static final String ENCRYPTION_PARAM = "X-Pulsar-Base64-encryption-param"; private static final String ENCRYPTION_KEYS = "X-Pulsar-Base64-encryption-keys"; + public static final String TXN_ABORTED = "X-Pulsar-txn-aborted"; + public static final String TXN_UNCOMMITTED = "X-Pulsar-txn-uncommitted"; // CHECKSTYLE.ON: MemberName public static final String PROPERTY_SHADOW_SOURCE_KEY = "PULSAR.SHADOW_SOURCE"; @@ -867,7 +869,9 @@ public CompletableFuture expireMessagesForAllSubscriptionsAsync(String top return asyncPostRequest(path, Entity.entity("", MediaType.APPLICATION_JSON)); } - private CompletableFuture>> peekNthMessage(String topic, String subName, int messagePosition) { + private CompletableFuture>> peekNthMessage(String topic, String subName, int messagePosition, + boolean showServerMarker, boolean showTxnAborted, + boolean showTxnUncommitted) { TopicName tn = validateTopic(topic); String encodedSubName = Codec.encode(subName); WebTarget path = topicPath(tn, "subscription", encodedSubName, @@ -879,7 +883,8 @@ private CompletableFuture>> peekNthMessage(String topic, St @Override public void completed(Response response) { try { - future.complete(getMessagesFromHttpResponse(tn.toString(), response)); + future.complete(getMessagesFromHttpResponse(tn.toString(), response, + showServerMarker, showTxnAborted, showTxnUncommitted)); } catch (Exception e) { future.completeExceptionally(getApiException(e)); } @@ -896,26 +901,47 @@ public void failed(Throwable throwable) { @Override public List> peekMessages(String topic, String subName, int numMessages) throws PulsarAdminException { - return sync(() -> peekMessagesAsync(topic, subName, numMessages)); + return sync(() -> peekMessagesAsync(topic, subName, numMessages, true, true, true)); + } + + @Override + public List> peekMessages(String topic, String subName, int numMessages, + boolean showServerMarker, boolean showTxnAborted, + boolean showTxnUncommitted) throws PulsarAdminException { + return sync(() -> + peekMessagesAsync(topic, subName, numMessages, showServerMarker, showTxnAborted, showTxnUncommitted)); } @Override public CompletableFuture>> peekMessagesAsync(String topic, String subName, int numMessages) { checkArgument(numMessages > 0); CompletableFuture>> future = new CompletableFuture>>(); - peekMessagesAsync(topic, subName, numMessages, new ArrayList<>(), future, 1); + peekMessagesAsync(topic, subName, numMessages, new ArrayList<>(), + future, 1, true, true, true); + return future; + } + + @Override + public CompletableFuture>> peekMessagesAsync(String topic, String subName, int numMessages, + boolean showServerMarker, boolean showTxnAborted, boolean showTxnUncommitted) { + checkArgument(numMessages > 0); + CompletableFuture>> future = new CompletableFuture>>(); + peekMessagesAsync(topic, subName, numMessages, new ArrayList<>(), + future, 1, showServerMarker, showTxnAborted, showTxnUncommitted); return future; } private void peekMessagesAsync(String topic, String subName, int numMessages, - List> messages, CompletableFuture>> future, int nthMessage) { + List> messages, CompletableFuture>> future, int nthMessage, + boolean showServerMarker, boolean showTxnAborted, boolean showTxnUncommitted) { if (numMessages <= 0) { future.complete(messages); return; } // if peeking first message succeeds, we know that the topic and subscription exists - peekNthMessage(topic, subName, nthMessage).handle((r, ex) -> { + peekNthMessage(topic, subName, nthMessage, showServerMarker, showTxnAborted, showTxnUncommitted) + .handle((r, ex) -> { if (ex != null) { // if we get a not found exception, it means that the position for the message we are trying to get // does not exist. At this point, we can return the already found messages. @@ -930,7 +956,8 @@ private void peekMessagesAsync(String topic, String subName, int numMessages, for (int i = 0; i < Math.min(r.size(), numMessages); i++) { messages.add(r.get(i)); } - peekMessagesAsync(topic, subName, numMessages - r.size(), messages, future, nthMessage + 1); + peekMessagesAsync(topic, subName, numMessages - r.size(), messages, future, + nthMessage + 1, showServerMarker, showTxnAborted, showTxnUncommitted); return null; }); } @@ -1253,6 +1280,12 @@ private TopicName validateTopic(String topic) { } private List> getMessagesFromHttpResponse(String topic, Response response) throws Exception { + return getMessagesFromHttpResponse(topic, response, true, true, true); + } + + private List> getMessagesFromHttpResponse(String topic, Response response, + boolean showServerMarker, boolean showTxnAborted, + boolean showTxnUncommitted) throws Exception { if (response.getStatus() != Status.OK.getStatusCode()) { throw getApiException(response); @@ -1284,7 +1317,32 @@ private List> getMessagesFromHttpResponse(String topic, Response Map properties = new TreeMap<>(); MultivaluedMap headers = response.getHeaders(); - Object tmp = headers.getFirst(PUBLISH_TIME); + Object tmp = headers.getFirst(MARKER_TYPE); + if (tmp != null) { + if (!showServerMarker) { + return new ArrayList<>(); + } else { + messageMetadata.setMarkerType(Integer.parseInt(tmp.toString())); + } + } + + tmp = headers.getFirst(TXN_ABORTED); + if (tmp != null && Boolean.parseBoolean(tmp.toString())) { + properties.put(TXN_ABORTED, tmp.toString()); + if (!showTxnAborted) { + return new ArrayList<>(); + } + } + + tmp = headers.getFirst(TXN_UNCOMMITTED); + if (tmp != null && Boolean.parseBoolean(tmp.toString())) { + properties.put(TXN_UNCOMMITTED, tmp.toString()); + if (!showTxnUncommitted) { + return new ArrayList<>(); + } + } + + tmp = headers.getFirst(PUBLISH_TIME); if (tmp != null) { messageMetadata.setPublishTime(DateFormatter.parse(tmp.toString())); } @@ -1336,10 +1394,6 @@ private List> getMessagesFromHttpResponse(String topic, Response if (tmp != null) { messageMetadata.setPartitionKeyB64Encoded(Boolean.parseBoolean(tmp.toString())); } - tmp = headers.getFirst(MARKER_TYPE); - if (tmp != null) { - messageMetadata.setMarkerType(Integer.parseInt(tmp.toString())); - } tmp = headers.getFirst(TXNID_LEAST_BITS); if (tmp != null) { messageMetadata.setTxnidLeastBits(Long.parseLong(tmp.toString())); diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java index fd1bdf4799848..729f2da80b838 100644 --- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java +++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java @@ -1744,7 +1744,7 @@ public void topics() throws Exception { verify(mockTopics).deletePartitionedTopic("persistent://myprop/clust/ns1/ds1", true); cmdTopics.run(split("peek-messages persistent://myprop/clust/ns1/ds1 -s sub1 -n 3")); - verify(mockTopics).peekMessages("persistent://myprop/clust/ns1/ds1", "sub1", 3); + verify(mockTopics).peekMessages("persistent://myprop/clust/ns1/ds1", "sub1", 3, false, false, false); MessageImpl message = mock(MessageImpl.class); when(message.getData()).thenReturn(new byte[]{}); diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java index e1e85c68f7e5e..71623143685c4 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java @@ -65,6 +65,8 @@ import org.apache.pulsar.client.impl.BatchMessageIdImpl; import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.client.impl.MessageImpl; +import org.apache.pulsar.common.api.proto.MarkerType; +import org.apache.pulsar.common.api.proto.MessageMetadata; import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.BacklogQuota; @@ -1097,10 +1099,23 @@ private class PeekMessages extends CliCommand { @Option(names = { "-n", "--count" }, description = "Number of messages (default 1)", required = false) private int numMessages = 1; + @Option(names = { "-ssm", "--show-server-marker" }, + description = "Enables the display of internal server write markers.", required = false) + private boolean showServerMarker = false; + + @Option(names = { "-sta", "--show-txn-aborted" }, + description = "Enables the display of messages from aborted transactions.", required = false) + private boolean showTxnAborted = false; + + @Option(names = { "-stu", "--show-txn-uncommitted" }, + description = "Enables the display of messages from uncommitted transactions.", required = false) + private boolean showTxnUncommitted = false; + @Override void run() throws PulsarAdminException { String persistentTopic = validatePersistentTopic(topicName); - List> messages = getTopics().peekMessages(persistentTopic, subName, numMessages); + List> messages = getTopics().peekMessages(persistentTopic, subName, numMessages, + showServerMarker, showTxnAborted, showTxnUncommitted); int position = 0; for (Message msg : messages) { MessageImpl message = (MessageImpl) msg; @@ -1122,6 +1137,10 @@ void run() throws PulsarAdminException { if (message.getDeliverAtTime() != 0) { System.out.println("Deliver at time: " + message.getDeliverAtTime()); } + MessageMetadata msgMetaData = message.getMessageBuilder(); + if (showServerMarker && msgMetaData.hasMarkerType()) { + System.out.println("Marker Type: " + MarkerType.valueOf(msgMetaData.getMarkerType())); + } if (message.getBrokerEntryMetadata() != null) { if (message.getBrokerEntryMetadata().hasBrokerTimestamp()) { From 8e82fd8b89b2fe50ed235775d186ba75eced5706 Mon Sep 17 00:00:00 2001 From: Baodi Shi Date: Mon, 27 May 2024 17:40:59 +0800 Subject: [PATCH 2/4] use --transaction-isolation-level --- pip/pip-353.md | 51 ++++++++++--------- .../admin/v3/AdminApiTransactionTest.java | 11 ++-- .../apache/pulsar/client/admin/Topics.java | 31 +++++------ .../client/admin/internal/TopicsImpl.java | 48 +++++++++-------- .../api/SubscriptionIsolationLevel.java | 26 ++++++++++ .../pulsar/admin/cli/PulsarAdminToolTest.java | 4 +- .../apache/pulsar/admin/cli/CmdTopics.java | 17 ++++--- 7 files changed, 113 insertions(+), 75 deletions(-) create mode 100644 pulsar-client-api/src/main/java/org/apache/pulsar/client/api/SubscriptionIsolationLevel.java diff --git a/pip/pip-353.md b/pip/pip-353.md index 4315bdab0eb2e..6544090a19b15 100644 --- a/pip/pip-353.md +++ b/pip/pip-353.md @@ -25,7 +25,7 @@ This behavior can confuse users and lead to incorrect data handling. The proposa ### In Scope -- Implement flags to selectively display `server markers`, `uncommitted messages`, and `aborted messages` in peek operations. +- Implement flags to selectively display `server markers`, `uncommitted messages(include aborted messages) for transaction` in peek operations. - Set the default behavior to only show messages from committed transactions to ensure data integrity. ### Out of Scope @@ -37,8 +37,9 @@ This behavior can confuse users and lead to incorrect data handling. The proposa The proposal introduces three new flags to the `peek-messages` command: 1. `--show-server-marker`: Controls the visibility of server markers (default: `false`). -2. `--show-txn-uncommitted`: Controls the visibility of messages from uncommitted transactions (default: `false`). -3. `--show-txn-aborted`: Controls the visibility of messages from aborted transactions (default: `false`). +2. `---transaction-isolation-level`: Controls the visibility of messages for transactions. (default: `READ_COMMITTED`). Options: + - READ_COMMITTED: Can only consume all transactional messages which have been committed. + - READ_UNCOMMITTED: Can consume all messages, even transactional messages which have been aborted. These flags will allow administrators and developers to tailor the peek functionality to their needs, improving the usability and security of message handling in transactional contexts. @@ -46,7 +47,7 @@ These flags will allow administrators and developers to tailor the peek function ### Design & Implementation Details -To support the `--show-server-marker` and `--show-txn-aborted`, `--show-txn-uncommitted` flags, needs to introduce specific tag into the `headers` of messages returned by the +To support the `--show-server-marker` and `---transaction-isolation-level` flags, needs to introduce specific tag into the `headers` of messages returned by the [peekNthMessage REST API](https://github.com/apache/pulsar/blob/8ca01cd42edfd4efd986f752f6f8538ea5bf4f94/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java#L1892-L1905). - `X-Pulsar-marker-type`: Already exists. @@ -62,11 +63,10 @@ see the following code: [https://github.com/shibd/pulsar/pull/34](https://github New command line flags added for the `bin/pulsar-admin topics peek-messages` command: -| Flag | Abbreviation | Type | Default | Description | -|--------------------------|--------------|---------|---------|----------------------------------------------------------------| -| `--show-server-marker` | `-ssm` | Boolean | `false` | Enables the display of internal server write markers. | -| `--show-txn-uncommitted` | `-stu` | Boolean | `false` | Enables the display of messages from uncommitted transactions. | -| `--show-txn-aborted` | `-sta` | Boolean | `false` | Enables the display of messages from aborted transactions. | +| Flag | Abbreviation | Type | Default | Description | +|----------------------------------|--------------|---------|---------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `--show-server-marker` | `-ssm` | Boolean | `false` | Enables the display of internal server write markers. | +| `---transaction-isolation-level` | `-til` | Enum | `false` | Enables theSets the isolation level for consuming messages within transactions.
- 'READ_COMMITTED' allows consuming only committed transactional messages.
- 'READ_UNCOMMITTED' allows consuming all messages, even transactional messages which have been aborted. | ## Public-facing Changes @@ -85,10 +85,11 @@ Add two methods to the admin.Topics() interface. * Number of messages * @param showServerMarker * Enables the display of internal server write markers - * @param showTxnAborted - * Enables the display of messages from aborted transactions - * @param showTxnUncommitted - * Enables the display of messages from uncommitted transactions + * @param transactionIsolationLevel + * Sets the isolation level for consuming messages within transactions. + * - 'READ_COMMITTED' allows consuming only committed transactional messages. + * - 'READ_UNCOMMITTED' allows consuming all messages, + * even transactional messages which have been aborted. * @return * @throws NotAuthorizedException * Don't have admin permission @@ -98,8 +99,9 @@ Add two methods to the admin.Topics() interface. * Unexpected error */ List> peekMessages(String topic, String subName, int numMessages, - boolean showServerMarker, boolean showTxnAborted, - boolean showTxnUncommitted) throws PulsarAdminException; + boolean showServerMarker, SubscriptionIsolationLevel transactionIsolationLevel) + throws PulsarAdminException; + /** * Peek messages from a topic subscription asynchronously. @@ -112,15 +114,16 @@ Add two methods to the admin.Topics() interface. * Number of messages * @param showServerMarker * Enables the display of internal server write markers - * @param showTxnAborted - * Enables the display of messages from aborted transactions - * @param showTxnUncommitted - * Enables the display of messages from uncommitted transactions - * @return a future that can be used to track when the messages are returned + @param transactionIsolationLevel + * Sets the isolation level for consuming messages within transactions. + * - 'READ_COMMITTED' allows consuming only committed transactional messages. + * - 'READ_UNCOMMITTED' allows consuming all messages, + * even transactional messages which have been aborted. + * @return a future that can be used to track when the messages are returned */ - CompletableFuture>> peekMessagesAsync(String topic, String subName, int numMessages, - boolean showServerMarker, boolean showTxnAborted, - boolean showTxnUncommitted); + CompletableFuture>> peekMessagesAsync( + String topic, String subName, int numMessages, + boolean showServerMarker, SubscriptionIsolationLevel transactionIsolationLevel); ``` ## Backward & Forward Compatibility @@ -130,5 +133,5 @@ Reverting to a previous version of Pulsar without this feature will remove the a ### Upgrade While upgrading to the new version of Pulsar that includes these changes, the default behavior of the `peek-messages` command will change. -Existing scripts or commands that rely on the old behavior (where transaction markers and messages from uncommitted or aborted transactions are visible) will need to explicitly set the new flags (`--show-server-marker`, `--show-txn-uncommitted`, `--show-txn-aborted`) to `true` to maintain the old behavior. +Existing scripts or commands that rely on the old behavior (where transaction markers and messages from uncommitted or aborted transactions are visible) will need to explicitly set the new flags (`--show-server-marker true` and `--transaction-isolation-level READ_UNCOMMITTED` to maintain the old behavior. This change is necessary as the previous default behavior did not align with typical expectations around data visibility and integrity in transactional systems. \ No newline at end of file diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java index 4cfce3f88e3e7..0a91c1a69bd01 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java @@ -49,6 +49,7 @@ import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SubscriptionIsolationLevel; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.api.transaction.Transaction; import org.apache.pulsar.client.api.transaction.TxnID; @@ -937,7 +938,7 @@ public void testPeekMessageForSkipTxnMarker() throws Exception { } List> peekMsgs = admin.topics().peekMessages(topic, "t-sub", n, - false, true, true); + false, SubscriptionIsolationLevel.READ_UNCOMMITTED); assertEquals(peekMsgs.size(), n); for (Message peekMsg : peekMsgs) { assertEquals(new String(peekMsg.getValue()), "msg"); @@ -945,7 +946,7 @@ public void testPeekMessageForSkipTxnMarker() throws Exception { } @Test - public void testPeekMessageForSkipAbortedAndUnCommittedMessages() throws Exception { + public void testPeekMessageFoReadCommittedMessages() throws Exception { initTransaction(1); final String topic = BrokerTestUtil.newUniqueName("persistent://public/default/peek_txn"); @@ -975,7 +976,7 @@ public void testPeekMessageForSkipAbortedAndUnCommittedMessages() throws Excepti // peek n message, all messages value should be "msg" { List> peekMsgs = admin.topics().peekMessages(topic, "t-sub", n, - false, false, false); + false, SubscriptionIsolationLevel.READ_COMMITTED); assertEquals(peekMsgs.size(), n); for (Message peekMsg : peekMsgs) { assertEquals(new String(peekMsg.getValue()), "msg"); @@ -985,7 +986,7 @@ public void testPeekMessageForSkipAbortedAndUnCommittedMessages() throws Excepti // peek 3 * n message, and still get n message, all messages value should be "msg" { List> peekMsgs = admin.topics().peekMessages(topic, "t-sub", 2 * n, - false, false, false); + false, SubscriptionIsolationLevel.READ_COMMITTED); assertEquals(peekMsgs.size(), n); for (Message peekMsg : peekMsgs) { assertEquals(new String(peekMsg.getValue()), "msg"); @@ -1021,7 +1022,7 @@ public void testPeekMessageForShowAllMessages() throws Exception { // peek 5 * n message, will get 5 * n msg. List> peekMsgs = admin.topics().peekMessages(topic, "t-sub", 5 * n, - true, true, true); + true, SubscriptionIsolationLevel.READ_UNCOMMITTED); assertEquals(peekMsgs.size(), 5 * n); for (int i = 0; i < 4 * n; i++) { diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java index 46c033fdde2cf..7a1efb36ce218 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java @@ -30,6 +30,7 @@ import org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.SubscriptionIsolationLevel; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.partition.PartitionedTopicMetadata; @@ -1638,7 +1639,6 @@ void expireMessagesForAllSubscriptions(String topic, long expireTimeInSeconds) /** * Peek messages from a topic subscription. - * This method will show server marker, uncommitted and aborted messages for transaction by default. * * @param topic * topic name @@ -1667,10 +1667,11 @@ void expireMessagesForAllSubscriptions(String topic, long expireTimeInSeconds) * Number of messages * @param showServerMarker * Enables the display of internal server write markers - * @param showTxnAborted - * Enables the display of messages from aborted transactions - * @param showTxnUncommitted - * Enables the display of messages from uncommitted transactions + * @param transactionIsolationLevel + * Sets the isolation level for peeking messages within transactions. + * - 'READ_COMMITTED' allows peeking only committed transactional messages. + * - 'READ_UNCOMMITTED' allows peeking all messages, + * even transactional messages which have been aborted. * @return * @throws NotAuthorizedException * Don't have admin permission @@ -1680,12 +1681,11 @@ void expireMessagesForAllSubscriptions(String topic, long expireTimeInSeconds) * Unexpected error */ List> peekMessages(String topic, String subName, int numMessages, - boolean showServerMarker, boolean showTxnAborted, - boolean showTxnUncommitted) throws PulsarAdminException; + boolean showServerMarker, SubscriptionIsolationLevel transactionIsolationLevel) + throws PulsarAdminException; /** * Peek messages from a topic subscription asynchronously. - * This method will show server marker, uncommitted and aborted messages for transaction by default. * * @param topic * topic name @@ -1708,15 +1708,16 @@ List> peekMessages(String topic, String subName, int numMessages * Number of messages * @param showServerMarker * Enables the display of internal server write markers - * @param showTxnAborted - * Enables the display of messages from aborted transactions - * @param showTxnUncommitted - * Enables the display of messages from uncommitted transactions + @param transactionIsolationLevel + * Sets the isolation level for peeking messages within transactions. + * - 'READ_COMMITTED' allows peeking only committed transactional messages. + * - 'READ_UNCOMMITTED' allows peeking all messages, + * even transactional messages which have been aborted. * @return a future that can be used to track when the messages are returned */ - CompletableFuture>> peekMessagesAsync(String topic, String subName, int numMessages, - boolean showServerMarker, boolean showTxnAborted, - boolean showTxnUncommitted); + CompletableFuture>> peekMessagesAsync( + String topic, String subName, int numMessages, + boolean showServerMarker, SubscriptionIsolationLevel transactionIsolationLevel); /** * Get a message by its messageId via a topic subscription. diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java index f9134e16d0aeb..c6faf5854b316 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java @@ -55,6 +55,7 @@ import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SubscriptionIsolationLevel; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.BatchMessageIdImpl; import org.apache.pulsar.client.impl.MessageIdImpl; @@ -869,9 +870,9 @@ public CompletableFuture expireMessagesForAllSubscriptionsAsync(String top return asyncPostRequest(path, Entity.entity("", MediaType.APPLICATION_JSON)); } - private CompletableFuture>> peekNthMessage(String topic, String subName, int messagePosition, - boolean showServerMarker, boolean showTxnAborted, - boolean showTxnUncommitted) { + private CompletableFuture>> peekNthMessage( + String topic, String subName, int messagePosition, boolean showServerMarker, + SubscriptionIsolationLevel transactionIsolationLevel) { TopicName tn = validateTopic(topic); String encodedSubName = Codec.encode(subName); WebTarget path = topicPath(tn, "subscription", encodedSubName, @@ -884,7 +885,7 @@ private CompletableFuture>> peekNthMessage(String topic, St public void completed(Response response) { try { future.complete(getMessagesFromHttpResponse(tn.toString(), response, - showServerMarker, showTxnAborted, showTxnUncommitted)); + showServerMarker, transactionIsolationLevel)); } catch (Exception e) { future.completeExceptionally(getApiException(e)); } @@ -901,15 +902,16 @@ public void failed(Throwable throwable) { @Override public List> peekMessages(String topic, String subName, int numMessages) throws PulsarAdminException { - return sync(() -> peekMessagesAsync(topic, subName, numMessages, true, true, true)); + return sync(() -> peekMessagesAsync(topic, subName, numMessages, + false, SubscriptionIsolationLevel.READ_COMMITTED)); } @Override public List> peekMessages(String topic, String subName, int numMessages, - boolean showServerMarker, boolean showTxnAborted, - boolean showTxnUncommitted) throws PulsarAdminException { - return sync(() -> - peekMessagesAsync(topic, subName, numMessages, showServerMarker, showTxnAborted, showTxnUncommitted)); + boolean showServerMarker, + SubscriptionIsolationLevel transactionIsolationLevel) + throws PulsarAdminException { + return sync(() -> peekMessagesAsync(topic, subName, numMessages, showServerMarker, transactionIsolationLevel)); } @Override @@ -917,30 +919,31 @@ public CompletableFuture>> peekMessagesAsync(String topic, checkArgument(numMessages > 0); CompletableFuture>> future = new CompletableFuture>>(); peekMessagesAsync(topic, subName, numMessages, new ArrayList<>(), - future, 1, true, true, true); + future, 1, false, SubscriptionIsolationLevel.READ_COMMITTED); return future; } @Override - public CompletableFuture>> peekMessagesAsync(String topic, String subName, int numMessages, - boolean showServerMarker, boolean showTxnAborted, boolean showTxnUncommitted) { + public CompletableFuture>> peekMessagesAsync( + String topic, String subName, int numMessages, + boolean showServerMarker, SubscriptionIsolationLevel transactionIsolationLevel) { checkArgument(numMessages > 0); CompletableFuture>> future = new CompletableFuture>>(); peekMessagesAsync(topic, subName, numMessages, new ArrayList<>(), - future, 1, showServerMarker, showTxnAborted, showTxnUncommitted); + future, 1, showServerMarker, transactionIsolationLevel); return future; } private void peekMessagesAsync(String topic, String subName, int numMessages, List> messages, CompletableFuture>> future, int nthMessage, - boolean showServerMarker, boolean showTxnAborted, boolean showTxnUncommitted) { + boolean showServerMarker, SubscriptionIsolationLevel transactionIsolationLevel) { if (numMessages <= 0) { future.complete(messages); return; } // if peeking first message succeeds, we know that the topic and subscription exists - peekNthMessage(topic, subName, nthMessage, showServerMarker, showTxnAborted, showTxnUncommitted) + peekNthMessage(topic, subName, nthMessage, showServerMarker, transactionIsolationLevel) .handle((r, ex) -> { if (ex != null) { // if we get a not found exception, it means that the position for the message we are trying to get @@ -957,7 +960,7 @@ private void peekMessagesAsync(String topic, String subName, int numMessages, messages.add(r.get(i)); } peekMessagesAsync(topic, subName, numMessages - r.size(), messages, future, - nthMessage + 1, showServerMarker, showTxnAborted, showTxnUncommitted); + nthMessage + 1, showServerMarker, transactionIsolationLevel); return null; }); } @@ -1280,12 +1283,13 @@ private TopicName validateTopic(String topic) { } private List> getMessagesFromHttpResponse(String topic, Response response) throws Exception { - return getMessagesFromHttpResponse(topic, response, true, true, true); + return getMessagesFromHttpResponse(topic, response, true, + SubscriptionIsolationLevel.READ_UNCOMMITTED); } - private List> getMessagesFromHttpResponse(String topic, Response response, - boolean showServerMarker, boolean showTxnAborted, - boolean showTxnUncommitted) throws Exception { + private List> getMessagesFromHttpResponse( + String topic, Response response, boolean showServerMarker, + SubscriptionIsolationLevel transactionIsolationLevel) throws Exception { if (response.getStatus() != Status.OK.getStatusCode()) { throw getApiException(response); @@ -1329,7 +1333,7 @@ private List> getMessagesFromHttpResponse(String topic, Response tmp = headers.getFirst(TXN_ABORTED); if (tmp != null && Boolean.parseBoolean(tmp.toString())) { properties.put(TXN_ABORTED, tmp.toString()); - if (!showTxnAborted) { + if (transactionIsolationLevel == SubscriptionIsolationLevel.READ_COMMITTED) { return new ArrayList<>(); } } @@ -1337,7 +1341,7 @@ private List> getMessagesFromHttpResponse(String topic, Response tmp = headers.getFirst(TXN_UNCOMMITTED); if (tmp != null && Boolean.parseBoolean(tmp.toString())) { properties.put(TXN_UNCOMMITTED, tmp.toString()); - if (!showTxnUncommitted) { + if (transactionIsolationLevel == SubscriptionIsolationLevel.READ_COMMITTED) { return new ArrayList<>(); } } diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/SubscriptionIsolationLevel.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/SubscriptionIsolationLevel.java new file mode 100644 index 0000000000000..0ec25f9dd2494 --- /dev/null +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/SubscriptionIsolationLevel.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api; + +public enum SubscriptionIsolationLevel { + // Consumer can only consume all transactional messages which have been committed. + READ_COMMITTED, + // Consumer can consume all messages, even transactional messages which have been aborted. + READ_UNCOMMITTED; +} diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java index 729f2da80b838..7053f61e8ad56 100644 --- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java +++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java @@ -81,6 +81,7 @@ import org.apache.pulsar.client.admin.internal.OffloadProcessStatusImpl; import org.apache.pulsar.client.admin.internal.PulsarAdminImpl; import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.SubscriptionIsolationLevel; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.api.schema.SchemaDefinition; import org.apache.pulsar.client.api.transaction.TxnID; @@ -1744,7 +1745,8 @@ public void topics() throws Exception { verify(mockTopics).deletePartitionedTopic("persistent://myprop/clust/ns1/ds1", true); cmdTopics.run(split("peek-messages persistent://myprop/clust/ns1/ds1 -s sub1 -n 3")); - verify(mockTopics).peekMessages("persistent://myprop/clust/ns1/ds1", "sub1", 3, false, false, false); + verify(mockTopics).peekMessages("persistent://myprop/clust/ns1/ds1", "sub1", 3, + false, SubscriptionIsolationLevel.READ_COMMITTED); MessageImpl message = mock(MessageImpl.class); when(message.getData()).thenReturn(new byte[]{}); diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java index 71623143685c4..4d1fcca9b3480 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java @@ -61,6 +61,7 @@ import org.apache.pulsar.client.admin.Topics; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.SubscriptionIsolationLevel; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.BatchMessageIdImpl; import org.apache.pulsar.client.impl.MessageIdImpl; @@ -1103,19 +1104,19 @@ private class PeekMessages extends CliCommand { description = "Enables the display of internal server write markers.", required = false) private boolean showServerMarker = false; - @Option(names = { "-sta", "--show-txn-aborted" }, - description = "Enables the display of messages from aborted transactions.", required = false) - private boolean showTxnAborted = false; - - @Option(names = { "-stu", "--show-txn-uncommitted" }, - description = "Enables the display of messages from uncommitted transactions.", required = false) - private boolean showTxnUncommitted = false; + @Option(names = { "-til", "--transaction-isolation-level" }, + description = "Sets the isolation level for peeking messages within transactions. " + + "'READ_COMMITTED' allows peeking only committed transactional messages. " + + "'READ_UNCOMMITTED' allows peeking all messages, " + + "even transactional messages which have been aborted.", + required = false) + private SubscriptionIsolationLevel transactionIsolationLevel = SubscriptionIsolationLevel.READ_COMMITTED; @Override void run() throws PulsarAdminException { String persistentTopic = validatePersistentTopic(topicName); List> messages = getTopics().peekMessages(persistentTopic, subName, numMessages, - showServerMarker, showTxnAborted, showTxnUncommitted); + showServerMarker, transactionIsolationLevel); int position = 0; for (Message msg : messages) { MessageImpl message = (MessageImpl) msg; From d5bec6f4a9a2a48676703a86aafb6c2e0e72f6a1 Mon Sep 17 00:00:00 2001 From: Baodi Shi Date: Tue, 28 May 2024 09:05:02 +0800 Subject: [PATCH 3/4] address code reviews. --- .../org/apache/pulsar/client/admin/Topics.java | 9 +++++++-- .../pulsar/client/admin/internal/TopicsImpl.java | 16 ---------------- .../client/api/SubscriptionIsolationLevel.java | 5 +++++ 3 files changed, 12 insertions(+), 18 deletions(-) diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java index 7a1efb36ce218..be150de636a18 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java @@ -1654,7 +1654,10 @@ void expireMessagesForAllSubscriptions(String topic, long expireTimeInSeconds) * @throws PulsarAdminException * Unexpected error */ - List> peekMessages(String topic, String subName, int numMessages) throws PulsarAdminException; + default List> peekMessages(String topic, String subName, int numMessages) + throws PulsarAdminException { + return peekMessages(topic, subName, numMessages, false, SubscriptionIsolationLevel.READ_COMMITTED); + } /** * Peek messages from a topic subscription. @@ -1695,7 +1698,9 @@ List> peekMessages(String topic, String subName, int numMessages * Number of messages * @return a future that can be used to track when the messages are returned */ - CompletableFuture>> peekMessagesAsync(String topic, String subName, int numMessages); + default CompletableFuture>> peekMessagesAsync(String topic, String subName, int numMessages) { + return peekMessagesAsync(topic, subName, numMessages, false, SubscriptionIsolationLevel.READ_COMMITTED); + } /** * Peek messages from a topic subscription asynchronously. diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java index c6faf5854b316..4babc703e7b1d 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java @@ -899,13 +899,6 @@ public void failed(Throwable throwable) { return future; } - @Override - public List> peekMessages(String topic, String subName, int numMessages) - throws PulsarAdminException { - return sync(() -> peekMessagesAsync(topic, subName, numMessages, - false, SubscriptionIsolationLevel.READ_COMMITTED)); - } - @Override public List> peekMessages(String topic, String subName, int numMessages, boolean showServerMarker, @@ -914,15 +907,6 @@ public List> peekMessages(String topic, String subName, int numM return sync(() -> peekMessagesAsync(topic, subName, numMessages, showServerMarker, transactionIsolationLevel)); } - @Override - public CompletableFuture>> peekMessagesAsync(String topic, String subName, int numMessages) { - checkArgument(numMessages > 0); - CompletableFuture>> future = new CompletableFuture>>(); - peekMessagesAsync(topic, subName, numMessages, new ArrayList<>(), - future, 1, false, SubscriptionIsolationLevel.READ_COMMITTED); - return future; - } - @Override public CompletableFuture>> peekMessagesAsync( String topic, String subName, int numMessages, diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/SubscriptionIsolationLevel.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/SubscriptionIsolationLevel.java index 0ec25f9dd2494..c8f8da8b307d1 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/SubscriptionIsolationLevel.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/SubscriptionIsolationLevel.java @@ -18,6 +18,11 @@ */ package org.apache.pulsar.client.api; +import org.apache.pulsar.common.classification.InterfaceAudience; +import org.apache.pulsar.common.classification.InterfaceStability; + +@InterfaceAudience.Public +@InterfaceStability.Stable public enum SubscriptionIsolationLevel { // Consumer can only consume all transactional messages which have been committed. READ_COMMITTED, From b158ff19f30ca7a78add129fe0b5c1330735fe9a Mon Sep 17 00:00:00 2001 From: Baodi Shi Date: Tue, 28 May 2024 09:53:05 +0800 Subject: [PATCH 4/4] Change class name to TransactionIsolationLevel --- pip/pip-353.md | 4 ++-- .../admin/v3/AdminApiTransactionTest.java | 10 +++++----- .../org/apache/pulsar/client/admin/Topics.java | 10 +++++----- .../client/admin/internal/TopicsImpl.java | 18 +++++++++--------- ...vel.java => TransactionIsolationLevel.java} | 2 +- .../pulsar/admin/cli/PulsarAdminToolTest.java | 4 ++-- .../org/apache/pulsar/admin/cli/CmdTopics.java | 4 ++-- 7 files changed, 26 insertions(+), 26 deletions(-) rename pulsar-client-api/src/main/java/org/apache/pulsar/client/api/{SubscriptionIsolationLevel.java => TransactionIsolationLevel.java} (96%) diff --git a/pip/pip-353.md b/pip/pip-353.md index 6544090a19b15..5944aaea1abf4 100644 --- a/pip/pip-353.md +++ b/pip/pip-353.md @@ -99,7 +99,7 @@ Add two methods to the admin.Topics() interface. * Unexpected error */ List> peekMessages(String topic, String subName, int numMessages, - boolean showServerMarker, SubscriptionIsolationLevel transactionIsolationLevel) + boolean showServerMarker, TransactionIsolationLevel transactionIsolationLevel) throws PulsarAdminException; @@ -123,7 +123,7 @@ Add two methods to the admin.Topics() interface. */ CompletableFuture>> peekMessagesAsync( String topic, String subName, int numMessages, - boolean showServerMarker, SubscriptionIsolationLevel transactionIsolationLevel); + boolean showServerMarker, TransactionIsolationLevel transactionIsolationLevel); ``` ## Backward & Forward Compatibility diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java index 0a91c1a69bd01..5a192d0159a42 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java @@ -49,7 +49,7 @@ import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Schema; -import org.apache.pulsar.client.api.SubscriptionIsolationLevel; +import org.apache.pulsar.client.api.TransactionIsolationLevel; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.api.transaction.Transaction; import org.apache.pulsar.client.api.transaction.TxnID; @@ -938,7 +938,7 @@ public void testPeekMessageForSkipTxnMarker() throws Exception { } List> peekMsgs = admin.topics().peekMessages(topic, "t-sub", n, - false, SubscriptionIsolationLevel.READ_UNCOMMITTED); + false, TransactionIsolationLevel.READ_UNCOMMITTED); assertEquals(peekMsgs.size(), n); for (Message peekMsg : peekMsgs) { assertEquals(new String(peekMsg.getValue()), "msg"); @@ -976,7 +976,7 @@ public void testPeekMessageFoReadCommittedMessages() throws Exception { // peek n message, all messages value should be "msg" { List> peekMsgs = admin.topics().peekMessages(topic, "t-sub", n, - false, SubscriptionIsolationLevel.READ_COMMITTED); + false, TransactionIsolationLevel.READ_COMMITTED); assertEquals(peekMsgs.size(), n); for (Message peekMsg : peekMsgs) { assertEquals(new String(peekMsg.getValue()), "msg"); @@ -986,7 +986,7 @@ public void testPeekMessageFoReadCommittedMessages() throws Exception { // peek 3 * n message, and still get n message, all messages value should be "msg" { List> peekMsgs = admin.topics().peekMessages(topic, "t-sub", 2 * n, - false, SubscriptionIsolationLevel.READ_COMMITTED); + false, TransactionIsolationLevel.READ_COMMITTED); assertEquals(peekMsgs.size(), n); for (Message peekMsg : peekMsgs) { assertEquals(new String(peekMsg.getValue()), "msg"); @@ -1022,7 +1022,7 @@ public void testPeekMessageForShowAllMessages() throws Exception { // peek 5 * n message, will get 5 * n msg. List> peekMsgs = admin.topics().peekMessages(topic, "t-sub", 5 * n, - true, SubscriptionIsolationLevel.READ_UNCOMMITTED); + true, TransactionIsolationLevel.READ_UNCOMMITTED); assertEquals(peekMsgs.size(), 5 * n); for (int i = 0; i < 4 * n; i++) { diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java index be150de636a18..c681bd1a7bca1 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java @@ -30,8 +30,8 @@ import org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; -import org.apache.pulsar.client.api.SubscriptionIsolationLevel; import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.api.TransactionIsolationLevel; import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.apache.pulsar.common.policies.data.AuthAction; @@ -1656,7 +1656,7 @@ void expireMessagesForAllSubscriptions(String topic, long expireTimeInSeconds) */ default List> peekMessages(String topic, String subName, int numMessages) throws PulsarAdminException { - return peekMessages(topic, subName, numMessages, false, SubscriptionIsolationLevel.READ_COMMITTED); + return peekMessages(topic, subName, numMessages, false, TransactionIsolationLevel.READ_COMMITTED); } /** @@ -1684,7 +1684,7 @@ default List> peekMessages(String topic, String subName, int num * Unexpected error */ List> peekMessages(String topic, String subName, int numMessages, - boolean showServerMarker, SubscriptionIsolationLevel transactionIsolationLevel) + boolean showServerMarker, TransactionIsolationLevel transactionIsolationLevel) throws PulsarAdminException; /** @@ -1699,7 +1699,7 @@ List> peekMessages(String topic, String subName, int numMessages * @return a future that can be used to track when the messages are returned */ default CompletableFuture>> peekMessagesAsync(String topic, String subName, int numMessages) { - return peekMessagesAsync(topic, subName, numMessages, false, SubscriptionIsolationLevel.READ_COMMITTED); + return peekMessagesAsync(topic, subName, numMessages, false, TransactionIsolationLevel.READ_COMMITTED); } /** @@ -1722,7 +1722,7 @@ default CompletableFuture>> peekMessagesAsync(String topic, */ CompletableFuture>> peekMessagesAsync( String topic, String subName, int numMessages, - boolean showServerMarker, SubscriptionIsolationLevel transactionIsolationLevel); + boolean showServerMarker, TransactionIsolationLevel transactionIsolationLevel); /** * Get a message by its messageId via a topic subscription. diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java index 4babc703e7b1d..b7a8b87664075 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java @@ -55,8 +55,8 @@ import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Schema; -import org.apache.pulsar.client.api.SubscriptionIsolationLevel; import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.api.TransactionIsolationLevel; import org.apache.pulsar.client.impl.BatchMessageIdImpl; import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.client.impl.MessageImpl; @@ -872,7 +872,7 @@ public CompletableFuture expireMessagesForAllSubscriptionsAsync(String top private CompletableFuture>> peekNthMessage( String topic, String subName, int messagePosition, boolean showServerMarker, - SubscriptionIsolationLevel transactionIsolationLevel) { + TransactionIsolationLevel transactionIsolationLevel) { TopicName tn = validateTopic(topic); String encodedSubName = Codec.encode(subName); WebTarget path = topicPath(tn, "subscription", encodedSubName, @@ -902,7 +902,7 @@ public void failed(Throwable throwable) { @Override public List> peekMessages(String topic, String subName, int numMessages, boolean showServerMarker, - SubscriptionIsolationLevel transactionIsolationLevel) + TransactionIsolationLevel transactionIsolationLevel) throws PulsarAdminException { return sync(() -> peekMessagesAsync(topic, subName, numMessages, showServerMarker, transactionIsolationLevel)); } @@ -910,7 +910,7 @@ public List> peekMessages(String topic, String subName, int numM @Override public CompletableFuture>> peekMessagesAsync( String topic, String subName, int numMessages, - boolean showServerMarker, SubscriptionIsolationLevel transactionIsolationLevel) { + boolean showServerMarker, TransactionIsolationLevel transactionIsolationLevel) { checkArgument(numMessages > 0); CompletableFuture>> future = new CompletableFuture>>(); peekMessagesAsync(topic, subName, numMessages, new ArrayList<>(), @@ -920,7 +920,7 @@ public CompletableFuture>> peekMessagesAsync( private void peekMessagesAsync(String topic, String subName, int numMessages, List> messages, CompletableFuture>> future, int nthMessage, - boolean showServerMarker, SubscriptionIsolationLevel transactionIsolationLevel) { + boolean showServerMarker, TransactionIsolationLevel transactionIsolationLevel) { if (numMessages <= 0) { future.complete(messages); return; @@ -1268,12 +1268,12 @@ private TopicName validateTopic(String topic) { private List> getMessagesFromHttpResponse(String topic, Response response) throws Exception { return getMessagesFromHttpResponse(topic, response, true, - SubscriptionIsolationLevel.READ_UNCOMMITTED); + TransactionIsolationLevel.READ_UNCOMMITTED); } private List> getMessagesFromHttpResponse( String topic, Response response, boolean showServerMarker, - SubscriptionIsolationLevel transactionIsolationLevel) throws Exception { + TransactionIsolationLevel transactionIsolationLevel) throws Exception { if (response.getStatus() != Status.OK.getStatusCode()) { throw getApiException(response); @@ -1317,7 +1317,7 @@ private List> getMessagesFromHttpResponse( tmp = headers.getFirst(TXN_ABORTED); if (tmp != null && Boolean.parseBoolean(tmp.toString())) { properties.put(TXN_ABORTED, tmp.toString()); - if (transactionIsolationLevel == SubscriptionIsolationLevel.READ_COMMITTED) { + if (transactionIsolationLevel == TransactionIsolationLevel.READ_COMMITTED) { return new ArrayList<>(); } } @@ -1325,7 +1325,7 @@ private List> getMessagesFromHttpResponse( tmp = headers.getFirst(TXN_UNCOMMITTED); if (tmp != null && Boolean.parseBoolean(tmp.toString())) { properties.put(TXN_UNCOMMITTED, tmp.toString()); - if (transactionIsolationLevel == SubscriptionIsolationLevel.READ_COMMITTED) { + if (transactionIsolationLevel == TransactionIsolationLevel.READ_COMMITTED) { return new ArrayList<>(); } } diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/SubscriptionIsolationLevel.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TransactionIsolationLevel.java similarity index 96% rename from pulsar-client-api/src/main/java/org/apache/pulsar/client/api/SubscriptionIsolationLevel.java rename to pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TransactionIsolationLevel.java index c8f8da8b307d1..ae385b20232c7 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/SubscriptionIsolationLevel.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TransactionIsolationLevel.java @@ -23,7 +23,7 @@ @InterfaceAudience.Public @InterfaceStability.Stable -public enum SubscriptionIsolationLevel { +public enum TransactionIsolationLevel { // Consumer can only consume all transactional messages which have been committed. READ_COMMITTED, // Consumer can consume all messages, even transactional messages which have been aborted. diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java index 7053f61e8ad56..a3b1fa075cffc 100644 --- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java +++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java @@ -81,7 +81,7 @@ import org.apache.pulsar.client.admin.internal.OffloadProcessStatusImpl; import org.apache.pulsar.client.admin.internal.PulsarAdminImpl; import org.apache.pulsar.client.api.MessageId; -import org.apache.pulsar.client.api.SubscriptionIsolationLevel; +import org.apache.pulsar.client.api.TransactionIsolationLevel; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.api.schema.SchemaDefinition; import org.apache.pulsar.client.api.transaction.TxnID; @@ -1746,7 +1746,7 @@ public void topics() throws Exception { cmdTopics.run(split("peek-messages persistent://myprop/clust/ns1/ds1 -s sub1 -n 3")); verify(mockTopics).peekMessages("persistent://myprop/clust/ns1/ds1", "sub1", 3, - false, SubscriptionIsolationLevel.READ_COMMITTED); + false, TransactionIsolationLevel.READ_COMMITTED); MessageImpl message = mock(MessageImpl.class); when(message.getData()).thenReturn(new byte[]{}); diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java index 4d1fcca9b3480..261bd81a5b7bd 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java @@ -61,8 +61,8 @@ import org.apache.pulsar.client.admin.Topics; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; -import org.apache.pulsar.client.api.SubscriptionIsolationLevel; import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.api.TransactionIsolationLevel; import org.apache.pulsar.client.impl.BatchMessageIdImpl; import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.client.impl.MessageImpl; @@ -1110,7 +1110,7 @@ private class PeekMessages extends CliCommand { + "'READ_UNCOMMITTED' allows peeking all messages, " + "even transactional messages which have been aborted.", required = false) - private SubscriptionIsolationLevel transactionIsolationLevel = SubscriptionIsolationLevel.READ_COMMITTED; + private TransactionIsolationLevel transactionIsolationLevel = TransactionIsolationLevel.READ_COMMITTED; @Override void run() throws PulsarAdminException {