Skip to content

Commit

Permalink
switch from synchronized to ReentrantLock (#1083)
Browse files Browse the repository at this point in the history
The ReentrantLock is supposed to work better with virtual
threads in JDK 21.
  • Loading branch information
brharrington authored Oct 4, 2023
1 parent c2d2304 commit fb43567
Show file tree
Hide file tree
Showing 8 changed files with 172 additions and 95 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
plugins {
id 'com.github.ben-manes.versions' version '0.47.0'
id 'com.github.spotbugs' version '5.1.0' apply false
id 'com.github.spotbugs' version '5.1.3' apply false
id 'me.champeau.jmh' version '0.7.1'
id 'com.netflix.nebula.dependency-recommender' version '12.2.0'
id 'com.netflix.nebula.netflixoss' version '11.3.2'
Expand Down
6 changes: 6 additions & 0 deletions codequality/findbugs-exclude.xml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@
<Bug pattern="IC_SUPERCLASS_USES_SUBCLASS_DURING_INITIALIZATION"/>
</And>
</Match>
<Match>
<And>
<Class name="com.netflix.spectator.atlas.impl.PrefixTree"/>
<Bug pattern="UL_UNRELEASED_LOCK"/>
</And>
</Match>
<Match>
<And>
<Class name="com.netflix.spectator.perf.ManyTags"/>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2014-2021 Netflix, Inc.
* Copyright 2014-2023 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -28,6 +28,8 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiFunction;
import java.util.function.LongSupplier;

Expand All @@ -37,6 +39,8 @@
*/
public final class CompositeRegistry implements Registry {

private final Lock lock = new ReentrantLock();

private final Clock clock;

private final AtomicReference<Registry[]> registries;
Expand Down Expand Up @@ -78,37 +82,52 @@ private int indexOf(Registry[] rs, Registry registry) {
}

/** Add a registry to the composite. */
public synchronized void add(Registry registry) {
Registry[] rs = registries.get();
int pos = indexOf(rs, registry);
if (pos == -1) {
Registry[] tmp = new Registry[rs.length + 1];
System.arraycopy(rs, 0, tmp, 0, rs.length);
tmp[rs.length] = registry;
registries.set(tmp);
version.incrementAndGet();
public void add(Registry registry) {
lock.lock();
try {
Registry[] rs = registries.get();
int pos = indexOf(rs, registry);
if (pos == -1) {
Registry[] tmp = new Registry[rs.length + 1];
System.arraycopy(rs, 0, tmp, 0, rs.length);
tmp[rs.length] = registry;
registries.set(tmp);
version.incrementAndGet();
}
} finally {
lock.unlock();
}
}

/** Remove a registry from the composite. */
public synchronized void remove(Registry registry) {
Registry[] rs = registries.get();
int pos = indexOf(rs, registry);
if (pos >= 0) {
Registry[] tmp = new Registry[rs.length - 1];
if (pos > 0)
System.arraycopy(rs, 0, tmp, 0, pos);
if (pos < tmp.length)
System.arraycopy(rs, pos + 1, tmp, pos, rs.length - pos - 1);
registries.set(tmp);
version.incrementAndGet();
public void remove(Registry registry) {
lock.lock();
try {
Registry[] rs = registries.get();
int pos = indexOf(rs, registry);
if (pos >= 0) {
Registry[] tmp = new Registry[rs.length - 1];
if (pos > 0)
System.arraycopy(rs, 0, tmp, 0, pos);
if (pos < tmp.length)
System.arraycopy(rs, pos + 1, tmp, pos, rs.length - pos - 1);
registries.set(tmp);
version.incrementAndGet();
}
} finally {
lock.unlock();
}
}

/** Remove all registries from the composite. */
public synchronized void removeAll() {
registries.set(new Registry[0]);
state.clear();
public void removeAll() {
lock.lock();
try {
registries.set(new Registry[0]);
state.clear();
} finally {
lock.unlock();
}
}

@Override public Clock clock() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2014-2019 Netflix, Inc.
* Copyright 2014-2023 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -33,6 +33,8 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
* <p><b>This class is an internal implementation detail only intended for use within spectator.
Expand Down Expand Up @@ -113,6 +115,8 @@ private static Id newId(Registry registry, String id, String name) {
private final ThreadFactory factory;
private final Thread[] threads;

private final Lock lock = new ReentrantLock();

private volatile boolean started = false;
private volatile boolean shutdown = false;

Expand Down Expand Up @@ -168,26 +172,36 @@ public ScheduledFuture<?> schedule(Options options, Runnable task) {
* Shutdown and cleanup resources associated with the scheduler. All threads will be
* interrupted, but this method does not block for them to all finish execution.
*/
public synchronized void shutdown() {
shutdown = true;
for (int i = 0; i < threads.length; ++i) {
if (threads[i] != null && threads[i].isAlive()) {
threads[i].interrupt();
threads[i] = null;
public void shutdown() {
lock.lock();
try {
shutdown = true;
for (int i = 0; i < threads.length; ++i) {
if (threads[i] != null && threads[i].isAlive()) {
threads[i].interrupt();
threads[i] = null;
}
}
} finally {
lock.unlock();
}
}

private synchronized void startThreads() {
if (!shutdown) {
started = true;
for (int i = 0; i < threads.length; ++i) {
if (threads[i] == null || !threads[i].isAlive() || threads[i].isInterrupted()) {
threads[i] = factory.newThread(new Worker());
threads[i].start();
LOGGER.debug("started thread {}", threads[i].getName());
private void startThreads() {
lock.lock();
try {
if (!shutdown) {
started = true;
for (int i = 0; i < threads.length; ++i) {
if (threads[i] == null || !threads[i].isAlive() || threads[i].isInterrupted()) {
threads[i] = factory.newThread(new Worker());
threads[i].start();
LOGGER.debug("started thread {}", threads[i].getName());
}
}
}
} finally {
lock.unlock();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
* Logger to collect GC notifcation events.
Expand Down Expand Up @@ -82,6 +84,8 @@ public final class GcLogger {
private static final Id CONCURRENT_PHASE_TIME =
Spectator.globalRegistry().createId("jvm.gc.concurrentPhaseTime");

private final Lock lock = new ReentrantLock();

private final long jvmStartTime;

private final ConcurrentHashMap<String, CircularBuffer<GcEvent>> gcLogs = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -129,38 +133,48 @@ public GcLogger() {
* If not null, the listener will be called with the event objects after metrics and the
* log buffer is updated.
*/
public synchronized void start(GcEventListener listener) {
// TODO: this class has a bad mix of static fields used from an instance of the class. For now
// this has been changed not to throw to make the dependency injection use-cases work. A
// more general refactor of the GcLogger class is needed.
if (notifListener != null) {
LOGGER.warn("logger already started");
return;
}
eventListener = listener;
notifListener = new GcNotificationListener();
for (GarbageCollectorMXBean mbean : ManagementFactory.getGarbageCollectorMXBeans()) {
if (mbean instanceof NotificationEmitter) {
final NotificationEmitter emitter = (NotificationEmitter) mbean;
emitter.addNotificationListener(notifListener, null, null);
public void start(GcEventListener listener) {
lock.lock();
try {
// TODO: this class has a bad mix of static fields used from an instance of the class. For now
// this has been changed not to throw to make the dependency injection use-cases work. A
// more general refactor of the GcLogger class is needed.
if (notifListener != null) {
LOGGER.warn("logger already started");
return;
}
eventListener = listener;
notifListener = new GcNotificationListener();
for (GarbageCollectorMXBean mbean : ManagementFactory.getGarbageCollectorMXBeans()) {
if (mbean instanceof NotificationEmitter) {
final NotificationEmitter emitter = (NotificationEmitter) mbean;
emitter.addNotificationListener(notifListener, null, null);
}
}
} finally {
lock.unlock();
}
}

/** Stop collecting GC events. */
public synchronized void stop() {
Preconditions.checkState(notifListener != null, "logger has not been started");
for (GarbageCollectorMXBean mbean : ManagementFactory.getGarbageCollectorMXBeans()) {
if (mbean instanceof NotificationEmitter) {
final NotificationEmitter emitter = (NotificationEmitter) mbean;
try {
emitter.removeNotificationListener(notifListener);
} catch (ListenerNotFoundException e) {
LOGGER.warn("could not remove gc listener", e);
public void stop() {
lock.lock();
try {
Preconditions.checkState(notifListener != null, "logger has not been started");
for (GarbageCollectorMXBean mbean : ManagementFactory.getGarbageCollectorMXBeans()) {
if (mbean instanceof NotificationEmitter) {
final NotificationEmitter emitter = (NotificationEmitter) mbean;
try {
emitter.removeNotificationListener(notifListener);
} catch (ListenerNotFoundException e) {
LOGGER.warn("could not remove gc listener", e);
}
}
}
notifListener = null;
} finally {
lock.unlock();
}
notifListener = null;
}

/** Return the current set of GC events in the in-memory log. */
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2014-2020 Netflix, Inc.
* Copyright 2014-2023 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -42,6 +42,8 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.zip.Deflater;
Expand All @@ -60,6 +62,8 @@ public class HttpRequestBuilder {

private static final StreamHelper STREAM_HELPER = new StreamHelper();

private static final Lock LOCK = new ReentrantLock();

// Should not be used directly, use the method of the same name that will create the
// executor if needed on the first access.
private static volatile ExecutorService defaultExecutor;
Expand All @@ -82,10 +86,13 @@ private static ExecutorService defaultExecutor() {
if (executor != null) {
return executor;
}
synchronized (LOGGER) {
LOCK.lock();
try {
defaultExecutor = Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors(), newThreadFactory());
return defaultExecutor;
} finally {
LOCK.unlock();
}
}

Expand Down
Loading

0 comments on commit fb43567

Please sign in to comment.