Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat][broker] PIP-264: Add topic messaging metrics #22467

Merged
merged 59 commits into from
May 1, 2024
Merged
Show file tree
Hide file tree
Changes from 45 commits
Commits
Show all changes
59 commits
Select commit Hold shift + click to select a range
42c4020
Add dummy topic stats class
dragosvictor Mar 21, 2024
52ccbd2
Draft
dragosvictor Mar 21, 2024
fb18d7c
Merge remote-tracking branch 'origin/master' into pip-264-messaging-t…
dragosvictor Mar 25, 2024
a92ba91
Plumb metric source
dragosvictor Mar 26, 2024
95ecd25
Add more metrics
dragosvictor Mar 26, 2024
6cc4c25
Update metric definitions
dragosvictor Apr 3, 2024
8a5a61b
Update metric definitions
dragosvictor Apr 3, 2024
911aa75
Merge remote-tracking branch 'origin/master' into pip-264-messaging-t…
dragosvictor Apr 3, 2024
eca99d0
Build fix
dragosvictor Apr 3, 2024
8b17a54
Add field AbstractTopic.totalPublishRateLimitedCounter
dragosvictor Apr 3, 2024
d87ea79
Add pulsar.transaction.status attribute
dragosvictor Apr 3, 2024
b237145
Add pulsar.backlog.quota.type attribute
dragosvictor Apr 3, 2024
d1cb2e9
Update otel TopicStats recordings
dragosvictor Apr 3, 2024
c7806b2
Remove redundant CompactionRecord.reset method
dragosvictor Apr 4, 2024
96570a2
Add total count getters for ManagedLedgerMBean
dragosvictor Apr 4, 2024
53dc110
Update topic metric sources
dragosvictor Apr 4, 2024
77c8a45
Add PulsarDeprecatedMetric annotations
dragosvictor Apr 4, 2024
cf5d4de
Add more compaction metrics
dragosvictor Apr 4, 2024
5a67b47
Update Topic Messaging metrics
dragosvictor Apr 4, 2024
f32624c
Update compaction bytes in/out counters
dragosvictor Apr 4, 2024
10ad983
Update descriptions
dragosvictor Apr 4, 2024
10aea2e
Update backlog quota metrics
dragosvictor Apr 4, 2024
0722ebc
Add test skeleton
dragosvictor Apr 5, 2024
fa69891
Move assertLongSumValue to MockedPulsarServiceBaseTest.assertOtelMetr…
dragosvictor Apr 5, 2024
bcfb1da
Add method TopicName.getPartitionedTopicLocalName
dragosvictor Apr 6, 2024
ea2129c
Ignore null dispatchers
dragosvictor Apr 6, 2024
7f8dcf0
Split topic name into different attributes
dragosvictor Apr 6, 2024
deedcb6
Test draft
dragosvictor Apr 6, 2024
dd78f44
Update tests
dragosvictor Apr 8, 2024
d76ca77
Update backlog size tests
dragosvictor Apr 8, 2024
9e56efc
Update quota age tests
dragosvictor Apr 8, 2024
636c4fb
Update compactor tests
dragosvictor Apr 9, 2024
8259ecf
Update transaction test
dragosvictor Apr 9, 2024
e176fb1
Checkstyle fixes
dragosvictor Apr 9, 2024
813d5a9
Add delayed subscription meter test
dragosvictor Apr 9, 2024
034b9a9
Update testMessagingMetrics
dragosvictor Apr 9, 2024
143dc5e
Merge remote-tracking branch 'origin/master' into pip-264-topic-messa…
dragosvictor Apr 9, 2024
8d56ecd
Test storage offload metric
dragosvictor Apr 9, 2024
9ce943b
Verify publish rate limit metric
dragosvictor Apr 9, 2024
aec00e8
Fix typo in metric name
dragosvictor Apr 9, 2024
f805c21
Cosmetic fixes
dragosvictor Apr 9, 2024
5cae0c5
Remove redundant attribute PULSAR_STORAGE_TYPE
dragosvictor Apr 9, 2024
6bec0d4
Tidy up
dragosvictor Apr 9, 2024
94f559e
Merge remote-tracking branch 'origin/master' into pip-264-topic-messa…
dragosvictor Apr 9, 2024
8cf57a4
Fix BrokerServiceLookupTest#testMultipleBrokerLookup
dragosvictor Apr 10, 2024
f570bfa
Merge remote-tracking branch 'origin/master' into pip-264-topic-messa…
dragosvictor Apr 22, 2024
e20760f
Use pulsar.namespace and pulsar.topic with same semantics as client
dragosvictor Apr 22, 2024
16fd52e
Fix tests
dragosvictor Apr 23, 2024
b8f7ba8
Revert "Add method TopicName.getPartitionedTopicLocalName"
dragosvictor Apr 23, 2024
5279821
Remove consumer.msg.ack counter
dragosvictor Apr 23, 2024
1e1d4d1
Clarify semantics of pulsar.broker.topic.compaction.removed.message.c…
dragosvictor Apr 23, 2024
41569be
Clarify storage metric semantics
dragosvictor Apr 23, 2024
80bb3f2
Refactor compaction operation counter metric
dragosvictor Apr 24, 2024
a510ee3
Rename metrics
dragosvictor Apr 24, 2024
3686d5b
Fix metric units for IS measurements
dragosvictor Apr 24, 2024
ec49622
Merge remote-tracking branch 'origin/master' into pip-264-topic-messa…
dragosvictor Apr 24, 2024
f3e8bfa
Fix counter types
dragosvictor Apr 24, 2024
265f0fc
Use metric type Gauge for time based backlog quota metrics
dragosvictor Apr 24, 2024
f8f2e9a
Fix metric type expectation in test BacklogQuotaManagerTest#testConsu…
dragosvictor Apr 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,11 @@ public interface ManagedLedgerMXBean {
*/
long getAddEntrySucceed();

/**
* @return the total number of addEntry requests that succeeded
*/
long getAddEntrySucceedTotal();

/**
* @return the number of addEntry requests that failed
*/
Expand All @@ -100,6 +105,11 @@ public interface ManagedLedgerMXBean {
*/
long getReadEntriesSucceeded();

/**
* @return the total number of readEntries requests that succeeded
*/
long getReadEntriesSucceededTotal();

/**
* @return the number of readEntries requests that failed
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,11 @@ public long getAddEntrySucceed() {
return addEntryOps.getCount();
}

@Override
public long getAddEntrySucceedTotal() {
return addEntryOps.getTotalCount();
}

@Override
public long getAddEntryErrors() {
return addEntryOpsFailed.getCount();
Expand All @@ -240,6 +245,11 @@ public long getReadEntriesSucceeded() {
return readEntriesOps.getCount();
}

@Override
public long getReadEntriesSucceededTotal() {
return readEntriesOps.getTotalCount();
}

@Override
public long getReadEntriesErrors() {
return readEntriesOpsFailed.getCount();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,12 @@ public void simple() throws Exception {
assertEquals(mbean.getAddEntryWithReplicasBytesRate(), 0.0);
assertEquals(mbean.getAddEntryMessagesRate(), 0.0);
assertEquals(mbean.getAddEntrySucceed(), 0);
assertEquals(mbean.getAddEntrySucceedTotal(), 0);
assertEquals(mbean.getAddEntryErrors(), 0);
assertEquals(mbean.getReadEntriesBytesRate(), 0.0);
assertEquals(mbean.getReadEntriesRate(), 0.0);
assertEquals(mbean.getReadEntriesSucceeded(), 0);
assertEquals(mbean.getReadEntriesSucceededTotal(), 0);
assertEquals(mbean.getReadEntriesErrors(), 0);
assertEquals(mbean.getMarkDeleteRate(), 0.0);

Expand All @@ -105,10 +107,12 @@ public void simple() throws Exception {
assertEquals(mbean.getAddEntryWithReplicasBytesRate(), 1600.0);
assertEquals(mbean.getAddEntryMessagesRate(), 2.0);
assertEquals(mbean.getAddEntrySucceed(), 2);
assertEquals(mbean.getAddEntrySucceedTotal(), 2);
assertEquals(mbean.getAddEntryErrors(), 0);
assertEquals(mbean.getReadEntriesBytesRate(), 0.0);
assertEquals(mbean.getReadEntriesRate(), 0.0);
assertEquals(mbean.getReadEntriesSucceeded(), 0);
assertEquals(mbean.getReadEntriesSucceededTotal(), 0);
assertEquals(mbean.getReadEntriesErrors(), 0);
assertTrue(mbean.getMarkDeleteRate() > 0.0);

Expand All @@ -134,10 +138,14 @@ public void simple() throws Exception {
assertEquals(mbean.getReadEntriesBytesRate(), 600.0);
assertEquals(mbean.getReadEntriesRate(), 1.0);
assertEquals(mbean.getReadEntriesSucceeded(), 1);
assertEquals(mbean.getReadEntriesSucceededTotal(), 1);
assertEquals(mbean.getReadEntriesErrors(), 0);
assertEquals(mbean.getNumberOfMessagesInBacklog(), 1);
assertEquals(mbean.getMarkDeleteRate(), 0.0);

assertEquals(mbean.getAddEntrySucceed(), 0);
assertEquals(mbean.getAddEntrySucceedTotal(), 2);

factory.shutdown();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@
import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
import org.apache.pulsar.broker.service.schema.SchemaStorageFactory;
import org.apache.pulsar.broker.stats.MetricsGenerator;
import org.apache.pulsar.broker.stats.OpenTelemetryTopicStats;
import org.apache.pulsar.broker.stats.PulsarBrokerOpenTelemetry;
import org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider;
import org.apache.pulsar.broker.stats.prometheus.PulsarPrometheusMetricsServlet;
Expand Down Expand Up @@ -251,6 +252,7 @@ public class PulsarService implements AutoCloseable, ShutdownService {

private MetricsGenerator metricsGenerator;
private final PulsarBrokerOpenTelemetry openTelemetry;
private OpenTelemetryTopicStats openTelemetryTopicStats;

private TransactionMetadataStoreService transactionMetadataStoreService;
private TransactionBufferProvider transactionBufferProvider;
Expand Down Expand Up @@ -622,6 +624,10 @@ public CompletableFuture<Void> closeAsync() {
brokerClientSharedTimer.stop();
monotonicSnapshotClock.close();

if (openTelemetryTopicStats != null) {
openTelemetryTopicStats.close();
}

asyncCloseFutures.add(EventLoopUtil.shutdownGracefully(ioEventLoopGroup));


Expand Down Expand Up @@ -762,6 +768,8 @@ public void start() throws PulsarServerException {
config.getDefaultRetentionTimeInMinutes() * 60));
}

openTelemetryTopicStats = new OpenTelemetryTopicStats(this);

localMetadataSynchronizer = StringUtils.isNotBlank(config.getMetadataSyncEventTopic())
? new PulsarMetadataEventSynchronizer(this, config.getMetadataSyncEventTopic())
: null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,9 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicP
private static final AtomicLongFieldUpdater<AbstractTopic> RATE_LIMITED_UPDATER =
AtomicLongFieldUpdater.newUpdater(AbstractTopic.class, "publishRateLimitedTimes");
protected volatile long publishRateLimitedTimes = 0L;
private static final AtomicLongFieldUpdater<AbstractTopic> TOTAL_RATE_LIMITED_UPDATER =
AtomicLongFieldUpdater.newUpdater(AbstractTopic.class, "totalPublishRateLimitedCounter");
protected volatile long totalPublishRateLimitedCounter = 0L;

private static final AtomicIntegerFieldUpdater<AbstractTopic> USER_CREATED_PRODUCER_COUNTER_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(AbstractTopic.class, "userCreatedProducerCount");
Expand Down Expand Up @@ -887,6 +890,7 @@ public void recordAddLatency(long latency, TimeUnit unit) {

@Override
public long increasePublishLimitedTimes() {
TOTAL_RATE_LIMITED_UPDATER.incrementAndGet(this);
return RATE_LIMITED_UPDATER.incrementAndGet(this);
}

Expand Down Expand Up @@ -1174,6 +1178,10 @@ public long getBytesOutCounter() {
+ sumSubscriptions(AbstractSubscription::getBytesOutCounter);
}

public long getTotalPublishRateLimitCounter() {
return TOTAL_RATE_LIMITED_UPDATER.get(this);
}

private long sumSubscriptions(ToLongFunction<AbstractSubscription> toCounter) {
return getSubscriptions().values().stream()
.map(AbstractSubscription.class::cast)
Expand Down
Loading
Loading