diff --git a/pip/pip-353.md b/pip/pip-353.md index 4315bdab0eb2e..6544090a19b15 100644 --- a/pip/pip-353.md +++ b/pip/pip-353.md @@ -25,7 +25,7 @@ This behavior can confuse users and lead to incorrect data handling. The proposa ### In Scope -- Implement flags to selectively display `server markers`, `uncommitted messages`, and `aborted messages` in peek operations. +- Implement flags to selectively display `server markers`, `uncommitted messages(include aborted messages) for transaction` in peek operations. - Set the default behavior to only show messages from committed transactions to ensure data integrity. ### Out of Scope @@ -37,8 +37,9 @@ This behavior can confuse users and lead to incorrect data handling. The proposa The proposal introduces three new flags to the `peek-messages` command: 1. `--show-server-marker`: Controls the visibility of server markers (default: `false`). -2. `--show-txn-uncommitted`: Controls the visibility of messages from uncommitted transactions (default: `false`). -3. `--show-txn-aborted`: Controls the visibility of messages from aborted transactions (default: `false`). +2. `---transaction-isolation-level`: Controls the visibility of messages for transactions. (default: `READ_COMMITTED`). Options: + - READ_COMMITTED: Can only consume all transactional messages which have been committed. + - READ_UNCOMMITTED: Can consume all messages, even transactional messages which have been aborted. These flags will allow administrators and developers to tailor the peek functionality to their needs, improving the usability and security of message handling in transactional contexts. @@ -46,7 +47,7 @@ These flags will allow administrators and developers to tailor the peek function ### Design & Implementation Details -To support the `--show-server-marker` and `--show-txn-aborted`, `--show-txn-uncommitted` flags, needs to introduce specific tag into the `headers` of messages returned by the +To support the `--show-server-marker` and `---transaction-isolation-level` flags, needs to introduce specific tag into the `headers` of messages returned by the [peekNthMessage REST API](https://github.com/apache/pulsar/blob/8ca01cd42edfd4efd986f752f6f8538ea5bf4f94/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java#L1892-L1905). - `X-Pulsar-marker-type`: Already exists. @@ -62,11 +63,10 @@ see the following code: [https://github.com/shibd/pulsar/pull/34](https://github New command line flags added for the `bin/pulsar-admin topics peek-messages` command: -| Flag | Abbreviation | Type | Default | Description | -|--------------------------|--------------|---------|---------|----------------------------------------------------------------| -| `--show-server-marker` | `-ssm` | Boolean | `false` | Enables the display of internal server write markers. | -| `--show-txn-uncommitted` | `-stu` | Boolean | `false` | Enables the display of messages from uncommitted transactions. | -| `--show-txn-aborted` | `-sta` | Boolean | `false` | Enables the display of messages from aborted transactions. | +| Flag | Abbreviation | Type | Default | Description | +|----------------------------------|--------------|---------|---------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `--show-server-marker` | `-ssm` | Boolean | `false` | Enables the display of internal server write markers. | +| `---transaction-isolation-level` | `-til` | Enum | `false` | Enables theSets the isolation level for consuming messages within transactions.
- 'READ_COMMITTED' allows consuming only committed transactional messages.
- 'READ_UNCOMMITTED' allows consuming all messages, even transactional messages which have been aborted. | ## Public-facing Changes @@ -85,10 +85,11 @@ Add two methods to the admin.Topics() interface. * Number of messages * @param showServerMarker * Enables the display of internal server write markers - * @param showTxnAborted - * Enables the display of messages from aborted transactions - * @param showTxnUncommitted - * Enables the display of messages from uncommitted transactions + * @param transactionIsolationLevel + * Sets the isolation level for consuming messages within transactions. + * - 'READ_COMMITTED' allows consuming only committed transactional messages. + * - 'READ_UNCOMMITTED' allows consuming all messages, + * even transactional messages which have been aborted. * @return * @throws NotAuthorizedException * Don't have admin permission @@ -98,8 +99,9 @@ Add two methods to the admin.Topics() interface. * Unexpected error */ List> peekMessages(String topic, String subName, int numMessages, - boolean showServerMarker, boolean showTxnAborted, - boolean showTxnUncommitted) throws PulsarAdminException; + boolean showServerMarker, SubscriptionIsolationLevel transactionIsolationLevel) + throws PulsarAdminException; + /** * Peek messages from a topic subscription asynchronously. @@ -112,15 +114,16 @@ Add two methods to the admin.Topics() interface. * Number of messages * @param showServerMarker * Enables the display of internal server write markers - * @param showTxnAborted - * Enables the display of messages from aborted transactions - * @param showTxnUncommitted - * Enables the display of messages from uncommitted transactions - * @return a future that can be used to track when the messages are returned + @param transactionIsolationLevel + * Sets the isolation level for consuming messages within transactions. + * - 'READ_COMMITTED' allows consuming only committed transactional messages. + * - 'READ_UNCOMMITTED' allows consuming all messages, + * even transactional messages which have been aborted. + * @return a future that can be used to track when the messages are returned */ - CompletableFuture>> peekMessagesAsync(String topic, String subName, int numMessages, - boolean showServerMarker, boolean showTxnAborted, - boolean showTxnUncommitted); + CompletableFuture>> peekMessagesAsync( + String topic, String subName, int numMessages, + boolean showServerMarker, SubscriptionIsolationLevel transactionIsolationLevel); ``` ## Backward & Forward Compatibility @@ -130,5 +133,5 @@ Reverting to a previous version of Pulsar without this feature will remove the a ### Upgrade While upgrading to the new version of Pulsar that includes these changes, the default behavior of the `peek-messages` command will change. -Existing scripts or commands that rely on the old behavior (where transaction markers and messages from uncommitted or aborted transactions are visible) will need to explicitly set the new flags (`--show-server-marker`, `--show-txn-uncommitted`, `--show-txn-aborted`) to `true` to maintain the old behavior. +Existing scripts or commands that rely on the old behavior (where transaction markers and messages from uncommitted or aborted transactions are visible) will need to explicitly set the new flags (`--show-server-marker true` and `--transaction-isolation-level READ_UNCOMMITTED` to maintain the old behavior. This change is necessary as the previous default behavior did not align with typical expectations around data visibility and integrity in transactional systems. \ No newline at end of file diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java index 4cfce3f88e3e7..0a91c1a69bd01 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java @@ -49,6 +49,7 @@ import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SubscriptionIsolationLevel; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.api.transaction.Transaction; import org.apache.pulsar.client.api.transaction.TxnID; @@ -937,7 +938,7 @@ public void testPeekMessageForSkipTxnMarker() throws Exception { } List> peekMsgs = admin.topics().peekMessages(topic, "t-sub", n, - false, true, true); + false, SubscriptionIsolationLevel.READ_UNCOMMITTED); assertEquals(peekMsgs.size(), n); for (Message peekMsg : peekMsgs) { assertEquals(new String(peekMsg.getValue()), "msg"); @@ -945,7 +946,7 @@ public void testPeekMessageForSkipTxnMarker() throws Exception { } @Test - public void testPeekMessageForSkipAbortedAndUnCommittedMessages() throws Exception { + public void testPeekMessageFoReadCommittedMessages() throws Exception { initTransaction(1); final String topic = BrokerTestUtil.newUniqueName("persistent://public/default/peek_txn"); @@ -975,7 +976,7 @@ public void testPeekMessageForSkipAbortedAndUnCommittedMessages() throws Excepti // peek n message, all messages value should be "msg" { List> peekMsgs = admin.topics().peekMessages(topic, "t-sub", n, - false, false, false); + false, SubscriptionIsolationLevel.READ_COMMITTED); assertEquals(peekMsgs.size(), n); for (Message peekMsg : peekMsgs) { assertEquals(new String(peekMsg.getValue()), "msg"); @@ -985,7 +986,7 @@ public void testPeekMessageForSkipAbortedAndUnCommittedMessages() throws Excepti // peek 3 * n message, and still get n message, all messages value should be "msg" { List> peekMsgs = admin.topics().peekMessages(topic, "t-sub", 2 * n, - false, false, false); + false, SubscriptionIsolationLevel.READ_COMMITTED); assertEquals(peekMsgs.size(), n); for (Message peekMsg : peekMsgs) { assertEquals(new String(peekMsg.getValue()), "msg"); @@ -1021,7 +1022,7 @@ public void testPeekMessageForShowAllMessages() throws Exception { // peek 5 * n message, will get 5 * n msg. List> peekMsgs = admin.topics().peekMessages(topic, "t-sub", 5 * n, - true, true, true); + true, SubscriptionIsolationLevel.READ_UNCOMMITTED); assertEquals(peekMsgs.size(), 5 * n); for (int i = 0; i < 4 * n; i++) { diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java index 46c033fdde2cf..7a1efb36ce218 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java @@ -30,6 +30,7 @@ import org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.SubscriptionIsolationLevel; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.partition.PartitionedTopicMetadata; @@ -1638,7 +1639,6 @@ void expireMessagesForAllSubscriptions(String topic, long expireTimeInSeconds) /** * Peek messages from a topic subscription. - * This method will show server marker, uncommitted and aborted messages for transaction by default. * * @param topic * topic name @@ -1667,10 +1667,11 @@ void expireMessagesForAllSubscriptions(String topic, long expireTimeInSeconds) * Number of messages * @param showServerMarker * Enables the display of internal server write markers - * @param showTxnAborted - * Enables the display of messages from aborted transactions - * @param showTxnUncommitted - * Enables the display of messages from uncommitted transactions + * @param transactionIsolationLevel + * Sets the isolation level for peeking messages within transactions. + * - 'READ_COMMITTED' allows peeking only committed transactional messages. + * - 'READ_UNCOMMITTED' allows peeking all messages, + * even transactional messages which have been aborted. * @return * @throws NotAuthorizedException * Don't have admin permission @@ -1680,12 +1681,11 @@ void expireMessagesForAllSubscriptions(String topic, long expireTimeInSeconds) * Unexpected error */ List> peekMessages(String topic, String subName, int numMessages, - boolean showServerMarker, boolean showTxnAborted, - boolean showTxnUncommitted) throws PulsarAdminException; + boolean showServerMarker, SubscriptionIsolationLevel transactionIsolationLevel) + throws PulsarAdminException; /** * Peek messages from a topic subscription asynchronously. - * This method will show server marker, uncommitted and aborted messages for transaction by default. * * @param topic * topic name @@ -1708,15 +1708,16 @@ List> peekMessages(String topic, String subName, int numMessages * Number of messages * @param showServerMarker * Enables the display of internal server write markers - * @param showTxnAborted - * Enables the display of messages from aborted transactions - * @param showTxnUncommitted - * Enables the display of messages from uncommitted transactions + @param transactionIsolationLevel + * Sets the isolation level for peeking messages within transactions. + * - 'READ_COMMITTED' allows peeking only committed transactional messages. + * - 'READ_UNCOMMITTED' allows peeking all messages, + * even transactional messages which have been aborted. * @return a future that can be used to track when the messages are returned */ - CompletableFuture>> peekMessagesAsync(String topic, String subName, int numMessages, - boolean showServerMarker, boolean showTxnAborted, - boolean showTxnUncommitted); + CompletableFuture>> peekMessagesAsync( + String topic, String subName, int numMessages, + boolean showServerMarker, SubscriptionIsolationLevel transactionIsolationLevel); /** * Get a message by its messageId via a topic subscription. diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java index f9134e16d0aeb..c6faf5854b316 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java @@ -55,6 +55,7 @@ import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SubscriptionIsolationLevel; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.BatchMessageIdImpl; import org.apache.pulsar.client.impl.MessageIdImpl; @@ -869,9 +870,9 @@ public CompletableFuture expireMessagesForAllSubscriptionsAsync(String top return asyncPostRequest(path, Entity.entity("", MediaType.APPLICATION_JSON)); } - private CompletableFuture>> peekNthMessage(String topic, String subName, int messagePosition, - boolean showServerMarker, boolean showTxnAborted, - boolean showTxnUncommitted) { + private CompletableFuture>> peekNthMessage( + String topic, String subName, int messagePosition, boolean showServerMarker, + SubscriptionIsolationLevel transactionIsolationLevel) { TopicName tn = validateTopic(topic); String encodedSubName = Codec.encode(subName); WebTarget path = topicPath(tn, "subscription", encodedSubName, @@ -884,7 +885,7 @@ private CompletableFuture>> peekNthMessage(String topic, St public void completed(Response response) { try { future.complete(getMessagesFromHttpResponse(tn.toString(), response, - showServerMarker, showTxnAborted, showTxnUncommitted)); + showServerMarker, transactionIsolationLevel)); } catch (Exception e) { future.completeExceptionally(getApiException(e)); } @@ -901,15 +902,16 @@ public void failed(Throwable throwable) { @Override public List> peekMessages(String topic, String subName, int numMessages) throws PulsarAdminException { - return sync(() -> peekMessagesAsync(topic, subName, numMessages, true, true, true)); + return sync(() -> peekMessagesAsync(topic, subName, numMessages, + false, SubscriptionIsolationLevel.READ_COMMITTED)); } @Override public List> peekMessages(String topic, String subName, int numMessages, - boolean showServerMarker, boolean showTxnAborted, - boolean showTxnUncommitted) throws PulsarAdminException { - return sync(() -> - peekMessagesAsync(topic, subName, numMessages, showServerMarker, showTxnAborted, showTxnUncommitted)); + boolean showServerMarker, + SubscriptionIsolationLevel transactionIsolationLevel) + throws PulsarAdminException { + return sync(() -> peekMessagesAsync(topic, subName, numMessages, showServerMarker, transactionIsolationLevel)); } @Override @@ -917,30 +919,31 @@ public CompletableFuture>> peekMessagesAsync(String topic, checkArgument(numMessages > 0); CompletableFuture>> future = new CompletableFuture>>(); peekMessagesAsync(topic, subName, numMessages, new ArrayList<>(), - future, 1, true, true, true); + future, 1, false, SubscriptionIsolationLevel.READ_COMMITTED); return future; } @Override - public CompletableFuture>> peekMessagesAsync(String topic, String subName, int numMessages, - boolean showServerMarker, boolean showTxnAborted, boolean showTxnUncommitted) { + public CompletableFuture>> peekMessagesAsync( + String topic, String subName, int numMessages, + boolean showServerMarker, SubscriptionIsolationLevel transactionIsolationLevel) { checkArgument(numMessages > 0); CompletableFuture>> future = new CompletableFuture>>(); peekMessagesAsync(topic, subName, numMessages, new ArrayList<>(), - future, 1, showServerMarker, showTxnAborted, showTxnUncommitted); + future, 1, showServerMarker, transactionIsolationLevel); return future; } private void peekMessagesAsync(String topic, String subName, int numMessages, List> messages, CompletableFuture>> future, int nthMessage, - boolean showServerMarker, boolean showTxnAborted, boolean showTxnUncommitted) { + boolean showServerMarker, SubscriptionIsolationLevel transactionIsolationLevel) { if (numMessages <= 0) { future.complete(messages); return; } // if peeking first message succeeds, we know that the topic and subscription exists - peekNthMessage(topic, subName, nthMessage, showServerMarker, showTxnAborted, showTxnUncommitted) + peekNthMessage(topic, subName, nthMessage, showServerMarker, transactionIsolationLevel) .handle((r, ex) -> { if (ex != null) { // if we get a not found exception, it means that the position for the message we are trying to get @@ -957,7 +960,7 @@ private void peekMessagesAsync(String topic, String subName, int numMessages, messages.add(r.get(i)); } peekMessagesAsync(topic, subName, numMessages - r.size(), messages, future, - nthMessage + 1, showServerMarker, showTxnAborted, showTxnUncommitted); + nthMessage + 1, showServerMarker, transactionIsolationLevel); return null; }); } @@ -1280,12 +1283,13 @@ private TopicName validateTopic(String topic) { } private List> getMessagesFromHttpResponse(String topic, Response response) throws Exception { - return getMessagesFromHttpResponse(topic, response, true, true, true); + return getMessagesFromHttpResponse(topic, response, true, + SubscriptionIsolationLevel.READ_UNCOMMITTED); } - private List> getMessagesFromHttpResponse(String topic, Response response, - boolean showServerMarker, boolean showTxnAborted, - boolean showTxnUncommitted) throws Exception { + private List> getMessagesFromHttpResponse( + String topic, Response response, boolean showServerMarker, + SubscriptionIsolationLevel transactionIsolationLevel) throws Exception { if (response.getStatus() != Status.OK.getStatusCode()) { throw getApiException(response); @@ -1329,7 +1333,7 @@ private List> getMessagesFromHttpResponse(String topic, Response tmp = headers.getFirst(TXN_ABORTED); if (tmp != null && Boolean.parseBoolean(tmp.toString())) { properties.put(TXN_ABORTED, tmp.toString()); - if (!showTxnAborted) { + if (transactionIsolationLevel == SubscriptionIsolationLevel.READ_COMMITTED) { return new ArrayList<>(); } } @@ -1337,7 +1341,7 @@ private List> getMessagesFromHttpResponse(String topic, Response tmp = headers.getFirst(TXN_UNCOMMITTED); if (tmp != null && Boolean.parseBoolean(tmp.toString())) { properties.put(TXN_UNCOMMITTED, tmp.toString()); - if (!showTxnUncommitted) { + if (transactionIsolationLevel == SubscriptionIsolationLevel.READ_COMMITTED) { return new ArrayList<>(); } } diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/SubscriptionIsolationLevel.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/SubscriptionIsolationLevel.java new file mode 100644 index 0000000000000..0ec25f9dd2494 --- /dev/null +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/SubscriptionIsolationLevel.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api; + +public enum SubscriptionIsolationLevel { + // Consumer can only consume all transactional messages which have been committed. + READ_COMMITTED, + // Consumer can consume all messages, even transactional messages which have been aborted. + READ_UNCOMMITTED; +} diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java index 729f2da80b838..7053f61e8ad56 100644 --- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java +++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java @@ -81,6 +81,7 @@ import org.apache.pulsar.client.admin.internal.OffloadProcessStatusImpl; import org.apache.pulsar.client.admin.internal.PulsarAdminImpl; import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.SubscriptionIsolationLevel; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.api.schema.SchemaDefinition; import org.apache.pulsar.client.api.transaction.TxnID; @@ -1744,7 +1745,8 @@ public void topics() throws Exception { verify(mockTopics).deletePartitionedTopic("persistent://myprop/clust/ns1/ds1", true); cmdTopics.run(split("peek-messages persistent://myprop/clust/ns1/ds1 -s sub1 -n 3")); - verify(mockTopics).peekMessages("persistent://myprop/clust/ns1/ds1", "sub1", 3, false, false, false); + verify(mockTopics).peekMessages("persistent://myprop/clust/ns1/ds1", "sub1", 3, + false, SubscriptionIsolationLevel.READ_COMMITTED); MessageImpl message = mock(MessageImpl.class); when(message.getData()).thenReturn(new byte[]{}); diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java index 71623143685c4..4d1fcca9b3480 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java @@ -61,6 +61,7 @@ import org.apache.pulsar.client.admin.Topics; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.SubscriptionIsolationLevel; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.BatchMessageIdImpl; import org.apache.pulsar.client.impl.MessageIdImpl; @@ -1103,19 +1104,19 @@ private class PeekMessages extends CliCommand { description = "Enables the display of internal server write markers.", required = false) private boolean showServerMarker = false; - @Option(names = { "-sta", "--show-txn-aborted" }, - description = "Enables the display of messages from aborted transactions.", required = false) - private boolean showTxnAborted = false; - - @Option(names = { "-stu", "--show-txn-uncommitted" }, - description = "Enables the display of messages from uncommitted transactions.", required = false) - private boolean showTxnUncommitted = false; + @Option(names = { "-til", "--transaction-isolation-level" }, + description = "Sets the isolation level for peeking messages within transactions. " + + "'READ_COMMITTED' allows peeking only committed transactional messages. " + + "'READ_UNCOMMITTED' allows peeking all messages, " + + "even transactional messages which have been aborted.", + required = false) + private SubscriptionIsolationLevel transactionIsolationLevel = SubscriptionIsolationLevel.READ_COMMITTED; @Override void run() throws PulsarAdminException { String persistentTopic = validatePersistentTopic(topicName); List> messages = getTopics().peekMessages(persistentTopic, subName, numMessages, - showServerMarker, showTxnAborted, showTxnUncommitted); + showServerMarker, transactionIsolationLevel); int position = 0; for (Message msg : messages) { MessageImpl message = (MessageImpl) msg;