Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][broker]PIP-255 Add topic metrics for the number of production data requests to add a topic and the average number of messages per request. #6

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ public class Producer {
private final String appId;
private final BrokerInterceptor brokerInterceptor;
private Rate msgIn;
private Rate requestIn;
private Rate chunkedMessageRate;
// it records msg-drop rate only for non-persistent topic
private final Rate msgDrop;
Expand Down Expand Up @@ -117,6 +118,7 @@ public Producer(Topic topic, TransportCnx cnx, long producerId, String producerN
this.closeFuture = new CompletableFuture<>();
this.appId = appId;
this.msgIn = new Rate();
this.requestIn = new Rate();
this.chunkedMessageRate = new Rate();
this.isNonPersistentTopic = topic instanceof NonPersistentTopic;
this.msgDrop = this.isNonPersistentTopic ? new Rate() : null;
Expand Down Expand Up @@ -272,7 +274,7 @@ public boolean checkAndStartPublish(long producerId, long sequenceId, ByteBuf he
private void publishMessageToTopic(ByteBuf headersAndPayload, long sequenceId, long batchSize, boolean isChunked,
boolean isMarker, Position position) {
MessagePublishContext messagePublishContext =
MessagePublishContext.get(this, sequenceId, msgIn, headersAndPayload.readableBytes(),
MessagePublishContext.get(this, sequenceId, msgIn, requestIn, headersAndPayload.readableBytes(),
batchSize, isChunked, System.nanoTime(), isMarker, position);
if (brokerInterceptor != null) {
brokerInterceptor
Expand All @@ -284,7 +286,7 @@ private void publishMessageToTopic(ByteBuf headersAndPayload, long sequenceId, l
private void publishMessageToTopic(ByteBuf headersAndPayload, long lowestSequenceId, long highestSequenceId,
long batchSize, boolean isChunked, boolean isMarker, Position position) {
MessagePublishContext messagePublishContext = MessagePublishContext.get(this, lowestSequenceId,
highestSequenceId, msgIn, headersAndPayload.readableBytes(), batchSize,
highestSequenceId, msgIn, requestIn, headersAndPayload.readableBytes(), batchSize,
isChunked, System.nanoTime(), isMarker, position);
if (brokerInterceptor != null) {
brokerInterceptor
Expand Down Expand Up @@ -377,6 +379,7 @@ private static final class MessagePublishContext implements PublishContext, Runn
private long ledgerId;
private long entryId;
private Rate rateIn;
private Rate requestIn;
private int msgSize;
private long batchSize;
private boolean chunked;
Expand Down Expand Up @@ -537,6 +540,7 @@ public void run() {

// stats
rateIn.recordMultipleEvents(batchSize, msgSize);
requestIn.recordMultipleEvents(1L, batchSize);
producer.topic.recordAddLatency(System.nanoTime() - startTimeNs, TimeUnit.NANOSECONDS);
producer.cnx.getCommandSender().sendSendReceiptResponse(producer.producerId, sequenceId, highestSequenceId,
ledgerId, entryId);
Expand All @@ -552,12 +556,13 @@ public void run() {
recycle();
}

static MessagePublishContext get(Producer producer, long sequenceId, Rate rateIn, int msgSize,
static MessagePublishContext get(Producer producer, long sequenceId, Rate rateIn, Rate requestIn, int msgSize,
long batchSize, boolean chunked, long startTimeNs, boolean isMarker, Position position) {
MessagePublishContext callback = RECYCLER.get();
callback.producer = producer;
callback.sequenceId = sequenceId;
callback.rateIn = rateIn;
callback.requestIn = requestIn;
callback.msgSize = msgSize;
callback.batchSize = batchSize;
callback.chunked = chunked;
Expand All @@ -574,12 +579,14 @@ static MessagePublishContext get(Producer producer, long sequenceId, Rate rateIn
}

static MessagePublishContext get(Producer producer, long lowestSequenceId, long highestSequenceId, Rate rateIn,
int msgSize, long batchSize, boolean chunked, long startTimeNs, boolean isMarker, Position position) {
Rate requestIn, int msgSize, long batchSize, boolean chunked, long startTimeNs, boolean isMarker,
Position position) {
MessagePublishContext callback = RECYCLER.get();
callback.producer = producer;
callback.sequenceId = lowestSequenceId;
callback.highestSequenceId = highestSequenceId;
callback.rateIn = rateIn;
callback.requestIn = requestIn;
callback.msgSize = msgSize;
callback.batchSize = batchSize;
callback.originalProducerName = null;
Expand Down Expand Up @@ -629,6 +636,7 @@ public void recycle() {
originalSequenceId = -1L;
originalHighestSequenceId = -1L;
rateIn = null;
requestIn = null;
msgSize = 0;
ledgerId = -1L;
entryId = -1L;
Expand Down Expand Up @@ -730,10 +738,13 @@ public void topicMigrated(Optional<ClusterUrl> clusterUrl) {

public void updateRates() {
msgIn.calculateRate();
requestIn.calculateRate();
chunkedMessageRate.calculateRate();
stats.msgRateIn = msgIn.getRate();
stats.msgThroughputIn = msgIn.getValueRate();
stats.averageMsgSize = msgIn.getAverageValue();
stats.requestRateIn = requestIn.getRate();
stats.averageMsgPerRequest = requestIn.getAverageValue();
stats.chunkedMessageRate = chunkedMessageRate.getRate();
if (chunkedMessageRate.getCount() > 0 && this.topic instanceof PersistentTopic) {
((PersistentTopic) this.topic).msgChunkPublished = true;
Expand Down Expand Up @@ -813,7 +824,7 @@ public void publishTxnMessage(TxnID txnID, long producerId, long sequenceId, lon
return;
}
MessagePublishContext messagePublishContext =
MessagePublishContext.get(this, sequenceId, highSequenceId, msgIn,
MessagePublishContext.get(this, sequenceId, highSequenceId, msgIn, requestIn,
headersAndPayload.readableBytes(), batchSize, isChunked, System.nanoTime(), isMarker, null);
if (brokerInterceptor != null) {
brokerInterceptor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2168,6 +2168,7 @@ public CompletableFuture<TopicStatsImpl> asyncGetStats(boolean getPreciseBacklog
PublisherStatsImpl publisherStats = producer.getStats();
stats.msgRateIn += publisherStats.msgRateIn;
stats.msgThroughputIn += publisherStats.msgThroughputIn;
stats.requestRateIn += publisherStats.requestRateIn;

if (producer.isRemote()) {
remotePublishersStats.put(producer.getRemoteCluster(), publisherStats);
Expand All @@ -2176,6 +2177,7 @@ public CompletableFuture<TopicStatsImpl> asyncGetStats(boolean getPreciseBacklog
}
});

stats.averageMsgPerRequest = stats.msgRateIn == 0.0 ? 0.0 : (stats.msgRateIn / stats.requestRateIn);
stats.averageMsgSize = stats.msgRateIn == 0.0 ? 0.0 : (stats.msgThroughputIn / stats.msgRateIn);
stats.msgInCounter = getMsgInCounter();
stats.bytesInCounter = getBytesInCounter();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,8 @@ public class AggregatedProducerStats {

public double averageMsgSize;

public double requestRateIn;

public double averageMsgPerRequest;

}
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ private static void getTopicStats(Topic topic, TopicStats stats, boolean include
stats.producersCount++;
stats.rateIn += producer.getStats().msgRateIn;
stats.throughputIn += producer.getStats().msgThroughputIn;
stats.requestRateIn += producer.getStats().requestRateIn;

if (includeProducerMetrics) {
AggregatedProducerStats producerStats = stats.producerStats.computeIfAbsent(
Expand All @@ -225,9 +226,12 @@ private static void getTopicStats(Topic topic, TopicStats stats, boolean include
producerStats.msgRateIn = producer.getStats().msgRateIn;
producerStats.msgThroughputIn = producer.getStats().msgThroughputIn;
producerStats.averageMsgSize = producer.getStats().averageMsgSize;
producerStats.requestRateIn = producer.getStats().requestRateIn;
producerStats.averageMsgPerRequest = producer.getStats().averageMsgPerRequest;
}
}
});
stats.averageMsgPerRequest = stats.rateIn / stats.requestRateIn;

if (topic instanceof PersistentTopic) {
tStatus.subscriptions.forEach((subName, subscriptionStats) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ class TopicStats {
double rateOut;
double throughputIn;
double throughputOut;
double requestRateIn;
double averageMsgPerRequest;
long msgInCounter;
long bytesInCounter;
long msgOutCounter;
Expand Down Expand Up @@ -81,6 +83,8 @@ public void reset() {
rateOut = 0;
throughputIn = 0;
throughputOut = 0;
requestRateIn = 0;
averageMsgPerRequest = 0;
bytesInCounter = 0;
msgInCounter = 0;
bytesOutCounter = 0;
Expand Down Expand Up @@ -134,6 +138,11 @@ public static void printTopicStats(PrometheusMetricStreams stream, TopicStats st
writeMetric(stream, "pulsar_average_msg_size", stats.averageMsgSize,
cluster, namespace, topic, splitTopicAndPartitionIndexLabel);

writeMetric(stream, "pulsar_request_rate_in", stats.requestRateIn,
cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
writeMetric(stream, "pulsar_average_msg_per_request", stats.averageMsgPerRequest,
cluster, namespace, topic, splitTopicAndPartitionIndexLabel);

writeMetric(stream, "pulsar_txn_tb_active_total", stats.ongoingTxnCount,
cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
writeMetric(stream, "pulsar_txn_tb_aborted_total", stats.abortedTxnCount,
Expand Down Expand Up @@ -258,6 +267,10 @@ public static void printTopicStats(PrometheusMetricStreams stream, TopicStats st
cluster, namespace, topic, p, producerStats.producerId, splitTopicAndPartitionIndexLabel);
writeProducerMetric(stream, "pulsar_producer_msg_average_Size", producerStats.averageMsgSize,
cluster, namespace, topic, p, producerStats.producerId, splitTopicAndPartitionIndexLabel);
writeProducerMetric(stream, "pulsar_producer_request_in", producerStats.requestRateIn,
cluster, namespace, topic, p, producerStats.producerId, splitTopicAndPartitionIndexLabel);
writeProducerMetric(stream, "pulsar_producer_avg_msg_per_request", producerStats.averageMsgPerRequest,
cluster, namespace, topic, p, producerStats.producerId, splitTopicAndPartitionIndexLabel);
});

stats.subscriptionStats.forEach((sub, subsStats) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ public class PublisherStatsImpl implements PublisherStats {
/** Average message size published by this publisher. */
public double averageMsgSize;

/** Total rate of request published by this publisher (request/s). */
public double requestRateIn;

/** Average number of messages per entry by this publisher (msg/request). */
public double averageMsgPerRequest;

/** The total rate of chunked messages published by this publisher. **/
public double chunkedMessageRate;

Expand Down Expand Up @@ -95,6 +101,10 @@ public PublisherStatsImpl add(PublisherStatsImpl stats) {
this.msgThroughputIn += stats.msgThroughputIn;
double newAverageMsgSize = (this.averageMsgSize * (this.count - 1) + stats.averageMsgSize) / this.count;
this.averageMsgSize = newAverageMsgSize;
double newAverageMsgPerRequest = (this.averageMsgPerRequest * (this.count - 1) + stats.averageMsgPerRequest)
/ this.count;
this.averageMsgPerRequest = newAverageMsgPerRequest;
this.requestRateIn += stats.requestRateIn;
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ public class TopicStatsImpl implements TopicStats {
/** Total rate of messages published on the topic (msg/s). */
public double msgRateIn;

/** Total rate of request published on the topic (request/s). */
public double requestRateIn;

/** Average number of messages per entry on the topic (msg/request). */
public double averageMsgPerRequest;

/** Total throughput of messages published on the topic (byte/s). */
public double msgThroughputIn;

Expand Down Expand Up @@ -191,6 +197,8 @@ public TopicStatsImpl() {
public void reset() {
this.count = 0;
this.msgRateIn = 0;
this.requestRateIn = 0;
this.averageMsgPerRequest = 0;
this.msgThroughputIn = 0;
this.msgRateOut = 0;
this.msgThroughputOut = 0;
Expand Down Expand Up @@ -238,6 +246,10 @@ public TopicStatsImpl add(TopicStats ts) {
this.waitingPublishers += stats.waitingPublishers;
double newAverageMsgSize = (this.averageMsgSize * (this.count - 1) + stats.averageMsgSize) / this.count;
this.averageMsgSize = newAverageMsgSize;
this.requestRateIn = stats.requestRateIn;
double newAverageMsgPerRequest = (this.averageMsgPerRequest * (this.count - 1) + stats.averageMsgPerRequest)
/ this.count;
this.averageMsgPerRequest = newAverageMsgPerRequest;
this.storageSize += stats.storageSize;
this.backlogSize += stats.backlogSize;
this.publishRateLimitedTimes += stats.publishRateLimitedTimes;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ public void testPublisherStats() throws Exception {
"msgRateIn",
"msgThroughputIn",
"averageMsgSize",
"requestRateIn",
"averageMsgPerRequest",
"chunkedMessageRate",
"producerId",
"metadata",
Expand Down