From 693787c7a47d0d7af8a336fa5873a86c38274d3f Mon Sep 17 00:00:00 2001 From: SSpirits Date: Wed, 11 Oct 2023 18:01:26 +0800 Subject: [PATCH] feat(broker): introduce metric framework Signed-off-by: SSpirits --- .../rocketmq/broker/BrokerController.java | 5 + .../rocketmq/broker/MetricsConstant.java | 26 ++ .../rocketmq/broker/MetricsExporter.java | 247 ++++++++++++++++++ .../rocketmq/common/config/BrokerConfig.java | 8 +- .../rocketmq/common/config/MetricsConfig.java | 77 ++++++ .../common/model/MetricsExporterType.java | 52 ++++ .../grpc/ExtendGrpcMessagingApplication.java | 48 ++++ .../proxy/metrics/ProxyMetricsManager.java | 91 +++++++ .../rocketmq/proxy/model/ProxyContextExt.java | 29 ++ 9 files changed, 582 insertions(+), 1 deletion(-) create mode 100644 broker/src/main/java/com/automq/rocketmq/broker/MetricsConstant.java create mode 100644 broker/src/main/java/com/automq/rocketmq/broker/MetricsExporter.java create mode 100644 common/src/main/java/com/automq/rocketmq/common/config/MetricsConfig.java create mode 100644 common/src/main/java/com/automq/rocketmq/common/model/MetricsExporterType.java create mode 100644 proxy/src/main/java/com/automq/rocketmq/proxy/metrics/ProxyMetricsManager.java create mode 100644 proxy/src/main/java/com/automq/rocketmq/proxy/model/ProxyContextExt.java diff --git a/broker/src/main/java/com/automq/rocketmq/broker/BrokerController.java b/broker/src/main/java/com/automq/rocketmq/broker/BrokerController.java index 663474de2..262fc373e 100644 --- a/broker/src/main/java/com/automq/rocketmq/broker/BrokerController.java +++ b/broker/src/main/java/com/automq/rocketmq/broker/BrokerController.java @@ -44,6 +44,7 @@ public class BrokerController implements Lifecycle { private final StoreMetadataService storeMetadataService; private final ProxyMetadataService proxyMetadataService; private final MessagingProcessor messagingProcessor; + private final MetricsExporter metricsExporter; public BrokerController(BrokerConfig brokerConfig) throws Exception { this.brokerConfig = brokerConfig; @@ -66,6 +67,8 @@ public BrokerController(BrokerConfig brokerConfig) throws Exception { // TODO: Split controller to a separate port ControllerServiceImpl controllerService = MetadataStoreBuilder.build(metadataStore); grpcServer = new GrpcProtocolServer(brokerConfig.proxy(), messagingProcessor, controllerService); + + metricsExporter = new MetricsExporter(brokerConfig, messageStore, (ExtendMessagingProcessor) messagingProcessor); } @Override @@ -74,6 +77,7 @@ public void start() throws Exception { messagingProcessor.start(); grpcServer.start(); metadataStore.registerCurrentNode(brokerConfig.name(), brokerConfig.advertiseAddress(), brokerConfig.instanceId()); + metricsExporter.start(); } @Override @@ -82,5 +86,6 @@ public void shutdown() throws Exception { messagingProcessor.shutdown(); messageStore.shutdown(); metadataStore.close(); + metricsExporter.shutdown(); } } diff --git a/broker/src/main/java/com/automq/rocketmq/broker/MetricsConstant.java b/broker/src/main/java/com/automq/rocketmq/broker/MetricsConstant.java new file mode 100644 index 000000000..0d8a479ad --- /dev/null +++ b/broker/src/main/java/com/automq/rocketmq/broker/MetricsConstant.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.automq.rocketmq.broker; + +public class MetricsConstant { + public static final String LABEL_AGGREGATION = "aggregation"; + public static final String LABEL_NODE_NAME = "node_name"; + public static final String LABEL_INSTANCE_ID = "instance_id"; + public static final String AGGREGATION_DELTA = "delta"; + public static final String AGGREGATION_CUMULATIVE = "cumulative"; +} diff --git a/broker/src/main/java/com/automq/rocketmq/broker/MetricsExporter.java b/broker/src/main/java/com/automq/rocketmq/broker/MetricsExporter.java new file mode 100644 index 000000000..f7af265d4 --- /dev/null +++ b/broker/src/main/java/com/automq/rocketmq/broker/MetricsExporter.java @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.automq.rocketmq.broker; + +import com.automq.rocketmq.common.config.BrokerConfig; +import com.automq.rocketmq.common.config.MetricsConfig; +import com.automq.rocketmq.common.util.Lifecycle; +import com.automq.rocketmq.proxy.metrics.ProxyMetricsManager; +import com.automq.rocketmq.proxy.processor.ExtendMessagingProcessor; +import com.automq.rocketmq.store.api.MessageStore; +import com.google.common.base.Splitter; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.common.AttributesBuilder; +import io.opentelemetry.api.metrics.Meter; +import io.opentelemetry.exporter.logging.LoggingMetricExporter; +import io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporter; +import io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporterBuilder; +import io.opentelemetry.exporter.prometheus.PrometheusHttpServer; +import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.metrics.InstrumentSelector; +import io.opentelemetry.sdk.metrics.InstrumentType; +import io.opentelemetry.sdk.metrics.SdkMeterProvider; +import io.opentelemetry.sdk.metrics.SdkMeterProviderBuilder; +import io.opentelemetry.sdk.metrics.View; +import io.opentelemetry.sdk.metrics.data.AggregationTemporality; +import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader; +import io.opentelemetry.sdk.resources.Resource; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; +import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.common.Pair; +import org.apache.rocketmq.common.metrics.MetricsExporterType; +import org.apache.rocketmq.logging.org.slf4j.Logger; +import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; +import org.slf4j.bridge.SLF4JBridgeHandler; + +import static com.automq.rocketmq.broker.MetricsConstant.AGGREGATION_CUMULATIVE; +import static com.automq.rocketmq.broker.MetricsConstant.AGGREGATION_DELTA; +import static com.automq.rocketmq.broker.MetricsConstant.LABEL_AGGREGATION; +import static com.automq.rocketmq.broker.MetricsConstant.LABEL_INSTANCE_ID; +import static com.automq.rocketmq.broker.MetricsConstant.LABEL_NODE_NAME; + +public class MetricsExporter implements Lifecycle { + private static final Logger LOGGER = LoggerFactory.getLogger(MetricsExporter.class); + + private final BrokerConfig brokerConfig; + private final MetricsConfig metricsConfig; + private final MessageStore messageStore; + private final ExtendMessagingProcessor messagingProcessor; + private final static Map LABEL_MAP = new HashMap<>(); + private OtlpGrpcMetricExporter metricExporter; + private PeriodicMetricReader periodicMetricReader; + private PrometheusHttpServer prometheusHttpServer; + private LoggingMetricExporter loggingMetricExporter; + private Meter brokerMeter; + + public static Supplier attributesBuilderSupplier = Attributes::builder; + + public MetricsExporter(BrokerConfig brokerConfig, MessageStore messageStore, + ExtendMessagingProcessor messagingProcessor) { + this.brokerConfig = brokerConfig; + this.metricsConfig = brokerConfig.metrics(); + this.messageStore = messageStore; + this.messagingProcessor = messagingProcessor; + } + + public static AttributesBuilder newAttributesBuilder() { + AttributesBuilder attributesBuilder; + if (attributesBuilderSupplier == null) { + attributesBuilderSupplier = Attributes::builder; + } + attributesBuilder = attributesBuilderSupplier.get(); + LABEL_MAP.forEach(attributesBuilder::put); + return attributesBuilder; + } + + private boolean checkConfig() { + if (metricsConfig == null) { + return false; + } + MetricsExporterType exporterType = MetricsExporterType.valueOf(metricsConfig.exporterType()); + if (!exporterType.isEnable()) { + return false; + } + + switch (exporterType) { + case OTLP_GRPC: + return StringUtils.isNotBlank(metricsConfig.grpcExporterHeader()); + case PROM: + return true; + case LOG: + return true; + } + return false; + } + + @Override + public void start() { + MetricsExporterType metricsExporterType = MetricsExporterType.valueOf(metricsConfig.exporterType()); + if (metricsExporterType == MetricsExporterType.DISABLE) { + return; + } + + if (!checkConfig()) { + LOGGER.error("check metrics config failed, will not export metrics"); + return; + } + + String labels = metricsConfig.labels(); + if (StringUtils.isNotBlank(labels)) { + List kvPairs = Splitter.on(',').omitEmptyStrings().splitToList(labels); + for (String item : kvPairs) { + String[] split = item.split(":"); + if (split.length != 2) { + LOGGER.warn("metricsLabel is not valid: {}", labels); + continue; + } + LABEL_MAP.put(split[0], split[1]); + } + } + if (metricsConfig.exportInDelta()) { + LABEL_MAP.put(LABEL_AGGREGATION, AGGREGATION_DELTA); + } else { + LABEL_MAP.put(LABEL_AGGREGATION, AGGREGATION_CUMULATIVE); + } + + LABEL_MAP.put(LABEL_NODE_NAME, brokerConfig.name()); + LABEL_MAP.put(LABEL_INSTANCE_ID, brokerConfig.instanceId()); + + SdkMeterProviderBuilder providerBuilder = SdkMeterProvider.builder() + .setResource(Resource.empty()); + + if (metricsExporterType == MetricsExporterType.OTLP_GRPC) { + String endpoint = metricsConfig.grpcExporterTarget(); + if (!endpoint.startsWith("http")) { + endpoint = "https://" + endpoint; + } + OtlpGrpcMetricExporterBuilder metricExporterBuilder = OtlpGrpcMetricExporter.builder() + .setEndpoint(endpoint) + .setTimeout(metricsConfig.grpcExporterTimeOutInMills(), TimeUnit.MILLISECONDS) + .setAggregationTemporalitySelector(type -> { + if (metricsConfig.exportInDelta() && + (type == InstrumentType.COUNTER || type == InstrumentType.OBSERVABLE_COUNTER || type == InstrumentType.HISTOGRAM)) { + return AggregationTemporality.DELTA; + } + return AggregationTemporality.CUMULATIVE; + }); + + String headers = metricsConfig.grpcExporterHeader(); + if (StringUtils.isNotBlank(headers)) { + Map headerMap = new HashMap<>(); + List kvPairs = Splitter.on(',').omitEmptyStrings().splitToList(headers); + for (String item : kvPairs) { + String[] split = item.split(":"); + if (split.length != 2) { + LOGGER.warn("metricsGrpcExporterHeader is not valid: {}", headers); + continue; + } + headerMap.put(split[0], split[1]); + } + headerMap.forEach(metricExporterBuilder::addHeader); + } + + metricExporter = metricExporterBuilder.build(); + + periodicMetricReader = PeriodicMetricReader.builder(metricExporter) + .setInterval(metricsConfig.grpcExporterIntervalInMills(), TimeUnit.MILLISECONDS) + .build(); + + providerBuilder.registerMetricReader(periodicMetricReader); + } else if (metricsExporterType == MetricsExporterType.PROM) { + String promExporterHost = metricsConfig.promExporterHost(); + if (StringUtils.isBlank(promExporterHost)) { + throw new IllegalArgumentException("Config item promExporterHost is blank"); + } + prometheusHttpServer = PrometheusHttpServer.builder() + .setHost(promExporterHost) + .setPort(metricsConfig.promExporterPort()) + .build(); + providerBuilder.registerMetricReader(prometheusHttpServer); + } else if (metricsExporterType == MetricsExporterType.LOG) { + SLF4JBridgeHandler.removeHandlersForRootLogger(); + SLF4JBridgeHandler.install(); + loggingMetricExporter = LoggingMetricExporter.create(metricsConfig.exportInDelta() ? AggregationTemporality.DELTA : AggregationTemporality.CUMULATIVE); + java.util.logging.Logger.getLogger(LoggingMetricExporter.class.getName()).setLevel(java.util.logging.Level.FINEST); + periodicMetricReader = PeriodicMetricReader.builder(loggingMetricExporter) + .setInterval(metricsConfig.loggingExporterIntervalInMills(), TimeUnit.MILLISECONDS) + .build(); + providerBuilder.registerMetricReader(periodicMetricReader); + } + + registerMetricsView(providerBuilder); + + brokerMeter = OpenTelemetrySdk.builder() + .setMeterProvider(providerBuilder.build()) + .build() + .getMeter("rocketmq-meter"); + + initMetrics(); + } + + private void registerMetricsView(SdkMeterProviderBuilder providerBuilder) { + for (Pair selectorViewPair : ProxyMetricsManager.getMetricsView()) { + providerBuilder.registerView(selectorViewPair.getObject1(), selectorViewPair.getObject2()); + } + } + + private void initMetrics() { + ProxyMetricsManager.initMetrics(brokerMeter, MetricsExporter::newAttributesBuilder); + } + + @Override + public void shutdown() { + MetricsExporterType exporterType = MetricsExporterType.valueOf(metricsConfig.exporterType()); + if (exporterType == MetricsExporterType.OTLP_GRPC) { + periodicMetricReader.forceFlush(); + periodicMetricReader.shutdown(); + metricExporter.shutdown(); + } else if (exporterType == MetricsExporterType.PROM) { + prometheusHttpServer.forceFlush(); + prometheusHttpServer.shutdown(); + } else if (exporterType == MetricsExporterType.LOG) { + periodicMetricReader.forceFlush(); + periodicMetricReader.shutdown(); + loggingMetricExporter.shutdown(); + } + } +} + diff --git a/common/src/main/java/com/automq/rocketmq/common/config/BrokerConfig.java b/common/src/main/java/com/automq/rocketmq/common/config/BrokerConfig.java index cc47f62ac..17e6881ea 100644 --- a/common/src/main/java/com/automq/rocketmq/common/config/BrokerConfig.java +++ b/common/src/main/java/com/automq/rocketmq/common/config/BrokerConfig.java @@ -49,6 +49,7 @@ public class BrokerConfig implements ControllerConfig { */ private String advertiseAddress; + private final MetricsConfig metrics; private final ProxyConfig proxy; private final StoreConfig store; private final S3StreamConfig s3Stream; @@ -56,6 +57,7 @@ public class BrokerConfig implements ControllerConfig { private final DatabaseConfig db; public BrokerConfig() { + this.metrics = new MetricsConfig(); this.proxy = new ProxyConfig(); this.store = new StoreConfig(); this.s3Stream = new S3StreamConfig(); @@ -115,6 +117,10 @@ public String dbPassword() { return this.db.getPassword(); } + public MetricsConfig metrics() { + return metrics; + } + public ProxyConfig proxy() { return proxy; } @@ -127,7 +133,7 @@ public S3StreamConfig s3Stream() { return s3Stream; } - public DatabaseConfig getDb() { + public DatabaseConfig db() { return db; } diff --git a/common/src/main/java/com/automq/rocketmq/common/config/MetricsConfig.java b/common/src/main/java/com/automq/rocketmq/common/config/MetricsConfig.java new file mode 100644 index 000000000..bea01084d --- /dev/null +++ b/common/src/main/java/com/automq/rocketmq/common/config/MetricsConfig.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.automq.rocketmq.common.config; + +@SuppressWarnings({"FieldMayBeFinal", "FieldCanBeLocal"}) +public class MetricsConfig { + private String exporterType = "DISABLE"; + + private String grpcExporterTarget = ""; + private String grpcExporterHeader = ""; + private long grpcExporterTimeOutInMills = 3 * 1000; + private long grpcExporterIntervalInMills = 60 * 1000; + private long loggingExporterIntervalInMills = 10 * 1000; + + private int promExporterPort = 5557; + private String promExporterHost = "localhost"; + + // Label pairs in CSV. Each label follows pattern of Key:Value. eg: instance_id:xxx,uid:xxx + private String labels = ""; + + private boolean exportInDelta = false; + + public String exporterType() { + return exporterType; + } + + public String grpcExporterTarget() { + return grpcExporterTarget; + } + + public String grpcExporterHeader() { + return grpcExporterHeader; + } + + public long grpcExporterTimeOutInMills() { + return grpcExporterTimeOutInMills; + } + + public long grpcExporterIntervalInMills() { + return grpcExporterIntervalInMills; + } + + public long loggingExporterIntervalInMills() { + return loggingExporterIntervalInMills; + } + + public int promExporterPort() { + return promExporterPort; + } + + public String promExporterHost() { + return promExporterHost; + } + + public String labels() { + return labels; + } + + public boolean exportInDelta() { + return exportInDelta; + } +} diff --git a/common/src/main/java/com/automq/rocketmq/common/model/MetricsExporterType.java b/common/src/main/java/com/automq/rocketmq/common/model/MetricsExporterType.java new file mode 100644 index 000000000..148e549eb --- /dev/null +++ b/common/src/main/java/com/automq/rocketmq/common/model/MetricsExporterType.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.automq.rocketmq.common.model; + +public enum MetricsExporterType { + DISABLE(0), + OTLP_GRPC(1), + PROM(2), + LOG(3); + + private final int value; + + MetricsExporterType(int value) { + this.value = value; + } + + public int getValue() { + return value; + } + + public static MetricsExporterType valueOf(int value) { + switch (value) { + case 1: + return OTLP_GRPC; + case 2: + return PROM; + case 3: + return LOG; + default: + return DISABLE; + } + } + + public boolean isEnable() { + return this.value > 0; + } +} diff --git a/proxy/src/main/java/com/automq/rocketmq/proxy/grpc/ExtendGrpcMessagingApplication.java b/proxy/src/main/java/com/automq/rocketmq/proxy/grpc/ExtendGrpcMessagingApplication.java index f018f50cf..571750ed3 100644 --- a/proxy/src/main/java/com/automq/rocketmq/proxy/grpc/ExtendGrpcMessagingApplication.java +++ b/proxy/src/main/java/com/automq/rocketmq/proxy/grpc/ExtendGrpcMessagingApplication.java @@ -17,11 +17,59 @@ package com.automq.rocketmq.proxy.grpc; +import apache.rocketmq.v2.Status; +import com.automq.rocketmq.proxy.metrics.ProxyMetricsManager; +import com.automq.rocketmq.proxy.model.ProxyContextExt; +import io.grpc.Context; +import io.grpc.Metadata; +import io.grpc.stub.StreamObserver; +import java.lang.reflect.Method; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import org.apache.rocketmq.proxy.common.ProxyContext; +import org.apache.rocketmq.proxy.grpc.interceptor.InterceptorConstants; import org.apache.rocketmq.proxy.grpc.v2.GrpcMessagingApplication; import org.apache.rocketmq.proxy.grpc.v2.GrpcMessingActivity; +import org.apache.rocketmq.proxy.processor.channel.ChannelProtocolType; public class ExtendGrpcMessagingApplication extends GrpcMessagingApplication { public ExtendGrpcMessagingApplication(GrpcMessingActivity grpcMessingActivity) { super(grpcMessingActivity); } + + @Override + protected ProxyContext createContext() { + Context ctx = Context.current(); + Metadata headers = InterceptorConstants.METADATA.get(ctx); + ProxyContext context = new ProxyContextExt() + .setLocalAddress(getDefaultStringMetadataInfo(headers, InterceptorConstants.LOCAL_ADDRESS)) + .setRemoteAddress(getDefaultStringMetadataInfo(headers, InterceptorConstants.REMOTE_ADDRESS)) + .setClientID(getDefaultStringMetadataInfo(headers, InterceptorConstants.CLIENT_ID)) + .setProtocolType(ChannelProtocolType.GRPC_V2.getName()) + .setLanguage(getDefaultStringMetadataInfo(headers, InterceptorConstants.LANGUAGE)) + .setClientVersion(getDefaultStringMetadataInfo(headers, InterceptorConstants.CLIENT_VERSION)) + .setAction(getDefaultStringMetadataInfo(headers, InterceptorConstants.SIMPLE_RPC_NAME)); + if (ctx.getDeadline() != null) { + context.setRemainingMs(ctx.getDeadline().timeRemaining(TimeUnit.MILLISECONDS)); + } + return context; + } + + private String getResponseStatus(T response) { + try { + Method getStatus = response.getClass().getDeclaredMethod("getStatus"); + Status status = (Status) getStatus.invoke(response); + return status.getMessage(); + } catch (Exception e) { + return "unknown"; + } + } + + @Override + protected void writeResponse(ProxyContext context, V request, T response, StreamObserver responseObserver, + Throwable t, Function errorResponseCreator) { + ProxyMetricsManager.recordRpcLatency(context.getProtocolType(), context.getAction(), + getResponseStatus(response), ((ProxyContextExt) context).getElapsedTimeNanos()); + super.writeResponse(context, request, response, responseObserver, t, errorResponseCreator); + } } diff --git a/proxy/src/main/java/com/automq/rocketmq/proxy/metrics/ProxyMetricsManager.java b/proxy/src/main/java/com/automq/rocketmq/proxy/metrics/ProxyMetricsManager.java new file mode 100644 index 000000000..970bf6ce7 --- /dev/null +++ b/proxy/src/main/java/com/automq/rocketmq/proxy/metrics/ProxyMetricsManager.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.automq.rocketmq.proxy.metrics; + +import com.google.common.collect.Lists; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.common.AttributesBuilder; +import io.opentelemetry.api.metrics.LongHistogram; +import io.opentelemetry.api.metrics.Meter; +import io.opentelemetry.sdk.metrics.Aggregation; +import io.opentelemetry.sdk.metrics.InstrumentSelector; +import io.opentelemetry.sdk.metrics.InstrumentType; +import io.opentelemetry.sdk.metrics.View; +import java.time.Duration; +import java.util.Arrays; +import java.util.List; +import java.util.function.Supplier; +import org.apache.rocketmq.common.Pair; +import org.apache.rocketmq.common.metrics.NopLongHistogram; + +public class ProxyMetricsManager { + public static final String LABEL_PROTOCOL_TYPE = "protocol_type"; + public static final String LABEL_ACTION = "action"; + public static final String LABEL_RESULT = "result"; + + public static final String HISTOGRAM_RPC_LATENCY = "rocketmq_rpc_latency"; + private static LongHistogram rpcLatency = new NopLongHistogram(); + + private static Supplier attributesBuilderSupplier; + + public static AttributesBuilder newAttributesBuilder() { + if (attributesBuilderSupplier == null) { + return Attributes.builder(); + } + return attributesBuilderSupplier.get(); + } + + public static void initMetrics(Meter meter, Supplier attributesBuilderSupplier) { + ProxyMetricsManager.attributesBuilderSupplier = attributesBuilderSupplier; + rpcLatency = meter.histogramBuilder(HISTOGRAM_RPC_LATENCY) + .setDescription("Rpc latency") + .setUnit("milliseconds") + .ofLongs() + .build(); + } + + public static List> getMetricsView() { + List rpcCostTimeBuckets = Arrays.asList( + (double) Duration.ofMillis(1).toMillis(), + (double) Duration.ofMillis(3).toMillis(), + (double) Duration.ofMillis(5).toMillis(), + (double) Duration.ofMillis(7).toMillis(), + (double) Duration.ofMillis(10).toMillis(), + (double) Duration.ofMillis(100).toMillis(), + (double) Duration.ofSeconds(1).toMillis(), + (double) Duration.ofSeconds(2).toMillis(), + (double) Duration.ofSeconds(3).toMillis() + ); + InstrumentSelector selector = InstrumentSelector.builder() + .setType(InstrumentType.HISTOGRAM) + .setName(HISTOGRAM_RPC_LATENCY) + .build(); + View view = View.builder() + .setAggregation(Aggregation.explicitBucketHistogram(rpcCostTimeBuckets)) + .build(); + return Lists.newArrayList(new Pair<>(selector, view)); + } + + public static void recordRpcLatency(String protocolType, String action, String result, long costTimeMillis) { + AttributesBuilder attributesBuilder = newAttributesBuilder() + .put(LABEL_PROTOCOL_TYPE, protocolType) + .put(LABEL_ACTION, action) + .put(LABEL_RESULT, result); + rpcLatency.record(costTimeMillis, attributesBuilder.build()); + } +} diff --git a/proxy/src/main/java/com/automq/rocketmq/proxy/model/ProxyContextExt.java b/proxy/src/main/java/com/automq/rocketmq/proxy/model/ProxyContextExt.java new file mode 100644 index 000000000..510d90987 --- /dev/null +++ b/proxy/src/main/java/com/automq/rocketmq/proxy/model/ProxyContextExt.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.automq.rocketmq.proxy.model; + +import com.google.common.base.Stopwatch; +import org.apache.rocketmq.proxy.common.ProxyContext; + +public class ProxyContextExt extends ProxyContext { + private final Stopwatch stopwatch = Stopwatch.createStarted(); + + public long getElapsedTimeNanos() { + return stopwatch.elapsed().toNanos(); + } +}