Skip to content

Commit

Permalink
Fix subscription consumer count
Browse files Browse the repository at this point in the history
(cherry picked from commit 1d4f9a6)
  • Loading branch information
cbornet authored and nicoloboschi committed Apr 6, 2022
1 parent 5855f99 commit 81927aa
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -60,5 +60,7 @@ public class AggregatedSubscriptionStats {

double msgDropRate;

long consumersCount;

public Map<Consumer, AggregatedConsumerStats> consumerStat = new HashMap<>();
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl;
import org.apache.pulsar.common.policies.data.stats.NonPersistentTopicStatsImpl;
import org.apache.pulsar.common.policies.data.stats.ReplicatorStatsImpl;
import org.apache.pulsar.common.policies.data.stats.SubscriptionStatsImpl;
import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
import org.apache.pulsar.common.util.SimpleTextOutputStream;
import org.apache.pulsar.compaction.CompactedTopicContext;
Expand Down Expand Up @@ -189,6 +190,7 @@ private static void getTopicStats(Topic topic, TopicStats stats, boolean include
subsStats.lastConsumedFlowTimestamp = subscriptionStats.lastConsumedFlowTimestamp;
subsStats.lastConsumedTimestamp = subscriptionStats.lastConsumedTimestamp;
subsStats.lastMarkDeleteAdvancedTimestamp = subscriptionStats.lastMarkDeleteAdvancedTimestamp;
subsStats.consumersCount = subscriptionStats.consumers.size();
subscriptionStats.consumers.forEach(cStats -> {
stats.consumersCount++;
subsStats.unackedMessages += cStats.unackedMessages;
Expand Down Expand Up @@ -222,6 +224,7 @@ private static void getTopicStats(Topic topic, TopicStats stats, boolean include
subsStats.lastConsumedFlowTimestamp = subscriptionStats.getLastConsumedFlowTimestamp();
subsStats.lastConsumedTimestamp = subscriptionStats.getLastConsumedTimestamp();
subsStats.lastMarkDeleteAdvancedTimestamp = subscriptionStats.getLastMarkDeleteAdvancedTimestamp();
subsStats.consumersCount = subscriptionStats.getConsumers().size();
subsStats.msgDropRate = subscriptionStats.getMsgDropRate();
subscriptionStats.getConsumers().forEach(cStats -> {
stats.consumersCount++;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ static void printTopicStats(SimpleTextOutputStream stream, String cluster, Strin
metric(stream, cluster, namespace, topic, n, "pulsar_subscription_msg_drop_rate",
subsStats.msgDropRate, splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, n, "pulsar_subscription_consumers_count",
subsStats.consumerStat.size(), splitTopicAndPartitionIndexLabel);
subsStats.consumersCount, splitTopicAndPartitionIndexLabel);
subsStats.consumerStat.forEach((c, consumerStats) -> {
metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(),
"pulsar_consumer_msg_rate_redeliver", consumerStats.msgRateRedeliver,
Expand Down

0 comments on commit 81927aa

Please sign in to comment.