diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 65dd90f7a1235..8cf1376642b88 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -113,6 +113,7 @@ import org.apache.pulsar.broker.stats.MetricsGenerator; import org.apache.pulsar.broker.stats.OpenTelemetryConsumerStats; import org.apache.pulsar.broker.stats.OpenTelemetryProducerStats; +import org.apache.pulsar.broker.stats.OpenTelemetryReplicatorStats; import org.apache.pulsar.broker.stats.OpenTelemetryTopicStats; import org.apache.pulsar.broker.stats.PulsarBrokerOpenTelemetry; import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsServlet; @@ -260,6 +261,7 @@ public class PulsarService implements AutoCloseable, ShutdownService { private OpenTelemetryTopicStats openTelemetryTopicStats; private OpenTelemetryConsumerStats openTelemetryConsumerStats; private OpenTelemetryProducerStats openTelemetryProducerStats; + private OpenTelemetryReplicatorStats openTelemetryReplicatorStats; private TransactionMetadataStoreService transactionMetadataStoreService; private TransactionBufferProvider transactionBufferProvider; @@ -678,6 +680,10 @@ public CompletableFuture closeAsync() { brokerClientSharedTimer.stop(); monotonicSnapshotClock.close(); + if (openTelemetryReplicatorStats != null) { + openTelemetryReplicatorStats.close(); + openTelemetryReplicatorStats = null; + } if (openTelemetryProducerStats != null) { openTelemetryProducerStats.close(); openTelemetryProducerStats = null; @@ -834,6 +840,7 @@ public void start() throws PulsarServerException { openTelemetryTopicStats = new OpenTelemetryTopicStats(this); openTelemetryConsumerStats = new OpenTelemetryConsumerStats(this); openTelemetryProducerStats = new OpenTelemetryProducerStats(this); + openTelemetryReplicatorStats = new OpenTelemetryReplicatorStats(this); localMetadataSynchronizer = StringUtils.isNotBlank(config.getMetadataSyncEventTopic()) ? new PulsarMetadataEventSynchronizer(this, config.getMetadataSyncEventTopic()) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java index 869a4bc81d310..8552a9f09e93b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java @@ -19,6 +19,7 @@ package org.apache.pulsar.broker.service; import com.google.common.annotations.VisibleForTesting; +import io.opentelemetry.api.common.Attributes; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; @@ -42,6 +43,7 @@ import org.apache.pulsar.common.util.Backoff; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.StringInterner; +import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,6 +57,7 @@ public abstract class AbstractReplicator implements Replicator { protected final PulsarClientImpl replicationClient; protected final PulsarClientImpl client; protected String replicatorId; + @Getter protected final Topic localTopic; protected volatile ProducerImpl producer; @@ -74,6 +77,10 @@ public abstract class AbstractReplicator implements Replicator { @Getter protected volatile State state = State.Disconnected; + private volatile Attributes attributes = null; + private static final AtomicReferenceFieldUpdater ATTRIBUTES_UPDATER = + AtomicReferenceFieldUpdater.newUpdater(AbstractReplicator.class, Attributes.class, "attributes"); + public enum State { /** * This enum has two mean meanings: @@ -136,6 +143,17 @@ public AbstractReplicator(String localCluster, Topic localTopic, String remoteCl protected abstract void disableReplicatorRead(); + @Override + public boolean isConnected() { + var producer = this.producer; + return producer != null && producer.isConnected(); + } + + public long getReplicationDelayMs() { + var producer = this.producer; + return producer == null ? 0 : producer.getDelayInMillis(); + } + public String getRemoteCluster() { return remoteCluster; } @@ -476,4 +494,26 @@ protected ImmutablePair compareSetAndGetState(State expect, Stat public boolean isTerminated() { return state == State.Terminating || state == State.Terminated; } + + public Attributes getAttributes() { + if (attributes != null) { + return attributes; + } + return ATTRIBUTES_UPDATER.updateAndGet(this, old -> { + if (old != null) { + return old; + } + var topicName = TopicName.get(getLocalTopic().getName()); + var builder = Attributes.builder() + .put(OpenTelemetryAttributes.PULSAR_DOMAIN, topicName.getDomain().toString()) + .put(OpenTelemetryAttributes.PULSAR_TENANT, topicName.getTenant()) + .put(OpenTelemetryAttributes.PULSAR_NAMESPACE, topicName.getNamespace()) + .put(OpenTelemetryAttributes.PULSAR_TOPIC, topicName.getPartitionedTopicName()); + if (topicName.isPartitioned()) { + builder.put(OpenTelemetryAttributes.PULSAR_PARTITION_INDEX, topicName.getPartitionIndex()); + } + builder.put(OpenTelemetryAttributes.PULSAR_REPLICATION_REMOTE_CLUSTER_NAME, getRemoteCluster()); + return builder.build(); + }); + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index 572b54e0d3e79..fbf11f1d0ad62 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -933,6 +933,14 @@ public void incrementPublishCount(Producer producer, int numOfMessages, long msg if (isSystemTopic()) { systemTopicBytesInCounter.add(msgSizeInBytes); } + + if (producer.isRemote()) { + var remoteClusterName = producer.getRemoteCluster(); + var replicator = getReplicators().get(remoteClusterName); + if (replicator != null) { + replicator.getStats().incrementPublishCount(numOfMessages, msgSizeInBytes); + } + } } private void handlePublishThrottling(Producer producer, int numOfMessages, long msgSizeInBytes) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java index 5c314397da80e..667063e491085 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java @@ -27,7 +27,9 @@ public interface Replicator { void startProducer(); - ReplicatorStatsImpl getStats(); + Topic getLocalTopic(); + + ReplicatorStatsImpl computeStats(); CompletableFuture terminate(); @@ -53,4 +55,6 @@ default Optional getRateLimiter() { long getNumberOfEntriesInBacklog(); boolean isTerminated(); + + ReplicatorStatsImpl getStats(); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java index 51509f3818a28..6441230fad87b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java @@ -116,6 +116,8 @@ public void sendMessage(Entry entry) { } msgOut.recordEvent(headersAndPayload.readableBytes()); + stats.incrementMsgOutCounter(); + stats.incrementBytesOutCounter(headersAndPayload.readableBytes()); msg.setReplicatedFrom(localCluster); @@ -129,6 +131,7 @@ public void sendMessage(Entry entry) { replicatorId); } msgDrop.recordEvent(); + stats.incrementMsgDropCount(); entry.release(); } } @@ -143,11 +146,11 @@ public void updateRates() { } @Override - public NonPersistentReplicatorStatsImpl getStats() { - stats.connected = producer != null && producer.isConnected(); - stats.replicationDelayInSeconds = getReplicationDelayInSeconds(); - + public NonPersistentReplicatorStatsImpl computeStats() { ProducerImpl producer = this.producer; + stats.connected = isConnected(); + stats.replicationDelayInSeconds = TimeUnit.MILLISECONDS.toSeconds(getReplicationDelayMs()); + if (producer != null) { stats.outboundConnection = producer.getConnectionId(); stats.outboundConnectedSince = producer.getConnectedSince(); @@ -159,11 +162,9 @@ public NonPersistentReplicatorStatsImpl getStats() { return stats; } - private long getReplicationDelayInSeconds() { - if (producer != null) { - return TimeUnit.MILLISECONDS.toSeconds(producer.getDelayInMillis()); - } - return 0L; + @Override + public NonPersistentReplicatorStatsImpl getStats() { + return stats; } private static final class ProducerSendCallback implements SendCallback { @@ -256,10 +257,4 @@ public long getNumberOfEntriesInBacklog() { protected void disableReplicatorRead() { // No-op } - - @Override - public boolean isConnected() { - ProducerImpl producer = this.producer; - return producer != null && producer.isConnected(); - } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index a6f65f6da3284..0c6ebdfefa01f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -961,7 +961,7 @@ public CompletableFuture asyncGetStats(GetStatsOptions }); replicators.forEach((cluster, replicator) -> { - NonPersistentReplicatorStatsImpl replicatorStats = replicator.getStats(); + NonPersistentReplicatorStatsImpl replicatorStats = replicator.computeStats(); // Add incoming msg rates PublisherStatsImpl pubStats = remotePublishersStats.get(replicator.getRemoteCluster()); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java index 1314b2d2ed06b..1d9df2bcccda3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java @@ -166,6 +166,8 @@ protected boolean replicateEntries(List entries) { msg.getMessageBuilder().clearTxnidMostBits(); msg.getMessageBuilder().clearTxnidLeastBits(); msgOut.recordEvent(headersAndPayload.readableBytes()); + stats.incrementMsgOutCounter(); + stats.incrementBytesOutCounter(headersAndPayload.readableBytes()); // Increment pending messages for messages produced locally PENDING_MESSAGES_UPDATER.incrementAndGet(this); producer.sendAsync(msg, ProducerSendCallback.create(this, entry, msg)); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java index 6263c512997fa..aa53a93da5c4f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java @@ -28,11 +28,13 @@ import io.netty.util.Recycler; import io.netty.util.Recycler.Handle; import java.util.List; +import java.util.Objects; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import lombok.Getter; import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.AsyncCallbacks.ClearBacklogCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCallback; @@ -107,7 +109,8 @@ public abstract class PersistentReplicator extends AbstractReplicator // for connected subscriptions, message expiry will be checked if the backlog is greater than this threshold private static final int MINIMUM_BACKLOG_FOR_EXPIRY_CHECK = 1000; - private final ReplicatorStatsImpl stats = new ReplicatorStatsImpl(); + @Getter + protected final ReplicatorStatsImpl stats = new ReplicatorStatsImpl(); protected volatile boolean fetchSchemaInProgress = false; @@ -118,7 +121,7 @@ public PersistentReplicator(String localCluster, PersistentTopic localTopic, Man super(localCluster, localTopic, remoteCluster, remoteTopic, localTopic.getReplicatorPrefix(), brokerService, replicationClient); this.topic = localTopic; - this.cursor = cursor; + this.cursor = Objects.requireNonNull(cursor); this.expiryMonitor = new PersistentMessageExpiryMonitor(localTopic, Codec.decode(cursor.getName()), cursor, null); HAVE_PENDING_READ_UPDATER.set(this, FALSE); @@ -186,12 +189,14 @@ public long getNumberOfEntriesInBacklog() { return cursor.getNumberOfEntriesInBacklog(true); } + public long getMessageExpiredCount() { + return expiryMonitor.getTotalMessageExpired(); + } + @Override protected void disableReplicatorRead() { - if (this.cursor != null) { - // deactivate cursor after successfully close the producer - this.cursor.setInactive(); - } + // deactivate cursor after successfully close the producer + this.cursor.setInactive(); } /** @@ -330,12 +335,10 @@ protected CompletableFuture getSchemaInfo(MessageImpl msg) throws Ex } public void updateCursorState() { - if (this.cursor != null) { - if (producer != null && producer.isConnected()) { - this.cursor.setActive(); - } else { - this.cursor.setInactive(); - } + if (isConnected()) { + cursor.setActive(); + } else { + cursor.setInactive(); } } @@ -595,10 +598,10 @@ public void updateRates() { stats.msgRateExpired = msgExpired.getRate() + expiryMonitor.getMessageExpiryRate(); } - public ReplicatorStatsImpl getStats() { - stats.replicationBacklog = cursor != null ? cursor.getNumberOfEntriesInBacklog(false) : 0; - stats.connected = producer != null && producer.isConnected(); - stats.replicationDelayInSeconds = getReplicationDelayInSeconds(); + public ReplicatorStatsImpl computeStats() { + stats.replicationBacklog = cursor.getNumberOfEntriesInBacklog(false); + stats.connected = isConnected(); + stats.replicationDelayInSeconds = TimeUnit.MILLISECONDS.toSeconds(getReplicationDelayMs()); ProducerImpl producer = this.producer; if (producer != null) { @@ -616,13 +619,6 @@ public void updateMessageTTL(int messageTTLInSeconds) { this.messageTTLInSeconds = messageTTLInSeconds; } - private long getReplicationDelayInSeconds() { - if (producer != null) { - return TimeUnit.MILLISECONDS.toSeconds(producer.getDelayInMillis()); - } - return 0L; - } - @Override public boolean expireMessages(int messageTTLInSeconds) { if ((cursor.getNumberOfEntriesInBacklog(false) == 0) @@ -691,12 +687,6 @@ protected void checkReplicatedSubscriptionMarker(Position position, MessageImpl< } } - @Override - public boolean isConnected() { - ProducerImpl producer = this.producer; - return producer != null && producer.isConnected(); - } - @Override protected void doReleaseResources() { dispatchRateLimiter.ifPresent(DispatchRateLimiter::close); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 1983fa3c383e3..6e3d49fbe9ff1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -2353,7 +2353,7 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats } // Update replicator stats - ReplicatorStatsImpl rStat = replicator.getStats(); + ReplicatorStatsImpl rStat = replicator.computeStats(); // Add incoming msg rates PublisherStatsImpl pubStats = topicStatsHelper.remotePublishersStats.get(replicator.getRemoteCluster()); @@ -2636,7 +2636,7 @@ public CompletableFuture asyncGetStats(GetStatsOptions }); replicators.forEach((cluster, replicator) -> { - ReplicatorStatsImpl replicatorStats = replicator.getStats(); + ReplicatorStatsImpl replicatorStats = replicator.computeStats(); // Add incoming msg rates PublisherStatsImpl pubStats = remotePublishersStats.get(replicator.getRemoteCluster()); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ShadowReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ShadowReplicator.java index 85e837ff1879a..25591857aa1b5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ShadowReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ShadowReplicator.java @@ -92,6 +92,8 @@ protected boolean replicateEntries(List entries) { dispatchRateLimiter.ifPresent(rateLimiter -> rateLimiter.consumeDispatchQuota(1, entry.getLength())); msgOut.recordEvent(headersAndPayload.readableBytes()); + stats.incrementMsgOutCounter(); + stats.incrementBytesOutCounter(headersAndPayload.readableBytes()); msg.setReplicatedFrom(localCluster); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/OpenTelemetryReplicatorStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/OpenTelemetryReplicatorStats.java new file mode 100644 index 0000000000000..04bc805a64bbf --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/OpenTelemetryReplicatorStats.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.stats; + +import io.opentelemetry.api.metrics.BatchCallback; +import io.opentelemetry.api.metrics.ObservableDoubleMeasurement; +import io.opentelemetry.api.metrics.ObservableLongMeasurement; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.service.AbstractReplicator; +import org.apache.pulsar.broker.service.nonpersistent.NonPersistentReplicator; +import org.apache.pulsar.broker.service.persistent.PersistentReplicator; +import org.apache.pulsar.common.stats.MetricsUtil; + +public class OpenTelemetryReplicatorStats implements AutoCloseable { + + // Replaces pulsar_replication_rate_in + public static final String MESSAGE_IN_COUNTER = "pulsar.broker.replication.message.incoming.count"; + private final ObservableLongMeasurement messageInCounter; + + // Replaces pulsar_replication_rate_out + public static final String MESSAGE_OUT_COUNTER = "pulsar.broker.replication.message.outgoing.count"; + private final ObservableLongMeasurement messageOutCounter; + + // Replaces pulsar_replication_throughput_in + public static final String BYTES_IN_COUNTER = "pulsar.broker.replication.message.incoming.size"; + private final ObservableLongMeasurement bytesInCounter; + + // Replaces pulsar_replication_throughput_out + public static final String BYTES_OUT_COUNTER = "pulsar.broker.replication.message.outgoing.size"; + private final ObservableLongMeasurement bytesOutCounter; + + // Replaces pulsar_replication_backlog + public static final String BACKLOG_COUNTER = "pulsar.broker.replication.message.backlog.count"; + private final ObservableLongMeasurement backlogCounter; + + // Replaces pulsar_replication_delay_in_seconds + public static final String DELAY_GAUGE = "pulsar.broker.replication.message.backlog.age"; + private final ObservableDoubleMeasurement delayGauge; + + // Replaces pulsar_replication_rate_expired + public static final String EXPIRED_COUNTER = "pulsar.broker.replication.message.expired.count"; + private final ObservableLongMeasurement expiredCounter; + + public static final String DROPPED_COUNTER = "pulsar.broker.replication.message.dropped.count"; + private final ObservableLongMeasurement droppedCounter; + + private final BatchCallback batchCallback; + + public OpenTelemetryReplicatorStats(PulsarService pulsar) { + var meter = pulsar.getOpenTelemetry().getMeter(); + + messageInCounter = meter + .upDownCounterBuilder(MESSAGE_IN_COUNTER) + .setUnit("{message}") + .setDescription( + "The total number of messages received from the remote cluster through this replicator.") + .buildObserver(); + + messageOutCounter = meter + .upDownCounterBuilder(MESSAGE_OUT_COUNTER) + .setUnit("{message}") + .setDescription("The total number of messages sent to the remote cluster through this replicator.") + .buildObserver(); + + bytesInCounter = meter + .upDownCounterBuilder(BYTES_IN_COUNTER) + .setUnit("{By}") + .setDescription( + "The total number of messages bytes received from the remote cluster through this replicator.") + .buildObserver(); + + bytesOutCounter = meter + .upDownCounterBuilder(BYTES_OUT_COUNTER) + .setUnit("{By}") + .setDescription( + "The total number of messages bytes sent to the remote cluster through this replicator.") + .buildObserver(); + + backlogCounter = meter + .upDownCounterBuilder(BACKLOG_COUNTER) + .setUnit("{message}") + .setDescription("The total number of messages in the backlog for this replicator.") + .buildObserver(); + + delayGauge = meter + .gaugeBuilder(DELAY_GAUGE) + .setUnit("s") + .setDescription("The age of the oldest message in the replicator backlog.") + .buildObserver(); + + expiredCounter = meter + .upDownCounterBuilder(EXPIRED_COUNTER) + .setUnit("{message}") + .setDescription("The total number of messages that expired for this replicator.") + .buildObserver(); + + droppedCounter = meter + .upDownCounterBuilder(DROPPED_COUNTER) + .setUnit("{message}") + .setDescription("The total number of messages dropped by this replicator.") + .buildObserver(); + + batchCallback = meter.batchCallback(() -> pulsar.getBrokerService() + .getTopics() + .values() + .stream() + .filter(topicFuture -> topicFuture.isDone() && !topicFuture.isCompletedExceptionally()) + .map(CompletableFuture::join) + .filter(Optional::isPresent) + .map(Optional::get) + .flatMap(topic -> topic.getReplicators().values().stream()) + .map(AbstractReplicator.class::cast) + .forEach(this::recordMetricsForReplicator), + messageInCounter, + messageOutCounter, + bytesInCounter, + bytesOutCounter, + backlogCounter, + delayGauge, + expiredCounter, + droppedCounter); + } + + @Override + public void close() { + batchCallback.close(); + } + + private void recordMetricsForReplicator(AbstractReplicator replicator) { + var attributes = replicator.getAttributes(); + var stats = replicator.getStats(); + + messageInCounter.record(stats.getMsgInCount(), attributes); + messageOutCounter.record(stats.getMsgOutCount(), attributes); + bytesInCounter.record(stats.getBytesInCount(), attributes); + bytesOutCounter.record(stats.getBytesOutCount(), attributes); + var delaySeconds = MetricsUtil.convertToSeconds(replicator.getReplicationDelayMs(), TimeUnit.MILLISECONDS); + delayGauge.record(delaySeconds, attributes); + + if (replicator instanceof PersistentReplicator persistentReplicator) { + expiredCounter.record(persistentReplicator.getMessageExpiredCount(), attributes); + backlogCounter.record(persistentReplicator.getNumberOfEntriesInBacklog(), attributes); + } else if (replicator instanceof NonPersistentReplicator nonPersistentReplicator) { + droppedCounter.record(nonPersistentReplicator.getStats().getMsgDropCount(), attributes); + } + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java index 3bbc9100b364f..a229ef54c795d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java @@ -290,7 +290,7 @@ private static void getTopicStats(Topic topic, TopicStats stats, boolean include } topic.getReplicators().forEach((cluster, replicator) -> { - ReplicatorStatsImpl replStats = replicator.getStats(); + ReplicatorStatsImpl replStats = replicator.computeStats(); AggregatedReplicationStats aggReplStats = stats.replicationStats.get(replicator.getRemoteCluster()); if (aggReplStats == null) { aggReplStats = new AggregatedReplicationStats(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java index d20f5f0d520e9..64d3088b20622 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java @@ -139,6 +139,11 @@ protected Position getReplicatorReadPosition() { return PositionFactory.EARLIEST; } + @Override + public ReplicatorStatsImpl computeStats() { + return null; + } + @Override public ReplicatorStatsImpl getStats() { return null; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java index d83b2ed4ee6c9..1c47abab775b3 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java @@ -18,11 +18,15 @@ */ package org.apache.pulsar.broker.service; +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat; import static org.apache.pulsar.broker.BrokerTestUtil.newUniqueName; +import static org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricDoubleGaugeValue; +import static org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricLongSumValue; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.spy; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; @@ -31,6 +35,7 @@ import com.google.common.collect.Sets; import com.scurrilous.circe.checksum.Crc32cIntChecksum; import io.netty.buffer.ByteBuf; +import io.opentelemetry.api.common.Attributes; import java.lang.reflect.Field; import java.lang.reflect.Method; import java.nio.charset.StandardCharsets; @@ -58,10 +63,10 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.ManagedLedgerException.CursorAlreadyClosedException; import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.PositionFactory; import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; -import org.apache.bookkeeper.mledger.PositionFactory; import org.apache.commons.lang3.RandomUtils; import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.PulsarService; @@ -69,6 +74,7 @@ import org.apache.pulsar.broker.service.BrokerServiceException.NamingException; import org.apache.pulsar.broker.service.persistent.PersistentReplicator; import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.broker.stats.OpenTelemetryReplicatorStats; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Consumer; @@ -104,6 +110,7 @@ import org.apache.pulsar.common.policies.data.TopicStats; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; +import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes; import org.apache.pulsar.schema.Schemas; import org.awaitility.Awaitility; import org.awaitility.reflect.WhiteboxImpl; @@ -667,7 +674,7 @@ public void testReplicatorClearBacklog() throws Exception { Thread.sleep(100); replicator.updateRates(); // for code-coverage replicator.expireMessages(1); // for code-coverage - ReplicatorStats status = replicator.getStats(); + ReplicatorStats status = replicator.computeStats(); assertEquals(status.getReplicationBacklog(), 0); } @@ -697,7 +704,7 @@ public void testResetReplicatorSubscriptionPosition() throws Exception { replicator.updateRates(); - ReplicatorStats status = replicator.getStats(); + ReplicatorStats status = replicator.computeStats(); assertEquals(status.getReplicationBacklog(), 0); } @@ -997,14 +1004,28 @@ public void testResumptionAfterBacklogRelaxed() throws Exception { Thread.sleep((TIME_TO_CHECK_BACKLOG_QUOTA + 1) * 1000); - assertEquals(replicator.getStats().replicationBacklog, 0); + assertEquals(replicator.computeStats().replicationBacklog, 0); + var attributes = Attributes.of( + OpenTelemetryAttributes.PULSAR_DOMAIN, dest.getDomain().value(), + OpenTelemetryAttributes.PULSAR_TENANT, dest.getTenant(), + OpenTelemetryAttributes.PULSAR_NAMESPACE, dest.getNamespace(), + OpenTelemetryAttributes.PULSAR_TOPIC, dest.getPartitionedTopicName(), + OpenTelemetryAttributes.PULSAR_REPLICATION_REMOTE_CLUSTER_NAME, cluster2 + ); + var metrics = metricReader1.collectAllMetrics(); + assertMetricLongSumValue(metrics, OpenTelemetryReplicatorStats.BACKLOG_COUNTER, attributes, 0); + assertMetricDoubleGaugeValue(metrics, OpenTelemetryReplicatorStats.DELAY_GAUGE, attributes, 0.0); // Next message will not be replicated, because r2 has reached the quota producer1.produce(1); Thread.sleep(500); - assertEquals(replicator.getStats().replicationBacklog, 1); + assertEquals(replicator.computeStats().replicationBacklog, 1); + metrics = metricReader1.collectAllMetrics(); + assertMetricLongSumValue(metrics, OpenTelemetryReplicatorStats.BACKLOG_COUNTER, attributes, 1); + assertMetricDoubleGaugeValue(metrics, OpenTelemetryReplicatorStats.DELAY_GAUGE, attributes, + aDouble -> assertThat(aDouble).isPositive()); // Consumer will now drain 1 message and the replication backlog will be cleared consumer2.receive(1); @@ -1013,13 +1034,16 @@ public void testResumptionAfterBacklogRelaxed() throws Exception { consumer2.receive(1); int retry = 10; - for (int i = 0; i < retry && replicator.getStats().replicationBacklog > 0; i++) { + for (int i = 0; i < retry && replicator.computeStats().replicationBacklog > 0; i++) { if (i != retry - 1) { Thread.sleep(100); } } - assertEquals(replicator.getStats().replicationBacklog, 0); + assertEquals(replicator.computeStats().replicationBacklog, 0); + metrics = metricReader1.collectAllMetrics(); + assertMetricLongSumValue(metrics, OpenTelemetryReplicatorStats.BACKLOG_COUNTER, attributes, 0); + assertMetricDoubleGaugeValue(metrics, OpenTelemetryReplicatorStats.DELAY_GAUGE, attributes, 0.0); } } @@ -1813,6 +1837,72 @@ public void testReplicatorWithTTL() throws Exception { assertEquals(result, Lists.newArrayList("V1", "V2", "V3", "V4")); } + @Test + public void testReplicationMetrics() throws Exception { + var destTopicName = TopicName.get(BrokerTestUtil.newUniqueName("persistent://pulsar/ns/replicationMetrics")); + + @Cleanup + var producer1 = new MessageProducer(url1, destTopicName); + + @Cleanup + var consumer1 = new MessageConsumer(url1, destTopicName); + + @Cleanup + var consumer2 = new MessageConsumer(url2, destTopicName); + + // Produce from cluster 1 and consume from the 1 and 2. + producer1.produce(3); + consumer1.receive(2); + consumer2.receive(1); + + { + // Validate replicator metrics on cluster 1 from cluster 2 + var attributes = Attributes.of( + OpenTelemetryAttributes.PULSAR_DOMAIN, destTopicName.getDomain().value(), + OpenTelemetryAttributes.PULSAR_TENANT, destTopicName.getTenant(), + OpenTelemetryAttributes.PULSAR_NAMESPACE, destTopicName.getNamespace(), + OpenTelemetryAttributes.PULSAR_TOPIC, destTopicName.getPartitionedTopicName(), + OpenTelemetryAttributes.PULSAR_REPLICATION_REMOTE_CLUSTER_NAME, cluster2 + ); + var metrics = metricReader1.collectAllMetrics(); + assertMetricLongSumValue(metrics, OpenTelemetryReplicatorStats.MESSAGE_OUT_COUNTER, attributes, 3); + assertMetricLongSumValue(metrics, OpenTelemetryReplicatorStats.BYTES_OUT_COUNTER, attributes, + aLong -> assertThat(aLong).isPositive()); + + var topicOpt = pulsar1.getBrokerService().getTopicReference(destTopicName.toString()); + assertThat(topicOpt).isPresent(); + var topic = topicOpt.get(); + var persistentReplicators = topic.getReplicators() + .values() + .stream() + .map(PersistentReplicator.class::cast) + .toList(); + persistentReplicators.forEach(this::pauseReplicator); + producer1.produce(5); + Awaitility.await().untilAsserted(() -> { + persistentReplicators.forEach(repl -> repl.expireMessages(1)); + assertMetricLongSumValue(metricReader1.collectAllMetrics(), + OpenTelemetryReplicatorStats.EXPIRED_COUNTER, + attributes, 5); + }); + } + + { + // Validate replicator metrics on cluster 2 from cluster 1 + var attributes = Attributes.of( + OpenTelemetryAttributes.PULSAR_DOMAIN, destTopicName.getDomain().value(), + OpenTelemetryAttributes.PULSAR_TENANT, destTopicName.getTenant(), + OpenTelemetryAttributes.PULSAR_NAMESPACE, destTopicName.getNamespace(), + OpenTelemetryAttributes.PULSAR_TOPIC, destTopicName.getPartitionedTopicName(), + OpenTelemetryAttributes.PULSAR_REPLICATION_REMOTE_CLUSTER_NAME, cluster1 + ); + var metrics = metricReader2.collectAllMetrics(); + assertMetricLongSumValue(metrics, OpenTelemetryReplicatorStats.MESSAGE_IN_COUNTER, attributes, 3); + assertMetricLongSumValue(metrics, OpenTelemetryReplicatorStats.BYTES_IN_COUNTER, attributes, + aLong -> assertThat(aLong).isPositive()); + } + } + @Test public void testEnableReplicationWithNamespaceAllowedClustersPolices() throws Exception { log.info("--- testEnableReplicationWithNamespaceAllowedClustersPolices ---"); @@ -1873,5 +1963,8 @@ private void pauseReplicator(PersistentReplicator replicator) { assertTrue(replicator.isConnected()); }); replicator.closeProducerAsync(true); + Awaitility.await().untilAsserted(() -> { + assertFalse(replicator.isConnected()); + }); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java index 838632febd889..33877b681184f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java @@ -21,12 +21,10 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; - -import com.google.common.io.Resources; import com.google.common.collect.Sets; - +import com.google.common.io.Resources; import io.netty.util.concurrent.DefaultThreadFactory; - +import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader; import java.net.URL; import java.util.Optional; import java.util.Set; @@ -35,12 +33,9 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; - import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; -import org.apache.pulsar.common.policies.data.ClusterData; -import org.apache.pulsar.common.policies.data.TopicType; -import org.apache.pulsar.tests.TestRetrySupport; +import org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; @@ -51,7 +46,11 @@ import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.TypedMessageBuilder; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.TenantInfoImpl; +import org.apache.pulsar.common.policies.data.TopicType; +import org.apache.pulsar.functions.worker.WorkerConfig; +import org.apache.pulsar.tests.TestRetrySupport; import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble; import org.apache.pulsar.zookeeper.ZookeeperServerTest; import org.slf4j.Logger; @@ -63,6 +62,7 @@ public abstract class ReplicatorTestBase extends TestRetrySupport { ServiceConfiguration config1 = new ServiceConfiguration(); PulsarService pulsar1; BrokerService ns1; + protected InMemoryMetricReader metricReader1; PulsarAdmin admin1; LocalBookkeeperEnsemble bkEnsemble1; @@ -74,6 +74,7 @@ public abstract class ReplicatorTestBase extends TestRetrySupport { BrokerService ns2; PulsarAdmin admin2; LocalBookkeeperEnsemble bkEnsemble2; + protected InMemoryMetricReader metricReader2; URL url3; URL urlTls3; @@ -82,6 +83,7 @@ public abstract class ReplicatorTestBase extends TestRetrySupport { BrokerService ns3; PulsarAdmin admin3; LocalBookkeeperEnsemble bkEnsemble3; + protected InMemoryMetricReader metricReader3; URL url4; URL urlTls4; @@ -89,6 +91,7 @@ public abstract class ReplicatorTestBase extends TestRetrySupport { PulsarService pulsar4; PulsarAdmin admin4; LocalBookkeeperEnsemble bkEnsemble4; + protected InMemoryMetricReader metricReader4; ZookeeperServerTest globalZkS; @@ -154,7 +157,8 @@ protected void setup() throws Exception { // completely // independent config objects instead of referring to the same properties object setConfig1DefaultValue(); - pulsar1 = new PulsarService(config1); + metricReader1 = InMemoryMetricReader.create(); + pulsar1 = buildPulsarService(config1, metricReader1); pulsar1.start(); ns1 = pulsar1.getBrokerService(); @@ -169,7 +173,8 @@ protected void setup() throws Exception { bkEnsemble2.start(); setConfig2DefaultValue(); - pulsar2 = new PulsarService(config2); + metricReader2 = InMemoryMetricReader.create(); + pulsar2 = buildPulsarService(config2, metricReader2); pulsar2.start(); ns2 = pulsar2.getBrokerService(); @@ -184,7 +189,8 @@ protected void setup() throws Exception { bkEnsemble3.start(); setConfig3DefaultValue(); - pulsar3 = new PulsarService(config3); + metricReader3 = InMemoryMetricReader.create(); + pulsar3 = buildPulsarService(config3, metricReader3); pulsar3.start(); ns3 = pulsar3.getBrokerService(); @@ -199,7 +205,8 @@ protected void setup() throws Exception { bkEnsemble4.start(); setConfig4DefaultValue(); - pulsar4 = new PulsarService(config4); + metricReader4 = InMemoryMetricReader.create(); + pulsar4 = buildPulsarService(config4, metricReader4); pulsar4.start(); url4 = new URL(pulsar4.getWebServiceAddress()); @@ -312,6 +319,14 @@ protected void setup() throws Exception { } + private PulsarService buildPulsarService(ServiceConfiguration config, InMemoryMetricReader metricReader) { + return new PulsarService(config, + new WorkerConfig(), + Optional.empty(), + exitCode -> log.info("Pulsar service finished with exit code {}", exitCode), + BrokerOpenTelemetryTestUtil.getOpenTelemetrySdkBuilderConsumer(metricReader)); + } + public void setConfig3DefaultValue() { setConfigDefaults(config3, cluster3, bkEnsemble3); config3.setTlsEnabled(true); @@ -409,6 +424,23 @@ protected void cleanup() throws Exception { admin4 = null; } + if (metricReader4 != null) { + metricReader4.close(); + metricReader4 = null; + } + if (metricReader3 != null) { + metricReader3.close(); + metricReader3 = null; + } + if (metricReader2 != null) { + metricReader2.close(); + metricReader2 = null; + } + if (metricReader1 != null) { + metricReader1.close(); + metricReader1 = null; + } + if (pulsar4 != null) { pulsar4.close(); pulsar4 = null; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/BrokerOpenTelemetryTestUtil.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/BrokerOpenTelemetryTestUtil.java index cb61677ab953d..d7ad0588201d4 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/BrokerOpenTelemetryTestUtil.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/BrokerOpenTelemetryTestUtil.java @@ -89,4 +89,22 @@ public static void assertMetricLongGaugeValue(Collection metrics, St valueConsumer.accept(point.getValue()); })))); } + + public static void assertMetricDoubleGaugeValue(Collection metrics, String metricName, + Attributes attributes, double expected) { + assertMetricDoubleGaugeValue(metrics, metricName, attributes, actual -> assertThat(actual).isEqualTo(expected)); + } + + public static void assertMetricDoubleGaugeValue(Collection metrics, String metricName, + Attributes attributes, Consumer valueConsumer) { + assertThat(metrics) + .anySatisfy(metric -> assertThat(metric) + .hasName(metricName) + .hasDoubleGaugeSatisfying(gauge -> gauge.satisfies( + pointData -> assertThat(pointData.getPoints()).anySatisfy( + point -> { + assertThat(point.getAttributes()).isEqualTo(attributes); + valueConsumer.accept(point.getValue()); + })))); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java index e99802a5bc5c4..157df1185307a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java @@ -193,6 +193,7 @@ public void testMultipleBrokerLookup() throws Exception { pulsar2.getOpenTelemetryTopicStats().close(); pulsar2.getOpenTelemetryConsumerStats().close(); pulsar2.getOpenTelemetryProducerStats().close(); + pulsar2.getOpenTelemetryReplicatorStats().close(); var metricReader = pulsarTestContext.getOpenTelemetryMetricReader(); var lookupRequestSemaphoreField = BrokerService.class.getDeclaredField("lookupRequestSemaphore"); diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/NonPersistentReplicatorStats.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/NonPersistentReplicatorStats.java index 6c77de9195786..bfeeb6d037a78 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/NonPersistentReplicatorStats.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/NonPersistentReplicatorStats.java @@ -27,4 +27,7 @@ public interface NonPersistentReplicatorStats extends ReplicatorStats { * for non-persistent topic: broker drops msg for replicator if replicator connection is not writable. **/ double getMsgDropRate(); + + /** Total number of messages dropped by the broker for the replicator. */ + long getMsgDropCount(); } diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ReplicatorStats.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ReplicatorStats.java index 24be2f9380bb7..1790cc35f50c5 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ReplicatorStats.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ReplicatorStats.java @@ -24,20 +24,40 @@ public interface ReplicatorStats { /** Total rate of messages received from the remote cluster (msg/s). */ + @Deprecated double getMsgRateIn(); + /** Total number of messages received from the remote cluster. */ + long getMsgInCount(); + /** Total throughput received from the remote cluster (bytes/s). */ + @Deprecated double getMsgThroughputIn(); + /** Total number of bytes received from the remote cluster. */ + long getBytesInCount(); + /** Total rate of messages delivered to the replication-subscriber (msg/s). */ + @Deprecated double getMsgRateOut(); + /** Total number of messages sent to the remote cluster. */ + long getMsgOutCount(); + /** Total throughput delivered to the replication-subscriber (bytes/s). */ + @Deprecated double getMsgThroughputOut(); + /** Total number of bytes sent to the remote cluster. */ + long getBytesOutCount(); + /** Total rate of messages expired (msg/s). */ + @Deprecated double getMsgRateExpired(); + /** Total number of messages expired. */ + long getMsgExpiredCount(); + /** Number of messages pending to be replicated to remote cluster. */ long getReplicationBacklog(); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/NonPersistentReplicatorStatsImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/NonPersistentReplicatorStatsImpl.java index 98f838a94493c..a09d03b21a03a 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/NonPersistentReplicatorStatsImpl.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/NonPersistentReplicatorStatsImpl.java @@ -18,27 +18,43 @@ */ package org.apache.pulsar.common.policies.data.stats; -import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; import java.util.Objects; -import lombok.Getter; +import java.util.concurrent.atomic.LongAdder; +import lombok.Data; +import lombok.EqualsAndHashCode; import org.apache.pulsar.common.policies.data.NonPersistentReplicatorStats; /** * Statistics for a non-persistent replicator. */ -@SuppressFBWarnings("EQ_DOESNT_OVERRIDE_EQUALS") +@Data +@EqualsAndHashCode(callSuper = true) public class NonPersistentReplicatorStatsImpl extends ReplicatorStatsImpl implements NonPersistentReplicatorStats { /** * for non-persistent topic: broker drops msg for replicator if replicator connection is not writable. **/ - @Getter public double msgDropRate; + @JsonIgnore + private final LongAdder msgDropCount = new LongAdder(); + public NonPersistentReplicatorStatsImpl add(NonPersistentReplicatorStatsImpl stats) { Objects.requireNonNull(stats); super.add(stats); this.msgDropRate += stats.msgDropRate; return this; } + + @Override + @JsonProperty + public long getMsgDropCount() { + return msgDropCount.sum(); + } + + public void incrementMsgDropCount() { + msgDropCount.increment(); + } } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/ReplicatorStatsImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/ReplicatorStatsImpl.java index 6933f5cc7ed76..c19169cbee57f 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/ReplicatorStatsImpl.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/ReplicatorStatsImpl.java @@ -18,7 +18,10 @@ */ package org.apache.pulsar.common.policies.data.stats; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; import java.util.Objects; +import java.util.concurrent.atomic.LongAdder; import lombok.Data; import org.apache.pulsar.common.policies.data.ReplicatorStats; @@ -31,15 +34,27 @@ public class ReplicatorStatsImpl implements ReplicatorStats { /** Total rate of messages received from the remote cluster (msg/s). */ public double msgRateIn; + @JsonIgnore + private final LongAdder msgInCount = new LongAdder(); + /** Total throughput received from the remote cluster (bytes/s). */ public double msgThroughputIn; + @JsonIgnore + private final LongAdder bytesInCount = new LongAdder(); + /** Total rate of messages delivered to the replication-subscriber (msg/s). */ public double msgRateOut; + @JsonIgnore + private final LongAdder msgOutCount = new LongAdder(); + /** Total throughput delivered to the replication-subscriber (bytes/s). */ public double msgThroughputOut; + @JsonIgnore + private final LongAdder bytesOutCount = new LongAdder(); + /** Total rate of messages expired (msg/s). */ public double msgRateExpired; @@ -72,10 +87,51 @@ public ReplicatorStatsImpl add(ReplicatorStatsImpl stats) { this.msgThroughputOut += stats.msgThroughputOut; this.msgRateExpired += stats.msgRateExpired; this.replicationBacklog += stats.replicationBacklog; - if (this.connected) { - this.connected &= stats.connected; - } + this.connected &= stats.connected; this.replicationDelayInSeconds = Math.max(this.replicationDelayInSeconds, stats.replicationDelayInSeconds); return this; } + + @Override + @JsonProperty + public long getMsgInCount() { + return msgInCount.sum(); + } + + @Override + @JsonProperty + public long getBytesInCount() { + return bytesInCount.sum(); + } + + public void incrementPublishCount(int numOfMessages, long msgSizeInBytes) { + msgInCount.add(numOfMessages); + bytesInCount.add(msgSizeInBytes); + } + + @Override + @JsonProperty + public long getMsgOutCount() { + return msgOutCount.sum(); + } + + public void incrementMsgOutCounter() { + msgOutCount.increment(); + } + + @Override + @JsonProperty + public long getBytesOutCount() { + return bytesOutCount.sum(); + } + + public void incrementBytesOutCounter(long bytes) { + bytesOutCount.add(bytes); + } + + @Override + @JsonProperty + public long getMsgExpiredCount() { + return 0; + } } diff --git a/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java b/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java index 6639cd68b398e..31e527f02869e 100644 --- a/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java +++ b/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java @@ -143,6 +143,12 @@ enum BacklogQuotaType { public final Attributes attributes = Attributes.of(PULSAR_BACKLOG_QUOTA_TYPE, name().toLowerCase()); } + /** + * The name of the remote cluster for a Pulsar replicator. + */ + AttributeKey PULSAR_REPLICATION_REMOTE_CLUSTER_NAME = + AttributeKey.stringKey("pulsar.replication.remote.cluster.name"); + AttributeKey PULSAR_CONNECTION_STATUS = AttributeKey.stringKey("pulsar.connection.status"); enum ConnectionStatus { ACTIVE,