Skip to content

Commit

Permalink
[fix][broker] Optimize /metrics, fix unbounded request queue issue an…
Browse files Browse the repository at this point in the history
…d fix race conditions in metricsBufferResponse mode (apache#22494)

(cherry picked from commit 7009071)
  • Loading branch information
lhotari committed Apr 15, 2024
1 parent ef20f2b commit 734c3f8
Show file tree
Hide file tree
Showing 25 changed files with 726 additions and 517 deletions.
6 changes: 4 additions & 2 deletions conf/proxy.conf
Original file line number Diff line number Diff line change
Expand Up @@ -361,5 +361,7 @@ zooKeeperCacheExpirySeconds=-1
enableProxyStatsEndpoints=true
# Whether the '/metrics' endpoint requires authentication. Defaults to true
authenticateMetricsEndpoint=true
# Enable cache metrics data, default value is false
metricsBufferResponse=false
# Time in milliseconds that metrics endpoint would time out. Default is 30s.
# Set it to 0 to disable timeout.
metricsServletTimeoutMs=30000

Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public static void generateSystemMetrics(SimpleTextOutputStream stream, String c
}
for (int j = 0; j < sample.labelNames.size(); j++) {
String labelValue = sample.labelValues.get(j);
if (labelValue != null) {
if (labelValue != null && labelValue.indexOf('"') > -1) {
labelValue = labelValue.replace("\"", "\\\"");
}
if (j > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,85 +18,153 @@
*/
package org.apache.pulsar.broker.stats.prometheus;

import static org.apache.bookkeeper.util.SafeRunnable.safeRun;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.EOFException;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.servlet.AsyncContext;
import javax.servlet.AsyncEvent;
import javax.servlet.AsyncListener;
import javax.servlet.ServletException;
import javax.servlet.ServletOutputStream;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PrometheusMetricsServlet extends HttpServlet {

private static final long serialVersionUID = 1L;
private static final int HTTP_STATUS_OK_200 = 200;
private static final int HTTP_STATUS_INTERNAL_SERVER_ERROR_500 = 500;

private final long metricsServletTimeoutMs;
private final String cluster;
static final int HTTP_STATUS_OK_200 = 200;
static final int HTTP_STATUS_INTERNAL_SERVER_ERROR_500 = 500;
protected final long metricsServletTimeoutMs;
protected final String cluster;
protected List<PrometheusRawMetricsProvider> metricsProviders;

private ExecutorService executor = null;
protected ExecutorService executor = null;
protected final int executorMaxThreads;

public PrometheusMetricsServlet(long metricsServletTimeoutMs, String cluster) {
this(metricsServletTimeoutMs, cluster, 1);
}

public PrometheusMetricsServlet(long metricsServletTimeoutMs, String cluster, int executorMaxThreads) {
this.metricsServletTimeoutMs = metricsServletTimeoutMs;
this.cluster = cluster;
this.executorMaxThreads = executorMaxThreads;
}

@Override
public void init() throws ServletException {
executor = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("prometheus-stats"));
if (executorMaxThreads > 0) {
executor =
Executors.newScheduledThreadPool(executorMaxThreads, new DefaultThreadFactory("prometheus-stats"));
}
}

@Override
protected void doGet(HttpServletRequest request, HttpServletResponse response) {
AsyncContext context = request.startAsync();
context.setTimeout(metricsServletTimeoutMs);
executor.execute(safeRun(() -> {
long start = System.currentTimeMillis();
HttpServletResponse res = (HttpServletResponse) context.getResponse();
try {
res.setStatus(HTTP_STATUS_OK_200);
res.setContentType("text/plain;charset=utf-8");
generateMetrics(cluster, res.getOutputStream());
} catch (Exception e) {
long end = System.currentTimeMillis();
long time = end - start;
if (e instanceof EOFException) {
// NO STACKTRACE
log.error("Failed to send metrics, "
+ "likely the client or this server closed "
+ "the connection due to a timeout ({} ms elapsed): {}", time, e + "");
} else {
log.error("Failed to generate prometheus stats, {} ms elapsed", time, e);
// set hard timeout to 2 * timeout
if (metricsServletTimeoutMs > 0) {
context.setTimeout(metricsServletTimeoutMs * 2);
}
long startNanos = System.nanoTime();
AtomicBoolean taskStarted = new AtomicBoolean(false);
Future<?> future = executor.submit(() -> {
taskStarted.set(true);
long elapsedNanos = System.nanoTime() - startNanos;
// check if the request has been timed out, implement a soft timeout
// so that response writing can continue to up to 2 * timeout
if (metricsServletTimeoutMs > 0 && elapsedNanos > TimeUnit.MILLISECONDS.toNanos(metricsServletTimeoutMs)) {
log.warn("Prometheus metrics request was too long in queue ({}ms). Skipping sending metrics.",
TimeUnit.NANOSECONDS.toMillis(elapsedNanos));
if (!response.isCommitted()) {
response.setStatus(HTTP_STATUS_INTERNAL_SERVER_ERROR_500);
}
res.setStatus(HTTP_STATUS_INTERNAL_SERVER_ERROR_500);
} finally {
long end = System.currentTimeMillis();
long time = end - start;
try {
context.complete();
} catch (IllegalStateException e) {
// this happens when metricsServletTimeoutMs expires
// java.lang.IllegalStateException: AsyncContext completed and/or Request lifecycle recycled
log.error("Failed to generate prometheus stats, "
+ "this is likely due to metricsServletTimeoutMs: {} ms elapsed: {}", time, e + "");
context.complete();
return;
}
handleAsyncMetricsRequest(context);
});
context.addListener(new AsyncListener() {
@Override
public void onComplete(AsyncEvent asyncEvent) throws IOException {
if (!taskStarted.get()) {
future.cancel(false);
}
}

@Override
public void onTimeout(AsyncEvent asyncEvent) throws IOException {
if (!taskStarted.get()) {
future.cancel(false);
}
log.warn("Prometheus metrics request timed out");
HttpServletResponse res = (HttpServletResponse) context.getResponse();
if (!res.isCommitted()) {
res.setStatus(HTTP_STATUS_INTERNAL_SERVER_ERROR_500);
}
context.complete();
}

@Override
public void onError(AsyncEvent asyncEvent) throws IOException {
if (!taskStarted.get()) {
future.cancel(false);
}
}
}));

@Override
public void onStartAsync(AsyncEvent asyncEvent) throws IOException {

}
});

}

private void handleAsyncMetricsRequest(AsyncContext context) {
long start = System.currentTimeMillis();
HttpServletResponse res = (HttpServletResponse) context.getResponse();
try {
generateMetricsSynchronously(res);
} catch (Exception e) {
long end = System.currentTimeMillis();
long time = end - start;
if (e instanceof EOFException) {
// NO STACKTRACE
log.error("Failed to send metrics, "
+ "likely the client or this server closed "
+ "the connection due to a timeout ({} ms elapsed): {}", time, e + "");
} else {
log.error("Failed to generate prometheus stats, {} ms elapsed", time, e);
}
if (!res.isCommitted()) {
res.setStatus(HTTP_STATUS_INTERNAL_SERVER_ERROR_500);
}
} finally {
long end = System.currentTimeMillis();
long time = end - start;
try {
context.complete();
} catch (IllegalStateException e) {
// this happens when metricsServletTimeoutMs expires
// java.lang.IllegalStateException: AsyncContext completed and/or Request lifecycle recycled
log.error("Failed to generate prometheus stats, "
+ "this is likely due to metricsServletTimeoutMs: {} ms elapsed: {}", time, e + "");
}
}
}

protected void generateMetrics(String cluster, ServletOutputStream outputStream) throws IOException {
PrometheusMetricsGeneratorUtils.generate(cluster, outputStream, metricsProviders);
private void generateMetricsSynchronously(HttpServletResponse res) throws IOException {
res.setStatus(HTTP_STATUS_OK_200);
res.setContentType("text/plain;charset=utf-8");
PrometheusMetricsGeneratorUtils.generate(cluster, res.getOutputStream(), metricsProviders);
}

@Override
Expand Down

This file was deleted.

This file was deleted.

Loading

0 comments on commit 734c3f8

Please sign in to comment.