Skip to content

Commit

Permalink
Avoid FastThreadLocal in MeterCache (#210)
Browse files Browse the repository at this point in the history
Related to eclipse-vertx/vert.x#5078

FastThreadLocal can cause a memory-leak if not used as a class constant.

Signed-off-by: Thomas Segismont <[email protected]>
  • Loading branch information
tsegismont authored Feb 14, 2024
1 parent 38b607c commit 1818a63
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 90 deletions.
76 changes: 12 additions & 64 deletions src/main/java/io/vertx/micrometer/impl/AbstractMetrics.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@
import java.util.function.Function;
import java.util.function.ToDoubleFunction;

import static io.micrometer.core.instrument.Meter.Type.*;

/**
* Abstract class for metrics container.
*
Expand All @@ -40,18 +38,16 @@ public abstract class AbstractMetrics implements MicrometerMetrics {

protected final MeterRegistry registry;
protected final MetricsNaming names;
protected final String category;
protected final LongGauges longGauges;
protected final EnumSet<Label> enabledLabels;
protected final MeterCache meterCache;
private final String category;
private final EnumSet<Label> enabledLabels;
private final MeterCache meterCache;

AbstractMetrics(MeterRegistry registry, MetricsNaming names, LongGauges longGauges, EnumSet<Label> enabledLabels, MeterCache meterCache) {
AbstractMetrics(MeterRegistry registry, MetricsNaming names, LongGauges longGauges, EnumSet<Label> enabledLabels, boolean meterCacheEnabled) {
this.registry = registry;
this.longGauges = longGauges;
this.category = null;
this.enabledLabels = enabledLabels;
this.names = names.withBaseName(baseName());
this.meterCache = meterCache;
this.names = names;
this.meterCache = new MeterCache(meterCacheEnabled, registry, longGauges);
}

AbstractMetrics(AbstractMetrics parent, MetricsDomain domain) {
Expand All @@ -60,7 +56,6 @@ public abstract class AbstractMetrics implements MicrometerMetrics {

AbstractMetrics(AbstractMetrics parent, String category) {
this.registry = parent.registry;
this.longGauges = parent.longGauges;
this.enabledLabels = parent.enabledLabels;
this.meterCache = parent.meterCache;
this.category = category;
Expand All @@ -75,77 +70,30 @@ public MeterRegistry registry() {
return registry;
}

// Method is final because it is invoked from the constructor
@Override
public String baseName() {
public final String baseName() {
return category == null ? null : "vertx." + category + ".";
}

Counter counter(String name, String description, Tags tags) {
Meter.Id id = new Meter.Id(name, tags, null, description, COUNTER);
Counter c;
if (meterCache != null) {
c = meterCache.get(id);
if (c != null) {
return c;
}
}
c = Counter.builder(name).description(description).tags(tags).register(registry);
if (meterCache != null) {
meterCache.put(id, c);
}
return c;
return meterCache.getOrCreateCounter(name, description, tags);
}

LongAdder longGauge(String name, String description, Tags tags) {
return longGauge(name, description, tags, LongAdder::doubleValue);
}

LongAdder longGauge(String name, String description, Tags tags, ToDoubleFunction<LongAdder> func) {
Meter.Id id = new Meter.Id(name, tags, null, description, GAUGE);
LongAdder la;
if (meterCache != null) {
la = meterCache.get(id);
if (la != null) {
return la;
}
}
la = longGauges.builder(name, func).description(description).tags(tags).register(registry);
if (meterCache != null) {
meterCache.put(id, la);
}
return la;
return meterCache.getOrCreateLongGauge(name, description, tags, func);
}

DistributionSummary distributionSummary(String name, String description, Tags tags) {
Meter.Id id = new Meter.Id(name, tags, null, description, DISTRIBUTION_SUMMARY);
DistributionSummary ds;
if (meterCache != null) {
ds = meterCache.get(id);
if (ds != null) {
return ds;
}
}
ds = DistributionSummary.builder(name).description(description).tags(tags).register(registry);
if (meterCache != null) {
meterCache.put(id, ds);
}
return ds;
return meterCache.getOrCreateDistributionSummary(name, description, tags);
}

Timer timer(String name, String description, Tags tags) {
Meter.Id id = new Meter.Id(name, tags, null, description, TIMER);
Timer t;
if (meterCache != null) {
t = meterCache.get(id);
if (t != null) {
return t;
}
}
t = Timer.builder(name).description(description).tags(tags).register(registry);
if (meterCache != null) {
meterCache.put(id, t);
}
return t;
return meterCache.getOrCreateTimer(name, description, tags);
}

<U> Tag toTag(Label label, Function<U, String> func, U u) {
Expand Down
107 changes: 88 additions & 19 deletions src/main/java/io/vertx/micrometer/impl/MeterCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,40 +17,109 @@

package io.vertx.micrometer.impl;

import io.micrometer.core.instrument.Meter;
import io.netty.util.concurrent.FastThreadLocal;
import io.micrometer.core.instrument.*;
import io.vertx.core.impl.ContextInternal;
import io.vertx.micrometer.impl.meters.LongGauges;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.ToDoubleFunction;

import static io.micrometer.core.instrument.Meter.Type.*;

class MeterCache {

private final FastThreadLocal<Map<Meter.Id, Object>> threadLocal;
private static final Object COUNTERS = new Object();
private static final Object LONG_GAUGES = new Object();
private static final Object DISTRIBUTION_SUMMARIES = new Object();
private static final Object TIMERS = new Object();

private final boolean enabled;
private final MeterRegistry registry;
private final LongGauges longGauges;

MeterCache(boolean enabled, MeterRegistry registry, LongGauges longGauges) {
this.enabled = enabled;
this.registry = registry;
this.longGauges = longGauges;
}

MeterCache() {
threadLocal = new FastThreadLocal<Map<Meter.Id, Object>>() {
@Override
protected Map<Meter.Id, Object> initialValue() {
return new HashMap<>();
Counter getOrCreateCounter(String name, String description, Tags tags) {
Counter counter;
ContextInternal context;
if (enabled && (context = eventLoopContext()) != null) {
@SuppressWarnings("unchecked")
Map<Meter.Id, Counter> counterMap = (Map<Meter.Id, Counter>) context.contextData().computeIfAbsent(COUNTERS, k -> new HashMap<>());
Meter.Id id = new Meter.Id(name, tags, null, description, COUNTER);
counter = counterMap.get(id);
if (counter == null) {
counter = Counter.builder(name).description(description).tags(tags).register(registry);
counterMap.put(id, counter);
}
} else {
counter = Counter.builder(name).description(description).tags(tags).register(registry);
}
return counter;
}

@Override
protected void onRemoval(Map<Meter.Id, Object> value) {
value.clear();
LongAdder getOrCreateLongGauge(String name, String description, Tags tags, ToDoubleFunction<LongAdder> func) {
LongAdder longGauge;
ContextInternal context;
if (enabled && (context = eventLoopContext()) != null) {
@SuppressWarnings("unchecked")
Map<Meter.Id, LongAdder> longGaugeMap = (Map<Meter.Id, LongAdder>) context.contextData().computeIfAbsent(LONG_GAUGES, k -> new HashMap<>());
Meter.Id id = new Meter.Id(name, tags, null, description, GAUGE);
longGauge = longGaugeMap.get(id);
if (longGauge == null) {
longGauge = longGauges.builder(name, func).description(description).tags(tags).register(registry);
longGaugeMap.put(id, longGauge);
}
};
} else {
longGauge = longGauges.builder(name, func).description(description).tags(tags).register(registry);
}
return longGauge;
}

@SuppressWarnings("unchecked")
<T> T get(Meter.Id id) {
return (T) threadLocal.get().get(id);
DistributionSummary getOrCreateDistributionSummary(String name, String description, Tags tags) {
DistributionSummary distributionSummary;
ContextInternal context;
if (enabled && (context = eventLoopContext()) != null) {
@SuppressWarnings("unchecked")
Map<Meter.Id, DistributionSummary> distributionSummaryMap = (Map<Meter.Id, DistributionSummary>) context.contextData().computeIfAbsent(DISTRIBUTION_SUMMARIES, k -> new HashMap<>());
Meter.Id id = new Meter.Id(name, tags, null, description, DISTRIBUTION_SUMMARY);
distributionSummary = distributionSummaryMap.get(id);
if (distributionSummary == null) {
distributionSummary = DistributionSummary.builder(name).description(description).tags(tags).register(registry);
distributionSummaryMap.put(id, distributionSummary);
}
} else {
distributionSummary = DistributionSummary.builder(name).description(description).tags(tags).register(registry);
}
return distributionSummary;
}

void put(Meter.Id id, Object value) {
threadLocal.get().put(id, value);
Timer getOrCreateTimer(String name, String description, Tags tags) {
Timer timer;
ContextInternal context;
if (enabled && (context = eventLoopContext()) != null) {
@SuppressWarnings("unchecked")
Map<Meter.Id, Timer> timerMap = (Map<Meter.Id, Timer>) context.contextData().computeIfAbsent(TIMERS, k -> new HashMap<>());
Meter.Id id = new Meter.Id(name, tags, null, description, TIMER);
timer = timerMap.get(id);
if (timer == null) {
timer = Timer.builder(name).description(description).tags(tags).register(registry);
timerMap.put(id, timer);
}
} else {
timer = Timer.builder(name).description(description).tags(tags).register(registry);
}
return timer;
}

void close() {
threadLocal.remove();
private ContextInternal eventLoopContext() {
// Caching is only safe when running on an event-loop context since we store meters in a HashMap
ContextInternal current = ContextInternal.current();
return current != null && current.isEventLoopContext() && current.inThread() ? current : null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,7 @@ public VertxMetrics metrics(VertxOptions vertxOptions) {
synchronized (longGaugesByRegistry) {
longGauges = longGaugesByRegistry.computeIfAbsent(backendRegistry.getMeterRegistry(), meterRegistry -> new ConcurrentHashMap<>());
}
MeterCache meterCache = options.isMeterCacheEnabled() ? new MeterCache() : null;
VertxMetricsImpl metrics = new VertxMetricsImpl(options, backendRegistry, new LongGauges(longGauges), meterCache);
VertxMetricsImpl metrics = new VertxMetricsImpl(options, backendRegistry, new LongGauges(longGauges));
metrics.init();

return metrics;
Expand Down
7 changes: 2 additions & 5 deletions src/main/java/io/vertx/micrometer/impl/VertxMetricsImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ public class VertxMetricsImpl extends AbstractMetrics implements VertxMetrics {
private final Function<HttpRequest, Iterable<Tag>> serverRequestTagsProvider;
private final Function<HttpRequest, Iterable<Tag>> clientRequestTagsProvider;

public VertxMetricsImpl(MicrometerMetricsOptions options, BackendRegistry backendRegistry, LongGauges longGauges, MeterCache meterCache) {
super(backendRegistry.getMeterRegistry(), options.getMetricsNaming(), longGauges, EnumSet.copyOf(options.getLabels()), meterCache);
public VertxMetricsImpl(MicrometerMetricsOptions options, BackendRegistry backendRegistry, LongGauges longGauges) {
super(backendRegistry.getMeterRegistry(), options.getMetricsNaming(), longGauges, EnumSet.copyOf(options.getLabels()), options.isMeterCacheEnabled());
this.backendRegistry = backendRegistry;
registryName = options.getRegistryName();
if (options.getDisabledMetricsCategories() != null) {
Expand Down Expand Up @@ -196,8 +196,5 @@ public void close() {
}
}
BackendRegistries.stop(registryName);
if (meterCache != null) {
meterCache.close();
}
}
}

0 comments on commit 1818a63

Please sign in to comment.