Skip to content

Commit

Permalink
feat(broker): introduce metric framework
Browse files Browse the repository at this point in the history
Signed-off-by: SSpirits <[email protected]>
  • Loading branch information
ShadowySpirits authored and daniel-y committed Oct 11, 2023
1 parent 8bc251f commit 693787c
Show file tree
Hide file tree
Showing 9 changed files with 582 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public class BrokerController implements Lifecycle {
private final StoreMetadataService storeMetadataService;
private final ProxyMetadataService proxyMetadataService;
private final MessagingProcessor messagingProcessor;
private final MetricsExporter metricsExporter;

public BrokerController(BrokerConfig brokerConfig) throws Exception {
this.brokerConfig = brokerConfig;
Expand All @@ -66,6 +67,8 @@ public BrokerController(BrokerConfig brokerConfig) throws Exception {
// TODO: Split controller to a separate port
ControllerServiceImpl controllerService = MetadataStoreBuilder.build(metadataStore);
grpcServer = new GrpcProtocolServer(brokerConfig.proxy(), messagingProcessor, controllerService);

metricsExporter = new MetricsExporter(brokerConfig, messageStore, (ExtendMessagingProcessor) messagingProcessor);
}

@Override
Expand All @@ -74,6 +77,7 @@ public void start() throws Exception {
messagingProcessor.start();
grpcServer.start();
metadataStore.registerCurrentNode(brokerConfig.name(), brokerConfig.advertiseAddress(), brokerConfig.instanceId());
metricsExporter.start();
}

@Override
Expand All @@ -82,5 +86,6 @@ public void shutdown() throws Exception {
messagingProcessor.shutdown();
messageStore.shutdown();
metadataStore.close();
metricsExporter.shutdown();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.automq.rocketmq.broker;

public class MetricsConstant {
public static final String LABEL_AGGREGATION = "aggregation";
public static final String LABEL_NODE_NAME = "node_name";
public static final String LABEL_INSTANCE_ID = "instance_id";
public static final String AGGREGATION_DELTA = "delta";
public static final String AGGREGATION_CUMULATIVE = "cumulative";
}
247 changes: 247 additions & 0 deletions broker/src/main/java/com/automq/rocketmq/broker/MetricsExporter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,247 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.automq.rocketmq.broker;

import com.automq.rocketmq.common.config.BrokerConfig;
import com.automq.rocketmq.common.config.MetricsConfig;
import com.automq.rocketmq.common.util.Lifecycle;
import com.automq.rocketmq.proxy.metrics.ProxyMetricsManager;
import com.automq.rocketmq.proxy.processor.ExtendMessagingProcessor;
import com.automq.rocketmq.store.api.MessageStore;
import com.google.common.base.Splitter;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.exporter.logging.LoggingMetricExporter;
import io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporter;
import io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporterBuilder;
import io.opentelemetry.exporter.prometheus.PrometheusHttpServer;
import io.opentelemetry.sdk.OpenTelemetrySdk;
import io.opentelemetry.sdk.metrics.InstrumentSelector;
import io.opentelemetry.sdk.metrics.InstrumentType;
import io.opentelemetry.sdk.metrics.SdkMeterProvider;
import io.opentelemetry.sdk.metrics.SdkMeterProviderBuilder;
import io.opentelemetry.sdk.metrics.View;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader;
import io.opentelemetry.sdk.resources.Resource;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.metrics.MetricsExporterType;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.slf4j.bridge.SLF4JBridgeHandler;

import static com.automq.rocketmq.broker.MetricsConstant.AGGREGATION_CUMULATIVE;
import static com.automq.rocketmq.broker.MetricsConstant.AGGREGATION_DELTA;
import static com.automq.rocketmq.broker.MetricsConstant.LABEL_AGGREGATION;
import static com.automq.rocketmq.broker.MetricsConstant.LABEL_INSTANCE_ID;
import static com.automq.rocketmq.broker.MetricsConstant.LABEL_NODE_NAME;

public class MetricsExporter implements Lifecycle {
private static final Logger LOGGER = LoggerFactory.getLogger(MetricsExporter.class);

private final BrokerConfig brokerConfig;
private final MetricsConfig metricsConfig;
private final MessageStore messageStore;
private final ExtendMessagingProcessor messagingProcessor;
private final static Map<String, String> LABEL_MAP = new HashMap<>();
private OtlpGrpcMetricExporter metricExporter;
private PeriodicMetricReader periodicMetricReader;
private PrometheusHttpServer prometheusHttpServer;
private LoggingMetricExporter loggingMetricExporter;
private Meter brokerMeter;

public static Supplier<AttributesBuilder> attributesBuilderSupplier = Attributes::builder;

public MetricsExporter(BrokerConfig brokerConfig, MessageStore messageStore,
ExtendMessagingProcessor messagingProcessor) {
this.brokerConfig = brokerConfig;
this.metricsConfig = brokerConfig.metrics();
this.messageStore = messageStore;
this.messagingProcessor = messagingProcessor;
}

public static AttributesBuilder newAttributesBuilder() {
AttributesBuilder attributesBuilder;
if (attributesBuilderSupplier == null) {
attributesBuilderSupplier = Attributes::builder;
}
attributesBuilder = attributesBuilderSupplier.get();
LABEL_MAP.forEach(attributesBuilder::put);
return attributesBuilder;
}

private boolean checkConfig() {
if (metricsConfig == null) {
return false;
}
MetricsExporterType exporterType = MetricsExporterType.valueOf(metricsConfig.exporterType());
if (!exporterType.isEnable()) {
return false;
}

switch (exporterType) {
case OTLP_GRPC:
return StringUtils.isNotBlank(metricsConfig.grpcExporterHeader());
case PROM:
return true;
case LOG:
return true;
}
return false;
}

@Override
public void start() {
MetricsExporterType metricsExporterType = MetricsExporterType.valueOf(metricsConfig.exporterType());
if (metricsExporterType == MetricsExporterType.DISABLE) {
return;
}

if (!checkConfig()) {
LOGGER.error("check metrics config failed, will not export metrics");
return;
}

String labels = metricsConfig.labels();
if (StringUtils.isNotBlank(labels)) {
List<String> kvPairs = Splitter.on(',').omitEmptyStrings().splitToList(labels);
for (String item : kvPairs) {
String[] split = item.split(":");
if (split.length != 2) {
LOGGER.warn("metricsLabel is not valid: {}", labels);
continue;
}
LABEL_MAP.put(split[0], split[1]);
}
}
if (metricsConfig.exportInDelta()) {
LABEL_MAP.put(LABEL_AGGREGATION, AGGREGATION_DELTA);
} else {
LABEL_MAP.put(LABEL_AGGREGATION, AGGREGATION_CUMULATIVE);
}

LABEL_MAP.put(LABEL_NODE_NAME, brokerConfig.name());
LABEL_MAP.put(LABEL_INSTANCE_ID, brokerConfig.instanceId());

SdkMeterProviderBuilder providerBuilder = SdkMeterProvider.builder()
.setResource(Resource.empty());

if (metricsExporterType == MetricsExporterType.OTLP_GRPC) {
String endpoint = metricsConfig.grpcExporterTarget();
if (!endpoint.startsWith("http")) {
endpoint = "https://" + endpoint;
}
OtlpGrpcMetricExporterBuilder metricExporterBuilder = OtlpGrpcMetricExporter.builder()
.setEndpoint(endpoint)
.setTimeout(metricsConfig.grpcExporterTimeOutInMills(), TimeUnit.MILLISECONDS)
.setAggregationTemporalitySelector(type -> {
if (metricsConfig.exportInDelta() &&
(type == InstrumentType.COUNTER || type == InstrumentType.OBSERVABLE_COUNTER || type == InstrumentType.HISTOGRAM)) {
return AggregationTemporality.DELTA;
}
return AggregationTemporality.CUMULATIVE;
});

String headers = metricsConfig.grpcExporterHeader();
if (StringUtils.isNotBlank(headers)) {
Map<String, String> headerMap = new HashMap<>();
List<String> kvPairs = Splitter.on(',').omitEmptyStrings().splitToList(headers);
for (String item : kvPairs) {
String[] split = item.split(":");
if (split.length != 2) {
LOGGER.warn("metricsGrpcExporterHeader is not valid: {}", headers);
continue;
}
headerMap.put(split[0], split[1]);
}
headerMap.forEach(metricExporterBuilder::addHeader);
}

metricExporter = metricExporterBuilder.build();

periodicMetricReader = PeriodicMetricReader.builder(metricExporter)
.setInterval(metricsConfig.grpcExporterIntervalInMills(), TimeUnit.MILLISECONDS)
.build();

providerBuilder.registerMetricReader(periodicMetricReader);
} else if (metricsExporterType == MetricsExporterType.PROM) {
String promExporterHost = metricsConfig.promExporterHost();
if (StringUtils.isBlank(promExporterHost)) {
throw new IllegalArgumentException("Config item promExporterHost is blank");
}
prometheusHttpServer = PrometheusHttpServer.builder()
.setHost(promExporterHost)
.setPort(metricsConfig.promExporterPort())
.build();
providerBuilder.registerMetricReader(prometheusHttpServer);
} else if (metricsExporterType == MetricsExporterType.LOG) {
SLF4JBridgeHandler.removeHandlersForRootLogger();
SLF4JBridgeHandler.install();
loggingMetricExporter = LoggingMetricExporter.create(metricsConfig.exportInDelta() ? AggregationTemporality.DELTA : AggregationTemporality.CUMULATIVE);
java.util.logging.Logger.getLogger(LoggingMetricExporter.class.getName()).setLevel(java.util.logging.Level.FINEST);
periodicMetricReader = PeriodicMetricReader.builder(loggingMetricExporter)
.setInterval(metricsConfig.loggingExporterIntervalInMills(), TimeUnit.MILLISECONDS)
.build();
providerBuilder.registerMetricReader(periodicMetricReader);
}

registerMetricsView(providerBuilder);

brokerMeter = OpenTelemetrySdk.builder()
.setMeterProvider(providerBuilder.build())
.build()
.getMeter("rocketmq-meter");

initMetrics();
}

private void registerMetricsView(SdkMeterProviderBuilder providerBuilder) {
for (Pair<InstrumentSelector, View> selectorViewPair : ProxyMetricsManager.getMetricsView()) {
providerBuilder.registerView(selectorViewPair.getObject1(), selectorViewPair.getObject2());
}
}

private void initMetrics() {
ProxyMetricsManager.initMetrics(brokerMeter, MetricsExporter::newAttributesBuilder);
}

@Override
public void shutdown() {
MetricsExporterType exporterType = MetricsExporterType.valueOf(metricsConfig.exporterType());
if (exporterType == MetricsExporterType.OTLP_GRPC) {
periodicMetricReader.forceFlush();
periodicMetricReader.shutdown();
metricExporter.shutdown();
} else if (exporterType == MetricsExporterType.PROM) {
prometheusHttpServer.forceFlush();
prometheusHttpServer.shutdown();
} else if (exporterType == MetricsExporterType.LOG) {
periodicMetricReader.forceFlush();
periodicMetricReader.shutdown();
loggingMetricExporter.shutdown();
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,15 @@ public class BrokerConfig implements ControllerConfig {
*/
private String advertiseAddress;

private final MetricsConfig metrics;
private final ProxyConfig proxy;
private final StoreConfig store;
private final S3StreamConfig s3Stream;

private final DatabaseConfig db;

public BrokerConfig() {
this.metrics = new MetricsConfig();
this.proxy = new ProxyConfig();
this.store = new StoreConfig();
this.s3Stream = new S3StreamConfig();
Expand Down Expand Up @@ -115,6 +117,10 @@ public String dbPassword() {
return this.db.getPassword();
}

public MetricsConfig metrics() {
return metrics;
}

public ProxyConfig proxy() {
return proxy;
}
Expand All @@ -127,7 +133,7 @@ public S3StreamConfig s3Stream() {
return s3Stream;
}

public DatabaseConfig getDb() {
public DatabaseConfig db() {
return db;
}

Expand Down
Loading

0 comments on commit 693787c

Please sign in to comment.