diff --git a/ratis-common/src/main/java/org/apache/ratis/util/LeakDetector.java b/ratis-common/src/main/java/org/apache/ratis/util/LeakDetector.java index 82202f2884..a6b2ec28bd 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/LeakDetector.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/LeakDetector.java @@ -22,10 +22,14 @@ import java.lang.ref.ReferenceQueue; import java.lang.ref.WeakReference; +import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; +import java.util.List; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; +import java.util.function.Supplier; /** * Simple general resource leak detector using {@link ReferenceQueue} and {@link java.lang.ref.WeakReference} to @@ -55,13 +59,61 @@ */ public class LeakDetector { private static final Logger LOG = LoggerFactory.getLogger(LeakDetector.class); + + private static class LeakTrackerSet { + private final Set set = Collections.newSetFromMap(new HashMap<>()); + + synchronized boolean remove(LeakTracker tracker) { + return set.remove(tracker); + } + + synchronized void removeExisting(LeakTracker tracker) { + final boolean removed = set.remove(tracker); + Preconditions.assertTrue(removed, () -> "Failed to remove existing " + tracker); + } + + synchronized LeakTracker add(Object referent, ReferenceQueue queue, Supplier leakReporter) { + final LeakTracker tracker = new LeakTracker(referent, queue, this::removeExisting, leakReporter); + final boolean added = set.add(tracker); + Preconditions.assertTrue(added, () -> "Failed to add " + tracker + " for " + referent); + return tracker; + } + + synchronized int getNumLeaks(boolean throwException) { + if (set.isEmpty()) { + return 0; + } + + int n = 0; + for (LeakTracker tracker : set) { + if (tracker.reportLeak() != null) { + n++; + } + } + if (throwException) { + assertNoLeaks(n); + } + return n; + } + + synchronized void assertNoLeaks(int leaks) { + Preconditions.assertTrue(leaks == 0, () -> { + final int size = set.size(); + return "#leaks = " + leaks + " > 0, #leaks " + (leaks == size? "==" : "!=") + " set.size = " + size; + }); + } + } + private static final AtomicLong COUNTER = new AtomicLong(); private final ReferenceQueue queue = new ReferenceQueue<>(); - private final Set allLeaks = Collections.newSetFromMap(new ConcurrentHashMap<>()); + /** All the {@link LeakTracker}s. */ + private final LeakTrackerSet trackers = new LeakTrackerSet(); + /** When a leak is discovered, a message is printed and added to this list. */ + private final List leakMessages = Collections.synchronizedList(new ArrayList<>()); private final String name; - public LeakDetector(String name) { + LeakDetector(String name) { this.name = name + COUNTER.getAndIncrement(); } @@ -80,8 +132,11 @@ private void run() { LeakTracker tracker = (LeakTracker) queue.remove(); // Original resource already been GCed, if tracker is not closed yet, // report a leak. - if (allLeaks.remove(tracker)) { - tracker.reportLeak(); + if (trackers.remove(tracker)) { + final String leak = tracker.reportLeak(); + if (leak != null) { + leakMessages.add(leak); + } } } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -93,48 +148,51 @@ private void run() { LOG.warn("Exiting leak detector {}.", name); } - public UncheckedAutoCloseable track(Object leakable, Runnable reportLeak) { - // A rate filter can be put here to only track a subset of all objects, e.g. 5%, 10%, + Runnable track(Object leakable, Supplier reportLeak) { + // TODO: A rate filter can be put here to only track a subset of all objects, e.g. 5%, 10%, // if we have proofs that leak tracking impacts performance, or a single LeakDetector // thread can't keep up with the pace of object allocation. // For now, it looks effective enough and let keep it simple. - LeakTracker tracker = new LeakTracker(leakable, queue, allLeaks, reportLeak); - allLeaks.add(tracker); - return tracker; + return trackers.add(leakable, queue, reportLeak)::remove; } - public void assertNoLeaks() { - Preconditions.assertTrue(allLeaks.isEmpty(), this::allLeaksString); - } + public void assertNoLeaks(int maxRetries, TimeDuration retrySleep) throws InterruptedException { + synchronized (leakMessages) { + // leakMessages are all the leaks discovered so far. + Preconditions.assertTrue(leakMessages.isEmpty(), + () -> "#leaks = " + leakMessages.size() + "\n" + leakMessages); + } - String allLeaksString() { - if (allLeaks.isEmpty()) { - return "allLeaks = "; + for(int i = 0; i < maxRetries; i++) { + final int numLeaks = trackers.getNumLeaks(false); + if (numLeaks == 0) { + return; + } + LOG.warn("{}/{}) numLeaks == {} > 0, will wait and retry ...", i, maxRetries, numLeaks); + retrySleep.sleep(); } - allLeaks.forEach(LeakTracker::reportLeak); - return "allLeaks.size = " + allLeaks.size(); + trackers.getNumLeaks(true); } - private static final class LeakTracker extends WeakReference implements UncheckedAutoCloseable { - private final Set allLeaks; - private final Runnable leakReporter; + private static final class LeakTracker extends WeakReference { + private final Consumer removeMethod; + private final Supplier getLeakMessage; + LeakTracker(Object referent, ReferenceQueue referenceQueue, - Set allLeaks, Runnable leakReporter) { + Consumer removeMethod, Supplier getLeakMessage) { super(referent, referenceQueue); - this.allLeaks = allLeaks; - this.leakReporter = leakReporter; + this.removeMethod = removeMethod; + this.getLeakMessage = getLeakMessage; } - /** - * Called by the tracked resource when closing. - */ - @Override - public void close() { - allLeaks.remove(this); + /** Called by the tracked resource when the object is completely released. */ + void remove() { + removeMethod.accept(this); } - void reportLeak() { - leakReporter.run(); + /** @return the leak message if there is a leak; return null if there is no leak. */ + String reportLeak() { + return getLeakMessage.get(); } } } diff --git a/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedLeakDetector.java b/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedLeakDetector.java index 32abe805f1..acf6fb8cfc 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedLeakDetector.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedLeakDetector.java @@ -20,7 +20,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.LinkedList; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -49,7 +49,7 @@ public static LeakDetector getLeakDetector() { private ReferenceCountedLeakDetector() { } - static synchronized void enable(boolean advanced) { + public static synchronized void enable(boolean advanced) { FACTORY.set(advanced ? Mode.ADVANCED : Mode.SIMPLE); } @@ -108,6 +108,10 @@ public V get() { return value; } + final int getCount() { + return count.get(); + } + @Override public V retain() { // n < 0: exception @@ -138,85 +142,221 @@ public boolean release() { } private static class SimpleTracing extends Impl { - private final UncheckedAutoCloseable leakTracker; + private final LeakDetector leakDetector; + private final Class valueClass; + private String valueString = null; + private Runnable removeMethod = null; SimpleTracing(T value, Runnable retainMethod, Consumer releaseMethod, LeakDetector leakDetector) { super(value, retainMethod, releaseMethod); - final Class clazz = value.getClass(); - this.leakTracker = leakDetector.track(this, - () -> LOG.warn("LEAK: A {} is not released properly", clazz.getName())); + this.valueClass = value.getClass(); + this.leakDetector = leakDetector; + } + + String getTraceString(int count) { + return "(" + valueClass + ", count=" + count + ", value=" + valueString + ")"; + } + + /** @return the leak message if there is a leak; return null if there is no leak. */ + String logLeakMessage() { + final int count = getCount(); + if (count == 0) { // never retain + return null; + } + final String message = "LEAK: " + getTraceString(count); + LOG.warn(message); + return message; } @Override - public boolean release() { - boolean released = super.release(); + public synchronized T get() { + try { + return super.get(); + } catch (Exception e) { + throw new IllegalStateException("Failed to get: " + getTraceString(getCount()), e); + } + } + + @Override + public synchronized T retain() { + final T value; + try { + value = super.retain(); + } catch (Exception e) { + throw new IllegalStateException("Failed to retain: " + getTraceString(getCount()), e); + } + if (getCount() == 1) { // this is the first retain + this.removeMethod = leakDetector.track(this, this::logLeakMessage); + this.valueString = value.toString(); + } + return value; + } + + @Override + public synchronized boolean release() { + final boolean released; + try { + released = super.release(); + } catch (Exception e) { + throw new IllegalStateException("Failed to release: " + getTraceString(getCount()), e); + } + if (released) { - leakTracker.close(); + Preconditions.assertNotNull(removeMethod, () -> "Not yet retained (removeMethod == null): " + valueClass); + removeMethod.run(); } return released; } } - private static class AdvancedTracing extends Impl { - private final UncheckedAutoCloseable leakTracker; - private final List retainsTraces; - private final List releaseTraces; + private static class AdvancedTracing extends SimpleTracing { + enum Op {CREATION, RETAIN, RELEASE, CURRENT} - AdvancedTracing(T value, Runnable retainMethod, Consumer releaseMethod, LeakDetector leakDetector) { - super(value, retainMethod, releaseMethod); + static class Counts { + private final int refCount; + private final int retainCount; + private final int releaseCount; + + Counts() { + this.refCount = 0; + this.retainCount = 0; + this.releaseCount = 0; + } + + Counts(Op op, Counts previous) { + if (op == Op.RETAIN) { + this.refCount = previous.refCount + 1; + this.retainCount = previous.retainCount + 1; + this.releaseCount = previous.releaseCount; + } else if (op == Op.RELEASE) { + this.refCount = previous.refCount - 1; + this.retainCount = previous.retainCount; + this.releaseCount = previous.releaseCount + 1; + } else { + throw new IllegalStateException("Unexpected op: " + op); + } + } + + @Override + public String toString() { + return "refCount=" + refCount + + ", retainCount=" + retainCount + + ", releaseCount=" + releaseCount; + } + } + + static class TraceInfo { + private final int id; + private final Op op; + private final int previousRefCount; + private final Counts counts; + + private final String threadInfo; + private final StackTraceElement[] stackTraces; + private final int newTraceElementIndex; + + TraceInfo(int id, Op op, TraceInfo previous, int previousRefCount) { + this.id = id; + this.op = op; + this.previousRefCount = previousRefCount; + this.counts = previous == null? new Counts() + : op == Op.CURRENT ? previous.counts + : new Counts(op, previous.counts); + + final Thread thread = Thread.currentThread(); + this.threadInfo = "Thread_" + thread.getId() + ":" + thread.getName(); + this.stackTraces = thread.getStackTrace(); + this.newTraceElementIndex = previous == null? stackTraces.length - 1 + : findFirstUnequalFromTail(this.stackTraces, previous.stackTraces); + } + + static int findFirstUnequalFromTail(T[] current, T[] previous) { + int c = current.length - 1; + for(int p = previous.length - 1; p >= 0; p--, c--) { + if (!previous[p].equals(current[c])) { + return c; + } + } + return -1; + } + + private StringBuilder appendTo(StringBuilder b) { + b.append(op).append("_").append(id) + .append(": previousRefCount=").append(previousRefCount) + .append(", ").append(counts) + .append(", ").append(threadInfo).append("\n"); + final int n = newTraceElementIndex + 1; + int line = 3; + for (; line <= n && line < stackTraces.length; line++) { + b.append(" ").append(stackTraces[line]).append("\n"); + } + if (line < stackTraces.length) { + b.append(" ...\n"); + } + return b; + } + + @Override + public String toString() { + return appendTo(new StringBuilder()).toString(); + } + } - StackTraceElement[] createStrace = Thread.currentThread().getStackTrace(); - final Class clazz = value.getClass(); - final List localRetainsTraces = new LinkedList<>(); - final List localReleaseTraces = new LinkedList<>(); + private final List traceInfos = new ArrayList<>(); + private TraceInfo previous; - this.leakTracker = leakDetector.track(this, () -> - LOG.warn("LEAK: A {} is not released properly.\nCreation trace:\n{}\n" + - "Retain traces({}):\n{}\nRelease traces({}):\n{}", - clazz.getName(), formatStackTrace(createStrace, 3), - localRetainsTraces.size(), formatStackTraces(localRetainsTraces, 2), - localReleaseTraces.size(), formatStackTraces(localReleaseTraces, 2))); + AdvancedTracing(T value, Runnable retainMethod, Consumer releaseMethod, LeakDetector leakDetector) { + super(value, retainMethod, releaseMethod, leakDetector); + addTraceInfo(Op.CREATION, -1); + } - this.retainsTraces = localRetainsTraces; - this.releaseTraces = localReleaseTraces; + private synchronized TraceInfo addTraceInfo(Op op, int previousRefCount) { + final TraceInfo current = new TraceInfo(traceInfos.size(), op, previous, previousRefCount); + traceInfos.add(current); + previous = current; + return current; } + @Override - public T retain() { - T retain = super.retain(); - retainsTraces.add(Thread.currentThread().getStackTrace()); - return retain; + public synchronized T retain() { + final int previousRefCount = getCount(); + final T retained = super.retain(); + final TraceInfo info = addTraceInfo(Op.RETAIN, previousRefCount); + Preconditions.assertSame(getCount(), info.counts.refCount, "refCount"); + return retained; } @Override - public boolean release() { - boolean released = super.release(); - if (released) { - leakTracker.close(); - } - releaseTraces.add(Thread.currentThread().getStackTrace()); + public synchronized boolean release() { + final int previousRefCount = getCount(); + final boolean released = super.release(); + final TraceInfo info = addTraceInfo(Op.RELEASE, previousRefCount); + final int count = getCount(); + final int expected = count == -1? 0 : count; + Preconditions.assertSame(expected, info.counts.refCount, "refCount"); return released; } - } - private static String formatStackTrace(StackTraceElement[] stackTrace, int startIdx) { - final StringBuilder sb = new StringBuilder(); - for (int line = startIdx; line < stackTrace.length; line++) { - sb.append(stackTrace[line]).append("\n"); + @Override + synchronized String getTraceString(int count) { + return super.getTraceString(count) + getTraceInfosString(); } - return sb.toString(); - } - private static String formatStackTraces(List stackTraces, int startIdx) { - final StringBuilder sb = new StringBuilder(); - stackTraces.forEach(stackTrace -> { - if (sb.length() > 0) { - sb.append("\n"); - } - for (int line = startIdx; line < stackTrace.length; line++) { - sb.append(stackTrace[line]).append("\n"); + private String getTraceInfosString() { + final int n = traceInfos.size(); + final StringBuilder b = new StringBuilder(n << 10).append(" #TraceInfos=").append(n); + TraceInfo last = null; + for (TraceInfo info : traceInfos) { + info.appendTo(b.append("\n")); + last = info; } - }); - return sb.toString(); + + // append current track info + final TraceInfo current = new TraceInfo(n, Op.CURRENT, last, getCount()); + current.appendTo(b.append("\n")); + + return b.toString(); + } } } diff --git a/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedObject.java b/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedObject.java index b2c53182d3..1fc72c3445 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedObject.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedObject.java @@ -182,13 +182,4 @@ static ReferenceCountedObject wrap(V value, Runnable retainMethod, Consum static ReferenceCountedObject wrap(V value, Runnable retainMethod, Runnable releaseMethod) { return wrap(value, retainMethod, ignored -> releaseMethod.run()); } - - static void enableLeakDetection() { - ReferenceCountedLeakDetector.enable(false); - } - - static void enableAdvancedLeakDetection() { - ReferenceCountedLeakDetector.enable(true); - } - } diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGrpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGrpc.java index 47f9e1d4b6..fe12e29f11 100644 --- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGrpc.java +++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGrpc.java @@ -32,7 +32,7 @@ import org.apache.ratis.server.impl.MiniRaftCluster; import org.apache.ratis.server.impl.RaftServerTestUtil; import org.apache.ratis.util.NetUtils; -import org.apache.ratis.util.ReferenceCountedObject; +import org.apache.ratis.util.ReferenceCountedLeakDetector; import org.junit.Assert; import java.util.Optional; @@ -51,7 +51,8 @@ public MiniRaftClusterWithGrpc newCluster(String[] ids, String[] listenerIds, Ra }; static { - ReferenceCountedObject.enableLeakDetection(); + // TODO move it to MiniRaftCluster for detecting non-gRPC cases + ReferenceCountedLeakDetector.enable(false); } public interface FactoryGet extends Factory.Get { diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java index 9a54700ecd..86ebfa52ca 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java @@ -136,17 +136,27 @@ default void runWithNewCluster(int numServers, int numListeners, boolean startCl final StackTraceElement caller = JavaUtils.getCallerStackTraceElement(); LOG.info("Running " + caller.getMethodName()); final CLUSTER cluster = newCluster(numServers, numListeners); + Throwable failed = null; try { if (startCluster) { cluster.start(); } testCase.accept(cluster); - } catch(Exception t) { + } catch(Throwable t) { LOG.info(cluster.printServers()); LOG.error("Failed " + caller, t); + failed = t; throw t; } finally { - cluster.shutdown(); + try { + cluster.shutdown(); + } catch (Exception e) { + if (failed == null) { + throw e; + } else { + failed.addSuppressed(e); + } + } } } @@ -847,10 +857,24 @@ public void shutdown() { final ExecutorService executor = Executors.newFixedThreadPool(servers.size(), (t) -> Daemon.newBuilder().setName("MiniRaftCluster-" + THREAD_COUNT.incrementAndGet()).setRunnable(t).build()); getServers().forEach(proxy -> executor.submit(() -> JavaUtils.runAsUnchecked(proxy::close))); + final int maxRetries = 30; + final TimeDuration retrySleep = TimeDuration.ONE_SECOND; try { executor.shutdown(); // just wait for a few seconds - executor.awaitTermination(5, TimeUnit.SECONDS); + boolean terminated = false; + + for(int i = 0; i < maxRetries && !terminated; ) { + terminated = executor.awaitTermination(retrySleep.getDuration(), retrySleep.getUnit()); + if (!terminated) { + i++; + if (i < maxRetries) { + LOG.warn("{}/{}: Not yet able to shutdown executor, will wait again ...", i, maxRetries); + } else { + LOG.error("Failed to shutdown executor, some servers may be still running:\n{}", printServers()); + } + } + } } catch (InterruptedException e) { LOG.warn("shutdown interrupted", e); Thread.currentThread().interrupt(); @@ -864,9 +888,13 @@ public void shutdown() { try { RaftTestUtil.gc(); } catch (InterruptedException e) { - LOG.info("gc interrupted."); + LOG.warn("gc interrupted.", e); + } + try { + ReferenceCountedLeakDetector.getLeakDetector().assertNoLeaks(maxRetries, retrySleep); + } catch (InterruptedException e) { + LOG.warn("LeakDetector interrupted.", e); } - ReferenceCountedLeakDetector.getLeakDetector().assertNoLeaks(); } /**