Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat][broker] PIP-264: Add OpenTelemetry broker replicator metrics #22972

Merged
merged 54 commits into from
Jun 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
3834f50
Add Replicator stats draft
dragosvictor Apr 11, 2024
5297854
Dummy commit
dragosvictor Apr 15, 2024
f1ca793
Merge remote-tracking branch 'origin/master' into pip-264-replication…
dragosvictor May 22, 2024
7642b8b
Fix NPE bug in [Non]PersistentReplicator.getStats
dragosvictor May 22, 2024
37759a9
Add Replicator.getTopic method
dragosvictor May 22, 2024
8dc58e8
Checkstyle fix
dragosvictor May 22, 2024
a063034
Factor out AbstractReplicator.getReplicationDelayMs
dragosvictor May 23, 2024
012cb1f
Factor out AbstractReplicator.isConnected
dragosvictor May 23, 2024
5f056fa
Refactor ReplicatorStats
dragosvictor May 23, 2024
1132518
Fix possible NPEs in PersistentReplicator.updateCursorState
dragosvictor May 23, 2024
1fea1ac
Rename Replicator.getTopic to Replicator.getLocalTopic
dragosvictor May 23, 2024
4d85952
Instantiate OpenTelemetryReplicatorStats field in PulsarService
dragosvictor May 23, 2024
7540e9e
Enable OpenTelemetry in ReplicatorTest
dragosvictor May 23, 2024
b6a30fd
Update ReplicatorStats implementation
dragosvictor May 23, 2024
4ef7ee7
Update stat values
dragosvictor May 23, 2024
ebbe49e
Add draft OpenTelemetryReplicatorStatsTest
dragosvictor May 23, 2024
b505a28
Merge remote-tracking branch 'origin/master' into pip-264-replication…
dragosvictor May 23, 2024
154f0bc
Remove redundant cursor null pointer checks in PersistentReplicator
dragosvictor May 23, 2024
36d5d2c
Checkstyle fix
dragosvictor May 23, 2024
23e24a2
Update stats access
dragosvictor May 23, 2024
6173af8
Skip redundant null check
dragosvictor May 23, 2024
325ffca
Debug
dragosvictor May 24, 2024
b008ade
Debug
dragosvictor May 24, 2024
6af06f9
Aadd default implementations for ReplicatorStats methods
dragosvictor May 24, 2024
ed9be19
Fix test errors caused by stats serialization
dragosvictor May 24, 2024
2f71eb0
Spotbugs fixes
dragosvictor May 24, 2024
47c8617
Merge remote-tracking branch 'origin/master' into pip-264-replication…
dragosvictor May 28, 2024
5d20675
Fix BrokerServiceLookupTest.testMultipleBrokerLookup
dragosvictor May 28, 2024
2b2d18d
Add comments
dragosvictor May 28, 2024
1228a03
Add replicator stats attributes
dragosvictor May 28, 2024
410d31e
Draft ReplicatorTest#testReplicationMetrics
dragosvictor May 28, 2024
2b25472
Fix test
dragosvictor May 28, 2024
5abbae3
Test fix
dragosvictor May 28, 2024
e1b1d0e
Validate backlog counter
dragosvictor May 28, 2024
a5789a4
Implement dropped message counter
dragosvictor May 28, 2024
95aac51
Validate delay metric
dragosvictor May 28, 2024
1b8739e
Validate expired metric
dragosvictor May 29, 2024
ad49a82
Test cleanup
dragosvictor May 29, 2024
34e3fe8
Merge remote-tracking branch 'origin/master' into pip-264-replication…
dragosvictor May 29, 2024
03a7dd5
Merge remote-tracking branch 'origin/master' into pip-264-replication…
dragosvictor Jun 13, 2024
ee03759
Cosmetic fixes
dragosvictor Jun 13, 2024
8168cd2
Merge remote-tracking branch 'origin/master' into pip-264-replication…
dragosvictor Jun 13, 2024
c3fc519
Merge remote-tracking branch 'origin/master' into pip-264-replication…
dragosvictor Jun 14, 2024
92354e9
Merge remote-tracking branch 'origin/master' into pip-264-replication…
dragosvictor Jun 15, 2024
7469faf
Merge remote-tracking branch 'origin/master' into pip-264-replication…
dragosvictor Jun 17, 2024
92f7ce8
Fix merge conflicts
dragosvictor Jun 17, 2024
a08b692
Merge remote-tracking branch 'origin/master' into pip-264-replication…
dragosvictor Jun 24, 2024
7390e07
Rename Replicator.getStats -> Replicator.computeStats for improved cl…
dragosvictor Jun 24, 2024
94c40d9
Increment replicator message publish counters
dragosvictor Jun 24, 2024
58b0c81
Skip exceptional topic futures during otel collections
dragosvictor Jun 24, 2024
0b8c93a
Cache attributes object in AbstractReplicator field
dragosvictor Jun 24, 2024
bea5073
Update ReplicatorTest#testReplicationMetrics
dragosvictor Jun 24, 2024
9a9c35a
Remove hardcoded pulsar.broker.replication.connected.count metric
dragosvictor Jun 25, 2024
2a8534a
Cosmetic fixes
dragosvictor Jun 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@
import org.apache.pulsar.broker.stats.MetricsGenerator;
import org.apache.pulsar.broker.stats.OpenTelemetryConsumerStats;
import org.apache.pulsar.broker.stats.OpenTelemetryProducerStats;
import org.apache.pulsar.broker.stats.OpenTelemetryReplicatorStats;
import org.apache.pulsar.broker.stats.OpenTelemetryTopicStats;
import org.apache.pulsar.broker.stats.PulsarBrokerOpenTelemetry;
import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsServlet;
Expand Down Expand Up @@ -260,6 +261,7 @@ public class PulsarService implements AutoCloseable, ShutdownService {
private OpenTelemetryTopicStats openTelemetryTopicStats;
private OpenTelemetryConsumerStats openTelemetryConsumerStats;
private OpenTelemetryProducerStats openTelemetryProducerStats;
private OpenTelemetryReplicatorStats openTelemetryReplicatorStats;

private TransactionMetadataStoreService transactionMetadataStoreService;
private TransactionBufferProvider transactionBufferProvider;
Expand Down Expand Up @@ -678,6 +680,10 @@ public CompletableFuture<Void> closeAsync() {
brokerClientSharedTimer.stop();
monotonicSnapshotClock.close();

if (openTelemetryReplicatorStats != null) {
openTelemetryReplicatorStats.close();
openTelemetryReplicatorStats = null;
}
if (openTelemetryProducerStats != null) {
openTelemetryProducerStats.close();
openTelemetryProducerStats = null;
Expand Down Expand Up @@ -834,6 +840,7 @@ public void start() throws PulsarServerException {
openTelemetryTopicStats = new OpenTelemetryTopicStats(this);
openTelemetryConsumerStats = new OpenTelemetryConsumerStats(this);
openTelemetryProducerStats = new OpenTelemetryProducerStats(this);
openTelemetryReplicatorStats = new OpenTelemetryReplicatorStats(this);

localMetadataSynchronizer = StringUtils.isNotBlank(config.getMetadataSyncEventTopic())
? new PulsarMetadataEventSynchronizer(this, config.getMetadataSyncEventTopic())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.broker.service;

import com.google.common.annotations.VisibleForTesting;
import io.opentelemetry.api.common.Attributes;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
Expand All @@ -42,6 +43,7 @@
import org.apache.pulsar.common.util.Backoff;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.StringInterner;
import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -55,6 +57,7 @@ public abstract class AbstractReplicator implements Replicator {
protected final PulsarClientImpl replicationClient;
protected final PulsarClientImpl client;
protected String replicatorId;
@Getter
protected final Topic localTopic;

protected volatile ProducerImpl producer;
Expand All @@ -74,6 +77,10 @@ public abstract class AbstractReplicator implements Replicator {
@Getter
protected volatile State state = State.Disconnected;

private volatile Attributes attributes = null;
private static final AtomicReferenceFieldUpdater<AbstractReplicator, Attributes> ATTRIBUTES_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(AbstractReplicator.class, Attributes.class, "attributes");

public enum State {
/**
* This enum has two mean meanings:
Expand Down Expand Up @@ -136,6 +143,17 @@ public AbstractReplicator(String localCluster, Topic localTopic, String remoteCl

protected abstract void disableReplicatorRead();

@Override
public boolean isConnected() {
var producer = this.producer;
return producer != null && producer.isConnected();
}

public long getReplicationDelayMs() {
var producer = this.producer;
return producer == null ? 0 : producer.getDelayInMillis();
}

public String getRemoteCluster() {
return remoteCluster;
}
Expand Down Expand Up @@ -476,4 +494,26 @@ protected ImmutablePair<Boolean, State> compareSetAndGetState(State expect, Stat
public boolean isTerminated() {
return state == State.Terminating || state == State.Terminated;
}

public Attributes getAttributes() {
if (attributes != null) {
return attributes;
}
return ATTRIBUTES_UPDATER.updateAndGet(this, old -> {
if (old != null) {
return old;
}
var topicName = TopicName.get(getLocalTopic().getName());
var builder = Attributes.builder()
.put(OpenTelemetryAttributes.PULSAR_DOMAIN, topicName.getDomain().toString())
.put(OpenTelemetryAttributes.PULSAR_TENANT, topicName.getTenant())
.put(OpenTelemetryAttributes.PULSAR_NAMESPACE, topicName.getNamespace())
.put(OpenTelemetryAttributes.PULSAR_TOPIC, topicName.getPartitionedTopicName());
if (topicName.isPartitioned()) {
builder.put(OpenTelemetryAttributes.PULSAR_PARTITION_INDEX, topicName.getPartitionIndex());
}
builder.put(OpenTelemetryAttributes.PULSAR_REPLICATION_REMOTE_CLUSTER_NAME, getRemoteCluster());
return builder.build();
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -933,6 +933,14 @@ public void incrementPublishCount(Producer producer, int numOfMessages, long msg
if (isSystemTopic()) {
systemTopicBytesInCounter.add(msgSizeInBytes);
}

if (producer.isRemote()) {
var remoteClusterName = producer.getRemoteCluster();
var replicator = getReplicators().get(remoteClusterName);
if (replicator != null) {
replicator.getStats().incrementPublishCount(numOfMessages, msgSizeInBytes);
}
}
}

private void handlePublishThrottling(Producer producer, int numOfMessages, long msgSizeInBytes) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ public interface Replicator {

void startProducer();

ReplicatorStatsImpl getStats();
Topic getLocalTopic();

ReplicatorStatsImpl computeStats();

CompletableFuture<Void> terminate();

Expand All @@ -53,4 +55,6 @@ default Optional<DispatchRateLimiter> getRateLimiter() {
long getNumberOfEntriesInBacklog();

boolean isTerminated();

ReplicatorStatsImpl getStats();
}
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,8 @@ public void sendMessage(Entry entry) {
}

msgOut.recordEvent(headersAndPayload.readableBytes());
stats.incrementMsgOutCounter();
stats.incrementBytesOutCounter(headersAndPayload.readableBytes());

msg.setReplicatedFrom(localCluster);

Expand All @@ -129,6 +131,7 @@ public void sendMessage(Entry entry) {
replicatorId);
}
msgDrop.recordEvent();
stats.incrementMsgDropCount();
entry.release();
}
}
Expand All @@ -143,11 +146,11 @@ public void updateRates() {
}

@Override
public NonPersistentReplicatorStatsImpl getStats() {
stats.connected = producer != null && producer.isConnected();
stats.replicationDelayInSeconds = getReplicationDelayInSeconds();

public NonPersistentReplicatorStatsImpl computeStats() {
ProducerImpl producer = this.producer;
stats.connected = isConnected();
stats.replicationDelayInSeconds = TimeUnit.MILLISECONDS.toSeconds(getReplicationDelayMs());

if (producer != null) {
stats.outboundConnection = producer.getConnectionId();
stats.outboundConnectedSince = producer.getConnectedSince();
Expand All @@ -159,11 +162,9 @@ public NonPersistentReplicatorStatsImpl getStats() {
return stats;
}

private long getReplicationDelayInSeconds() {
if (producer != null) {
return TimeUnit.MILLISECONDS.toSeconds(producer.getDelayInMillis());
}
return 0L;
@Override
public NonPersistentReplicatorStatsImpl getStats() {
return stats;
}

private static final class ProducerSendCallback implements SendCallback {
Expand Down Expand Up @@ -256,10 +257,4 @@ public long getNumberOfEntriesInBacklog() {
protected void disableReplicatorRead() {
// No-op
}

@Override
public boolean isConnected() {
ProducerImpl<?> producer = this.producer;
return producer != null && producer.isConnected();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -961,7 +961,7 @@ public CompletableFuture<? extends TopicStatsImpl> asyncGetStats(GetStatsOptions
});

replicators.forEach((cluster, replicator) -> {
NonPersistentReplicatorStatsImpl replicatorStats = replicator.getStats();
NonPersistentReplicatorStatsImpl replicatorStats = replicator.computeStats();

// Add incoming msg rates
PublisherStatsImpl pubStats = remotePublishersStats.get(replicator.getRemoteCluster());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,8 @@ protected boolean replicateEntries(List<Entry> entries) {
msg.getMessageBuilder().clearTxnidMostBits();
msg.getMessageBuilder().clearTxnidLeastBits();
msgOut.recordEvent(headersAndPayload.readableBytes());
stats.incrementMsgOutCounter();
stats.incrementBytesOutCounter(headersAndPayload.readableBytes());
// Increment pending messages for messages produced locally
PENDING_MESSAGES_UPDATER.incrementAndGet(this);
producer.sendAsync(msg, ProducerSendCallback.create(this, entry, msg));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,13 @@
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import lombok.Getter;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ClearBacklogCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCallback;
Expand Down Expand Up @@ -107,7 +109,8 @@ public abstract class PersistentReplicator extends AbstractReplicator
// for connected subscriptions, message expiry will be checked if the backlog is greater than this threshold
private static final int MINIMUM_BACKLOG_FOR_EXPIRY_CHECK = 1000;

private final ReplicatorStatsImpl stats = new ReplicatorStatsImpl();
@Getter
protected final ReplicatorStatsImpl stats = new ReplicatorStatsImpl();

protected volatile boolean fetchSchemaInProgress = false;

Expand All @@ -118,7 +121,7 @@ public PersistentReplicator(String localCluster, PersistentTopic localTopic, Man
super(localCluster, localTopic, remoteCluster, remoteTopic, localTopic.getReplicatorPrefix(),
brokerService, replicationClient);
this.topic = localTopic;
this.cursor = cursor;
this.cursor = Objects.requireNonNull(cursor);
this.expiryMonitor = new PersistentMessageExpiryMonitor(localTopic,
Codec.decode(cursor.getName()), cursor, null);
HAVE_PENDING_READ_UPDATER.set(this, FALSE);
Expand Down Expand Up @@ -186,12 +189,14 @@ public long getNumberOfEntriesInBacklog() {
return cursor.getNumberOfEntriesInBacklog(true);
}

public long getMessageExpiredCount() {
return expiryMonitor.getTotalMessageExpired();
}

@Override
protected void disableReplicatorRead() {
if (this.cursor != null) {
// deactivate cursor after successfully close the producer
this.cursor.setInactive();
}
// deactivate cursor after successfully close the producer
this.cursor.setInactive();
}

/**
Expand Down Expand Up @@ -330,12 +335,10 @@ protected CompletableFuture<SchemaInfo> getSchemaInfo(MessageImpl msg) throws Ex
}

public void updateCursorState() {
if (this.cursor != null) {
if (producer != null && producer.isConnected()) {
this.cursor.setActive();
} else {
this.cursor.setInactive();
}
if (isConnected()) {
cursor.setActive();
} else {
cursor.setInactive();
}
}

Expand Down Expand Up @@ -595,10 +598,10 @@ public void updateRates() {
stats.msgRateExpired = msgExpired.getRate() + expiryMonitor.getMessageExpiryRate();
}

public ReplicatorStatsImpl getStats() {
stats.replicationBacklog = cursor != null ? cursor.getNumberOfEntriesInBacklog(false) : 0;
stats.connected = producer != null && producer.isConnected();
stats.replicationDelayInSeconds = getReplicationDelayInSeconds();
public ReplicatorStatsImpl computeStats() {
stats.replicationBacklog = cursor.getNumberOfEntriesInBacklog(false);
stats.connected = isConnected();
stats.replicationDelayInSeconds = TimeUnit.MILLISECONDS.toSeconds(getReplicationDelayMs());

ProducerImpl producer = this.producer;
if (producer != null) {
Expand All @@ -616,13 +619,6 @@ public void updateMessageTTL(int messageTTLInSeconds) {
this.messageTTLInSeconds = messageTTLInSeconds;
}

private long getReplicationDelayInSeconds() {
if (producer != null) {
return TimeUnit.MILLISECONDS.toSeconds(producer.getDelayInMillis());
}
return 0L;
}

@Override
public boolean expireMessages(int messageTTLInSeconds) {
if ((cursor.getNumberOfEntriesInBacklog(false) == 0)
Expand Down Expand Up @@ -691,12 +687,6 @@ protected void checkReplicatedSubscriptionMarker(Position position, MessageImpl<
}
}

@Override
public boolean isConnected() {
ProducerImpl<?> producer = this.producer;
return producer != null && producer.isConnected();
}

@Override
protected void doReleaseResources() {
dispatchRateLimiter.ifPresent(DispatchRateLimiter::close);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2353,7 +2353,7 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats
}

// Update replicator stats
ReplicatorStatsImpl rStat = replicator.getStats();
ReplicatorStatsImpl rStat = replicator.computeStats();

// Add incoming msg rates
PublisherStatsImpl pubStats = topicStatsHelper.remotePublishersStats.get(replicator.getRemoteCluster());
Expand Down Expand Up @@ -2636,7 +2636,7 @@ public CompletableFuture<? extends TopicStatsImpl> asyncGetStats(GetStatsOptions
});

replicators.forEach((cluster, replicator) -> {
ReplicatorStatsImpl replicatorStats = replicator.getStats();
ReplicatorStatsImpl replicatorStats = replicator.computeStats();

// Add incoming msg rates
PublisherStatsImpl pubStats = remotePublishersStats.get(replicator.getRemoteCluster());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ protected boolean replicateEntries(List<Entry> entries) {
dispatchRateLimiter.ifPresent(rateLimiter -> rateLimiter.consumeDispatchQuota(1, entry.getLength()));

msgOut.recordEvent(headersAndPayload.readableBytes());
stats.incrementMsgOutCounter();
stats.incrementBytesOutCounter(headersAndPayload.readableBytes());

msg.setReplicatedFrom(localCluster);

Expand Down
Loading
Loading