From fb43567098a330b22557a6f6450341a2add1bca2 Mon Sep 17 00:00:00 2001 From: brharrington Date: Wed, 4 Oct 2023 15:14:01 -0500 Subject: [PATCH] switch from synchronized to ReentrantLock (#1083) The ReentrantLock is supposed to work better with virtual threads in JDK 21. --- build.gradle | 2 +- codequality/findbugs-exclude.xml | 6 ++ .../spectator/api/CompositeRegistry.java | 67 ++++++++++++------- .../com/netflix/spectator/impl/Scheduler.java | 44 +++++++----- .../com/netflix/spectator/gc/GcLogger.java | 62 ++++++++++------- .../ipc/http/HttpRequestBuilder.java | 11 ++- .../spectator/atlas/impl/Evaluator.java | 60 ++++++++++------- .../spectator/atlas/impl/PrefixTree.java | 15 ++++- 8 files changed, 172 insertions(+), 95 deletions(-) diff --git a/build.gradle b/build.gradle index 9730f77ca..797c986eb 100755 --- a/build.gradle +++ b/build.gradle @@ -1,6 +1,6 @@ plugins { id 'com.github.ben-manes.versions' version '0.47.0' - id 'com.github.spotbugs' version '5.1.0' apply false + id 'com.github.spotbugs' version '5.1.3' apply false id 'me.champeau.jmh' version '0.7.1' id 'com.netflix.nebula.dependency-recommender' version '12.2.0' id 'com.netflix.nebula.netflixoss' version '11.3.2' diff --git a/codequality/findbugs-exclude.xml b/codequality/findbugs-exclude.xml index e5cd84993..c482b1a78 100644 --- a/codequality/findbugs-exclude.xml +++ b/codequality/findbugs-exclude.xml @@ -48,6 +48,12 @@ + + + + + + diff --git a/spectator-api/src/main/java/com/netflix/spectator/api/CompositeRegistry.java b/spectator-api/src/main/java/com/netflix/spectator/api/CompositeRegistry.java index 279f3c203..98a3a610e 100644 --- a/spectator-api/src/main/java/com/netflix/spectator/api/CompositeRegistry.java +++ b/spectator-api/src/main/java/com/netflix/spectator/api/CompositeRegistry.java @@ -1,5 +1,5 @@ /* - * Copyright 2014-2021 Netflix, Inc. + * Copyright 2014-2023 Netflix, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -28,6 +28,8 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.BiFunction; import java.util.function.LongSupplier; @@ -37,6 +39,8 @@ */ public final class CompositeRegistry implements Registry { + private final Lock lock = new ReentrantLock(); + private final Clock clock; private final AtomicReference registries; @@ -78,37 +82,52 @@ private int indexOf(Registry[] rs, Registry registry) { } /** Add a registry to the composite. */ - public synchronized void add(Registry registry) { - Registry[] rs = registries.get(); - int pos = indexOf(rs, registry); - if (pos == -1) { - Registry[] tmp = new Registry[rs.length + 1]; - System.arraycopy(rs, 0, tmp, 0, rs.length); - tmp[rs.length] = registry; - registries.set(tmp); - version.incrementAndGet(); + public void add(Registry registry) { + lock.lock(); + try { + Registry[] rs = registries.get(); + int pos = indexOf(rs, registry); + if (pos == -1) { + Registry[] tmp = new Registry[rs.length + 1]; + System.arraycopy(rs, 0, tmp, 0, rs.length); + tmp[rs.length] = registry; + registries.set(tmp); + version.incrementAndGet(); + } + } finally { + lock.unlock(); } } /** Remove a registry from the composite. */ - public synchronized void remove(Registry registry) { - Registry[] rs = registries.get(); - int pos = indexOf(rs, registry); - if (pos >= 0) { - Registry[] tmp = new Registry[rs.length - 1]; - if (pos > 0) - System.arraycopy(rs, 0, tmp, 0, pos); - if (pos < tmp.length) - System.arraycopy(rs, pos + 1, tmp, pos, rs.length - pos - 1); - registries.set(tmp); - version.incrementAndGet(); + public void remove(Registry registry) { + lock.lock(); + try { + Registry[] rs = registries.get(); + int pos = indexOf(rs, registry); + if (pos >= 0) { + Registry[] tmp = new Registry[rs.length - 1]; + if (pos > 0) + System.arraycopy(rs, 0, tmp, 0, pos); + if (pos < tmp.length) + System.arraycopy(rs, pos + 1, tmp, pos, rs.length - pos - 1); + registries.set(tmp); + version.incrementAndGet(); + } + } finally { + lock.unlock(); } } /** Remove all registries from the composite. */ - public synchronized void removeAll() { - registries.set(new Registry[0]); - state.clear(); + public void removeAll() { + lock.lock(); + try { + registries.set(new Registry[0]); + state.clear(); + } finally { + lock.unlock(); + } } @Override public Clock clock() { diff --git a/spectator-api/src/main/java/com/netflix/spectator/impl/Scheduler.java b/spectator-api/src/main/java/com/netflix/spectator/impl/Scheduler.java index 90cb451c5..4716e90df 100644 --- a/spectator-api/src/main/java/com/netflix/spectator/impl/Scheduler.java +++ b/spectator-api/src/main/java/com/netflix/spectator/impl/Scheduler.java @@ -1,5 +1,5 @@ /* - * Copyright 2014-2019 Netflix, Inc. + * Copyright 2014-2023 Netflix, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -33,6 +33,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; /** *

This class is an internal implementation detail only intended for use within spectator. @@ -113,6 +115,8 @@ private static Id newId(Registry registry, String id, String name) { private final ThreadFactory factory; private final Thread[] threads; + private final Lock lock = new ReentrantLock(); + private volatile boolean started = false; private volatile boolean shutdown = false; @@ -168,26 +172,36 @@ public ScheduledFuture schedule(Options options, Runnable task) { * Shutdown and cleanup resources associated with the scheduler. All threads will be * interrupted, but this method does not block for them to all finish execution. */ - public synchronized void shutdown() { - shutdown = true; - for (int i = 0; i < threads.length; ++i) { - if (threads[i] != null && threads[i].isAlive()) { - threads[i].interrupt(); - threads[i] = null; + public void shutdown() { + lock.lock(); + try { + shutdown = true; + for (int i = 0; i < threads.length; ++i) { + if (threads[i] != null && threads[i].isAlive()) { + threads[i].interrupt(); + threads[i] = null; + } } + } finally { + lock.unlock(); } } - private synchronized void startThreads() { - if (!shutdown) { - started = true; - for (int i = 0; i < threads.length; ++i) { - if (threads[i] == null || !threads[i].isAlive() || threads[i].isInterrupted()) { - threads[i] = factory.newThread(new Worker()); - threads[i].start(); - LOGGER.debug("started thread {}", threads[i].getName()); + private void startThreads() { + lock.lock(); + try { + if (!shutdown) { + started = true; + for (int i = 0; i < threads.length; ++i) { + if (threads[i] == null || !threads[i].isAlive() || threads[i].isInterrupted()) { + threads[i] = factory.newThread(new Worker()); + threads[i].start(); + LOGGER.debug("started thread {}", threads[i].getName()); + } } } + } finally { + lock.unlock(); } } diff --git a/spectator-ext-gc/src/main/java/com/netflix/spectator/gc/GcLogger.java b/spectator-ext-gc/src/main/java/com/netflix/spectator/gc/GcLogger.java index 53cda09dd..b3f8637af 100644 --- a/spectator-ext-gc/src/main/java/com/netflix/spectator/gc/GcLogger.java +++ b/spectator-ext-gc/src/main/java/com/netflix/spectator/gc/GcLogger.java @@ -40,6 +40,8 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; /** * Logger to collect GC notifcation events. @@ -82,6 +84,8 @@ public final class GcLogger { private static final Id CONCURRENT_PHASE_TIME = Spectator.globalRegistry().createId("jvm.gc.concurrentPhaseTime"); + private final Lock lock = new ReentrantLock(); + private final long jvmStartTime; private final ConcurrentHashMap> gcLogs = new ConcurrentHashMap<>(); @@ -129,38 +133,48 @@ public GcLogger() { * If not null, the listener will be called with the event objects after metrics and the * log buffer is updated. */ - public synchronized void start(GcEventListener listener) { - // TODO: this class has a bad mix of static fields used from an instance of the class. For now - // this has been changed not to throw to make the dependency injection use-cases work. A - // more general refactor of the GcLogger class is needed. - if (notifListener != null) { - LOGGER.warn("logger already started"); - return; - } - eventListener = listener; - notifListener = new GcNotificationListener(); - for (GarbageCollectorMXBean mbean : ManagementFactory.getGarbageCollectorMXBeans()) { - if (mbean instanceof NotificationEmitter) { - final NotificationEmitter emitter = (NotificationEmitter) mbean; - emitter.addNotificationListener(notifListener, null, null); + public void start(GcEventListener listener) { + lock.lock(); + try { + // TODO: this class has a bad mix of static fields used from an instance of the class. For now + // this has been changed not to throw to make the dependency injection use-cases work. A + // more general refactor of the GcLogger class is needed. + if (notifListener != null) { + LOGGER.warn("logger already started"); + return; } + eventListener = listener; + notifListener = new GcNotificationListener(); + for (GarbageCollectorMXBean mbean : ManagementFactory.getGarbageCollectorMXBeans()) { + if (mbean instanceof NotificationEmitter) { + final NotificationEmitter emitter = (NotificationEmitter) mbean; + emitter.addNotificationListener(notifListener, null, null); + } + } + } finally { + lock.unlock(); } } /** Stop collecting GC events. */ - public synchronized void stop() { - Preconditions.checkState(notifListener != null, "logger has not been started"); - for (GarbageCollectorMXBean mbean : ManagementFactory.getGarbageCollectorMXBeans()) { - if (mbean instanceof NotificationEmitter) { - final NotificationEmitter emitter = (NotificationEmitter) mbean; - try { - emitter.removeNotificationListener(notifListener); - } catch (ListenerNotFoundException e) { - LOGGER.warn("could not remove gc listener", e); + public void stop() { + lock.lock(); + try { + Preconditions.checkState(notifListener != null, "logger has not been started"); + for (GarbageCollectorMXBean mbean : ManagementFactory.getGarbageCollectorMXBeans()) { + if (mbean instanceof NotificationEmitter) { + final NotificationEmitter emitter = (NotificationEmitter) mbean; + try { + emitter.removeNotificationListener(notifListener); + } catch (ListenerNotFoundException e) { + LOGGER.warn("could not remove gc listener", e); + } } } + notifListener = null; + } finally { + lock.unlock(); } - notifListener = null; } /** Return the current set of GC events in the in-memory log. */ diff --git a/spectator-ext-ipc/src/main/java/com/netflix/spectator/ipc/http/HttpRequestBuilder.java b/spectator-ext-ipc/src/main/java/com/netflix/spectator/ipc/http/HttpRequestBuilder.java index 67443300e..c0dafce92 100644 --- a/spectator-ext-ipc/src/main/java/com/netflix/spectator/ipc/http/HttpRequestBuilder.java +++ b/spectator-ext-ipc/src/main/java/com/netflix/spectator/ipc/http/HttpRequestBuilder.java @@ -1,5 +1,5 @@ /* - * Copyright 2014-2020 Netflix, Inc. + * Copyright 2014-2023 Netflix, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -42,6 +42,8 @@ import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.Consumer; import java.util.function.Supplier; import java.util.zip.Deflater; @@ -60,6 +62,8 @@ public class HttpRequestBuilder { private static final StreamHelper STREAM_HELPER = new StreamHelper(); + private static final Lock LOCK = new ReentrantLock(); + // Should not be used directly, use the method of the same name that will create the // executor if needed on the first access. private static volatile ExecutorService defaultExecutor; @@ -82,10 +86,13 @@ private static ExecutorService defaultExecutor() { if (executor != null) { return executor; } - synchronized (LOGGER) { + LOCK.lock(); + try { defaultExecutor = Executors.newFixedThreadPool( Runtime.getRuntime().availableProcessors(), newThreadFactory()); return defaultExecutor; + } finally { + LOCK.unlock(); } } diff --git a/spectator-reg-atlas/src/main/java/com/netflix/spectator/atlas/impl/Evaluator.java b/spectator-reg-atlas/src/main/java/com/netflix/spectator/atlas/impl/Evaluator.java index d84088263..19918a10a 100644 --- a/spectator-reg-atlas/src/main/java/com/netflix/spectator/atlas/impl/Evaluator.java +++ b/spectator-reg-atlas/src/main/java/com/netflix/spectator/atlas/impl/Evaluator.java @@ -31,6 +31,8 @@ import java.util.Set; import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.Consumer; import java.util.function.Function; @@ -44,6 +46,7 @@ public class Evaluator { private static final Logger LOGGER = LoggerFactory.getLogger(Evaluator.class); + private final Lock lock = new ReentrantLock(); private final Map commonTags; private final Function> idMapper; private final long step; @@ -71,36 +74,41 @@ public Evaluator(EvaluatorConfig config) { /** * Synchronize the set of subscriptions for this evaluator with the provided set. */ - public synchronized void sync(List subs) { - Set removed = new HashSet<>(subscriptions.keySet()); - for (Subscription sub : subs) { - boolean alreadyPresent = removed.remove(sub); - if (!alreadyPresent) { - try { - // Parse and simplify query - Query q = sub.dataExpr().query().simplify(commonTags); - LOGGER.trace("query pre-eval: original [{}], simplified [{}], common tags {}", - sub.dataExpr().query(), q, commonTags); + public void sync(List subs) { + lock.lock(); + try { + Set removed = new HashSet<>(subscriptions.keySet()); + for (Subscription sub : subs) { + boolean alreadyPresent = removed.remove(sub); + if (!alreadyPresent) { + try { + // Parse and simplify query + Query q = sub.dataExpr().query().simplify(commonTags); + LOGGER.trace("query pre-eval: original [{}], simplified [{}], common tags {}", + sub.dataExpr().query(), q, commonTags); - // Update index - int multiple = (int) (sub.getFrequency() / step); - SubscriptionEntry entry = new SubscriptionEntry(sub, multiple); - subscriptions.put(sub, entry); - index.add(q, entry); - LOGGER.debug("subscription added: {}", sub); - } catch (Exception e) { - LOGGER.warn("failed to add subscription: {}", sub, e); + // Update index + int multiple = (int) (sub.getFrequency() / step); + SubscriptionEntry entry = new SubscriptionEntry(sub, multiple); + subscriptions.put(sub, entry); + index.add(q, entry); + LOGGER.debug("subscription added: {}", sub); + } catch (Exception e) { + LOGGER.warn("failed to add subscription: {}", sub, e); + } + } else { + LOGGER.trace("subscription already present: {}", sub); } - } else { - LOGGER.trace("subscription already present: {}", sub); } - } - for (Subscription sub : removed) { - SubscriptionEntry entry = subscriptions.remove(sub); - Query q = sub.dataExpr().query().simplify(commonTags); - index.remove(q, entry); - LOGGER.debug("subscription removed: {}", sub); + for (Subscription sub : removed) { + SubscriptionEntry entry = subscriptions.remove(sub); + Query q = sub.dataExpr().query().simplify(commonTags); + index.remove(q, entry); + LOGGER.debug("subscription removed: {}", sub); + } + } finally { + lock.unlock(); } } diff --git a/spectator-reg-atlas/src/main/java/com/netflix/spectator/atlas/impl/PrefixTree.java b/spectator-reg-atlas/src/main/java/com/netflix/spectator/atlas/impl/PrefixTree.java index f5fcae033..8c8e5c49e 100644 --- a/spectator-reg-atlas/src/main/java/com/netflix/spectator/atlas/impl/PrefixTree.java +++ b/spectator-reg-atlas/src/main/java/com/netflix/spectator/atlas/impl/PrefixTree.java @@ -1,5 +1,5 @@ /* - * Copyright 2014-2022 Netflix, Inc. + * Copyright 2014-2023 Netflix, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,6 +20,8 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicReferenceArray; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.Consumer; /** @@ -39,6 +41,7 @@ private static int indexOf(char c) { return (i >= TABLE_SIZE) ? -1 : i; } + private final Lock lock = new ReentrantLock(); private final AtomicReferenceArray> children; private final Set values; @@ -51,12 +54,15 @@ private static int indexOf(char c) { private PrefixTree computeIfAbsent(int i) { PrefixTree child = children.get(i); if (child == null) { - synchronized (this) { + lock.lock(); + try { child = children.get(i); if (child == null) { child = new PrefixTree<>(); children.set(i, child); } + } finally { + lock.unlock(); } } return child; @@ -122,13 +128,16 @@ private boolean remove(String prefix, int pos, T value) { } else { boolean result = child.remove(prefix, pos + 1, value); if (result && child.isEmpty()) { - synchronized (this) { + lock.lock(); + try { // Check that the children array still has the reference to the // same child object. The entry may have been replaced by another // thread. if (child == children.get(i) && child.isEmpty()) { children.set(i, null); } + } finally { + lock.unlock(); } } return result;