Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[collector]feature:Add monitoring metrics for consumer groups in Kafka client #2887

Merged
merged 11 commits into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,26 +24,41 @@
import org.apache.hertzbeat.common.entity.job.protocol.KafkaProtocol;
import org.apache.hertzbeat.common.entity.message.CollectRep;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ConsumerGroupDescription;
import org.apache.kafka.clients.admin.ConsumerGroupListing;
import org.apache.kafka.clients.admin.DescribeConsumerGroupsResult;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult;
import org.apache.kafka.clients.admin.ListConsumerGroupsResult;
import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
import org.apache.kafka.clients.admin.ListTopicsOptions;
import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;
import org.springframework.util.Assert;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;

@Slf4j
public class KafkaCollectImpl extends AbstractCollect {

private static final String LAG_NUM = "lag_num";
private static final String PARTITION_OFFSET = "Partition_offset";

@Override
public void preCheck(Metrics metrics) throws IllegalArgumentException {
KafkaProtocol kafkaProtocol = metrics.getKclient();
Expand Down Expand Up @@ -79,6 +94,9 @@ public void collect(CollectRep.MetricsData.Builder builder, long monitorId, Stri
case TOPIC_OFFSET:
collectTopicOffset(builder, adminClient);
break;
case CONSUMER_DETAIL:
collectTopicConsumerGroups(builder, adminClient);
break;
default:
log.error("Unsupported command: {}", command);
break;
Expand Down Expand Up @@ -203,6 +221,98 @@ private static void collectTopicDescribe(CollectRep.MetricsData.Builder builder,
});
}

/**
* Collect Topic ConsumerGroups Message
*
* @param builder The MetricsData builder
* @param adminClient The AdminClient
*/
private static void collectTopicConsumerGroups(CollectRep.MetricsData.Builder builder, AdminClient adminClient) throws InterruptedException, ExecutionException {
ListTopicsOptions options = new ListTopicsOptions();
options.listInternal(true);
// Get all consumer groups
ListConsumerGroupsResult consumerGroupsResult = adminClient.listConsumerGroups();
Collection<ConsumerGroupListing> consumerGroups = consumerGroupsResult.all().get();
// Get the list of consumer groups for each topic
Map<String, Set<String>> topicConsumerGroupsMap = getTopicConsumerGroupsMap(consumerGroups, adminClient);
topicConsumerGroupsMap.entrySet().stream()
.flatMap(entry -> entry.getValue().stream()
.map(groupId -> {
try {
String topic = entry.getKey();
DescribeConsumerGroupsResult describeResult = adminClient.describeConsumerGroups(Collections.singletonList(groupId));
Map<String, ConsumerGroupDescription> consumerGroupDescriptions = describeResult.all().get();
ConsumerGroupDescription description = consumerGroupDescriptions.get(groupId);
Map<String, String> offsetAndLagNum = getConsumerGroupMetrics(topic, groupId, adminClient);
return CollectRep.ValueRow.newBuilder()
.addColumns(groupId)
.addColumns(String.valueOf(description.members().size()))
.addColumns(topic)
.addColumns(offsetAndLagNum.get(PARTITION_OFFSET))
.addColumns(offsetAndLagNum.get(LAG_NUM))
.build();
} catch (InterruptedException | ExecutionException e) {
log.warn("group {} get message fail", groupId);
return null;
}
})
)
.filter(Objects::nonNull)
.forEach(builder::addValues);
}

private static Map<String, Set<String>> getTopicConsumerGroupsMap(Collection<ConsumerGroupListing> consumerGroups,
AdminClient adminClient)
throws ExecutionException, InterruptedException {
Map<String, Set<String>> topicConsumerGroupsMap = new HashMap<>();
for (ConsumerGroupListing consumerGroup : consumerGroups) {
String groupId = consumerGroup.groupId();
// Get the offset information for the consumer group
ListConsumerGroupOffsetsResult consumerGroupOffsetsResult = adminClient.listConsumerGroupOffsets(groupId);
Map<TopicPartition, OffsetAndMetadata> topicOffsets = consumerGroupOffsetsResult.partitionsToOffsetAndMetadata().get();
// Iterate over all TopicPartitions consumed by the consumer group
for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : topicOffsets.entrySet()) {
String topic = entry.getKey().topic();
topicConsumerGroupsMap.computeIfAbsent(topic, k -> new HashSet<>()).add(groupId);
}
}
return topicConsumerGroupsMap;
}

private static Map<String, String> getConsumerGroupMetrics(String topic, String groupId, AdminClient adminClient)
throws ExecutionException, InterruptedException {
// Get the offset for each groupId for the specified topic
ListConsumerGroupOffsetsResult consumerGroupOffsetsResult = adminClient.listConsumerGroupOffsets(groupId);
Map<TopicPartition, OffsetAndMetadata> topicOffsets = consumerGroupOffsetsResult.partitionsToOffsetAndMetadata().get();
long totalLag = 0L;
for (Entry<TopicPartition, OffsetAndMetadata> topicPartitionOffsetAndMetadataEntry : topicOffsets.entrySet()) {
if (topicPartitionOffsetAndMetadataEntry.getKey().topic().equals(topic)) {
OffsetAndMetadata offsetMetadata = topicPartitionOffsetAndMetadataEntry.getValue();
TopicPartition partition = topicPartitionOffsetAndMetadataEntry.getKey();
// Get the latest offset for each TopicPartition
ListOffsetsResultInfo resultInfo = adminClient.listOffsets(
Collections.singletonMap(partition, OffsetSpec.latest())).all().get().get(partition);
long latestOffset = resultInfo.offset();
// Accumulate the lag for each partition
long l = latestOffset - offsetMetadata.offset();
totalLag += l;
}
}
// Get all offsets and convert them to a string, joined by "、"
String partitionOffsets = topicOffsets.entrySet().stream()
.filter(entry -> entry.getKey().topic().equals(topic))
.map(entry -> String.valueOf(entry.getValue().offset()))
.collect(Collectors.collectingAndThen(
Collectors.joining(","),
result -> "[" + result + "]"
));
Map<String, String> res = new HashMap<>();
res.put(LAG_NUM, String.valueOf(totalLag));
res.put(PARTITION_OFFSET, partitionOffsets);
return res;
}


@Override
public String supportProtocol() {
return DispatchConstants.PROTOCOL_KAFKA;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ public enum SupportedCommand {

TOPIC_DESCRIBE("topic-describe"),
TOPIC_LIST("topic-list"),
TOPIC_OFFSET("topic-offset");
TOPIC_OFFSET("topic-offset"),
CONSUMER_DETAIL("consumer-detail");

private static Set<String> SUPPORTED_COMMAND = new HashSet<>();

Expand Down
41 changes: 39 additions & 2 deletions hertzbeat-manager/src/main/resources/define/app-kafka_client.yml
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ metrics:
- field: PartitionNum
type: 1
i18n:
zh-CN: 分区数量
zh-CN: 分区号
en-US: Partition Num
- field: earliest
type: 0
Expand All @@ -140,4 +140,41 @@ metrics:
host: ^_^host^_^
port: ^_^port^_^
command: topic-offset

- name: consumer_detail
i18n:
zh-CN: 消费者组情况
en-US: Consumer Detail Info
priority: 3
# Kafka offset does not need to be obtained frequently, as getting it too quickly will affect performance
interval: 300
fields:
- field: GroupId
type: 1
i18n:
zh-CN: 消费者组ID
en-US: Consumer Group ID
- field: Group Member Num
type: 1
i18n:
zh-CN: 消费者实例数量
en-US: Group Member Num
- field: Topic
type: 1
i18n:
zh-CN: 订阅主题名称
en-US: Subscribed Topic Name
- field: Offset of Each Partition
type: 1
i18n:
zh-CN: 各分区偏移量
en-US: Offset of Each Partition
- field: Lag
type: 0
i18n:
zh-CN: 落后偏移量
en-US: Total Lag
protocol: kclient
kclient:
host: ^_^host^_^
port: ^_^port^_^
command: consumer-detail
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,19 @@ keywords: [开源监控系统, 开源消息中间件监控, Kafka监控]

#### 指标集合:topic_offset

| 指标名称 | 指标单位 | 指标帮助描述 |
|-------|---|---------|
| TopicName | 无 | 主题名称 |
| PartitionNum | 无 | 分区数量 |
| earliest | 无 | 最早偏移量 |
| latest | 无 | 最新偏移量 |
| 指标名称 | 指标单位 | 指标帮助描述 |
|-------|---|--------|
| TopicName | 无 | 主题名称 |
| PartitionNum | 无 | 分区号 |
| earliest | 无 | 最早偏移量 |
| latest | 无 | 最新偏移量 |

#### 指标集合:consumer_detail

| 指标名称 | 指标单位 | 指标帮助描述 |
|-----------|------|-------|
| GroupId | 无 | 消费者组ID |
| Group Member Num | 无 | 消费者实例数量|
| Subscribed Topic Name | 无 | 订阅主题名称 |
| Offsets of Each Partition | 无 | 各分区偏移量 |
| Lag | 无 | 落后偏移量 |
Loading