diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java index 376fd0288a6d4..56eea830fad3f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java @@ -472,6 +472,7 @@ public NonPersistentSubscriptionStatsImpl getStats() { subStats.type = getTypeString(); subStats.msgDropRate = dispatcher.getMessageDropRate().getValueRate(); + subStats.subscriptionIsolationLevel = this.isolationLevel.toString(); KeySharedMode keySharedMode = this.keySharedMode; if (getType() == SubType.Key_Shared && keySharedMode != null) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index fa451008acff0..91ad5411ae592 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -128,7 +128,7 @@ public class PersistentSubscription extends AbstractSubscription implements Subs private volatile ReplicatedSubscriptionSnapshotCache replicatedSubscriptionSnapshotCache; private final PendingAckHandle pendingAckHandle; private volatile Map subscriptionProperties; - private final IsolationLevel subscriptionIsolationLevel; + private final IsolationLevel isolationLevel; private volatile CompletableFuture fenceFuture; static Map getBaseCursorProperties(boolean isReplicated) { @@ -162,7 +162,7 @@ public PersistentSubscription(PersistentTopic topic, String subscriptionName, Ma this.pendingAckHandle = new PendingAckHandleDisabled(); } IS_FENCED_UPDATER.set(this, FALSE); - this.subscriptionIsolationLevel = fetchIsolationLevelFromProperties(subscriptionProperties); + this.isolationLevel = fetchIsolationLevelFromProperties(subscriptionProperties); } public void updateLastMarkDeleteAdvancedTimestamp() { @@ -511,7 +511,7 @@ public String getTypeString() { @Override public IsolationLevel getIsolationLevel() { - return subscriptionIsolationLevel; + return isolationLevel; } @Override @@ -1195,6 +1195,7 @@ public SubscriptionStatsImpl getStats(Boolean getPreciseBacklog, boolean subscri subStats.isReplicated = isReplicated(); subStats.subscriptionProperties = subscriptionProperties; subStats.isDurable = cursor.isDurable(); + subStats.subscriptionIsolationLevel = this.isolationLevel.toString(); if (getType() == SubType.Key_Shared && dispatcher instanceof PersistentStickyKeyDispatcherMultipleConsumers) { PersistentStickyKeyDispatcherMultipleConsumers keySharedDispatcher = (PersistentStickyKeyDispatcherMultipleConsumers) dispatcher; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java index e97707710d743..0dd886983408e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java @@ -99,6 +99,7 @@ import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionInitialPosition; +import org.apache.pulsar.client.api.SubscriptionIsolationLevel; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.common.lookup.data.LookupData; @@ -1331,6 +1332,25 @@ public void testGetPartitionedStatsContainSubscriptionType() throws Exception { assertEquals(topicStats.getSubscriptions().get(subName).getType(), SubscriptionType.Exclusive.toString()); } + @Test + public void testGetPartitionedStatsContainSubscriptionIsolationLevel() throws Exception { + final String topic = "persistent://prop-xyz/ns1/my-topic" + UUID.randomUUID(); + final int numPartitions = 4; + admin.topics().createPartitionedTopic(topic, numPartitions); + + // create consumer and subscription + final String subName = "my-sub"; + @Cleanup Consumer exclusiveConsumer = pulsarClient.newConsumer().topic(topic) + .subscriptionName(subName) + .subscriptionType(SubscriptionType.Exclusive) + .subscriptionIsolationLevel(SubscriptionIsolationLevel.READ_UNCOMMITTED) + .subscribe(); + + TopicStats topicStats = admin.topics().getPartitionedStats(topic, false); + assertEquals(topicStats.getSubscriptions().size(), 1); + assertEquals(topicStats.getSubscriptions().get(subName).getSubscriptionIsolationLevel(), SubscriptionIsolationLevel.READ_UNCOMMITTED.toString()); + } + @Test public void testGetPartitionedStatsInternal() throws Exception { diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java index 9ff94a2952ea3..ca39491ccd1dc 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java @@ -136,4 +136,7 @@ public interface SubscriptionStats { long getFilterRescheduledMsgCount(); long getDelayedMessageIndexSizeInBytes(); + + /** The subscription isolationLevel as defined by {@link org.apache.pulsar.client.api.SubscriptionIsolationLevel}. */ + String getSubscriptionIsolationLevel(); } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java index bed8aabf27d8d..10a7a722ae531 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java @@ -149,6 +149,11 @@ public class SubscriptionStatsImpl implements SubscriptionStats { public long filterRescheduledMsgCount; + /** + * The subscription isolationLevel as defined by {@link org.apache.pulsar.client.api.SubscriptionIsolationLevel}. + */ + public String subscriptionIsolationLevel; + public SubscriptionStatsImpl() { this.consumers = new ArrayList<>(); this.consumersAfterMarkDeletePosition = new LinkedHashMap<>(); @@ -185,6 +190,7 @@ public void reset() { filterRejectedMsgCount = 0; filterRescheduledMsgCount = 0; bucketDelayedIndexStats.clear(); + subscriptionIsolationLevel = null; } // if the stats are added for the 1st time, we will need to make a copy of these stats and add it to the current @@ -239,6 +245,7 @@ public SubscriptionStatsImpl add(SubscriptionStatsImpl stats) { this.filterAcceptedMsgCount += stats.filterAcceptedMsgCount; this.filterRejectedMsgCount += stats.filterRejectedMsgCount; this.filterRescheduledMsgCount += stats.filterRescheduledMsgCount; + this.subscriptionIsolationLevel = stats.subscriptionIsolationLevel; stats.bucketDelayedIndexStats.forEach((k, v) -> { TopicMetricBean topicMetricBean = this.bucketDelayedIndexStats.computeIfAbsent(k, __ -> new TopicMetricBean());