diff --git a/docs/content.zh/docs/deployment/metric_reporters.md b/docs/content.zh/docs/deployment/metric_reporters.md index 23f1d4b674ee2..795317720e8dc 100644 --- a/docs/content.zh/docs/deployment/metric_reporters.md +++ b/docs/content.zh/docs/deployment/metric_reporters.md @@ -225,6 +225,7 @@ metrics.reporter.promgateway.randomJobNameSuffix: true metrics.reporter.promgateway.deleteOnShutdown: false metrics.reporter.promgateway.groupingKey: k1=v1;k2=v2 metrics.reporter.promgateway.interval: 60 SECONDS +metrics.reporter.promgateway.allowList: metricA,metricB ``` PrometheusPushGatewayReporter 发送器将运行指标发送给 [Pushgateway](https://github.com/prometheus/pushgateway),Prometheus 再从 Pushgateway 拉取、解析运行指标。 diff --git a/docs/content/docs/deployment/metric_reporters.md b/docs/content/docs/deployment/metric_reporters.md index 70d3aee87fb58..cab94d07530b1 100644 --- a/docs/content/docs/deployment/metric_reporters.md +++ b/docs/content/docs/deployment/metric_reporters.md @@ -213,6 +213,7 @@ metrics.reporter.promgateway.randomJobNameSuffix: true metrics.reporter.promgateway.deleteOnShutdown: false metrics.reporter.promgateway.groupingKey: k1=v1;k2=v2 metrics.reporter.promgateway.interval: 60 SECONDS +metrics.reporter.promgateway.allowList: metricA,metricB ``` The PrometheusPushGatewayReporter pushes metrics to a [Pushgateway](https://github.com/prometheus/pushgateway), which can be scraped by Prometheus. diff --git a/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/AbstractPrometheusReporter.java b/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/AbstractPrometheusReporter.java index 7e2dab464dcd1..12dd185874e3a 100644 --- a/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/AbstractPrometheusReporter.java +++ b/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/AbstractPrometheusReporter.java @@ -47,6 +47,7 @@ import java.util.Map; import java.util.regex.Pattern; +import static org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterOptions.ALLOW_LIST; import static org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterOptions.FILTER_LABEL_VALUE_CHARACTER; /** base prometheus reporter for prometheus metrics. */ @@ -61,10 +62,13 @@ public abstract class AbstractPrometheusReporter implements MetricReporter { @VisibleForTesting static final char SCOPE_SEPARATOR = '_'; @VisibleForTesting static final String SCOPE_PREFIX = "flink" + SCOPE_SEPARATOR; + @VisibleForTesting static final char DEFAULT_SCOPE_SEPARATOR = '.'; private final Map> collectorsWithCountByMetricName = new HashMap<>(); + private final List allowLists = new ArrayList<>(); + @VisibleForTesting static String replaceInvalidChars(final String input) { // https://prometheus.io/docs/instrumenting/writing_exporters/ @@ -87,6 +91,13 @@ public void open(MetricConfig config) { if (!filterLabelValueCharacters) { labelValueCharactersFilter = input -> input; } + + String allowList = config.getString(ALLOW_LIST.key(), ALLOW_LIST.defaultValue()); + for (String ele : allowList.split(",")) { + if (!ele.trim().isEmpty()) { + allowLists.add(ele.trim()); + } + } } @Override @@ -97,6 +108,11 @@ public void close() { @Override public void notifyOfAddedMetric( final Metric metric, final String metricName, final MetricGroup group) { + String metricScope = LogicalScopeProvider.castFrom(group).getLogicalScope(CHARACTER_FILTER); + if (!allowLists.isEmpty() + && !isInAllowList(metricScope + DEFAULT_SCOPE_SEPARATOR + metricName, allowLists)) { + return; + } List dimensionKeys = new LinkedList<>(); List dimensionValues = new LinkedList<>(); @@ -390,4 +406,13 @@ private static List addToList(List list, String element) { private static String[] toArray(List list) { return list.toArray(new String[list.size()]); } + + public static boolean isInAllowList(String metricScopeName, List patternList) { + for (String pattern : patternList) { + if (metricScopeName.contains(pattern)) { + return true; + } + } + return false; + } } diff --git a/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporterOptions.java b/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporterOptions.java index 84404b5ef7b3d..2059242157dc9 100644 --- a/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporterOptions.java +++ b/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporterOptions.java @@ -98,4 +98,11 @@ public class PrometheusPushGatewayReporterOptions { "https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels", "Prometheus requirements")) .build()); + + public static final ConfigOption ALLOW_LIST = + ConfigOptions.key("allowList") + .stringType() + .defaultValue("") + .withDescription( + "The allow-list of metric name. The default is to report all metrics"); } diff --git a/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java b/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java index 8491dc0b834bc..8759ab0db071b 100644 --- a/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java +++ b/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java @@ -23,6 +23,7 @@ import org.apache.flink.metrics.Histogram; import org.apache.flink.metrics.Meter; import org.apache.flink.metrics.Metric; +import org.apache.flink.metrics.MetricConfig; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.metrics.SimpleCounter; import org.apache.flink.metrics.util.TestHistogram; @@ -43,6 +44,8 @@ import java.util.Map; import java.util.NoSuchElementException; +import static org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterOptions.ALLOW_LIST; +import static org.apache.flink.metrics.prometheus.PrometheusReporterFactory.ARG_PORT; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -243,6 +246,33 @@ void canStartTwoReportersWhenUsingPortRange() { new PrometheusReporter(portRange).close(); } + @Test + public void testMetricFilter() throws Exception { + MetricConfig metricConfig = new MetricConfig(); + metricConfig.setProperty(ALLOW_LIST.key(), LOGICAL_SCOPE + ".metricName1"); + metricConfig.setProperty(ARG_PORT, portRangeProvider.nextRange()); + + PrometheusReporterFactory prometheusReporterFactory = new PrometheusReporterFactory(); + PrometheusReporter metricReporter = + prometheusReporterFactory.createMetricReporter(metricConfig); + metricReporter.open(metricConfig); + + final Map variables = new HashMap<>(metricGroup.getAllVariables()); + final MetricGroup metricGroup2 = TestUtils.createTestMetricGroup(LOGICAL_SCOPE, variables); + + Counter metric1 = new SimpleCounter(); + metric1.inc(7); + Counter metric2 = new SimpleCounter(); + metric2.inc(2); + metricReporter.notifyOfAddedMetric(metric1, "metricName1", metricGroup2); + metricReporter.notifyOfAddedMetric(metric2, "metricName2", metricGroup2); + + String response = pollMetrics(metricReporter.getPort()).body(); + + assertThat(response).contains("metricName1"); + assertThat(response).doesNotContain("metricName2"); + } + private String addMetricAndPollResponse(Metric metric, String metricName) throws IOException, InterruptedException { reporter.notifyOfAddedMetric(metric, metricName, metricGroup);