Skip to content

Commit

Permalink
Make retention.ms be long; Attach broker id into metrics datapoints
Browse files Browse the repository at this point in the history
Fixes:
- Convert topicSize and retentionMs from interger to long. So the large topic or long rention topic wont cause sensor parsing error.
- Attach broker id into metrics datapoints to help the engineer find out which brokers are in bad state.
  • Loading branch information
yisheng-zhou authored Oct 8, 2024
2 parents 4f11d45 + a622de5 commit 3a37e31
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -159,12 +159,15 @@ public void operate(KafkaCluster cluster) throws Exception {
"Orion agents on " + cluster.getClusterId() + " are unhealthy, no URPs on the cluster: " + alertableUnhealthyAgentBrokersWithoutURPs,
"orion"
));
OrionServer.metricsCounterInc(
"broker.agent.unhealthy",
new HashMap<String, String>() {{
put("clusterId", cluster.getClusterId());
}}
);
for (String brokerId : alertableUnhealthyAgentBrokersWithoutURPs) {
OrionServer.metricsCounterInc(
"broker.agent.unhealthy",
new HashMap<String, String>() {{
put("clusterId", cluster.getClusterId());
put("brokerId", brokerId);
}}
);
}
}

// alert on brokers where broker service is unhealthy but there are no URPs if they show up for 3 consecutive times
Expand All @@ -186,12 +189,15 @@ public void operate(KafkaCluster cluster) throws Exception {
"Kafka service on " + cluster.getClusterId() + " are unhealthy, no URPs on the cluster: " + alertableUnhealthyBrokersWithoutURPs,
"orion"
));
OrionServer.metricsCounterInc(
"broker.service.unhealthy",
new HashMap<String, String>() {{
put("clusterId", cluster.getClusterId());
}}
);
for (String brokerId : alertableUnhealthyBrokersWithoutURPs) {
OrionServer.metricsCounterInc(
"broker.service.unhealthy",
new HashMap<String, String>() {{
put("clusterId", cluster.getClusterId());
put("brokerId", brokerId);
}}
);
}
}

setMessage("offline brokers: " + unhealthyKafkaBrokers + "\nunhealthy agent orion nodes: " + unhealthyAgentNodes +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ public void populateTopicMetrics(KafkaCluster cluster, Map<String, KafkaTopicDes
put("clusterId", cluster.getClusterId());
}};
// Topic size
int topicSize = (int) getTopicSizeByteFromTopicDescription(topicDescription);
long topicSize = (long) getTopicSizeByteFromTopicDescription(topicDescription);
OrionServer.metricsHistogram(
"kafka.topic.size.bytes",
topicSize,
Expand All @@ -170,7 +170,7 @@ public void populateTopicMetrics(KafkaCluster cluster, Map<String, KafkaTopicDes
metricsTags
);
// Retention
int retentionMs= Integer.parseInt(
long retentionMs= Long.parseLong(
topicDescription.getTopicConfigs().getOrDefault("retention.ms", "0.0"));
OrionServer.metricsHistogram(
"kafka.topic.retention.ms",
Expand Down

0 comments on commit 3a37e31

Please sign in to comment.