registries;
@@ -78,37 +82,52 @@ private int indexOf(Registry[] rs, Registry registry) {
}
/** Add a registry to the composite. */
- public synchronized void add(Registry registry) {
- Registry[] rs = registries.get();
- int pos = indexOf(rs, registry);
- if (pos == -1) {
- Registry[] tmp = new Registry[rs.length + 1];
- System.arraycopy(rs, 0, tmp, 0, rs.length);
- tmp[rs.length] = registry;
- registries.set(tmp);
- version.incrementAndGet();
+ public void add(Registry registry) {
+ lock.lock();
+ try {
+ Registry[] rs = registries.get();
+ int pos = indexOf(rs, registry);
+ if (pos == -1) {
+ Registry[] tmp = new Registry[rs.length + 1];
+ System.arraycopy(rs, 0, tmp, 0, rs.length);
+ tmp[rs.length] = registry;
+ registries.set(tmp);
+ version.incrementAndGet();
+ }
+ } finally {
+ lock.unlock();
}
}
/** Remove a registry from the composite. */
- public synchronized void remove(Registry registry) {
- Registry[] rs = registries.get();
- int pos = indexOf(rs, registry);
- if (pos >= 0) {
- Registry[] tmp = new Registry[rs.length - 1];
- if (pos > 0)
- System.arraycopy(rs, 0, tmp, 0, pos);
- if (pos < tmp.length)
- System.arraycopy(rs, pos + 1, tmp, pos, rs.length - pos - 1);
- registries.set(tmp);
- version.incrementAndGet();
+ public void remove(Registry registry) {
+ lock.lock();
+ try {
+ Registry[] rs = registries.get();
+ int pos = indexOf(rs, registry);
+ if (pos >= 0) {
+ Registry[] tmp = new Registry[rs.length - 1];
+ if (pos > 0)
+ System.arraycopy(rs, 0, tmp, 0, pos);
+ if (pos < tmp.length)
+ System.arraycopy(rs, pos + 1, tmp, pos, rs.length - pos - 1);
+ registries.set(tmp);
+ version.incrementAndGet();
+ }
+ } finally {
+ lock.unlock();
}
}
/** Remove all registries from the composite. */
- public synchronized void removeAll() {
- registries.set(new Registry[0]);
- state.clear();
+ public void removeAll() {
+ lock.lock();
+ try {
+ registries.set(new Registry[0]);
+ state.clear();
+ } finally {
+ lock.unlock();
+ }
}
@Override public Clock clock() {
diff --git a/spectator-api/src/main/java/com/netflix/spectator/impl/Scheduler.java b/spectator-api/src/main/java/com/netflix/spectator/impl/Scheduler.java
index 90cb451c5..4716e90df 100644
--- a/spectator-api/src/main/java/com/netflix/spectator/impl/Scheduler.java
+++ b/spectator-api/src/main/java/com/netflix/spectator/impl/Scheduler.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2014-2019 Netflix, Inc.
+ * Copyright 2014-2023 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -33,6 +33,8 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
/**
* This class is an internal implementation detail only intended for use within spectator.
@@ -113,6 +115,8 @@ private static Id newId(Registry registry, String id, String name) {
private final ThreadFactory factory;
private final Thread[] threads;
+ private final Lock lock = new ReentrantLock();
+
private volatile boolean started = false;
private volatile boolean shutdown = false;
@@ -168,26 +172,36 @@ public ScheduledFuture> schedule(Options options, Runnable task) {
* Shutdown and cleanup resources associated with the scheduler. All threads will be
* interrupted, but this method does not block for them to all finish execution.
*/
- public synchronized void shutdown() {
- shutdown = true;
- for (int i = 0; i < threads.length; ++i) {
- if (threads[i] != null && threads[i].isAlive()) {
- threads[i].interrupt();
- threads[i] = null;
+ public void shutdown() {
+ lock.lock();
+ try {
+ shutdown = true;
+ for (int i = 0; i < threads.length; ++i) {
+ if (threads[i] != null && threads[i].isAlive()) {
+ threads[i].interrupt();
+ threads[i] = null;
+ }
}
+ } finally {
+ lock.unlock();
}
}
- private synchronized void startThreads() {
- if (!shutdown) {
- started = true;
- for (int i = 0; i < threads.length; ++i) {
- if (threads[i] == null || !threads[i].isAlive() || threads[i].isInterrupted()) {
- threads[i] = factory.newThread(new Worker());
- threads[i].start();
- LOGGER.debug("started thread {}", threads[i].getName());
+ private void startThreads() {
+ lock.lock();
+ try {
+ if (!shutdown) {
+ started = true;
+ for (int i = 0; i < threads.length; ++i) {
+ if (threads[i] == null || !threads[i].isAlive() || threads[i].isInterrupted()) {
+ threads[i] = factory.newThread(new Worker());
+ threads[i].start();
+ LOGGER.debug("started thread {}", threads[i].getName());
+ }
}
}
+ } finally {
+ lock.unlock();
}
}
diff --git a/spectator-ext-gc/src/main/java/com/netflix/spectator/gc/GcLogger.java b/spectator-ext-gc/src/main/java/com/netflix/spectator/gc/GcLogger.java
index 53cda09dd..b3f8637af 100644
--- a/spectator-ext-gc/src/main/java/com/netflix/spectator/gc/GcLogger.java
+++ b/spectator-ext-gc/src/main/java/com/netflix/spectator/gc/GcLogger.java
@@ -40,6 +40,8 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
/**
* Logger to collect GC notifcation events.
@@ -82,6 +84,8 @@ public final class GcLogger {
private static final Id CONCURRENT_PHASE_TIME =
Spectator.globalRegistry().createId("jvm.gc.concurrentPhaseTime");
+ private final Lock lock = new ReentrantLock();
+
private final long jvmStartTime;
private final ConcurrentHashMap> gcLogs = new ConcurrentHashMap<>();
@@ -129,38 +133,48 @@ public GcLogger() {
* If not null, the listener will be called with the event objects after metrics and the
* log buffer is updated.
*/
- public synchronized void start(GcEventListener listener) {
- // TODO: this class has a bad mix of static fields used from an instance of the class. For now
- // this has been changed not to throw to make the dependency injection use-cases work. A
- // more general refactor of the GcLogger class is needed.
- if (notifListener != null) {
- LOGGER.warn("logger already started");
- return;
- }
- eventListener = listener;
- notifListener = new GcNotificationListener();
- for (GarbageCollectorMXBean mbean : ManagementFactory.getGarbageCollectorMXBeans()) {
- if (mbean instanceof NotificationEmitter) {
- final NotificationEmitter emitter = (NotificationEmitter) mbean;
- emitter.addNotificationListener(notifListener, null, null);
+ public void start(GcEventListener listener) {
+ lock.lock();
+ try {
+ // TODO: this class has a bad mix of static fields used from an instance of the class. For now
+ // this has been changed not to throw to make the dependency injection use-cases work. A
+ // more general refactor of the GcLogger class is needed.
+ if (notifListener != null) {
+ LOGGER.warn("logger already started");
+ return;
}
+ eventListener = listener;
+ notifListener = new GcNotificationListener();
+ for (GarbageCollectorMXBean mbean : ManagementFactory.getGarbageCollectorMXBeans()) {
+ if (mbean instanceof NotificationEmitter) {
+ final NotificationEmitter emitter = (NotificationEmitter) mbean;
+ emitter.addNotificationListener(notifListener, null, null);
+ }
+ }
+ } finally {
+ lock.unlock();
}
}
/** Stop collecting GC events. */
- public synchronized void stop() {
- Preconditions.checkState(notifListener != null, "logger has not been started");
- for (GarbageCollectorMXBean mbean : ManagementFactory.getGarbageCollectorMXBeans()) {
- if (mbean instanceof NotificationEmitter) {
- final NotificationEmitter emitter = (NotificationEmitter) mbean;
- try {
- emitter.removeNotificationListener(notifListener);
- } catch (ListenerNotFoundException e) {
- LOGGER.warn("could not remove gc listener", e);
+ public void stop() {
+ lock.lock();
+ try {
+ Preconditions.checkState(notifListener != null, "logger has not been started");
+ for (GarbageCollectorMXBean mbean : ManagementFactory.getGarbageCollectorMXBeans()) {
+ if (mbean instanceof NotificationEmitter) {
+ final NotificationEmitter emitter = (NotificationEmitter) mbean;
+ try {
+ emitter.removeNotificationListener(notifListener);
+ } catch (ListenerNotFoundException e) {
+ LOGGER.warn("could not remove gc listener", e);
+ }
}
}
+ notifListener = null;
+ } finally {
+ lock.unlock();
}
- notifListener = null;
}
/** Return the current set of GC events in the in-memory log. */
diff --git a/spectator-ext-ipc/src/main/java/com/netflix/spectator/ipc/http/HttpRequestBuilder.java b/spectator-ext-ipc/src/main/java/com/netflix/spectator/ipc/http/HttpRequestBuilder.java
index 67443300e..c0dafce92 100644
--- a/spectator-ext-ipc/src/main/java/com/netflix/spectator/ipc/http/HttpRequestBuilder.java
+++ b/spectator-ext-ipc/src/main/java/com/netflix/spectator/ipc/http/HttpRequestBuilder.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2014-2020 Netflix, Inc.
+ * Copyright 2014-2023 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -42,6 +42,8 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.zip.Deflater;
@@ -60,6 +62,8 @@ public class HttpRequestBuilder {
private static final StreamHelper STREAM_HELPER = new StreamHelper();
+ private static final Lock LOCK = new ReentrantLock();
+
// Should not be used directly, use the method of the same name that will create the
// executor if needed on the first access.
private static volatile ExecutorService defaultExecutor;
@@ -82,10 +86,13 @@ private static ExecutorService defaultExecutor() {
if (executor != null) {
return executor;
}
- synchronized (LOGGER) {
+ LOCK.lock();
+ try {
defaultExecutor = Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors(), newThreadFactory());
return defaultExecutor;
+ } finally {
+ LOCK.unlock();
}
}
diff --git a/spectator-reg-atlas/src/main/java/com/netflix/spectator/atlas/impl/Evaluator.java b/spectator-reg-atlas/src/main/java/com/netflix/spectator/atlas/impl/Evaluator.java
index d84088263..19918a10a 100644
--- a/spectator-reg-atlas/src/main/java/com/netflix/spectator/atlas/impl/Evaluator.java
+++ b/spectator-reg-atlas/src/main/java/com/netflix/spectator/atlas/impl/Evaluator.java
@@ -31,6 +31,8 @@
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.function.Function;
@@ -44,6 +46,7 @@ public class Evaluator {
private static final Logger LOGGER = LoggerFactory.getLogger(Evaluator.class);
+ private final Lock lock = new ReentrantLock();
private final Map commonTags;
private final Function> idMapper;
private final long step;
@@ -71,36 +74,41 @@ public Evaluator(EvaluatorConfig config) {
/**
* Synchronize the set of subscriptions for this evaluator with the provided set.
*/
- public synchronized void sync(List subs) {
- Set removed = new HashSet<>(subscriptions.keySet());
- for (Subscription sub : subs) {
- boolean alreadyPresent = removed.remove(sub);
- if (!alreadyPresent) {
- try {
- // Parse and simplify query
- Query q = sub.dataExpr().query().simplify(commonTags);
- LOGGER.trace("query pre-eval: original [{}], simplified [{}], common tags {}",
- sub.dataExpr().query(), q, commonTags);
+ public void sync(List subs) {
+ lock.lock();
+ try {
+ Set removed = new HashSet<>(subscriptions.keySet());
+ for (Subscription sub : subs) {
+ boolean alreadyPresent = removed.remove(sub);
+ if (!alreadyPresent) {
+ try {
+ // Parse and simplify query
+ Query q = sub.dataExpr().query().simplify(commonTags);
+ LOGGER.trace("query pre-eval: original [{}], simplified [{}], common tags {}",
+ sub.dataExpr().query(), q, commonTags);
- // Update index
- int multiple = (int) (sub.getFrequency() / step);
- SubscriptionEntry entry = new SubscriptionEntry(sub, multiple);
- subscriptions.put(sub, entry);
- index.add(q, entry);
- LOGGER.debug("subscription added: {}", sub);
- } catch (Exception e) {
- LOGGER.warn("failed to add subscription: {}", sub, e);
+ // Update index
+ int multiple = (int) (sub.getFrequency() / step);
+ SubscriptionEntry entry = new SubscriptionEntry(sub, multiple);
+ subscriptions.put(sub, entry);
+ index.add(q, entry);
+ LOGGER.debug("subscription added: {}", sub);
+ } catch (Exception e) {
+ LOGGER.warn("failed to add subscription: {}", sub, e);
+ }
+ } else {
+ LOGGER.trace("subscription already present: {}", sub);
}
- } else {
- LOGGER.trace("subscription already present: {}", sub);
}
- }
- for (Subscription sub : removed) {
- SubscriptionEntry entry = subscriptions.remove(sub);
- Query q = sub.dataExpr().query().simplify(commonTags);
- index.remove(q, entry);
- LOGGER.debug("subscription removed: {}", sub);
+ for (Subscription sub : removed) {
+ SubscriptionEntry entry = subscriptions.remove(sub);
+ Query q = sub.dataExpr().query().simplify(commonTags);
+ index.remove(q, entry);
+ LOGGER.debug("subscription removed: {}", sub);
+ }
+ } finally {
+ lock.unlock();
}
}
diff --git a/spectator-reg-atlas/src/main/java/com/netflix/spectator/atlas/impl/PrefixTree.java b/spectator-reg-atlas/src/main/java/com/netflix/spectator/atlas/impl/PrefixTree.java
index f5fcae033..8c8e5c49e 100644
--- a/spectator-reg-atlas/src/main/java/com/netflix/spectator/atlas/impl/PrefixTree.java
+++ b/spectator-reg-atlas/src/main/java/com/netflix/spectator/atlas/impl/PrefixTree.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2014-2022 Netflix, Inc.
+ * Copyright 2014-2023 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -20,6 +20,8 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReferenceArray;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
/**
@@ -39,6 +41,7 @@ private static int indexOf(char c) {
return (i >= TABLE_SIZE) ? -1 : i;
}
+ private final Lock lock = new ReentrantLock();
private final AtomicReferenceArray> children;
private final Set values;
@@ -51,12 +54,15 @@ private static int indexOf(char c) {
private PrefixTree computeIfAbsent(int i) {
PrefixTree child = children.get(i);
if (child == null) {
- synchronized (this) {
+ lock.lock();
+ try {
child = children.get(i);
if (child == null) {
child = new PrefixTree<>();
children.set(i, child);
}
+ } finally {
+ lock.unlock();
}
}
return child;
@@ -122,13 +128,16 @@ private boolean remove(String prefix, int pos, T value) {
} else {
boolean result = child.remove(prefix, pos + 1, value);
if (result && child.isEmpty()) {
- synchronized (this) {
+ lock.lock();
+ try {
// Check that the children array still has the reference to the
// same child object. The entry may have been replaced by another
// thread.
if (child == children.get(i) && child.isEmpty()) {
children.set(i, null);
}
+ } finally {
+ lock.unlock();
}
}
return result;