Skip to content

Commit

Permalink
Remove hardcoded pulsar.broker.replication.connected.count metric
Browse files Browse the repository at this point in the history
  • Loading branch information
dragosvictor committed Jun 25, 2024
1 parent bea5073 commit 9a9c35a
Show file tree
Hide file tree
Showing 2 changed files with 1 addition and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.AbstractReplicator;
import org.apache.pulsar.broker.service.nonpersistent.NonPersistentReplicator;
import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
import org.apache.pulsar.common.policies.data.NonPersistentReplicatorStats;
import org.apache.pulsar.common.stats.MetricsUtil;

public class OpenTelemetryReplicatorStats implements AutoCloseable {
Expand Down Expand Up @@ -56,10 +56,6 @@ public class OpenTelemetryReplicatorStats implements AutoCloseable {
public static final String EXPIRED_COUNTER = "pulsar.broker.replication.message.expired";
private final ObservableLongMeasurement expiredCounter;

// Replaces pulsar_replication_connected_count
public static final String CONNECTED_COUNTER = "pulsar.broker.replication.connected.count";
private final ObservableLongMeasurement connectedCounter;

// Replaces pulsar_replication_delay_in_seconds
public static final String DELAY_GAUGE = "pulsar.broker.replication.message.age";
private final ObservableDoubleMeasurement delayGauge;
Expand Down Expand Up @@ -111,12 +107,6 @@ public OpenTelemetryReplicatorStats(PulsarService pulsar) {
.setDescription("The total number of messages that expired for this replicator.")
.buildObserver();

connectedCounter = meter
.upDownCounterBuilder(CONNECTED_COUNTER)
.setUnit("{subscriber}")
.setDescription("The total number of replication subscribers that are running.")
.buildObserver();

delayGauge = meter
.gaugeBuilder(DELAY_GAUGE)
.setUnit("{second}")
Expand Down Expand Up @@ -146,7 +136,6 @@ public OpenTelemetryReplicatorStats(PulsarService pulsar) {
bytesOutCounter,
backlogCounter,
expiredCounter,
connectedCounter,
delayGauge,
droppedCounter);
}
Expand All @@ -164,7 +153,6 @@ private void recordMetricsForReplicator(AbstractReplicator replicator) {
messageOutCounter.record(stats.getMsgOutCount(), attributes);
bytesInCounter.record(stats.getBytesInCount(), attributes);
bytesOutCounter.record(stats.getBytesOutCount(), attributes);
connectedCounter.record(replicator.isConnected() ? 1 : 0, attributes);
var delaySeconds = MetricsUtil.convertToSeconds(replicator.getReplicationDelayMs(), TimeUnit.MILLISECONDS);
delayGauge.record(delaySeconds, attributes);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1868,7 +1868,6 @@ public void testReplicationMetrics() throws Exception {
assertMetricLongSumValue(metrics, OpenTelemetryReplicatorStats.MESSAGE_OUT_COUNTER, attributes, 3);
assertMetricLongSumValue(metrics, OpenTelemetryReplicatorStats.BYTES_OUT_COUNTER, attributes,
aLong -> assertThat(aLong).isPositive());
assertMetricLongSumValue(metrics, OpenTelemetryReplicatorStats.CONNECTED_COUNTER, attributes, 1);

var topicOpt = pulsar1.getBrokerService().getTopicReference(destTopicName.toString());
assertThat(topicOpt).isPresent();
Expand Down

0 comments on commit 9a9c35a

Please sign in to comment.