Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

switch from synchronized to ReentrantLock #1083

Merged
merged 2 commits into from
Oct 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
plugins {
id 'com.github.ben-manes.versions' version '0.47.0'
id 'com.github.spotbugs' version '5.1.0' apply false
id 'com.github.spotbugs' version '5.1.3' apply false
id 'me.champeau.jmh' version '0.7.1'
id 'com.netflix.nebula.dependency-recommender' version '12.2.0'
id 'com.netflix.nebula.netflixoss' version '11.3.2'
Expand Down
6 changes: 6 additions & 0 deletions codequality/findbugs-exclude.xml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@
<Bug pattern="IC_SUPERCLASS_USES_SUBCLASS_DURING_INITIALIZATION"/>
</And>
</Match>
<Match>
<And>
<Class name="com.netflix.spectator.atlas.impl.PrefixTree"/>
<Bug pattern="UL_UNRELEASED_LOCK"/>
</And>
</Match>
<Match>
<And>
<Class name="com.netflix.spectator.perf.ManyTags"/>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2014-2021 Netflix, Inc.
* Copyright 2014-2023 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -28,6 +28,8 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiFunction;
import java.util.function.LongSupplier;

Expand All @@ -37,6 +39,8 @@
*/
public final class CompositeRegistry implements Registry {

private final Lock lock = new ReentrantLock();

private final Clock clock;

private final AtomicReference<Registry[]> registries;
Expand Down Expand Up @@ -78,37 +82,52 @@ private int indexOf(Registry[] rs, Registry registry) {
}

/** Add a registry to the composite. */
public synchronized void add(Registry registry) {
Registry[] rs = registries.get();
int pos = indexOf(rs, registry);
if (pos == -1) {
Registry[] tmp = new Registry[rs.length + 1];
System.arraycopy(rs, 0, tmp, 0, rs.length);
tmp[rs.length] = registry;
registries.set(tmp);
version.incrementAndGet();
public void add(Registry registry) {
lock.lock();
try {
Registry[] rs = registries.get();
int pos = indexOf(rs, registry);
if (pos == -1) {
Registry[] tmp = new Registry[rs.length + 1];
System.arraycopy(rs, 0, tmp, 0, rs.length);
tmp[rs.length] = registry;
registries.set(tmp);
version.incrementAndGet();
}
} finally {
lock.unlock();
}
}

/** Remove a registry from the composite. */
public synchronized void remove(Registry registry) {
Registry[] rs = registries.get();
int pos = indexOf(rs, registry);
if (pos >= 0) {
Registry[] tmp = new Registry[rs.length - 1];
if (pos > 0)
System.arraycopy(rs, 0, tmp, 0, pos);
if (pos < tmp.length)
System.arraycopy(rs, pos + 1, tmp, pos, rs.length - pos - 1);
registries.set(tmp);
version.incrementAndGet();
public void remove(Registry registry) {
lock.lock();
try {
Registry[] rs = registries.get();
int pos = indexOf(rs, registry);
if (pos >= 0) {
Registry[] tmp = new Registry[rs.length - 1];
if (pos > 0)
System.arraycopy(rs, 0, tmp, 0, pos);
if (pos < tmp.length)
System.arraycopy(rs, pos + 1, tmp, pos, rs.length - pos - 1);
registries.set(tmp);
version.incrementAndGet();
}
} finally {
lock.unlock();
}
}

/** Remove all registries from the composite. */
public synchronized void removeAll() {
registries.set(new Registry[0]);
state.clear();
public void removeAll() {
lock.lock();
try {
registries.set(new Registry[0]);
state.clear();
} finally {
lock.unlock();
}
}

@Override public Clock clock() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2014-2019 Netflix, Inc.
* Copyright 2014-2023 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -33,6 +33,8 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
* <p><b>This class is an internal implementation detail only intended for use within spectator.
Expand Down Expand Up @@ -113,6 +115,8 @@ private static Id newId(Registry registry, String id, String name) {
private final ThreadFactory factory;
private final Thread[] threads;

private final Lock lock = new ReentrantLock();

private volatile boolean started = false;
private volatile boolean shutdown = false;

Expand Down Expand Up @@ -168,26 +172,36 @@ public ScheduledFuture<?> schedule(Options options, Runnable task) {
* Shutdown and cleanup resources associated with the scheduler. All threads will be
* interrupted, but this method does not block for them to all finish execution.
*/
public synchronized void shutdown() {
shutdown = true;
for (int i = 0; i < threads.length; ++i) {
if (threads[i] != null && threads[i].isAlive()) {
threads[i].interrupt();
threads[i] = null;
public void shutdown() {
lock.lock();
try {
shutdown = true;
for (int i = 0; i < threads.length; ++i) {
if (threads[i] != null && threads[i].isAlive()) {
threads[i].interrupt();
threads[i] = null;
}
}
} finally {
lock.unlock();
}
}

private synchronized void startThreads() {
if (!shutdown) {
started = true;
for (int i = 0; i < threads.length; ++i) {
if (threads[i] == null || !threads[i].isAlive() || threads[i].isInterrupted()) {
threads[i] = factory.newThread(new Worker());
threads[i].start();
LOGGER.debug("started thread {}", threads[i].getName());
private void startThreads() {
lock.lock();
try {
if (!shutdown) {
started = true;
for (int i = 0; i < threads.length; ++i) {
if (threads[i] == null || !threads[i].isAlive() || threads[i].isInterrupted()) {
threads[i] = factory.newThread(new Worker());
threads[i].start();
LOGGER.debug("started thread {}", threads[i].getName());
}
}
}
} finally {
lock.unlock();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
* Logger to collect GC notifcation events.
Expand Down Expand Up @@ -82,6 +84,8 @@ public final class GcLogger {
private static final Id CONCURRENT_PHASE_TIME =
Spectator.globalRegistry().createId("jvm.gc.concurrentPhaseTime");

private final Lock lock = new ReentrantLock();

private final long jvmStartTime;

private final ConcurrentHashMap<String, CircularBuffer<GcEvent>> gcLogs = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -129,38 +133,48 @@ public GcLogger() {
* If not null, the listener will be called with the event objects after metrics and the
* log buffer is updated.
*/
public synchronized void start(GcEventListener listener) {
// TODO: this class has a bad mix of static fields used from an instance of the class. For now
// this has been changed not to throw to make the dependency injection use-cases work. A
// more general refactor of the GcLogger class is needed.
if (notifListener != null) {
LOGGER.warn("logger already started");
return;
}
eventListener = listener;
notifListener = new GcNotificationListener();
for (GarbageCollectorMXBean mbean : ManagementFactory.getGarbageCollectorMXBeans()) {
if (mbean instanceof NotificationEmitter) {
final NotificationEmitter emitter = (NotificationEmitter) mbean;
emitter.addNotificationListener(notifListener, null, null);
public void start(GcEventListener listener) {
lock.lock();
try {
// TODO: this class has a bad mix of static fields used from an instance of the class. For now
// this has been changed not to throw to make the dependency injection use-cases work. A
// more general refactor of the GcLogger class is needed.
if (notifListener != null) {
LOGGER.warn("logger already started");
return;
}
eventListener = listener;
notifListener = new GcNotificationListener();
for (GarbageCollectorMXBean mbean : ManagementFactory.getGarbageCollectorMXBeans()) {
if (mbean instanceof NotificationEmitter) {
final NotificationEmitter emitter = (NotificationEmitter) mbean;
emitter.addNotificationListener(notifListener, null, null);
}
}
} finally {
lock.unlock();
}
}

/** Stop collecting GC events. */
public synchronized void stop() {
Preconditions.checkState(notifListener != null, "logger has not been started");
for (GarbageCollectorMXBean mbean : ManagementFactory.getGarbageCollectorMXBeans()) {
if (mbean instanceof NotificationEmitter) {
final NotificationEmitter emitter = (NotificationEmitter) mbean;
try {
emitter.removeNotificationListener(notifListener);
} catch (ListenerNotFoundException e) {
LOGGER.warn("could not remove gc listener", e);
public void stop() {
lock.lock();
try {
Preconditions.checkState(notifListener != null, "logger has not been started");
for (GarbageCollectorMXBean mbean : ManagementFactory.getGarbageCollectorMXBeans()) {
if (mbean instanceof NotificationEmitter) {
final NotificationEmitter emitter = (NotificationEmitter) mbean;
try {
emitter.removeNotificationListener(notifListener);
} catch (ListenerNotFoundException e) {
LOGGER.warn("could not remove gc listener", e);
}
}
}
notifListener = null;
} finally {
lock.unlock();
}
notifListener = null;
}

/** Return the current set of GC events in the in-memory log. */
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2014-2020 Netflix, Inc.
* Copyright 2014-2023 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -42,6 +42,8 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.zip.Deflater;
Expand All @@ -60,6 +62,8 @@ public class HttpRequestBuilder {

private static final StreamHelper STREAM_HELPER = new StreamHelper();

private static final Lock LOCK = new ReentrantLock();

// Should not be used directly, use the method of the same name that will create the
// executor if needed on the first access.
private static volatile ExecutorService defaultExecutor;
Expand All @@ -82,10 +86,13 @@ private static ExecutorService defaultExecutor() {
if (executor != null) {
return executor;
}
synchronized (LOGGER) {
LOCK.lock();
try {
defaultExecutor = Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors(), newThreadFactory());
return defaultExecutor;
} finally {
LOCK.unlock();
}
}

Expand Down
Loading