Skip to content

Commit

Permalink
Add stats 'subscriptionIsolationLevel' in SubscriptionStatsImpl
Browse files Browse the repository at this point in the history
  • Loading branch information
hzh0425 committed Oct 7, 2023
1 parent 05a5c46 commit 7eee90c
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,7 @@ public NonPersistentSubscriptionStatsImpl getStats() {

subStats.type = getTypeString();
subStats.msgDropRate = dispatcher.getMessageDropRate().getValueRate();
subStats.subscriptionIsolationLevel = this.isolationLevel.toString();

KeySharedMode keySharedMode = this.keySharedMode;
if (getType() == SubType.Key_Shared && keySharedMode != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ public class PersistentSubscription extends AbstractSubscription implements Subs
private volatile ReplicatedSubscriptionSnapshotCache replicatedSubscriptionSnapshotCache;
private final PendingAckHandle pendingAckHandle;
private volatile Map<String, String> subscriptionProperties;
private final IsolationLevel subscriptionIsolationLevel;
private final IsolationLevel isolationLevel;
private volatile CompletableFuture<Void> fenceFuture;

static Map<String, Long> getBaseCursorProperties(boolean isReplicated) {
Expand Down Expand Up @@ -162,7 +162,7 @@ public PersistentSubscription(PersistentTopic topic, String subscriptionName, Ma
this.pendingAckHandle = new PendingAckHandleDisabled();
}
IS_FENCED_UPDATER.set(this, FALSE);
this.subscriptionIsolationLevel = fetchIsolationLevelFromProperties(subscriptionProperties);
this.isolationLevel = fetchIsolationLevelFromProperties(subscriptionProperties);
}

public void updateLastMarkDeleteAdvancedTimestamp() {
Expand Down Expand Up @@ -511,7 +511,7 @@ public String getTypeString() {

@Override
public IsolationLevel getIsolationLevel() {
return subscriptionIsolationLevel;
return isolationLevel;
}

@Override
Expand Down Expand Up @@ -1195,6 +1195,7 @@ public SubscriptionStatsImpl getStats(Boolean getPreciseBacklog, boolean subscri
subStats.isReplicated = isReplicated();
subStats.subscriptionProperties = subscriptionProperties;
subStats.isDurable = cursor.isDurable();
subStats.subscriptionIsolationLevel = this.isolationLevel.toString();
if (getType() == SubType.Key_Shared && dispatcher instanceof PersistentStickyKeyDispatcherMultipleConsumers) {
PersistentStickyKeyDispatcherMultipleConsumers keySharedDispatcher =
(PersistentStickyKeyDispatcherMultipleConsumers) dispatcher;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionIsolationLevel;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.lookup.data.LookupData;
Expand Down Expand Up @@ -1331,6 +1332,25 @@ public void testGetPartitionedStatsContainSubscriptionType() throws Exception {
assertEquals(topicStats.getSubscriptions().get(subName).getType(), SubscriptionType.Exclusive.toString());
}

@Test
public void testGetPartitionedStatsContainSubscriptionIsolationLevel() throws Exception {
final String topic = "persistent://prop-xyz/ns1/my-topic" + UUID.randomUUID();
final int numPartitions = 4;
admin.topics().createPartitionedTopic(topic, numPartitions);

// create consumer and subscription
final String subName = "my-sub";
@Cleanup Consumer<byte[]> exclusiveConsumer = pulsarClient.newConsumer().topic(topic)
.subscriptionName(subName)
.subscriptionType(SubscriptionType.Exclusive)
.subscriptionIsolationLevel(SubscriptionIsolationLevel.READ_UNCOMMITTED)
.subscribe();

TopicStats topicStats = admin.topics().getPartitionedStats(topic, false);
assertEquals(topicStats.getSubscriptions().size(), 1);
assertEquals(topicStats.getSubscriptions().get(subName).getSubscriptionIsolationLevel(), SubscriptionIsolationLevel.READ_UNCOMMITTED.toString());
}


@Test
public void testGetPartitionedStatsInternal() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,4 +136,7 @@ public interface SubscriptionStats {
long getFilterRescheduledMsgCount();

long getDelayedMessageIndexSizeInBytes();

/** The subscription isolationLevel as defined by {@link org.apache.pulsar.client.api.SubscriptionIsolationLevel}. */
String getSubscriptionIsolationLevel();
}
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,11 @@ public class SubscriptionStatsImpl implements SubscriptionStats {

public long filterRescheduledMsgCount;

/**
* The subscription isolationLevel as defined by {@link org.apache.pulsar.client.api.SubscriptionIsolationLevel}.
*/
public String subscriptionIsolationLevel;

public SubscriptionStatsImpl() {
this.consumers = new ArrayList<>();
this.consumersAfterMarkDeletePosition = new LinkedHashMap<>();
Expand Down Expand Up @@ -185,6 +190,7 @@ public void reset() {
filterRejectedMsgCount = 0;
filterRescheduledMsgCount = 0;
bucketDelayedIndexStats.clear();
subscriptionIsolationLevel = null;
}

// if the stats are added for the 1st time, we will need to make a copy of these stats and add it to the current
Expand Down Expand Up @@ -239,6 +245,7 @@ public SubscriptionStatsImpl add(SubscriptionStatsImpl stats) {
this.filterAcceptedMsgCount += stats.filterAcceptedMsgCount;
this.filterRejectedMsgCount += stats.filterRejectedMsgCount;
this.filterRescheduledMsgCount += stats.filterRescheduledMsgCount;
this.subscriptionIsolationLevel = stats.subscriptionIsolationLevel;
stats.bucketDelayedIndexStats.forEach((k, v) -> {
TopicMetricBean topicMetricBean =
this.bucketDelayedIndexStats.computeIfAbsent(k, __ -> new TopicMetricBean());
Expand Down

0 comments on commit 7eee90c

Please sign in to comment.