diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 8cf1376642b88..617afc6e5d154 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -31,6 +31,7 @@ import io.netty.util.HashedWheelTimer; import io.netty.util.Timer; import io.netty.util.concurrent.DefaultThreadFactory; +import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdkBuilder; import java.io.IOException; import java.lang.reflect.Constructor; @@ -382,7 +383,8 @@ public PulsarService(ServiceConfiguration config, DEFAULT_MONOTONIC_CLOCK_GRANULARITY_MILLIS), System::nanoTime); } - public MetadataStore createConfigurationMetadataStore(PulsarMetadataEventSynchronizer synchronizer) + public MetadataStore createConfigurationMetadataStore(PulsarMetadataEventSynchronizer synchronizer, + OpenTelemetry openTelemetry) throws MetadataStoreException { return MetadataStoreFactory.create(config.getConfigurationMetadataStoreUrl(), MetadataStoreConfig.builder() @@ -395,6 +397,7 @@ public MetadataStore createConfigurationMetadataStore(PulsarMetadataEventSynchro .batchingMaxSizeKb(config.getMetadataStoreBatchingMaxSizeKb()) .metadataStoreName(MetadataStoreConfig.CONFIGURATION_METADATA_STORE) .synchronizer(synchronizer) + .openTelemetry(openTelemetry) .build()); } @@ -845,7 +848,8 @@ public void start() throws PulsarServerException { localMetadataSynchronizer = StringUtils.isNotBlank(config.getMetadataSyncEventTopic()) ? new PulsarMetadataEventSynchronizer(this, config.getMetadataSyncEventTopic()) : null; - localMetadataStore = createLocalMetadataStore(localMetadataSynchronizer); + localMetadataStore = createLocalMetadataStore(localMetadataSynchronizer, + openTelemetry.getOpenTelemetryService().getOpenTelemetry()); localMetadataStore.registerSessionListener(this::handleMetadataSessionEvent); coordinationService = new CoordinationServiceImpl(localMetadataStore); @@ -854,7 +858,8 @@ public void start() throws PulsarServerException { configMetadataSynchronizer = StringUtils.isNotBlank(config.getConfigurationMetadataSyncEventTopic()) ? new PulsarMetadataEventSynchronizer(this, config.getConfigurationMetadataSyncEventTopic()) : null; - configurationMetadataStore = createConfigurationMetadataStore(configMetadataSynchronizer); + configurationMetadataStore = createConfigurationMetadataStore(configMetadataSynchronizer, + openTelemetry.getOpenTelemetryService().getOpenTelemetry()); shouldShutdownConfigurationMetadataStore = true; } else { configurationMetadataStore = localMetadataStore; @@ -1209,7 +1214,8 @@ private void handleDeleteCluster(Notification notification) { } } - public MetadataStoreExtended createLocalMetadataStore(PulsarMetadataEventSynchronizer synchronizer) + public MetadataStoreExtended createLocalMetadataStore(PulsarMetadataEventSynchronizer synchronizer, + OpenTelemetry openTelemetry) throws MetadataStoreException, PulsarServerException { return MetadataStoreExtended.create(config.getMetadataStoreUrl(), MetadataStoreConfig.builder() @@ -1222,6 +1228,7 @@ public MetadataStoreExtended createLocalMetadataStore(PulsarMetadataEventSynchro .batchingMaxSizeKb(config.getMetadataStoreBatchingMaxSizeKb()) .synchronizer(synchronizer) .metadataStoreName(MetadataStoreConfig.METADATA_STORE) + .openTelemetry(openTelemetry) .build()); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/PulsarBrokerOpenTelemetry.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/PulsarBrokerOpenTelemetry.java index 01ca65d2cc537..c1bcfadaf97f1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/PulsarBrokerOpenTelemetry.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/PulsarBrokerOpenTelemetry.java @@ -31,6 +31,8 @@ public class PulsarBrokerOpenTelemetry implements Closeable { public static final String SERVICE_NAME = "pulsar-broker"; + + @Getter private final OpenTelemetryService openTelemetryService; @Getter diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java index 5fbda961c0e3d..fc2fec96294ef 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java @@ -26,6 +26,7 @@ import com.google.common.collect.Range; import com.google.common.collect.Sets; import com.google.common.hash.Hashing; +import io.opentelemetry.api.OpenTelemetry; import java.lang.reflect.Field; import java.util.Collections; import java.util.HashMap; @@ -136,8 +137,9 @@ protected void cleanup() throws Exception { protected void beforePulsarStart(PulsarService pulsar) throws Exception { if (resources == null) { - MetadataStoreExtended localStore = pulsar.createLocalMetadataStore(null); - MetadataStoreExtended configStore = (MetadataStoreExtended) pulsar.createConfigurationMetadataStore(null); + MetadataStoreExtended localStore = pulsar.createLocalMetadataStore(null, OpenTelemetry.noop()); + MetadataStoreExtended configStore = + (MetadataStoreExtended) pulsar.createConfigurationMetadataStore(null, OpenTelemetry.noop()); resources = new PulsarResources(localStore, configStore); } this.createNamespaceIfNotExists(resources, NamespaceName.SYSTEM_NAMESPACE.getTenant(), diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryMetadataStoreStatsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryMetadataStoreStatsTest.java new file mode 100644 index 0000000000000..15689fca5d7c0 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryMetadataStoreStatsTest.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.stats; + +import static org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricLongSumValue; +import static org.assertj.core.api.Assertions.assertThat; +import io.opentelemetry.api.common.Attributes; +import java.util.concurrent.ExecutorService; +import lombok.Cleanup; +import org.apache.commons.lang3.reflect.FieldUtils; +import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.broker.service.BrokerTestBase; +import org.apache.pulsar.broker.testcontext.NonClosingProxyHandler; +import org.apache.pulsar.broker.testcontext.PulsarTestContext; +import org.apache.pulsar.metadata.api.MetadataStore; +import org.apache.pulsar.metadata.impl.stats.BatchMetadataStoreStats; +import org.apache.pulsar.metadata.impl.stats.MetadataStoreStats; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +public class OpenTelemetryMetadataStoreStatsTest extends BrokerTestBase { + + @BeforeMethod(alwaysRun = true) + @Override + protected void setup() throws Exception { + super.baseSetup(); + setupDefaultTenantAndNamespace(); + + // In testing conditions, the metadata store gets initialized before Pulsar does, so the OpenTelemetry SDK is + // not yet initialized. Work around this issue by recreating the stats object once we have access to the SDK. + var localMetadataStore = (MetadataStore) NonClosingProxyHandler.getDelegate(pulsar.getLocalMetadataStore()); + var currentStats = (MetadataStoreStats) FieldUtils.readField(localMetadataStore, "metadataStoreStats", true); + var localMetadataStoreName = (String) FieldUtils.readField(currentStats, "metadataStoreName", true); + + currentStats.close(); + var newStats = new MetadataStoreStats( + localMetadataStoreName, pulsar.getOpenTelemetry().getOpenTelemetryService().getOpenTelemetry()); + FieldUtils.writeField(localMetadataStore, "metadataStoreStats", newStats, true); + + var currentBatchedStats = (BatchMetadataStoreStats) FieldUtils.readField(localMetadataStore, "batchMetadataStoreStats", true); + currentBatchedStats.close(); + var currentExecutor = (ExecutorService) FieldUtils.readField(currentBatchedStats, "executor", true); + var newBatchedStats = new BatchMetadataStoreStats( + localMetadataStoreName, currentExecutor, pulsar.getOpenTelemetry().getOpenTelemetryService().getOpenTelemetry()); + FieldUtils.writeField(localMetadataStore, "batchMetadataStoreStats", newBatchedStats, true); + } + + @AfterMethod(alwaysRun = true) + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @Override + protected void customizeMainPulsarTestContextBuilder(PulsarTestContext.Builder pulsarTestContextBuilder) { + super.customizeMainPulsarTestContextBuilder(pulsarTestContextBuilder); + pulsarTestContextBuilder.enableOpenTelemetry(true); + } + + @Test + public void testMetadataStoreStats() throws Exception { + var topicName = BrokerTestUtil.newUniqueName("persistent://public/default/test-metadata-store-stats"); + + @Cleanup + var producer = pulsarClient.newProducer().topic(topicName).create(); + + producer.newMessage().value("test".getBytes()).send(); + + var attributes = Attributes.of(MetadataStoreStats.METADATA_STORE_NAME, "metadata-store"); + + var metrics = pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics(); + assertMetricLongSumValue(metrics, MetadataStoreStats.METADATA_STORE_PUT_BYTES_COUNTER_METRIC_NAME, + attributes, value -> assertThat(value).isPositive()); + assertMetricLongSumValue(metrics, BatchMetadataStoreStats.EXECUTOR_QUEUE_SIZE_METRIC_NAME, attributes, + value -> assertThat(value).isPositive()); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/AbstractTestPulsarService.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/AbstractTestPulsarService.java index c459098f6850c..c67714484f442 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/AbstractTestPulsarService.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/AbstractTestPulsarService.java @@ -19,6 +19,7 @@ package org.apache.pulsar.broker.testcontext; +import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdkBuilder; import java.io.IOException; import java.util.Optional; @@ -68,7 +69,8 @@ public AbstractTestPulsarService(SpyConfig spyConfig, ServiceConfiguration confi } @Override - public MetadataStore createConfigurationMetadataStore(PulsarMetadataEventSynchronizer synchronizer) + public MetadataStore createConfigurationMetadataStore(PulsarMetadataEventSynchronizer synchronizer, + OpenTelemetry openTelemetry) throws MetadataStoreException { if (synchronizer != null) { synchronizer.registerSyncListener( @@ -78,7 +80,8 @@ public MetadataStore createConfigurationMetadataStore(PulsarMetadataEventSynchro } @Override - public MetadataStoreExtended createLocalMetadataStore(PulsarMetadataEventSynchronizer synchronizer) + public MetadataStoreExtended createLocalMetadataStore(PulsarMetadataEventSynchronizer synchronizer, + OpenTelemetry openTelemetry) throws MetadataStoreException, PulsarServerException { if (synchronizer != null) { synchronizer.registerSyncListener( diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreConfig.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreConfig.java index 5ddfe33c3912a..be29f843eea18 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreConfig.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreConfig.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.metadata.api; +import io.opentelemetry.api.OpenTelemetry; import lombok.Builder; import lombok.Getter; import lombok.ToString; @@ -92,4 +93,10 @@ public class MetadataStoreConfig { * separate clusters. */ private MetadataEventSynchronizer synchronizer; + + /** + * OpenTelemetry instance to monitor metadata store operations. + */ + @Builder.Default + private OpenTelemetry openTelemetry = OpenTelemetry.noop(); } diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java index 7315e6a04a230..f35f197463222 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java @@ -26,6 +26,7 @@ import com.github.benmanes.caffeine.cache.LoadingCache; import com.google.common.annotations.VisibleForTesting; import io.netty.util.concurrent.DefaultThreadFactory; +import io.opentelemetry.api.OpenTelemetry; import java.time.Instant; import java.util.Collections; import java.util.EnumSet; @@ -88,7 +89,7 @@ public abstract class AbstractMetadataStore implements MetadataStoreExtended, Co protected abstract CompletableFuture existsFromStore(String path); - protected AbstractMetadataStore(String metadataStoreName) { + protected AbstractMetadataStore(String metadataStoreName, OpenTelemetry openTelemetry) { this.executor = new ScheduledThreadPoolExecutor(1, new DefaultThreadFactory( StringUtils.isNotBlank(metadataStoreName) ? metadataStoreName : getClass().getSimpleName())); @@ -137,7 +138,7 @@ public CompletableFuture asyncReload(String key, Boolean oldValue, }); this.metadataStoreName = metadataStoreName; - this.metadataStoreStats = new MetadataStoreStats(metadataStoreName); + this.metadataStoreStats = new MetadataStoreStats(metadataStoreName, openTelemetry); } @Override diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java index 3909a89cf5eb2..e95f1947740c8 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java @@ -78,7 +78,7 @@ private static class Value { public LocalMemoryMetadataStore(String metadataURL, MetadataStoreConfig metadataStoreConfig) throws MetadataStoreException { - super(metadataStoreConfig.getMetadataStoreName()); + super(metadataStoreConfig.getMetadataStoreName(), metadataStoreConfig.getOpenTelemetry()); String name = metadataURL.substring(MEMORY_SCHEME_IDENTIFIER.length()); // Local means a private data set // update synchronizer and register sync listener diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java index 06f7b26053693..20e3c4c2b27b2 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java @@ -209,7 +209,7 @@ static long toLong(byte[] bytes) { */ private RocksdbMetadataStore(String metadataURL, MetadataStoreConfig metadataStoreConfig) throws MetadataStoreException { - super(metadataStoreConfig.getMetadataStoreName()); + super(metadataStoreConfig.getMetadataStoreName(), metadataStoreConfig.getOpenTelemetry()); this.metadataUrl = metadataURL; try { RocksDB.loadLibrary(); diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java index 4fa1c6aca0fee..4275920d7f954 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java @@ -56,7 +56,7 @@ public abstract class AbstractBatchedMetadataStore extends AbstractMetadataStore private final BatchMetadataStoreStats batchMetadataStoreStats; protected AbstractBatchedMetadataStore(MetadataStoreConfig conf) { - super(conf.getMetadataStoreName()); + super(conf.getMetadataStoreName(), conf.getOpenTelemetry()); this.enabled = conf.isBatchingEnabled(); this.maxDelayMillis = conf.getBatchingMaxDelayMillis(); @@ -77,7 +77,7 @@ protected AbstractBatchedMetadataStore(MetadataStoreConfig conf) { // update synchronizer and register sync listener updateMetadataEventSynchronizer(conf.getSynchronizer()); this.batchMetadataStoreStats = - new BatchMetadataStoreStats(metadataStoreName, executor); + new BatchMetadataStoreStats(metadataStoreName, executor, conf.getOpenTelemetry()); } @Override diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java index 154a0ec0c4fd8..e9da7ec7c1ab5 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.metadata.impl.oxia; +import io.opentelemetry.api.OpenTelemetry; import io.streamnative.oxia.client.api.AsyncOxiaClient; import io.streamnative.oxia.client.api.DeleteOption; import io.streamnative.oxia.client.api.Notification; @@ -58,7 +59,7 @@ public class OxiaMetadataStore extends AbstractMetadataStore { private Optional synchronizer; public OxiaMetadataStore(AsyncOxiaClient oxia, String identity) { - super("oxia-metadata"); + super("oxia-metadata", OpenTelemetry.noop()); this.client = oxia; this.identity = identity; this.synchronizer = Optional.empty(); @@ -68,10 +69,10 @@ public OxiaMetadataStore(AsyncOxiaClient oxia, String identity) { public OxiaMetadataStore( @NonNull String serviceAddress, @NonNull String namespace, - @NonNull MetadataStoreConfig metadataStoreConfig, + MetadataStoreConfig metadataStoreConfig, boolean enableSessionWatcher) throws Exception { - super("oxia-metadata"); + super("oxia-metadata", Objects.requireNonNull(metadataStoreConfig).getOpenTelemetry()); var linger = metadataStoreConfig.getBatchingMaxDelayMillis(); if (!metadataStoreConfig.isBatchingEnabled()) { diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/stats/BatchMetadataStoreStats.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/stats/BatchMetadataStoreStats.java index f87155b9259be..9549a8df8f9f1 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/stats/BatchMetadataStoreStats.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/stats/BatchMetadataStoreStats.java @@ -18,6 +18,9 @@ */ package org.apache.pulsar.metadata.impl.stats; +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.metrics.ObservableLongUpDownCounter; import io.prometheus.client.Gauge; import io.prometheus.client.Histogram; import java.util.concurrent.ExecutorService; @@ -58,7 +61,10 @@ public final class BatchMetadataStoreStats implements AutoCloseable { private final Histogram.Child batchExecuteTimeChild; private final Histogram.Child opsPerBatchChild; - public BatchMetadataStoreStats(String metadataStoreName, ExecutorService executor) { + public static final String EXECUTOR_QUEUE_SIZE_METRIC_NAME = "pulsar.broker.metadata.store.executor.queue.size"; + private final ObservableLongUpDownCounter batchMetadataStoreSizeCounter; + + public BatchMetadataStoreStats(String metadataStoreName, ExecutorService executor, OpenTelemetry openTelemetry) { if (executor instanceof ThreadPoolExecutor tx) { this.executor = tx; } else { @@ -69,8 +75,7 @@ public BatchMetadataStoreStats(String metadataStoreName, ExecutorService executo EXECUTOR_QUEUE_SIZE.setChild(new Gauge.Child() { @Override public double get() { - return BatchMetadataStoreStats.this.executor == null ? 0 : - BatchMetadataStoreStats.this.executor.getQueue().size(); + return getQueueSize(); } }, metadataStoreName); @@ -78,6 +83,17 @@ public double get() { this.batchExecuteTimeChild = BATCH_EXECUTE_TIME.labels(metadataStoreName); this.opsPerBatchChild = OPS_PER_BATCH.labels(metadataStoreName); + var meter = openTelemetry.getMeter("org.apache.pulsar"); + var attributes = Attributes.of(MetadataStoreStats.METADATA_STORE_NAME, metadataStoreName); + this.batchMetadataStoreSizeCounter = meter + .upDownCounterBuilder(EXECUTOR_QUEUE_SIZE_METRIC_NAME) + .setDescription("The number of batch operations in the metadata store executor queue") + .setUnit("{operation}") + .buildWithCallback(measurement -> measurement.record(getQueueSize(), attributes)); + } + + private int getQueueSize() { + return executor == null ? 0 : executor.getQueue().size(); } public void recordOpWaiting(long millis) { @@ -99,6 +115,7 @@ public void close() throws Exception { OPS_WAITING.remove(this.metadataStoreName); BATCH_EXECUTE_TIME.remove(this.metadataStoreName); OPS_PER_BATCH.remove(metadataStoreName); + batchMetadataStoreSizeCounter.close(); } } } diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/stats/MetadataStoreStats.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/stats/MetadataStoreStats.java index 45024a68383bd..5f0383f9520a7 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/stats/MetadataStoreStats.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/stats/MetadataStoreStats.java @@ -18,6 +18,10 @@ */ package org.apache.pulsar.metadata.impl.stats; +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.metrics.LongCounter; import io.prometheus.client.Counter; import io.prometheus.client.Histogram; import java.util.concurrent.atomic.AtomicBoolean; @@ -48,6 +52,12 @@ public final class MetadataStoreStats implements AutoCloseable { .labelNames(METADATA_STORE_LABEL_NAME) .register(); + public static final AttributeKey METADATA_STORE_NAME = AttributeKey.stringKey("pulsar.metadata.store.name"); + public static final String METADATA_STORE_PUT_BYTES_COUNTER_METRIC_NAME = + "pulsar.broker.metadata.store.outgoing.size"; + private final Attributes attributes; + private final LongCounter putBytesCounter; + private final Histogram.Child getOpsSucceedChild; private final Histogram.Child delOpsSucceedChild; private final Histogram.Child putOpsSucceedChild; @@ -58,7 +68,7 @@ public final class MetadataStoreStats implements AutoCloseable { private final String metadataStoreName; private final AtomicBoolean closed = new AtomicBoolean(false); - public MetadataStoreStats(String metadataStoreName) { + public MetadataStoreStats(String metadataStoreName, OpenTelemetry openTelemetry) { this.metadataStoreName = metadataStoreName; this.getOpsSucceedChild = OPS_LATENCY.labels(metadataStoreName, OPS_TYPE_GET, STATUS_SUCCESS); @@ -68,6 +78,13 @@ public MetadataStoreStats(String metadataStoreName) { this.delOpsFailedChild = OPS_LATENCY.labels(metadataStoreName, OPS_TYPE_DEL, STATUS_FAIL); this.putOpsFailedChild = OPS_LATENCY.labels(metadataStoreName, OPS_TYPE_PUT, STATUS_FAIL); this.putBytesChild = PUT_BYTES.labels(metadataStoreName); + + attributes = Attributes.of(METADATA_STORE_NAME, metadataStoreName); + putBytesCounter = openTelemetry.getMeter("org.apache.pulsar") + .counterBuilder(METADATA_STORE_PUT_BYTES_COUNTER_METRIC_NAME) + .setDescription("The total amount of data written to the metadata store") + .setUnit("{By}") + .build(); } public void recordGetOpsSucceeded(long millis) { @@ -81,6 +98,7 @@ public void recordDelOpsSucceeded(long millis) { public void recordPutOpsSucceeded(long millis, int bytes) { this.putOpsSucceedChild.observe(millis); this.putBytesChild.inc(bytes); + this.putBytesCounter.add(bytes, attributes); } public void recordGetOpsFailed(long millis) { diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImplTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImplTest.java index c0159be4303bc..6ede02b67136e 100644 --- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImplTest.java +++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImplTest.java @@ -20,6 +20,7 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; +import io.opentelemetry.api.OpenTelemetry; import lombok.Cleanup; import org.apache.pulsar.metadata.api.GetResult; import org.apache.pulsar.metadata.api.MetadataStore; @@ -91,7 +92,7 @@ public MetadataStore create(String metadataURL, MetadataStoreConfig metadataStor public static class MyMetadataStore extends AbstractMetadataStore { protected MyMetadataStore() { - super("custom"); + super("custom", OpenTelemetry.noop()); } @Override