Skip to content

Commit

Permalink
[improve][cli] PIP-353: Improve transaction message visibility for pe…
Browse files Browse the repository at this point in the history
…ek-message (#22762)

(cherry picked from commit 20e83b9)
  • Loading branch information
shibd committed May 28, 2024
1 parent 7ca884b commit 66baa8d
Show file tree
Hide file tree
Showing 7 changed files with 316 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
Expand Down Expand Up @@ -2707,7 +2708,7 @@ public void readEntryFailed(ManagedLedgerException exception,
@Override
public void readEntryComplete(Entry entry, Object ctx) {
try {
results.complete(generateResponseWithEntry(entry));
results.complete(generateResponseWithEntry(entry, (PersistentTopic) topic));
} catch (IOException exception) {
throw new RestException(exception);
} finally {
Expand Down Expand Up @@ -2848,10 +2849,12 @@ protected CompletableFuture<Response> internalPeekNthMessageAsync(String subName
entry = sub.peekNthMessage(messagePosition);
}
}
return entry;
}).thenCompose(entry -> {
return entry.thenApply(e -> Pair.of(e, (PersistentTopic) topic));
}).thenCompose(entryTopicPair -> {
Entry entry = entryTopicPair.getLeft();
PersistentTopic persistentTopic = entryTopicPair.getRight();
try {
Response response = generateResponseWithEntry(entry);
Response response = generateResponseWithEntry(entry, persistentTopic);
return CompletableFuture.completedFuture(response);
} catch (NullPointerException npe) {
throw new RestException(Status.NOT_FOUND, "Message not found");
Expand Down Expand Up @@ -2930,17 +2933,18 @@ public String toString() {
PersistentTopicsBase.this.topicName);
}
}, null);
return future;
return future.thenApply(entry -> Pair.of(entry, (PersistentTopic) topic));
} catch (ManagedLedgerException exception) {
log.error("[{}] Failed to examine message at position {} from {} due to {}", clientAppId(),
messagePosition,
topicName, exception);
throw new RestException(exception);
}

}).thenApply(entry -> {
}).thenApply(entryTopicPair -> {
Entry entry = entryTopicPair.getLeft();
PersistentTopic persistentTopic = entryTopicPair.getRight();
try {
return generateResponseWithEntry(entry);
return generateResponseWithEntry(entry, persistentTopic);
} catch (IOException exception) {
throw new RestException(exception);
} finally {
Expand All @@ -2951,7 +2955,7 @@ public String toString() {
});
}

private Response generateResponseWithEntry(Entry entry) throws IOException {
private Response generateResponseWithEntry(Entry entry, PersistentTopic persistentTopic) throws IOException {
checkNotNull(entry);
PositionImpl pos = (PositionImpl) entry.getPosition();
ByteBuf metadataAndPayload = entry.getDataBuffer();
Expand Down Expand Up @@ -3069,6 +3073,14 @@ private Response generateResponseWithEntry(Entry entry) throws IOException {
if (metadata.hasNullPartitionKey()) {
responseBuilder.header("X-Pulsar-null-partition-key", metadata.isNullPartitionKey());
}
if (metadata.hasTxnidMostBits() && metadata.hasTxnidLeastBits()) {
TxnID txnID = new TxnID(metadata.getTxnidMostBits(), metadata.getTxnidLeastBits());
boolean isTxnAborted = persistentTopic.isTxnAborted(txnID, (PositionImpl) entry.getPosition());
responseBuilder.header("X-Pulsar-txn-aborted", isTxnAborted);
}
boolean isTxnUncommitted = ((PositionImpl) entry.getPosition())
.compareTo(persistentTopic.getMaxReadPosition()) > 0;
responseBuilder.header("X-Pulsar-txn-uncommitted", isTxnUncommitted);

// Decode if needed
CompressionCodec codec = CompressionCodecProvider.getCompressionCodec(metadata.getCompression());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import lombok.Cleanup;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.http.HttpStatus;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor;
Expand All @@ -48,12 +49,16 @@
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TransactionIsolationLevel;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.transaction.TransactionImpl;
import org.apache.pulsar.common.api.proto.MarkerType;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicDomain;
Expand Down Expand Up @@ -917,6 +922,127 @@ public void testAbortTransaction() throws Exception {
}
}

@Test
public void testPeekMessageForSkipTxnMarker() throws Exception {
initTransaction(1);

final String topic = BrokerTestUtil.newUniqueName("persistent://public/default/peek_marker");

@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create();
int n = 10;
for (int i = 0; i < n; i++) {
Transaction txn = pulsarClient.newTransaction().build().get();
producer.newMessage(txn).value("msg").send();
txn.commit().get();
}

List<Message<byte[]>> peekMsgs = admin.topics().peekMessages(topic, "t-sub", n,
false, TransactionIsolationLevel.READ_UNCOMMITTED);
assertEquals(peekMsgs.size(), n);
for (Message<byte[]> peekMsg : peekMsgs) {
assertEquals(new String(peekMsg.getValue()), "msg");
}
}

@Test
public void testPeekMessageFoReadCommittedMessages() throws Exception {
initTransaction(1);

final String topic = BrokerTestUtil.newUniqueName("persistent://public/default/peek_txn");

@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create();
int n = 10;
// Alternately sends `n` committed transactional messages and `n` abort transactional messages.
for (int i = 0; i < 2 * n; i++) {
Transaction txn = pulsarClient.newTransaction().build().get();
if (i % 2 == 0) {
producer.newMessage(txn).value("msg").send();
txn.commit().get();
} else {
producer.newMessage(txn).value("msg-aborted").send();
txn.abort();
}
}
// Then sends 1 uncommitted transactional messages.
Transaction txn = pulsarClient.newTransaction().build().get();
producer.newMessage(txn).value("msg-uncommitted").send();
// Then sends n-1 no transaction messages.
for (int i = 0; i < n - 1; i++) {
producer.newMessage().value("msg-after-uncommitted").send();
}

// peek n message, all messages value should be "msg"
{
List<Message<byte[]>> peekMsgs = admin.topics().peekMessages(topic, "t-sub", n,
false, TransactionIsolationLevel.READ_COMMITTED);
assertEquals(peekMsgs.size(), n);
for (Message<byte[]> peekMsg : peekMsgs) {
assertEquals(new String(peekMsg.getValue()), "msg");
}
}

// peek 3 * n message, and still get n message, all messages value should be "msg"
{
List<Message<byte[]>> peekMsgs = admin.topics().peekMessages(topic, "t-sub", 2 * n,
false, TransactionIsolationLevel.READ_COMMITTED);
assertEquals(peekMsgs.size(), n);
for (Message<byte[]> peekMsg : peekMsgs) {
assertEquals(new String(peekMsg.getValue()), "msg");
}
}
}

@Test
public void testPeekMessageForShowAllMessages() throws Exception {
initTransaction(1);

final String topic = BrokerTestUtil.newUniqueName("persistent://public/default/peek_all");

@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create();
int n = 10;
// Alternately sends `n` committed transactional messages and `n` abort transactional messages.
for (int i = 0; i < 2 * n; i++) {
Transaction txn = pulsarClient.newTransaction().build().get();
if (i % 2 == 0) {
producer.newMessage(txn).value("msg").send();
txn.commit().get();
} else {
producer.newMessage(txn).value("msg-aborted").send();
txn.abort();
}
}
// Then sends `n` uncommitted transactional messages.
Transaction txn = pulsarClient.newTransaction().build().get();
for (int i = 0; i < n; i++) {
producer.newMessage(txn).value("msg-uncommitted").send();
}

// peek 5 * n message, will get 5 * n msg.
List<Message<byte[]>> peekMsgs = admin.topics().peekMessages(topic, "t-sub", 5 * n,
true, TransactionIsolationLevel.READ_UNCOMMITTED);
assertEquals(peekMsgs.size(), 5 * n);

for (int i = 0; i < 4 * n; i++) {
Message<byte[]> peekMsg = peekMsgs.get(i);
MessageImpl peekMsgImpl = (MessageImpl) peekMsg;
MessageMetadata metadata = peekMsgImpl.getMessageBuilder();
if (metadata.hasMarkerType()) {
assertTrue(metadata.getMarkerType() == MarkerType.TXN_COMMIT_VALUE ||
metadata.getMarkerType() == MarkerType.TXN_ABORT_VALUE);
} else {
String value = new String(peekMsg.getValue());
assertTrue(value.equals("msg") || value.equals("msg-aborted"));
}
}
for (int i = 4 * n; i < peekMsgs.size(); i++) {
Message<byte[]> peekMsg = peekMsgs.get(i);
assertEquals(new String(peekMsg.getValue()), "msg-uncommitted");
}
}

private static void verifyCoordinatorStats(String state,
long sequenceId, long lowWaterMark) {
assertEquals(state, "Ready");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.TransactionIsolationLevel;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.AuthAction;
Expand Down Expand Up @@ -1653,7 +1654,53 @@ void expireMessagesForAllSubscriptions(String topic, long expireTimeInSeconds)
* @throws PulsarAdminException
* Unexpected error
*/
List<Message<byte[]>> peekMessages(String topic, String subName, int numMessages) throws PulsarAdminException;
default List<Message<byte[]>> peekMessages(String topic, String subName, int numMessages)
throws PulsarAdminException {
return peekMessages(topic, subName, numMessages, false, TransactionIsolationLevel.READ_COMMITTED);
}

/**
* Peek messages from a topic subscription.
*
* @param topic
* topic name
* @param subName
* Subscription name
* @param numMessages
* Number of messages
* @param showServerMarker
* Enables the display of internal server write markers
* @param transactionIsolationLevel
* Sets the isolation level for peeking messages within transactions.
* - 'READ_COMMITTED' allows peeking only committed transactional messages.
* - 'READ_UNCOMMITTED' allows peeking all messages,
* even transactional messages which have been aborted.
* @return
* @throws NotAuthorizedException
* Don't have admin permission
* @throws NotFoundException
* Topic or subscription does not exist
* @throws PulsarAdminException
* Unexpected error
*/
List<Message<byte[]>> peekMessages(String topic, String subName, int numMessages,
boolean showServerMarker, TransactionIsolationLevel transactionIsolationLevel)
throws PulsarAdminException;

/**
* Peek messages from a topic subscription asynchronously.
*
* @param topic
* topic name
* @param subName
* Subscription name
* @param numMessages
* Number of messages
* @return a future that can be used to track when the messages are returned
*/
default CompletableFuture<List<Message<byte[]>>> peekMessagesAsync(String topic, String subName, int numMessages) {
return peekMessagesAsync(topic, subName, numMessages, false, TransactionIsolationLevel.READ_COMMITTED);
}

/**
* Peek messages from a topic subscription asynchronously.
Expand All @@ -1664,9 +1711,18 @@ void expireMessagesForAllSubscriptions(String topic, long expireTimeInSeconds)
* Subscription name
* @param numMessages
* Number of messages
* @param showServerMarker
* Enables the display of internal server write markers
@param transactionIsolationLevel
* Sets the isolation level for peeking messages within transactions.
* - 'READ_COMMITTED' allows peeking only committed transactional messages.
* - 'READ_UNCOMMITTED' allows peeking all messages,
* even transactional messages which have been aborted.
* @return a future that can be used to track when the messages are returned
*/
CompletableFuture<List<Message<byte[]>>> peekMessagesAsync(String topic, String subName, int numMessages);
CompletableFuture<List<Message<byte[]>>> peekMessagesAsync(
String topic, String subName, int numMessages,
boolean showServerMarker, TransactionIsolationLevel transactionIsolationLevel);

/**
* Get a message by its messageId via a topic subscription.
Expand Down
Loading

0 comments on commit 66baa8d

Please sign in to comment.