Skip to content

Commit

Permalink
switch from synchronized to ReentrantLock
Browse files Browse the repository at this point in the history
The ReentrantLock is supposed to work better with virtual
threads in JDK 21.
  • Loading branch information
brharrington committed Oct 4, 2023
1 parent c2d2304 commit e0c4c0d
Show file tree
Hide file tree
Showing 6 changed files with 165 additions and 94 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2014-2021 Netflix, Inc.
* Copyright 2014-2023 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -28,6 +28,8 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiFunction;
import java.util.function.LongSupplier;

Expand All @@ -37,6 +39,8 @@
*/
public final class CompositeRegistry implements Registry {

private final Lock lock = new ReentrantLock();

private final Clock clock;

private final AtomicReference<Registry[]> registries;
Expand Down Expand Up @@ -78,37 +82,52 @@ private int indexOf(Registry[] rs, Registry registry) {
}

/** Add a registry to the composite. */
public synchronized void add(Registry registry) {
Registry[] rs = registries.get();
int pos = indexOf(rs, registry);
if (pos == -1) {
Registry[] tmp = new Registry[rs.length + 1];
System.arraycopy(rs, 0, tmp, 0, rs.length);
tmp[rs.length] = registry;
registries.set(tmp);
version.incrementAndGet();
public void add(Registry registry) {
lock.lock();
try {
Registry[] rs = registries.get();
int pos = indexOf(rs, registry);
if (pos == -1) {
Registry[] tmp = new Registry[rs.length + 1];
System.arraycopy(rs, 0, tmp, 0, rs.length);
tmp[rs.length] = registry;
registries.set(tmp);
version.incrementAndGet();
}
} finally {
lock.unlock();
}
}

/** Remove a registry from the composite. */
public synchronized void remove(Registry registry) {
Registry[] rs = registries.get();
int pos = indexOf(rs, registry);
if (pos >= 0) {
Registry[] tmp = new Registry[rs.length - 1];
if (pos > 0)
System.arraycopy(rs, 0, tmp, 0, pos);
if (pos < tmp.length)
System.arraycopy(rs, pos + 1, tmp, pos, rs.length - pos - 1);
registries.set(tmp);
version.incrementAndGet();
public void remove(Registry registry) {
lock.lock();
try {
Registry[] rs = registries.get();
int pos = indexOf(rs, registry);
if (pos >= 0) {
Registry[] tmp = new Registry[rs.length - 1];
if (pos > 0)
System.arraycopy(rs, 0, tmp, 0, pos);
if (pos < tmp.length)
System.arraycopy(rs, pos + 1, tmp, pos, rs.length - pos - 1);
registries.set(tmp);
version.incrementAndGet();
}
} finally {
lock.unlock();
}
}

/** Remove all registries from the composite. */
public synchronized void removeAll() {
registries.set(new Registry[0]);
state.clear();
public void removeAll() {
lock.lock();
try {
registries.set(new Registry[0]);
state.clear();
} finally {
lock.unlock();
}
}

@Override public Clock clock() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2014-2019 Netflix, Inc.
* Copyright 2014-2023 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -33,6 +33,8 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
* <p><b>This class is an internal implementation detail only intended for use within spectator.
Expand Down Expand Up @@ -113,6 +115,8 @@ private static Id newId(Registry registry, String id, String name) {
private final ThreadFactory factory;
private final Thread[] threads;

private final Lock lock = new ReentrantLock();

private volatile boolean started = false;
private volatile boolean shutdown = false;

Expand Down Expand Up @@ -168,26 +172,36 @@ public ScheduledFuture<?> schedule(Options options, Runnable task) {
* Shutdown and cleanup resources associated with the scheduler. All threads will be
* interrupted, but this method does not block for them to all finish execution.
*/
public synchronized void shutdown() {
shutdown = true;
for (int i = 0; i < threads.length; ++i) {
if (threads[i] != null && threads[i].isAlive()) {
threads[i].interrupt();
threads[i] = null;
public void shutdown() {
lock.lock();
try {
shutdown = true;
for (int i = 0; i < threads.length; ++i) {
if (threads[i] != null && threads[i].isAlive()) {
threads[i].interrupt();
threads[i] = null;
}
}
} finally {
lock.unlock();
}
}

private synchronized void startThreads() {
if (!shutdown) {
started = true;
for (int i = 0; i < threads.length; ++i) {
if (threads[i] == null || !threads[i].isAlive() || threads[i].isInterrupted()) {
threads[i] = factory.newThread(new Worker());
threads[i].start();
LOGGER.debug("started thread {}", threads[i].getName());
private void startThreads() {
lock.lock();
try {
if (!shutdown) {
started = true;
for (int i = 0; i < threads.length; ++i) {
if (threads[i] == null || !threads[i].isAlive() || threads[i].isInterrupted()) {
threads[i] = factory.newThread(new Worker());
threads[i].start();
LOGGER.debug("started thread {}", threads[i].getName());
}
}
}
} finally {
lock.unlock();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
* Logger to collect GC notifcation events.
Expand Down Expand Up @@ -82,6 +84,8 @@ public final class GcLogger {
private static final Id CONCURRENT_PHASE_TIME =
Spectator.globalRegistry().createId("jvm.gc.concurrentPhaseTime");

private final Lock lock = new ReentrantLock();

private final long jvmStartTime;

private final ConcurrentHashMap<String, CircularBuffer<GcEvent>> gcLogs = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -129,38 +133,48 @@ public GcLogger() {
* If not null, the listener will be called with the event objects after metrics and the
* log buffer is updated.
*/
public synchronized void start(GcEventListener listener) {
// TODO: this class has a bad mix of static fields used from an instance of the class. For now
// this has been changed not to throw to make the dependency injection use-cases work. A
// more general refactor of the GcLogger class is needed.
if (notifListener != null) {
LOGGER.warn("logger already started");
return;
}
eventListener = listener;
notifListener = new GcNotificationListener();
for (GarbageCollectorMXBean mbean : ManagementFactory.getGarbageCollectorMXBeans()) {
if (mbean instanceof NotificationEmitter) {
final NotificationEmitter emitter = (NotificationEmitter) mbean;
emitter.addNotificationListener(notifListener, null, null);
public void start(GcEventListener listener) {
lock.lock();
try {
// TODO: this class has a bad mix of static fields used from an instance of the class. For now
// this has been changed not to throw to make the dependency injection use-cases work. A
// more general refactor of the GcLogger class is needed.
if (notifListener != null) {
LOGGER.warn("logger already started");
return;
}
eventListener = listener;
notifListener = new GcNotificationListener();
for (GarbageCollectorMXBean mbean : ManagementFactory.getGarbageCollectorMXBeans()) {
if (mbean instanceof NotificationEmitter) {
final NotificationEmitter emitter = (NotificationEmitter) mbean;
emitter.addNotificationListener(notifListener, null, null);
}
}
} finally {
lock.unlock();
}
}

/** Stop collecting GC events. */
public synchronized void stop() {
Preconditions.checkState(notifListener != null, "logger has not been started");
for (GarbageCollectorMXBean mbean : ManagementFactory.getGarbageCollectorMXBeans()) {
if (mbean instanceof NotificationEmitter) {
final NotificationEmitter emitter = (NotificationEmitter) mbean;
try {
emitter.removeNotificationListener(notifListener);
} catch (ListenerNotFoundException e) {
LOGGER.warn("could not remove gc listener", e);
public void stop() {
lock.lock();
try {
Preconditions.checkState(notifListener != null, "logger has not been started");
for (GarbageCollectorMXBean mbean : ManagementFactory.getGarbageCollectorMXBeans()) {
if (mbean instanceof NotificationEmitter) {
final NotificationEmitter emitter = (NotificationEmitter) mbean;
try {
emitter.removeNotificationListener(notifListener);
} catch (ListenerNotFoundException e) {
LOGGER.warn("could not remove gc listener", e);
}
}
}
notifListener = null;
} finally {
lock.unlock();
}
notifListener = null;
}

/** Return the current set of GC events in the in-memory log. */
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2014-2020 Netflix, Inc.
* Copyright 2014-2023 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -42,6 +42,8 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.zip.Deflater;
Expand All @@ -60,6 +62,8 @@ public class HttpRequestBuilder {

private static final StreamHelper STREAM_HELPER = new StreamHelper();

private static final Lock LOCK = new ReentrantLock();

// Should not be used directly, use the method of the same name that will create the
// executor if needed on the first access.
private static volatile ExecutorService defaultExecutor;
Expand All @@ -82,10 +86,13 @@ private static ExecutorService defaultExecutor() {
if (executor != null) {
return executor;
}
synchronized (LOGGER) {
LOCK.lock();
try {
defaultExecutor = Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors(), newThreadFactory());
return defaultExecutor;
} finally {
LOCK.unlock();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.function.Function;

Expand All @@ -44,6 +46,7 @@ public class Evaluator {

private static final Logger LOGGER = LoggerFactory.getLogger(Evaluator.class);

private final Lock lock = new ReentrantLock();
private final Map<String, String> commonTags;
private final Function<Id, Map<String, String>> idMapper;
private final long step;
Expand Down Expand Up @@ -71,36 +74,41 @@ public Evaluator(EvaluatorConfig config) {
/**
* Synchronize the set of subscriptions for this evaluator with the provided set.
*/
public synchronized void sync(List<Subscription> subs) {
Set<Subscription> removed = new HashSet<>(subscriptions.keySet());
for (Subscription sub : subs) {
boolean alreadyPresent = removed.remove(sub);
if (!alreadyPresent) {
try {
// Parse and simplify query
Query q = sub.dataExpr().query().simplify(commonTags);
LOGGER.trace("query pre-eval: original [{}], simplified [{}], common tags {}",
sub.dataExpr().query(), q, commonTags);
public void sync(List<Subscription> subs) {
lock.lock();
try {
Set<Subscription> removed = new HashSet<>(subscriptions.keySet());
for (Subscription sub : subs) {
boolean alreadyPresent = removed.remove(sub);
if (!alreadyPresent) {
try {
// Parse and simplify query
Query q = sub.dataExpr().query().simplify(commonTags);
LOGGER.trace("query pre-eval: original [{}], simplified [{}], common tags {}",
sub.dataExpr().query(), q, commonTags);

// Update index
int multiple = (int) (sub.getFrequency() / step);
SubscriptionEntry entry = new SubscriptionEntry(sub, multiple);
subscriptions.put(sub, entry);
index.add(q, entry);
LOGGER.debug("subscription added: {}", sub);
} catch (Exception e) {
LOGGER.warn("failed to add subscription: {}", sub, e);
// Update index
int multiple = (int) (sub.getFrequency() / step);
SubscriptionEntry entry = new SubscriptionEntry(sub, multiple);
subscriptions.put(sub, entry);
index.add(q, entry);
LOGGER.debug("subscription added: {}", sub);
} catch (Exception e) {
LOGGER.warn("failed to add subscription: {}", sub, e);
}
} else {
LOGGER.trace("subscription already present: {}", sub);
}
} else {
LOGGER.trace("subscription already present: {}", sub);
}
}

for (Subscription sub : removed) {
SubscriptionEntry entry = subscriptions.remove(sub);
Query q = sub.dataExpr().query().simplify(commonTags);
index.remove(q, entry);
LOGGER.debug("subscription removed: {}", sub);
for (Subscription sub : removed) {
SubscriptionEntry entry = subscriptions.remove(sub);
Query q = sub.dataExpr().query().simplify(commonTags);
index.remove(q, entry);
LOGGER.debug("subscription removed: {}", sub);
}
} finally {
lock.unlock();
}
}

Expand Down
Loading

0 comments on commit e0c4c0d

Please sign in to comment.