Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat][broker] PIP-264: Add OpenTelemetry metadata store stats #22952

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timer;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdkBuilder;
import java.io.IOException;
import java.lang.reflect.Constructor;
Expand Down Expand Up @@ -380,7 +381,8 @@ public PulsarService(ServiceConfiguration config,
DEFAULT_MONOTONIC_CLOCK_GRANULARITY_MILLIS), System::nanoTime);
}

public MetadataStore createConfigurationMetadataStore(PulsarMetadataEventSynchronizer synchronizer)
public MetadataStore createConfigurationMetadataStore(PulsarMetadataEventSynchronizer synchronizer,
OpenTelemetry openTelemetry)
throws MetadataStoreException {
return MetadataStoreFactory.create(config.getConfigurationMetadataStoreUrl(),
MetadataStoreConfig.builder()
Expand All @@ -393,6 +395,7 @@ public MetadataStore createConfigurationMetadataStore(PulsarMetadataEventSynchro
.batchingMaxSizeKb(config.getMetadataStoreBatchingMaxSizeKb())
.metadataStoreName(MetadataStoreConfig.CONFIGURATION_METADATA_STORE)
.synchronizer(synchronizer)
.openTelemetry(openTelemetry)
.build());
}

Expand Down Expand Up @@ -838,7 +841,8 @@ public void start() throws PulsarServerException {
localMetadataSynchronizer = StringUtils.isNotBlank(config.getMetadataSyncEventTopic())
? new PulsarMetadataEventSynchronizer(this, config.getMetadataSyncEventTopic())
: null;
localMetadataStore = createLocalMetadataStore(localMetadataSynchronizer);
localMetadataStore = createLocalMetadataStore(localMetadataSynchronizer,
openTelemetry.getOpenTelemetryService().getOpenTelemetry());
localMetadataStore.registerSessionListener(this::handleMetadataSessionEvent);

coordinationService = new CoordinationServiceImpl(localMetadataStore);
Expand All @@ -847,7 +851,8 @@ public void start() throws PulsarServerException {
configMetadataSynchronizer = StringUtils.isNotBlank(config.getConfigurationMetadataSyncEventTopic())
? new PulsarMetadataEventSynchronizer(this, config.getConfigurationMetadataSyncEventTopic())
: null;
configurationMetadataStore = createConfigurationMetadataStore(configMetadataSynchronizer);
configurationMetadataStore = createConfigurationMetadataStore(configMetadataSynchronizer,
openTelemetry.getOpenTelemetryService().getOpenTelemetry());
shouldShutdownConfigurationMetadataStore = true;
} else {
configurationMetadataStore = localMetadataStore;
Expand Down Expand Up @@ -1202,7 +1207,8 @@ private void handleDeleteCluster(Notification notification) {
}
}

public MetadataStoreExtended createLocalMetadataStore(PulsarMetadataEventSynchronizer synchronizer)
public MetadataStoreExtended createLocalMetadataStore(PulsarMetadataEventSynchronizer synchronizer,
OpenTelemetry openTelemetry)
throws MetadataStoreException, PulsarServerException {
return MetadataStoreExtended.create(config.getMetadataStoreUrl(),
MetadataStoreConfig.builder()
Expand All @@ -1215,6 +1221,7 @@ public MetadataStoreExtended createLocalMetadataStore(PulsarMetadataEventSynchro
.batchingMaxSizeKb(config.getMetadataStoreBatchingMaxSizeKb())
.synchronizer(synchronizer)
.metadataStoreName(MetadataStoreConfig.METADATA_STORE)
.openTelemetry(openTelemetry)
.build());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
public class PulsarBrokerOpenTelemetry implements Closeable {

public static final String SERVICE_NAME = "pulsar-broker";

@Getter
private final OpenTelemetryService openTelemetryService;

@Getter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.google.common.collect.Range;
import com.google.common.collect.Sets;
import com.google.common.hash.Hashing;
import io.opentelemetry.api.OpenTelemetry;
import java.lang.reflect.Field;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -136,8 +137,9 @@ protected void cleanup() throws Exception {

protected void beforePulsarStart(PulsarService pulsar) throws Exception {
if (resources == null) {
MetadataStoreExtended localStore = pulsar.createLocalMetadataStore(null);
MetadataStoreExtended configStore = (MetadataStoreExtended) pulsar.createConfigurationMetadataStore(null);
MetadataStoreExtended localStore = pulsar.createLocalMetadataStore(null, OpenTelemetry.noop());
MetadataStoreExtended configStore =
(MetadataStoreExtended) pulsar.createConfigurationMetadataStore(null, OpenTelemetry.noop());
resources = new PulsarResources(localStore, configStore);
}
this.createNamespaceIfNotExists(resources, NamespaceName.SYSTEM_NAMESPACE.getTenant(),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.stats;

import static org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricLongSumValue;
import static org.assertj.core.api.Assertions.assertThat;
import io.opentelemetry.api.common.Attributes;
import java.util.concurrent.ExecutorService;
import lombok.Cleanup;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.broker.testcontext.NonClosingProxyHandler;
import org.apache.pulsar.broker.testcontext.PulsarTestContext;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.impl.stats.BatchMetadataStoreStats;
import org.apache.pulsar.metadata.impl.stats.MetadataStoreStats;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

public class OpenTelemetryMetadataStoreStatsTest extends BrokerTestBase {

@BeforeMethod(alwaysRun = true)
@Override
protected void setup() throws Exception {
super.baseSetup();
setupDefaultTenantAndNamespace();

// In testing conditions, the metadata store gets initialized before Pulsar does, so the OpenTelemetry SDK is
// not yet initialized. Work around this issue by recreating the stats object once we have access to the SDK.
var localMetadataStore = (MetadataStore) NonClosingProxyHandler.getDelegate(pulsar.getLocalMetadataStore());
var currentStats = (MetadataStoreStats) FieldUtils.readField(localMetadataStore, "metadataStoreStats", true);
var localMetadataStoreName = (String) FieldUtils.readField(currentStats, "metadataStoreName", true);

currentStats.close();
var newStats = new MetadataStoreStats(
localMetadataStoreName, pulsar.getOpenTelemetry().getOpenTelemetryService().getOpenTelemetry());
FieldUtils.writeField(localMetadataStore, "metadataStoreStats", newStats, true);

var currentBatchedStats = (BatchMetadataStoreStats) FieldUtils.readField(localMetadataStore, "batchMetadataStoreStats", true);
currentBatchedStats.close();
var currentExecutor = (ExecutorService) FieldUtils.readField(currentBatchedStats, "executor", true);
var newBatchedStats = new BatchMetadataStoreStats(
localMetadataStoreName, currentExecutor, pulsar.getOpenTelemetry().getOpenTelemetryService().getOpenTelemetry());
FieldUtils.writeField(localMetadataStore, "batchMetadataStoreStats", newBatchedStats, true);
}

@AfterMethod(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
}

@Override
protected void customizeMainPulsarTestContextBuilder(PulsarTestContext.Builder pulsarTestContextBuilder) {
super.customizeMainPulsarTestContextBuilder(pulsarTestContextBuilder);
pulsarTestContextBuilder.enableOpenTelemetry(true);
}

@Test
public void testMetadataStoreStats() throws Exception {
var topicName = BrokerTestUtil.newUniqueName("persistent://public/default/test-metadata-store-stats");

@Cleanup
var producer = pulsarClient.newProducer().topic(topicName).create();

producer.newMessage().value("test".getBytes()).send();

var attributes = Attributes.of(MetadataStoreStats.METADATA_STORE_NAME, "metadata-store");

var metrics = pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics();
assertMetricLongSumValue(metrics, MetadataStoreStats.METADATA_STORE_PUT_BYTES_COUNTER_METRIC_NAME,
attributes, value -> assertThat(value).isPositive());
assertMetricLongSumValue(metrics, BatchMetadataStoreStats.EXECUTOR_QUEUE_SIZE_METRIC_NAME, attributes,
value -> assertThat(value).isPositive());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.pulsar.broker.testcontext;

import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdkBuilder;
import java.io.IOException;
import java.util.Optional;
Expand Down Expand Up @@ -68,7 +69,8 @@ public AbstractTestPulsarService(SpyConfig spyConfig, ServiceConfiguration confi
}

@Override
public MetadataStore createConfigurationMetadataStore(PulsarMetadataEventSynchronizer synchronizer)
public MetadataStore createConfigurationMetadataStore(PulsarMetadataEventSynchronizer synchronizer,
OpenTelemetry openTelemetry)
throws MetadataStoreException {
if (synchronizer != null) {
synchronizer.registerSyncListener(
Expand All @@ -78,7 +80,8 @@ public MetadataStore createConfigurationMetadataStore(PulsarMetadataEventSynchro
}

@Override
public MetadataStoreExtended createLocalMetadataStore(PulsarMetadataEventSynchronizer synchronizer)
public MetadataStoreExtended createLocalMetadataStore(PulsarMetadataEventSynchronizer synchronizer,
OpenTelemetry openTelemetry)
throws MetadataStoreException, PulsarServerException {
if (synchronizer != null) {
synchronizer.registerSyncListener(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.metadata.api;

import io.opentelemetry.api.OpenTelemetry;
import lombok.Builder;
import lombok.Getter;
import lombok.ToString;
Expand Down Expand Up @@ -92,4 +93,10 @@ public class MetadataStoreConfig {
* separate clusters.
*/
private MetadataEventSynchronizer synchronizer;

/**
* OpenTelemetry instance to monitor metadata store operations.
*/
@Builder.Default
private OpenTelemetry openTelemetry = OpenTelemetry.noop();
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.github.benmanes.caffeine.cache.LoadingCache;
import com.google.common.annotations.VisibleForTesting;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.opentelemetry.api.OpenTelemetry;
import java.time.Instant;
import java.util.Collections;
import java.util.EnumSet;
Expand Down Expand Up @@ -88,7 +89,7 @@ public abstract class AbstractMetadataStore implements MetadataStoreExtended, Co

protected abstract CompletableFuture<Boolean> existsFromStore(String path);

protected AbstractMetadataStore(String metadataStoreName) {
protected AbstractMetadataStore(String metadataStoreName, OpenTelemetry openTelemetry) {
this.executor = new ScheduledThreadPoolExecutor(1,
new DefaultThreadFactory(
StringUtils.isNotBlank(metadataStoreName) ? metadataStoreName : getClass().getSimpleName()));
Expand Down Expand Up @@ -137,7 +138,7 @@ public CompletableFuture<Boolean> asyncReload(String key, Boolean oldValue,
});

this.metadataStoreName = metadataStoreName;
this.metadataStoreStats = new MetadataStoreStats(metadataStoreName);
this.metadataStoreStats = new MetadataStoreStats(metadataStoreName, openTelemetry);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ private static class Value {

public LocalMemoryMetadataStore(String metadataURL, MetadataStoreConfig metadataStoreConfig)
throws MetadataStoreException {
super(metadataStoreConfig.getMetadataStoreName());
super(metadataStoreConfig.getMetadataStoreName(), metadataStoreConfig.getOpenTelemetry());
String name = metadataURL.substring(MEMORY_SCHEME_IDENTIFIER.length());
// Local means a private data set
// update synchronizer and register sync listener
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ static long toLong(byte[] bytes) {
*/
private RocksdbMetadataStore(String metadataURL, MetadataStoreConfig metadataStoreConfig)
throws MetadataStoreException {
super(metadataStoreConfig.getMetadataStoreName());
super(metadataStoreConfig.getMetadataStoreName(), metadataStoreConfig.getOpenTelemetry());
this.metadataUrl = metadataURL;
try {
RocksDB.loadLibrary();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public abstract class AbstractBatchedMetadataStore extends AbstractMetadataStore
private final BatchMetadataStoreStats batchMetadataStoreStats;

protected AbstractBatchedMetadataStore(MetadataStoreConfig conf) {
super(conf.getMetadataStoreName());
super(conf.getMetadataStoreName(), conf.getOpenTelemetry());

this.enabled = conf.isBatchingEnabled();
this.maxDelayMillis = conf.getBatchingMaxDelayMillis();
Expand All @@ -77,7 +77,7 @@ protected AbstractBatchedMetadataStore(MetadataStoreConfig conf) {
// update synchronizer and register sync listener
updateMetadataEventSynchronizer(conf.getSynchronizer());
this.batchMetadataStoreStats =
new BatchMetadataStoreStats(metadataStoreName, executor);
new BatchMetadataStoreStats(metadataStoreName, executor, conf.getOpenTelemetry());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.metadata.impl.oxia;

import io.opentelemetry.api.OpenTelemetry;
import io.streamnative.oxia.client.api.AsyncOxiaClient;
import io.streamnative.oxia.client.api.DeleteOption;
import io.streamnative.oxia.client.api.Notification;
Expand Down Expand Up @@ -58,7 +59,7 @@ public class OxiaMetadataStore extends AbstractMetadataStore {
private Optional<MetadataEventSynchronizer> synchronizer;

public OxiaMetadataStore(AsyncOxiaClient oxia, String identity) {
super("oxia-metadata");
super("oxia-metadata", OpenTelemetry.noop());
this.client = oxia;
this.identity = identity;
this.synchronizer = Optional.empty();
Expand All @@ -68,10 +69,10 @@ public OxiaMetadataStore(AsyncOxiaClient oxia, String identity) {
public OxiaMetadataStore(
@NonNull String serviceAddress,
@NonNull String namespace,
@NonNull MetadataStoreConfig metadataStoreConfig,
MetadataStoreConfig metadataStoreConfig,
dragosvictor marked this conversation as resolved.
Show resolved Hide resolved
boolean enableSessionWatcher)
throws Exception {
super("oxia-metadata");
super("oxia-metadata", Objects.requireNonNull(metadataStoreConfig).getOpenTelemetry());

var linger = metadataStoreConfig.getBatchingMaxDelayMillis();
if (!metadataStoreConfig.isBatchingEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
*/
package org.apache.pulsar.metadata.impl.stats;

import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.ObservableLongUpDownCounter;
import io.prometheus.client.Gauge;
import io.prometheus.client.Histogram;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -58,7 +61,10 @@ public final class BatchMetadataStoreStats implements AutoCloseable {
private final Histogram.Child batchExecuteTimeChild;
private final Histogram.Child opsPerBatchChild;

public BatchMetadataStoreStats(String metadataStoreName, ExecutorService executor) {
public static final String EXECUTOR_QUEUE_SIZE_METRIC_NAME = "pulsar.broker.metadata.store.executor.queue.size";
private final ObservableLongUpDownCounter batchMetadataStoreSizeCounter;

public BatchMetadataStoreStats(String metadataStoreName, ExecutorService executor, OpenTelemetry openTelemetry) {
if (executor instanceof ThreadPoolExecutor tx) {
this.executor = tx;
} else {
Expand All @@ -69,15 +75,25 @@ public BatchMetadataStoreStats(String metadataStoreName, ExecutorService executo
EXECUTOR_QUEUE_SIZE.setChild(new Gauge.Child() {
@Override
public double get() {
return BatchMetadataStoreStats.this.executor == null ? 0 :
BatchMetadataStoreStats.this.executor.getQueue().size();
return getQueueSize();
}
}, metadataStoreName);

this.batchOpsWaitingChild = OPS_WAITING.labels(metadataStoreName);
this.batchExecuteTimeChild = BATCH_EXECUTE_TIME.labels(metadataStoreName);
this.opsPerBatchChild = OPS_PER_BATCH.labels(metadataStoreName);

var meter = openTelemetry.getMeter("org.apache.pulsar");
var attributes = Attributes.of(MetadataStoreStats.METADATA_STORE_NAME, metadataStoreName);
this.batchMetadataStoreSizeCounter = meter
.upDownCounterBuilder(EXECUTOR_QUEUE_SIZE_METRIC_NAME)
.setDescription("The number of batch operations in the metadata store executor queue")
.setUnit("{operation}")
.buildWithCallback(measurement -> measurement.record(getQueueSize(), attributes));
}

private int getQueueSize() {
return executor == null ? 0 : executor.getQueue().size();
}

public void recordOpWaiting(long millis) {
Expand All @@ -99,6 +115,7 @@ public void close() throws Exception {
OPS_WAITING.remove(this.metadataStoreName);
BATCH_EXECUTE_TIME.remove(this.metadataStoreName);
OPS_PER_BATCH.remove(metadataStoreName);
batchMetadataStoreSizeCounter.close();
}
}
}
Loading
Loading