Skip to content

Commit

Permalink
[feat][broker] PIP-264: Add OpenTelemetry metadata store stats (#22952)
Browse files Browse the repository at this point in the history
  • Loading branch information
dragosvictor committed Jun 26, 2024
1 parent 2c6fcc7 commit 243ad5a
Show file tree
Hide file tree
Showing 14 changed files with 175 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timer;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdkBuilder;
import java.io.IOException;
import java.lang.reflect.Constructor;
Expand Down Expand Up @@ -382,7 +383,8 @@ public PulsarService(ServiceConfiguration config,
DEFAULT_MONOTONIC_CLOCK_GRANULARITY_MILLIS), System::nanoTime);
}

public MetadataStore createConfigurationMetadataStore(PulsarMetadataEventSynchronizer synchronizer)
public MetadataStore createConfigurationMetadataStore(PulsarMetadataEventSynchronizer synchronizer,
OpenTelemetry openTelemetry)
throws MetadataStoreException {
return MetadataStoreFactory.create(config.getConfigurationMetadataStoreUrl(),
MetadataStoreConfig.builder()
Expand All @@ -395,6 +397,7 @@ public MetadataStore createConfigurationMetadataStore(PulsarMetadataEventSynchro
.batchingMaxSizeKb(config.getMetadataStoreBatchingMaxSizeKb())
.metadataStoreName(MetadataStoreConfig.CONFIGURATION_METADATA_STORE)
.synchronizer(synchronizer)
.openTelemetry(openTelemetry)
.build());
}

Expand Down Expand Up @@ -845,7 +848,8 @@ public void start() throws PulsarServerException {
localMetadataSynchronizer = StringUtils.isNotBlank(config.getMetadataSyncEventTopic())
? new PulsarMetadataEventSynchronizer(this, config.getMetadataSyncEventTopic())
: null;
localMetadataStore = createLocalMetadataStore(localMetadataSynchronizer);
localMetadataStore = createLocalMetadataStore(localMetadataSynchronizer,
openTelemetry.getOpenTelemetryService().getOpenTelemetry());
localMetadataStore.registerSessionListener(this::handleMetadataSessionEvent);

coordinationService = new CoordinationServiceImpl(localMetadataStore);
Expand All @@ -854,7 +858,8 @@ public void start() throws PulsarServerException {
configMetadataSynchronizer = StringUtils.isNotBlank(config.getConfigurationMetadataSyncEventTopic())
? new PulsarMetadataEventSynchronizer(this, config.getConfigurationMetadataSyncEventTopic())
: null;
configurationMetadataStore = createConfigurationMetadataStore(configMetadataSynchronizer);
configurationMetadataStore = createConfigurationMetadataStore(configMetadataSynchronizer,
openTelemetry.getOpenTelemetryService().getOpenTelemetry());
shouldShutdownConfigurationMetadataStore = true;
} else {
configurationMetadataStore = localMetadataStore;
Expand Down Expand Up @@ -1209,7 +1214,8 @@ private void handleDeleteCluster(Notification notification) {
}
}

public MetadataStoreExtended createLocalMetadataStore(PulsarMetadataEventSynchronizer synchronizer)
public MetadataStoreExtended createLocalMetadataStore(PulsarMetadataEventSynchronizer synchronizer,
OpenTelemetry openTelemetry)
throws MetadataStoreException, PulsarServerException {
return MetadataStoreExtended.create(config.getMetadataStoreUrl(),
MetadataStoreConfig.builder()
Expand All @@ -1222,6 +1228,7 @@ public MetadataStoreExtended createLocalMetadataStore(PulsarMetadataEventSynchro
.batchingMaxSizeKb(config.getMetadataStoreBatchingMaxSizeKb())
.synchronizer(synchronizer)
.metadataStoreName(MetadataStoreConfig.METADATA_STORE)
.openTelemetry(openTelemetry)
.build());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
public class PulsarBrokerOpenTelemetry implements Closeable {

public static final String SERVICE_NAME = "pulsar-broker";

@Getter
private final OpenTelemetryService openTelemetryService;

@Getter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.google.common.collect.Range;
import com.google.common.collect.Sets;
import com.google.common.hash.Hashing;
import io.opentelemetry.api.OpenTelemetry;
import java.lang.reflect.Field;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -136,8 +137,9 @@ protected void cleanup() throws Exception {

protected void beforePulsarStart(PulsarService pulsar) throws Exception {
if (resources == null) {
MetadataStoreExtended localStore = pulsar.createLocalMetadataStore(null);
MetadataStoreExtended configStore = (MetadataStoreExtended) pulsar.createConfigurationMetadataStore(null);
MetadataStoreExtended localStore = pulsar.createLocalMetadataStore(null, OpenTelemetry.noop());
MetadataStoreExtended configStore =
(MetadataStoreExtended) pulsar.createConfigurationMetadataStore(null, OpenTelemetry.noop());
resources = new PulsarResources(localStore, configStore);
}
this.createNamespaceIfNotExists(resources, NamespaceName.SYSTEM_NAMESPACE.getTenant(),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.stats;

import static org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricLongSumValue;
import static org.assertj.core.api.Assertions.assertThat;
import io.opentelemetry.api.common.Attributes;
import java.util.concurrent.ExecutorService;
import lombok.Cleanup;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.broker.testcontext.NonClosingProxyHandler;
import org.apache.pulsar.broker.testcontext.PulsarTestContext;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.impl.stats.BatchMetadataStoreStats;
import org.apache.pulsar.metadata.impl.stats.MetadataStoreStats;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

public class OpenTelemetryMetadataStoreStatsTest extends BrokerTestBase {

@BeforeMethod(alwaysRun = true)
@Override
protected void setup() throws Exception {
super.baseSetup();
setupDefaultTenantAndNamespace();

// In testing conditions, the metadata store gets initialized before Pulsar does, so the OpenTelemetry SDK is
// not yet initialized. Work around this issue by recreating the stats object once we have access to the SDK.
var localMetadataStore = (MetadataStore) NonClosingProxyHandler.getDelegate(pulsar.getLocalMetadataStore());
var currentStats = (MetadataStoreStats) FieldUtils.readField(localMetadataStore, "metadataStoreStats", true);
var localMetadataStoreName = (String) FieldUtils.readField(currentStats, "metadataStoreName", true);

currentStats.close();
var newStats = new MetadataStoreStats(
localMetadataStoreName, pulsar.getOpenTelemetry().getOpenTelemetryService().getOpenTelemetry());
FieldUtils.writeField(localMetadataStore, "metadataStoreStats", newStats, true);

var currentBatchedStats = (BatchMetadataStoreStats) FieldUtils.readField(localMetadataStore, "batchMetadataStoreStats", true);
currentBatchedStats.close();
var currentExecutor = (ExecutorService) FieldUtils.readField(currentBatchedStats, "executor", true);
var newBatchedStats = new BatchMetadataStoreStats(
localMetadataStoreName, currentExecutor, pulsar.getOpenTelemetry().getOpenTelemetryService().getOpenTelemetry());
FieldUtils.writeField(localMetadataStore, "batchMetadataStoreStats", newBatchedStats, true);
}

@AfterMethod(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
}

@Override
protected void customizeMainPulsarTestContextBuilder(PulsarTestContext.Builder pulsarTestContextBuilder) {
super.customizeMainPulsarTestContextBuilder(pulsarTestContextBuilder);
pulsarTestContextBuilder.enableOpenTelemetry(true);
}

@Test
public void testMetadataStoreStats() throws Exception {
var topicName = BrokerTestUtil.newUniqueName("persistent://public/default/test-metadata-store-stats");

@Cleanup
var producer = pulsarClient.newProducer().topic(topicName).create();

producer.newMessage().value("test".getBytes()).send();

var attributes = Attributes.of(MetadataStoreStats.METADATA_STORE_NAME, "metadata-store");

var metrics = pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics();
assertMetricLongSumValue(metrics, MetadataStoreStats.METADATA_STORE_PUT_BYTES_COUNTER_METRIC_NAME,
attributes, value -> assertThat(value).isPositive());
assertMetricLongSumValue(metrics, BatchMetadataStoreStats.EXECUTOR_QUEUE_SIZE_METRIC_NAME, attributes,
value -> assertThat(value).isPositive());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.pulsar.broker.testcontext;

import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdkBuilder;
import java.io.IOException;
import java.util.Optional;
Expand Down Expand Up @@ -68,7 +69,8 @@ public AbstractTestPulsarService(SpyConfig spyConfig, ServiceConfiguration confi
}

@Override
public MetadataStore createConfigurationMetadataStore(PulsarMetadataEventSynchronizer synchronizer)
public MetadataStore createConfigurationMetadataStore(PulsarMetadataEventSynchronizer synchronizer,
OpenTelemetry openTelemetry)
throws MetadataStoreException {
if (synchronizer != null) {
synchronizer.registerSyncListener(
Expand All @@ -78,7 +80,8 @@ public MetadataStore createConfigurationMetadataStore(PulsarMetadataEventSynchro
}

@Override
public MetadataStoreExtended createLocalMetadataStore(PulsarMetadataEventSynchronizer synchronizer)
public MetadataStoreExtended createLocalMetadataStore(PulsarMetadataEventSynchronizer synchronizer,
OpenTelemetry openTelemetry)
throws MetadataStoreException, PulsarServerException {
if (synchronizer != null) {
synchronizer.registerSyncListener(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.metadata.api;

import io.opentelemetry.api.OpenTelemetry;
import lombok.Builder;
import lombok.Getter;
import lombok.ToString;
Expand Down Expand Up @@ -92,4 +93,10 @@ public class MetadataStoreConfig {
* separate clusters.
*/
private MetadataEventSynchronizer synchronizer;

/**
* OpenTelemetry instance to monitor metadata store operations.
*/
@Builder.Default
private OpenTelemetry openTelemetry = OpenTelemetry.noop();
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.github.benmanes.caffeine.cache.LoadingCache;
import com.google.common.annotations.VisibleForTesting;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.opentelemetry.api.OpenTelemetry;
import java.time.Instant;
import java.util.Collections;
import java.util.EnumSet;
Expand Down Expand Up @@ -88,7 +89,7 @@ public abstract class AbstractMetadataStore implements MetadataStoreExtended, Co

protected abstract CompletableFuture<Boolean> existsFromStore(String path);

protected AbstractMetadataStore(String metadataStoreName) {
protected AbstractMetadataStore(String metadataStoreName, OpenTelemetry openTelemetry) {
this.executor = new ScheduledThreadPoolExecutor(1,
new DefaultThreadFactory(
StringUtils.isNotBlank(metadataStoreName) ? metadataStoreName : getClass().getSimpleName()));
Expand Down Expand Up @@ -137,7 +138,7 @@ public CompletableFuture<Boolean> asyncReload(String key, Boolean oldValue,
});

this.metadataStoreName = metadataStoreName;
this.metadataStoreStats = new MetadataStoreStats(metadataStoreName);
this.metadataStoreStats = new MetadataStoreStats(metadataStoreName, openTelemetry);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ private static class Value {

public LocalMemoryMetadataStore(String metadataURL, MetadataStoreConfig metadataStoreConfig)
throws MetadataStoreException {
super(metadataStoreConfig.getMetadataStoreName());
super(metadataStoreConfig.getMetadataStoreName(), metadataStoreConfig.getOpenTelemetry());
String name = metadataURL.substring(MEMORY_SCHEME_IDENTIFIER.length());
// Local means a private data set
// update synchronizer and register sync listener
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ static long toLong(byte[] bytes) {
*/
private RocksdbMetadataStore(String metadataURL, MetadataStoreConfig metadataStoreConfig)
throws MetadataStoreException {
super(metadataStoreConfig.getMetadataStoreName());
super(metadataStoreConfig.getMetadataStoreName(), metadataStoreConfig.getOpenTelemetry());
this.metadataUrl = metadataURL;
try {
RocksDB.loadLibrary();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public abstract class AbstractBatchedMetadataStore extends AbstractMetadataStore
private final BatchMetadataStoreStats batchMetadataStoreStats;

protected AbstractBatchedMetadataStore(MetadataStoreConfig conf) {
super(conf.getMetadataStoreName());
super(conf.getMetadataStoreName(), conf.getOpenTelemetry());

this.enabled = conf.isBatchingEnabled();
this.maxDelayMillis = conf.getBatchingMaxDelayMillis();
Expand All @@ -77,7 +77,7 @@ protected AbstractBatchedMetadataStore(MetadataStoreConfig conf) {
// update synchronizer and register sync listener
updateMetadataEventSynchronizer(conf.getSynchronizer());
this.batchMetadataStoreStats =
new BatchMetadataStoreStats(metadataStoreName, executor);
new BatchMetadataStoreStats(metadataStoreName, executor, conf.getOpenTelemetry());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.metadata.impl.oxia;

import io.opentelemetry.api.OpenTelemetry;
import io.streamnative.oxia.client.api.AsyncOxiaClient;
import io.streamnative.oxia.client.api.DeleteOption;
import io.streamnative.oxia.client.api.Notification;
Expand Down Expand Up @@ -58,7 +59,7 @@ public class OxiaMetadataStore extends AbstractMetadataStore {
private Optional<MetadataEventSynchronizer> synchronizer;

public OxiaMetadataStore(AsyncOxiaClient oxia, String identity) {
super("oxia-metadata");
super("oxia-metadata", OpenTelemetry.noop());
this.client = oxia;
this.identity = identity;
this.synchronizer = Optional.empty();
Expand All @@ -68,10 +69,10 @@ public OxiaMetadataStore(AsyncOxiaClient oxia, String identity) {
public OxiaMetadataStore(
@NonNull String serviceAddress,
@NonNull String namespace,
@NonNull MetadataStoreConfig metadataStoreConfig,
MetadataStoreConfig metadataStoreConfig,
boolean enableSessionWatcher)
throws Exception {
super("oxia-metadata");
super("oxia-metadata", Objects.requireNonNull(metadataStoreConfig).getOpenTelemetry());

var linger = metadataStoreConfig.getBatchingMaxDelayMillis();
if (!metadataStoreConfig.isBatchingEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
*/
package org.apache.pulsar.metadata.impl.stats;

import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.ObservableLongUpDownCounter;
import io.prometheus.client.Gauge;
import io.prometheus.client.Histogram;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -58,7 +61,10 @@ public final class BatchMetadataStoreStats implements AutoCloseable {
private final Histogram.Child batchExecuteTimeChild;
private final Histogram.Child opsPerBatchChild;

public BatchMetadataStoreStats(String metadataStoreName, ExecutorService executor) {
public static final String EXECUTOR_QUEUE_SIZE_METRIC_NAME = "pulsar.broker.metadata.store.executor.queue.size";
private final ObservableLongUpDownCounter batchMetadataStoreSizeCounter;

public BatchMetadataStoreStats(String metadataStoreName, ExecutorService executor, OpenTelemetry openTelemetry) {
if (executor instanceof ThreadPoolExecutor tx) {
this.executor = tx;
} else {
Expand All @@ -69,15 +75,25 @@ public BatchMetadataStoreStats(String metadataStoreName, ExecutorService executo
EXECUTOR_QUEUE_SIZE.setChild(new Gauge.Child() {
@Override
public double get() {
return BatchMetadataStoreStats.this.executor == null ? 0 :
BatchMetadataStoreStats.this.executor.getQueue().size();
return getQueueSize();
}
}, metadataStoreName);

this.batchOpsWaitingChild = OPS_WAITING.labels(metadataStoreName);
this.batchExecuteTimeChild = BATCH_EXECUTE_TIME.labels(metadataStoreName);
this.opsPerBatchChild = OPS_PER_BATCH.labels(metadataStoreName);

var meter = openTelemetry.getMeter("org.apache.pulsar");
var attributes = Attributes.of(MetadataStoreStats.METADATA_STORE_NAME, metadataStoreName);
this.batchMetadataStoreSizeCounter = meter
.upDownCounterBuilder(EXECUTOR_QUEUE_SIZE_METRIC_NAME)
.setDescription("The number of batch operations in the metadata store executor queue")
.setUnit("{operation}")
.buildWithCallback(measurement -> measurement.record(getQueueSize(), attributes));
}

private int getQueueSize() {
return executor == null ? 0 : executor.getQueue().size();
}

public void recordOpWaiting(long millis) {
Expand All @@ -99,6 +115,7 @@ public void close() throws Exception {
OPS_WAITING.remove(this.metadataStoreName);
BATCH_EXECUTE_TIME.remove(this.metadataStoreName);
OPS_PER_BATCH.remove(metadataStoreName);
batchMetadataStoreSizeCounter.close();
}
}
}
Loading

0 comments on commit 243ad5a

Please sign in to comment.