diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/OpenTelemetryReplicatorStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/OpenTelemetryReplicatorStats.java index fa7fa6a40cc2c..04bc805a64bbf 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/OpenTelemetryReplicatorStats.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/OpenTelemetryReplicatorStats.java @@ -49,18 +49,18 @@ public class OpenTelemetryReplicatorStats implements AutoCloseable { private final ObservableLongMeasurement bytesOutCounter; // Replaces pulsar_replication_backlog - public static final String BACKLOG_COUNTER = "pulsar.broker.replication.message.backlog"; + public static final String BACKLOG_COUNTER = "pulsar.broker.replication.message.backlog.count"; private final ObservableLongMeasurement backlogCounter; - // Replaces pulsar_replication_rate_expired - public static final String EXPIRED_COUNTER = "pulsar.broker.replication.message.expired"; - private final ObservableLongMeasurement expiredCounter; - // Replaces pulsar_replication_delay_in_seconds - public static final String DELAY_GAUGE = "pulsar.broker.replication.message.age"; + public static final String DELAY_GAUGE = "pulsar.broker.replication.message.backlog.age"; private final ObservableDoubleMeasurement delayGauge; - public static final String DROPPED_COUNTER = "pulsar.broker.replication.dropped.count"; + // Replaces pulsar_replication_rate_expired + public static final String EXPIRED_COUNTER = "pulsar.broker.replication.message.expired.count"; + private final ObservableLongMeasurement expiredCounter; + + public static final String DROPPED_COUNTER = "pulsar.broker.replication.message.dropped.count"; private final ObservableLongMeasurement droppedCounter; private final BatchCallback batchCallback; @@ -83,14 +83,14 @@ public OpenTelemetryReplicatorStats(PulsarService pulsar) { bytesInCounter = meter .upDownCounterBuilder(BYTES_IN_COUNTER) - .setUnit("{byte}") + .setUnit("{By}") .setDescription( "The total number of messages bytes received from the remote cluster through this replicator.") .buildObserver(); bytesOutCounter = meter .upDownCounterBuilder(BYTES_OUT_COUNTER) - .setUnit("{byte}") + .setUnit("{By}") .setDescription( "The total number of messages bytes sent to the remote cluster through this replicator.") .buildObserver(); @@ -101,22 +101,22 @@ public OpenTelemetryReplicatorStats(PulsarService pulsar) { .setDescription("The total number of messages in the backlog for this replicator.") .buildObserver(); + delayGauge = meter + .gaugeBuilder(DELAY_GAUGE) + .setUnit("s") + .setDescription("The age of the oldest message in the replicator backlog.") + .buildObserver(); + expiredCounter = meter .upDownCounterBuilder(EXPIRED_COUNTER) .setUnit("{message}") .setDescription("The total number of messages that expired for this replicator.") .buildObserver(); - delayGauge = meter - .gaugeBuilder(DELAY_GAUGE) - .setUnit("{second}") - .setDescription("The total number of messages that expired for this replicator.") - .buildObserver(); - droppedCounter = meter .upDownCounterBuilder(DROPPED_COUNTER) .setUnit("{message}") - .setDescription("The total number of dropped messages for this replicator.") + .setDescription("The total number of messages dropped by this replicator.") .buildObserver(); batchCallback = meter.batchCallback(() -> pulsar.getBrokerService() @@ -135,8 +135,8 @@ public OpenTelemetryReplicatorStats(PulsarService pulsar) { bytesInCounter, bytesOutCounter, backlogCounter, - expiredCounter, delayGauge, + expiredCounter, droppedCounter); } @@ -159,10 +159,8 @@ private void recordMetricsForReplicator(AbstractReplicator replicator) { if (replicator instanceof PersistentReplicator persistentReplicator) { expiredCounter.record(persistentReplicator.getMessageExpiredCount(), attributes); backlogCounter.record(persistentReplicator.getNumberOfEntriesInBacklog(), attributes); - } - - if (stats instanceof NonPersistentReplicatorStats nonPersistentStats) { - droppedCounter.record(nonPersistentStats.getMsgDropCount(), attributes); + } else if (replicator instanceof NonPersistentReplicator nonPersistentReplicator) { + droppedCounter.record(nonPersistentReplicator.getStats().getMsgDropCount(), attributes); } } }