Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import java.util.regex.Pattern;

import static org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterOptions.FILTER_LABEL_VALUE_CHARACTER;
import static org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterOptions.WHITE_LIST;

/** base prometheus reporter for prometheus metrics. */
@PublicEvolving
Expand All @@ -61,10 +62,13 @@ public abstract class AbstractPrometheusReporter implements MetricReporter {

@VisibleForTesting static final char SCOPE_SEPARATOR = '_';
@VisibleForTesting static final String SCOPE_PREFIX = "flink" + SCOPE_SEPARATOR;
@VisibleForTesting static final char DEFAULT_SCOPE_SEPARATOR = '.';

private final Map<String, AbstractMap.SimpleImmutableEntry<Collector, Integer>>
collectorsWithCountByMetricName = new HashMap<>();

private final List<String> whiteLists = new ArrayList<>();
Copy link
Contributor

@davidradl davidradl Sep 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we avoid the use of whitelists. google results shows The terms "allow list" or "approved list" are widely considered acceptable alternatives to "whitelist," which can carry pejorative or exclusionary connotations.


@VisibleForTesting
static String replaceInvalidChars(final String input) {
// https://prometheus.io/docs/instrumenting/writing_exporters/
Expand All @@ -87,6 +91,13 @@ public void open(MetricConfig config) {
if (!filterLabelValueCharacters) {
labelValueCharactersFilter = input -> input;
}

String whiteList = config.getString(WHITE_LIST.key(), WHITE_LIST.defaultValue());
for (String ele : whiteList.split(",")) {
if (!ele.trim().isEmpty()) {
whiteLists.add(ele.trim());
}
}
}

@Override
Expand All @@ -97,6 +108,11 @@ public void close() {
@Override
public void notifyOfAddedMetric(
final Metric metric, final String metricName, final MetricGroup group) {
String metricScope = LogicalScopeProvider.castFrom(group).getLogicalScope(CHARACTER_FILTER);
if (!whiteLists.isEmpty()
&& !isInWhiteList(metricScope + DEFAULT_SCOPE_SEPARATOR + metricName, whiteLists)) {
return;
}

List<String> dimensionKeys = new LinkedList<>();
List<String> dimensionValues = new LinkedList<>();
Expand Down Expand Up @@ -390,4 +406,13 @@ private static List<String> addToList(List<String> list, String element) {
private static String[] toArray(List<String> list) {
return list.toArray(new String[list.size()]);
}

public static boolean isInWhiteList(String metricScopeName, List<String> patternList) {
for (String pattern : patternList) {
if (metricScopeName.contains(pattern)) {
return true;
}
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -98,4 +98,11 @@ public class PrometheusPushGatewayReporterOptions {
"https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels",
"Prometheus requirements"))
.build());

public static final ConfigOption<String> WHITE_LIST =
ConfigOptions.key("whiteList")
.stringType()
.defaultValue("")
.withDescription(
"The white-list of metric name. The default is to report all metrics");
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.metrics.Histogram;
import org.apache.flink.metrics.Meter;
import org.apache.flink.metrics.Metric;
import org.apache.flink.metrics.MetricConfig;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.metrics.util.TestHistogram;
Expand All @@ -43,6 +44,8 @@
import java.util.Map;
import java.util.NoSuchElementException;

import static org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterOptions.WHITE_LIST;
import static org.apache.flink.metrics.prometheus.PrometheusReporterFactory.ARG_PORT;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

Expand Down Expand Up @@ -243,6 +246,33 @@ void canStartTwoReportersWhenUsingPortRange() {
new PrometheusReporter(portRange).close();
}

@Test
public void testMetricFilter() throws Exception {
MetricConfig metricConfig = new MetricConfig();
metricConfig.setProperty(WHITE_LIST.key(), LOGICAL_SCOPE + ".metricName1");
metricConfig.setProperty(ARG_PORT, portRangeProvider.nextRange());

PrometheusReporterFactory prometheusReporterFactory = new PrometheusReporterFactory();
PrometheusReporter metricReporter =
prometheusReporterFactory.createMetricReporter(metricConfig);
metricReporter.open(metricConfig);

final Map<String, String> variables = new HashMap<>(metricGroup.getAllVariables());
final MetricGroup metricGroup2 = TestUtils.createTestMetricGroup(LOGICAL_SCOPE, variables);

Counter metric1 = new SimpleCounter();
metric1.inc(7);
Counter metric2 = new SimpleCounter();
metric2.inc(2);
metricReporter.notifyOfAddedMetric(metric1, "metricName1", metricGroup2);
metricReporter.notifyOfAddedMetric(metric2, "metricName2", metricGroup2);

String response = pollMetrics(metricReporter.getPort()).body();

assertThat(response).contains("metricName1");
assertThat(response).doesNotContain("metricName2");
}

private String addMetricAndPollResponse(Metric metric, String metricName)
throws IOException, InterruptedException {
reporter.notifyOfAddedMetric(metric, metricName, metricGroup);
Expand Down