From a4c4db2dc934ef50e400a047470769ef903e7759 Mon Sep 17 00:00:00 2001 From: hzh0425 <642256541@qq.com> Date: Sat, 7 Oct 2023 21:39:17 +0800 Subject: [PATCH] Add stats 'subscriptionIsolationLevel' in SubscriptionStatsImpl --- .../service/nonpersistent/NonPersistentSubscription.java | 1 + .../service/persistent/PersistentSubscription.java | 7 ++++--- .../policies/data/stats/SubscriptionStatsImpl.java | 9 ++++++++- 3 files changed, 13 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java index 376fd0288a6d48..56eea830fad3fa 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java @@ -472,6 +472,7 @@ public NonPersistentSubscriptionStatsImpl getStats() { subStats.type = getTypeString(); subStats.msgDropRate = dispatcher.getMessageDropRate().getValueRate(); + subStats.subscriptionIsolationLevel = this.isolationLevel.toString(); KeySharedMode keySharedMode = this.keySharedMode; if (getType() == SubType.Key_Shared && keySharedMode != null) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index fa451008acff01..91ad5411ae592d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -128,7 +128,7 @@ public class PersistentSubscription extends AbstractSubscription implements Subs private volatile ReplicatedSubscriptionSnapshotCache replicatedSubscriptionSnapshotCache; private final PendingAckHandle pendingAckHandle; private volatile Map subscriptionProperties; - private final IsolationLevel subscriptionIsolationLevel; + private final IsolationLevel isolationLevel; private volatile CompletableFuture fenceFuture; static Map getBaseCursorProperties(boolean isReplicated) { @@ -162,7 +162,7 @@ public PersistentSubscription(PersistentTopic topic, String subscriptionName, Ma this.pendingAckHandle = new PendingAckHandleDisabled(); } IS_FENCED_UPDATER.set(this, FALSE); - this.subscriptionIsolationLevel = fetchIsolationLevelFromProperties(subscriptionProperties); + this.isolationLevel = fetchIsolationLevelFromProperties(subscriptionProperties); } public void updateLastMarkDeleteAdvancedTimestamp() { @@ -511,7 +511,7 @@ public String getTypeString() { @Override public IsolationLevel getIsolationLevel() { - return subscriptionIsolationLevel; + return isolationLevel; } @Override @@ -1195,6 +1195,7 @@ public SubscriptionStatsImpl getStats(Boolean getPreciseBacklog, boolean subscri subStats.isReplicated = isReplicated(); subStats.subscriptionProperties = subscriptionProperties; subStats.isDurable = cursor.isDurable(); + subStats.subscriptionIsolationLevel = this.isolationLevel.toString(); if (getType() == SubType.Key_Shared && dispatcher instanceof PersistentStickyKeyDispatcherMultipleConsumers) { PersistentStickyKeyDispatcherMultipleConsumers keySharedDispatcher = (PersistentStickyKeyDispatcherMultipleConsumers) dispatcher; diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java index bed8aabf27d8db..c4ffa70a209929 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java @@ -138,7 +138,9 @@ public class SubscriptionStatsImpl implements SubscriptionStats { @JsonIgnore public Map bucketDelayedIndexStats; - /** SubscriptionProperties (key/value strings) associated with this subscribe. */ + /** + * SubscriptionProperties (key/value strings) associated with this subscribe. + */ public Map subscriptionProperties; public long filterProcessedMsgCount; @@ -149,6 +151,11 @@ public class SubscriptionStatsImpl implements SubscriptionStats { public long filterRescheduledMsgCount; + /** + * Whether the subscription's transaction isolationLevel is READ_COMMITTED or READ_UNCOMMITTED. + */ + public String subscriptionIsolationLevel; + public SubscriptionStatsImpl() { this.consumers = new ArrayList<>(); this.consumersAfterMarkDeletePosition = new LinkedHashMap<>();