Skip to content

Commit

Permalink
Add stats 'subscriptionIsolationLevel' in SubscriptionStatsImpl
Browse files Browse the repository at this point in the history
  • Loading branch information
hzh0425 committed Oct 7, 2023
1 parent 05a5c46 commit a4c4db2
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,7 @@ public NonPersistentSubscriptionStatsImpl getStats() {

subStats.type = getTypeString();
subStats.msgDropRate = dispatcher.getMessageDropRate().getValueRate();
subStats.subscriptionIsolationLevel = this.isolationLevel.toString();

KeySharedMode keySharedMode = this.keySharedMode;
if (getType() == SubType.Key_Shared && keySharedMode != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ public class PersistentSubscription extends AbstractSubscription implements Subs
private volatile ReplicatedSubscriptionSnapshotCache replicatedSubscriptionSnapshotCache;
private final PendingAckHandle pendingAckHandle;
private volatile Map<String, String> subscriptionProperties;
private final IsolationLevel subscriptionIsolationLevel;
private final IsolationLevel isolationLevel;
private volatile CompletableFuture<Void> fenceFuture;

static Map<String, Long> getBaseCursorProperties(boolean isReplicated) {
Expand Down Expand Up @@ -162,7 +162,7 @@ public PersistentSubscription(PersistentTopic topic, String subscriptionName, Ma
this.pendingAckHandle = new PendingAckHandleDisabled();
}
IS_FENCED_UPDATER.set(this, FALSE);
this.subscriptionIsolationLevel = fetchIsolationLevelFromProperties(subscriptionProperties);
this.isolationLevel = fetchIsolationLevelFromProperties(subscriptionProperties);
}

public void updateLastMarkDeleteAdvancedTimestamp() {
Expand Down Expand Up @@ -511,7 +511,7 @@ public String getTypeString() {

@Override
public IsolationLevel getIsolationLevel() {
return subscriptionIsolationLevel;
return isolationLevel;
}

@Override
Expand Down Expand Up @@ -1195,6 +1195,7 @@ public SubscriptionStatsImpl getStats(Boolean getPreciseBacklog, boolean subscri
subStats.isReplicated = isReplicated();
subStats.subscriptionProperties = subscriptionProperties;
subStats.isDurable = cursor.isDurable();
subStats.subscriptionIsolationLevel = this.isolationLevel.toString();
if (getType() == SubType.Key_Shared && dispatcher instanceof PersistentStickyKeyDispatcherMultipleConsumers) {
PersistentStickyKeyDispatcherMultipleConsumers keySharedDispatcher =
(PersistentStickyKeyDispatcherMultipleConsumers) dispatcher;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,9 @@ public class SubscriptionStatsImpl implements SubscriptionStats {
@JsonIgnore
public Map<String, TopicMetricBean> bucketDelayedIndexStats;

/** SubscriptionProperties (key/value strings) associated with this subscribe. */
/**
* SubscriptionProperties (key/value strings) associated with this subscribe.
*/
public Map<String, String> subscriptionProperties;

public long filterProcessedMsgCount;
Expand All @@ -149,6 +151,11 @@ public class SubscriptionStatsImpl implements SubscriptionStats {

public long filterRescheduledMsgCount;

/**
* Whether the subscription's transaction isolationLevel is READ_COMMITTED or READ_UNCOMMITTED.
*/
public String subscriptionIsolationLevel;

public SubscriptionStatsImpl() {
this.consumers = new ArrayList<>();
this.consumersAfterMarkDeletePosition = new LinkedHashMap<>();
Expand Down

0 comments on commit a4c4db2

Please sign in to comment.