Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat][broker] PIP-264: Add OpenTelemetry consumer metrics #22693

Merged
merged 22 commits into from
May 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@
import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
import org.apache.pulsar.broker.service.schema.SchemaStorageFactory;
import org.apache.pulsar.broker.stats.MetricsGenerator;
import org.apache.pulsar.broker.stats.OpenTelemetryConsumerStats;
import org.apache.pulsar.broker.stats.OpenTelemetryTopicStats;
import org.apache.pulsar.broker.stats.PulsarBrokerOpenTelemetry;
import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsServlet;
Expand Down Expand Up @@ -254,6 +255,7 @@ public class PulsarService implements AutoCloseable, ShutdownService {
private MetricsGenerator metricsGenerator;
private final PulsarBrokerOpenTelemetry openTelemetry;
private OpenTelemetryTopicStats openTelemetryTopicStats;
private OpenTelemetryConsumerStats openTelemetryConsumerStats;

private TransactionMetadataStoreService transactionMetadataStoreService;
private TransactionBufferProvider transactionBufferProvider;
Expand Down Expand Up @@ -631,8 +633,13 @@ public CompletableFuture<Void> closeAsync() {
brokerClientSharedTimer.stop();
monotonicSnapshotClock.close();

if (openTelemetryConsumerStats != null) {
openTelemetryConsumerStats.close();
openTelemetryConsumerStats = null;
}
if (openTelemetryTopicStats != null) {
openTelemetryTopicStats.close();
openTelemetryTopicStats = null;
}

asyncCloseFutures.add(EventLoopUtil.shutdownGracefully(ioEventLoopGroup));
Expand Down Expand Up @@ -776,6 +783,7 @@ public void start() throws PulsarServerException {
}

openTelemetryTopicStats = new OpenTelemetryTopicStats(this);
openTelemetryConsumerStats = new OpenTelemetryConsumerStats(this);

localMetadataSynchronizer = StringUtils.isNotBlank(config.getMetadataSyncEventTopic())
? new PulsarMetadataEventSynchronizer(this, config.getMetadataSyncEventTopic())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.google.common.util.concurrent.AtomicDouble;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise;
import java.time.Instant;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collections;
Expand Down Expand Up @@ -90,7 +91,9 @@ public class Consumer {
private final Rate msgOut;
private final Rate msgRedeliver;
private final LongAdder msgOutCounter;
private final LongAdder msgRedeliverCounter;
private final LongAdder bytesOutCounter;
private final LongAdder messageAckCounter;
private final Rate messageAckRate;

private volatile long lastConsumedTimestamp;
Expand Down Expand Up @@ -152,6 +155,9 @@ public class Consumer {
@Getter
private final SchemaType schemaType;

@Getter
private final Instant connectedSince = Instant.now();

public Consumer(Subscription subscription, SubType subType, String topicName, long consumerId,
int priorityLevel, String consumerName,
boolean isDurable, TransportCnx cnx, String appId,
Expand Down Expand Up @@ -182,8 +188,10 @@ public Consumer(Subscription subscription, SubType subType, String topicName, lo
this.msgOut = new Rate();
this.chunkedMessageRate = new Rate();
this.msgRedeliver = new Rate();
this.msgRedeliverCounter = new LongAdder();
this.bytesOutCounter = new LongAdder();
this.msgOutCounter = new LongAdder();
this.messageAckCounter = new LongAdder();
this.messageAckRate = new Rate();
this.appId = appId;

Expand All @@ -200,7 +208,7 @@ public Consumer(Subscription subscription, SubType subType, String topicName, lo
stats = new ConsumerStatsImpl();
stats.setAddress(cnx.clientSourceAddressAndPort());
stats.consumerName = consumerName;
stats.setConnectedSince(DateFormatter.now());
stats.setConnectedSince(DateFormatter.format(connectedSince));
stats.setClientVersion(cnx.getClientVersion());
stats.metadata = this.metadata;

Expand Down Expand Up @@ -238,8 +246,10 @@ public Consumer(Subscription subscription, SubType subType, String topicName, lo
this.consumerName = consumerName;
this.msgOut = null;
this.msgRedeliver = null;
this.msgRedeliverCounter = null;
this.msgOutCounter = null;
this.bytesOutCounter = null;
this.messageAckCounter = null;
this.messageAckRate = null;
this.pendingAcks = null;
this.stats = null;
Expand Down Expand Up @@ -502,6 +512,7 @@ public CompletableFuture<Void> messageAcked(CommandAck ack) {
return future
.thenApply(v -> {
this.messageAckRate.recordEvent(v);
this.messageAckCounter.add(v);
return null;
});
}
Expand Down Expand Up @@ -922,6 +933,14 @@ public long getBytesOutCounter() {
return bytesOutCounter.longValue();
}

public long getMessageAckCounter() {
return messageAckCounter.sum();
}

public long getMessageRedeliverCounter() {
return msgRedeliverCounter.sum();
}

public int getUnackedMessages() {
return unackedMessages;
}
Expand Down Expand Up @@ -1059,6 +1078,8 @@ public void redeliverUnacknowledgedMessages(long consumerEpoch) {
}

msgRedeliver.recordMultipleEvents(totalRedeliveryMessages.intValue(), totalRedeliveryMessages.intValue());
msgRedeliverCounter.add(totalRedeliveryMessages.intValue());

subscription.redeliverUnacknowledgedMessages(this, pendingPositions);
} else {
subscription.redeliverUnacknowledgedMessages(this, consumerEpoch);
Expand Down Expand Up @@ -1091,6 +1112,7 @@ public void redeliverUnacknowledgedMessages(List<MessageIdData> messageIds) {

subscription.redeliverUnacknowledgedMessages(this, pendingPositions);
msgRedeliver.recordMultipleEvents(totalRedeliveryMessages, totalRedeliveryMessages);
msgRedeliverCounter.add(totalRedeliveryMessages);

int numberOfBlockedPermits = PERMITS_RECEIVED_WHILE_CONSUMER_BLOCKED_UPDATER.getAndSet(this, 0);

Expand Down Expand Up @@ -1153,6 +1175,14 @@ public String getClientAddress() {
return clientAddress;
}

public String getClientAddressAndPort() {
return cnx.clientSourceAddressAndPort();
}

public String getClientVersion() {
return cnx.getClientVersion();
}

public MessageId getStartMessageId() {
return startMessageId;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.stats;

import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.BatchCallback;
import io.opentelemetry.api.metrics.ObservableLongMeasurement;
import java.util.Collection;
import java.util.Optional;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes;

public class OpenTelemetryConsumerStats implements AutoCloseable {

// Replaces pulsar_consumer_msg_rate_out
public static final String MESSAGE_OUT_COUNTER = "pulsar.broker.consumer.message.outgoing.count";
private final ObservableLongMeasurement messageOutCounter;

// Replaces pulsar_consumer_msg_throughput_out
public static final String BYTES_OUT_COUNTER = "pulsar.broker.consumer.message.outgoing.size";
private final ObservableLongMeasurement bytesOutCounter;

// Replaces pulsar_consumer_msg_ack_rate
public static final String MESSAGE_ACK_COUNTER = "pulsar.broker.consumer.message.ack.count";
private final ObservableLongMeasurement messageAckCounter;

// Replaces pulsar_consumer_msg_rate_redeliver
public static final String MESSAGE_REDELIVER_COUNTER = "pulsar.broker.consumer.message.redeliver.count";
private final ObservableLongMeasurement messageRedeliverCounter;

// Replaces pulsar_consumer_unacked_messages
public static final String MESSAGE_UNACKNOWLEDGED_COUNTER = "pulsar.broker.consumer.message.unack.count";
private final ObservableLongMeasurement messageUnacknowledgedCounter;

// Replaces pulsar_consumer_available_permits
public static final String MESSAGE_PERMITS_COUNTER = "pulsar.broker.consumer.permit.count";
private final ObservableLongMeasurement messagePermitsCounter;

private final BatchCallback batchCallback;

public OpenTelemetryConsumerStats(PulsarService pulsar) {
var meter = pulsar.getOpenTelemetry().getMeter();

messageOutCounter = meter
.counterBuilder(MESSAGE_OUT_COUNTER)
.setUnit("{message}")
.setDescription("The total number of messages dispatched to this consumer.")
.buildObserver();

bytesOutCounter = meter
.counterBuilder(BYTES_OUT_COUNTER)
.setUnit("By")
.setDescription("The total number of messages bytes dispatched to this consumer.")
.buildObserver();

messageAckCounter = meter
.counterBuilder(MESSAGE_ACK_COUNTER)
.setUnit("{ack}")
.setDescription("The total number of message acknowledgments received from this consumer.")
.buildObserver();

messageRedeliverCounter = meter
.counterBuilder(MESSAGE_REDELIVER_COUNTER)
.setUnit("{message}")
.setDescription("The total number of messages that have been redelivered to this consumer.")
.buildObserver();

messageUnacknowledgedCounter = meter
.upDownCounterBuilder(MESSAGE_UNACKNOWLEDGED_COUNTER)
.setUnit("{message}")
.setDescription("The total number of messages unacknowledged by this consumer.")
.buildObserver();

messagePermitsCounter = meter
.upDownCounterBuilder(MESSAGE_PERMITS_COUNTER)
.setUnit("{permit}")
.setDescription("The number of permits currently available for this consumer.")
.buildObserver();

batchCallback = meter.batchCallback(() -> pulsar.getBrokerService()
.getTopics()
.values()
.stream()
.map(topicFuture -> topicFuture.getNow(Optional.empty()))
.filter(Optional::isPresent)
.map(Optional::get)
.map(Topic::getSubscriptions)
.flatMap(s -> s.values().stream())
.map(Subscription::getConsumers)
.flatMap(Collection::stream)
.forEach(this::recordMetricsForConsumer),
messageOutCounter,
bytesOutCounter,
messageAckCounter,
messageRedeliverCounter,
messageUnacknowledgedCounter,
messagePermitsCounter);
}

@Override
public void close() {
batchCallback.close();
}

private void recordMetricsForConsumer(Consumer consumer) {
var subscription = consumer.getSubscription();
var topicName = TopicName.get(subscription.getTopic().getName());

var builder = Attributes.builder()
.put(OpenTelemetryAttributes.PULSAR_CONSUMER_NAME, consumer.consumerName())
.put(OpenTelemetryAttributes.PULSAR_CONSUMER_ID, consumer.consumerId())
.put(OpenTelemetryAttributes.PULSAR_CONSUMER_CONNECTED_SINCE,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@merlimat @dragosvictor This and other attributes here are super high cardinality - why are we recording with them? This will "kill" any TSDB and be very costly. Even when compared with pulsar_consumer_msg_rate_out - it didn't have all those attributes.

consumer.getConnectedSince().getEpochSecond())
.put(OpenTelemetryAttributes.PULSAR_SUBSCRIPTION_NAME, subscription.getName())
.put(OpenTelemetryAttributes.PULSAR_SUBSCRIPTION_TYPE, consumer.subType().toString())
.put(OpenTelemetryAttributes.PULSAR_DOMAIN, topicName.getDomain().toString())
.put(OpenTelemetryAttributes.PULSAR_TENANT, topicName.getTenant())
.put(OpenTelemetryAttributes.PULSAR_NAMESPACE, topicName.getNamespace())
.put(OpenTelemetryAttributes.PULSAR_TOPIC, topicName.getPartitionedTopicName());
if (topicName.isPartitioned()) {
builder.put(OpenTelemetryAttributes.PULSAR_PARTITION_INDEX, topicName.getPartitionIndex());
}
var clientAddress = consumer.getClientAddressAndPort();
if (clientAddress != null) {
builder.put(OpenTelemetryAttributes.PULSAR_CLIENT_ADDRESS, clientAddress);
}
var clientVersion = consumer.getClientVersion();
if (clientVersion != null) {
builder.put(OpenTelemetryAttributes.PULSAR_CLIENT_VERSION, clientVersion);
}
var metadataList = consumer.getMetadata()
.entrySet()
.stream()
.map(e -> String.format("%s:%s", e.getKey(), e.getValue()))
.toList();
builder.put(OpenTelemetryAttributes.PULSAR_CONSUMER_METADATA, metadataList);
var attributes = builder.build();

messageOutCounter.record(consumer.getMsgOutCounter(), attributes);
bytesOutCounter.record(consumer.getBytesOutCounter(), attributes);
messageAckCounter.record(consumer.getMessageAckCounter(), attributes);
messageRedeliverCounter.record(consumer.getMessageRedeliverCounter(), attributes);
messageUnacknowledgedCounter.record(consumer.getUnackedMessages(),
Attributes.builder()
.putAll(attributes)
.put(OpenTelemetryAttributes.PULSAR_CONSUMER_BLOCKED, consumer.isBlocked())
.build());
messagePermitsCounter.record(consumer.getAvailablePermits(), attributes);
}
}
Loading
Loading