Skip to content

Commit

Permalink
PIP-223
Browse files Browse the repository at this point in the history
  • Loading branch information
dao-jun committed Dec 20, 2023
1 parent 69a45a1 commit 453762f
Show file tree
Hide file tree
Showing 6 changed files with 226 additions and 0 deletions.
3 changes: 3 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -1533,6 +1533,9 @@ authenticateMetricsEndpoint=false
# Enable topic level metrics
exposeTopicLevelMetricsInPrometheus=true

# Enable expose per rest endpoint metrics of the broker.
exposePerRestEndpointMetricsInPrometheus=false

# Enable consumer level metrics. default is false
exposeConsumerLevelMetricsInPrometheus=false

Expand Down
3 changes: 3 additions & 0 deletions conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -968,6 +968,9 @@ webSocketMaxTextFrameSize=1048576
# Enable topic level metrics
exposeTopicLevelMetricsInPrometheus=true

# Enable expose per rest endpoint metrics of the broker.
exposePerRestEndpointMetricsInPrometheus=false

# Time in milliseconds that metrics endpoint would time out. Default is 30s.
# Increase it if there are a lot of topics to expose topic-level metrics.
# Set it to 0 to disable timeout.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2962,6 +2962,11 @@ The delayed message index time step(in seconds) in per bucket snapshot segment,
)
private boolean exposeBundlesMetricsInPrometheus = false;

@FieldContext(
category = CATEGORY_METRICS,
doc = "Enable expose per rest endpoint metrics of the broker.")
private boolean exposePerRestEndpointMetricsInPrometheus = false;

/**** --- Functions. --- ****/
@FieldContext(
category = CATEGORY_FUNCTIONS,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.web;

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import io.prometheus.client.Counter;
import io.prometheus.client.Histogram;
import java.io.IOException;
import java.util.Stack;
import javax.ws.rs.container.ContainerRequestContext;
import javax.ws.rs.container.ContainerRequestFilter;
import javax.ws.rs.container.ContainerResponseContext;
import javax.ws.rs.container.ContainerResponseFilter;
import javax.ws.rs.core.Response;
import org.glassfish.jersey.server.internal.routing.UriRoutingContext;
import org.glassfish.jersey.server.model.Resource;
import org.glassfish.jersey.server.model.ResourceMethod;
import org.jetbrains.annotations.NotNull;

public class RestEndpointMetricsFilter implements ContainerResponseFilter, ContainerRequestFilter {
private static final LoadingCache<ResourceMethod, String> CACHE = CacheBuilder
.newBuilder()
.maximumSize(100)
.build(new CacheLoader<>() {
@Override
public @NotNull String load(@NotNull ResourceMethod method) throws Exception {
return getRestPath(method);
}
});

private static final Histogram LATENCY = Histogram
.build("pulsar_broker_rest_endpoint_latency", "-")
.unit("ms")
.labelNames("path", "method")
.buckets(10D, 20D, 50D, 100D, 200D, 500D, 1000D, 2000D)
.register();
private static final Counter FAILED = Counter
.build("pulsar_broker_rest_endpoint_failed", "-")
.labelNames("path", "method", "code")
.register();

private static final String REQUEST_START_TIME = "requestStartTime";

@Override
public void filter(ContainerRequestContext req, ContainerResponseContext resp) throws IOException {
String path;
try {
UriRoutingContext info = (UriRoutingContext) req.getUriInfo();
ResourceMethod rm = info.getMatchedResourceMethod();
path = CACHE.get(rm);
} catch (Throwable ex) {
path = "UNKNOWN";
}

String method = req.getMethod();
Response.StatusType status = resp.getStatusInfo();
if (status.getStatusCode() < Response.Status.BAD_REQUEST.getStatusCode()) {
long start = req.getProperty(REQUEST_START_TIME) == null
? System.currentTimeMillis() : (long) req.getProperty(REQUEST_START_TIME);
LATENCY.labels(path, method).observe(System.currentTimeMillis() - start);
} else {
FAILED.labels(path, method, String.valueOf(status.getStatusCode())).inc();
}
}

@Override
public void filter(ContainerRequestContext req) throws IOException {
// Set the request start time into properties.
req.setProperty(REQUEST_START_TIME, System.currentTimeMillis());
}

private static String getRestPath(ResourceMethod method) {
try {
StringBuilder fullPath = new StringBuilder();
Stack<String> pathStack = new Stack<>();
Resource parent = method.getParent();

while (true) {
String path = parent.getPath();
parent = parent.getParent();
if (parent == null) {
if (!path.endsWith("/") && !pathStack.peek().startsWith("/")) {
pathStack.push("/");
}
pathStack.push(path);
break;
}
pathStack.push(path);

}
while (!pathStack.isEmpty()) {
fullPath.append(pathStack.pop().replace("{", ":").replace("}", ""));
}
return fullPath.toString();
} catch (Exception ex) {
return "UNKNOWN";
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,9 @@ private void addResourceServlet(String basePath, boolean requiresAuthentication,
config.register(JsonMapperProvider.class);
}
config.register(MultiPartFeature.class);
if (pulsar.getConfiguration().isExposePerRestEndpointMetricsInPrometheus()) {
config.register(RestEndpointMetricsFilter.class);
}
ServletHolder servletHolder = new ServletHolder(new ServletContainer(config));
servletHolder.setAsyncSupported(true);
addServlet(basePath, servletHolder, requiresAuthentication, attributeMap);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.stats;

import com.google.common.collect.Multimap;
import java.io.ByteArrayOutputStream;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.Set;
import java.util.UUID;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups = "broker")
public class BrokerRestEndpointMetricsTest extends BrokerTestBase {

@BeforeMethod(alwaysRun = true)
@Override
protected void setup() throws Exception {
conf.setExposePerRestEndpointMetricsInPrometheus(true);
baseSetup();
}

@BeforeMethod(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
}


@Test
public void testMetrics() throws Exception {
admin.tenants().createTenant("test", TenantInfo.builder().allowedClusters(Set.of("test")).build());
admin.namespaces().createNamespace("test/test");
String topic = "persistent://test/test/test_" + UUID.randomUUID();
admin.topics().createNonPartitionedTopic(topic);
admin.topics().getList("test/test");

// This request will be failed
try {
admin.topics().createNonPartitionedTopic("persistent://test1/test1/test1");
} catch (Exception e) {
// ignore
}

admin.topics().delete(topic, true);
admin.namespaces().deleteNamespace("test/test");
admin.tenants().deleteTenant("test");

ByteArrayOutputStream output = new ByteArrayOutputStream();
PrometheusMetricsGenerator.generate(pulsar, false, false, false, false, output);

String metricsStr = output.toString(StandardCharsets.UTF_8);
Multimap<String, PrometheusMetricsTest.Metric> metricsMap = PrometheusMetricsTest.parseMetrics(metricsStr);

Collection<PrometheusMetricsTest.Metric> latency = metricsMap.get("pulsar_broker_rest_endpoint_latency_ms_sum");
Collection<PrometheusMetricsTest.Metric> failed = metricsMap.get("pulsar_broker_rest_endpoint_failed_total");

Assert.assertTrue(latency.size() > 0);
Assert.assertTrue(failed.size() > 0);

for (PrometheusMetricsTest.Metric m : latency) {
Assert.assertNotNull(m.tags.get("cluster"));
Assert.assertNotNull(m.tags.get("path"));
Assert.assertNotNull(m.tags.get("method"));
}

for (PrometheusMetricsTest.Metric m : failed) {
Assert.assertNotNull(m.tags.get("cluster"));
Assert.assertNotNull(m.tags.get("path"));
Assert.assertNotNull(m.tags.get("method"));
Assert.assertNotNull(m.tags.get("code"));
}
}
}

0 comments on commit 453762f

Please sign in to comment.