diff --git a/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java b/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java index 39af18b9faa..3f825207a92 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java @@ -17,6 +17,7 @@ package org.apache.rocketmq.broker.metrics; import com.google.common.base.Splitter; +import io.netty.channel.Channel; import io.opentelemetry.api.common.Attributes; import io.opentelemetry.api.common.AttributesBuilder; import io.opentelemetry.api.metrics.LongCounter; @@ -45,10 +46,12 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.broker.client.ClientChannelInfo; import org.apache.rocketmq.broker.client.ConsumerManager; import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.MQVersion; @@ -459,9 +462,14 @@ private void initConnectionMetrics() { .ofLongs() .buildWithCallback(measurement -> { Map metricsMap = new HashMap<>(); - brokerController.getProducerManager() - .getGroupChannelTable() - .values() + ConcurrentHashMap> groupChannelTable = brokerController.getProducerManager().getGroupChannelTable(); + ConcurrentHashMap> clone = new ConcurrentHashMap<>(groupChannelTable); + clone.forEach((group, map) -> { + if (MixAll.CLIENT_INNER_PRODUCER_GROUP.equals(group)) { + groupChannelTable.remove(group); + } + }); + clone.values() .stream() .flatMap(map -> map.values().stream()) .forEach(info -> {