From 0d2a4dc5e2772c7761ef4579d9f55cc706b802a9 Mon Sep 17 00:00:00 2001 From: Tsz-Wo Nicholas Sze Date: Sat, 28 Sep 2024 09:06:09 -0700 Subject: [PATCH 01/31] RATIS-2164. LeakDetector has a race condition. --- .../org/apache/ratis/util/LeakDetector.java | 74 ++++++++++++------ .../util/ReferenceCountedLeakDetector.java | 78 ++++++++++--------- 2 files changed, 92 insertions(+), 60 deletions(-) diff --git a/ratis-common/src/main/java/org/apache/ratis/util/LeakDetector.java b/ratis-common/src/main/java/org/apache/ratis/util/LeakDetector.java index 82202f2884..22f524b25c 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/LeakDetector.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/LeakDetector.java @@ -26,6 +26,8 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; +import java.util.function.Predicate; /** * Simple general resource leak detector using {@link ReferenceQueue} and {@link java.lang.ref.WeakReference} to @@ -55,10 +57,43 @@ */ public class LeakDetector { private static final Logger LOG = LoggerFactory.getLogger(LeakDetector.class); + + private static class LeakTrackerSet { + private final Set set = Collections.newSetFromMap(new ConcurrentHashMap<>()); + + synchronized boolean remove(LeakTracker tracker) { + return set.remove(tracker); + } + + synchronized void removeExisting(LeakTracker tracker) { + final boolean removed = set.remove(tracker); + Preconditions.assertTrue(removed, () -> "Failed to remove existing " + tracker); + } + + synchronized LeakTracker add(Object referent, ReferenceQueue queue, Runnable leakReporter) { + final LeakTracker tracker = new LeakTracker(referent, queue, this::removeExisting, leakReporter); + final boolean added = set.add(tracker); + Preconditions.assertTrue(added, () -> "Failed to add " + tracker + " for " + referent); + return tracker; + } + + synchronized void assertNoLeaks() { + Preconditions.assertTrue(set.isEmpty(), this::allLeaksString); + } + + private String allLeaksString() { + if (set.isEmpty()) { + return "allLeaks = "; + } + set.forEach(LeakTracker::reportLeak); + return "allLeaks.size = " + set.size(); + } + } + private static final AtomicLong COUNTER = new AtomicLong(); private final ReferenceQueue queue = new ReferenceQueue<>(); - private final Set allLeaks = Collections.newSetFromMap(new ConcurrentHashMap<>()); + private final LeakTrackerSet allLeaks = new LeakTrackerSet(); private final String name; public LeakDetector(String name) { @@ -93,44 +128,39 @@ private void run() { LOG.warn("Exiting leak detector {}.", name); } - public UncheckedAutoCloseable track(Object leakable, Runnable reportLeak) { + public Predicate> track(Object leakable, Runnable reportLeak) { // A rate filter can be put here to only track a subset of all objects, e.g. 5%, 10%, // if we have proofs that leak tracking impacts performance, or a single LeakDetector // thread can't keep up with the pace of object allocation. // For now, it looks effective enough and let keep it simple. - LeakTracker tracker = new LeakTracker(leakable, queue, allLeaks, reportLeak); - allLeaks.add(tracker); - return tracker; + return allLeaks.add(leakable, queue, reportLeak)::releaseAndCheckRemove; } public void assertNoLeaks() { - Preconditions.assertTrue(allLeaks.isEmpty(), this::allLeaksString); + allLeaks.assertNoLeaks(); } - String allLeaksString() { - if (allLeaks.isEmpty()) { - return "allLeaks = "; - } - allLeaks.forEach(LeakTracker::reportLeak); - return "allLeaks.size = " + allLeaks.size(); - } - - private static final class LeakTracker extends WeakReference implements UncheckedAutoCloseable { - private final Set allLeaks; + private static final class LeakTracker extends WeakReference { + private final Consumer removeMethod; private final Runnable leakReporter; + LeakTracker(Object referent, ReferenceQueue referenceQueue, - Set allLeaks, Runnable leakReporter) { + Consumer removeMethod, Runnable leakReporter) { super(referent, referenceQueue); - this.allLeaks = allLeaks; + this.removeMethod = removeMethod; this.leakReporter = leakReporter; } /** - * Called by the tracked resource when closing. + * Called by the tracked resource when releasing the object. */ - @Override - public void close() { - allLeaks.remove(this); + boolean releaseAndCheckRemove(ReferenceCountedObject referenceCountedObject) { + if (referenceCountedObject.release()) { + removeMethod.accept(this); + return true; + } else { + return false; + } } void reportLeak() { diff --git a/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedLeakDetector.java b/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedLeakDetector.java index 32abe805f1..5f3c7a447d 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedLeakDetector.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedLeakDetector.java @@ -25,6 +25,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; +import java.util.function.Predicate; import java.util.function.Supplier; /** @@ -108,6 +109,10 @@ public V get() { return value; } + final int getCount() { + return count.get(); + } + @Override public V retain() { // n < 0: exception @@ -138,64 +143,61 @@ public boolean release() { } private static class SimpleTracing extends Impl { - private final UncheckedAutoCloseable leakTracker; + final LeakDetector leakDetector; + final Class clazz; + + Predicate> releaseAndCheckLeak = null; SimpleTracing(T value, Runnable retainMethod, Consumer releaseMethod, LeakDetector leakDetector) { super(value, retainMethod, releaseMethod); - final Class clazz = value.getClass(); - this.leakTracker = leakDetector.track(this, - () -> LOG.warn("LEAK: A {} is not released properly", clazz.getName())); + this.clazz = value.getClass(); + this.leakDetector = leakDetector; } @Override - public boolean release() { - boolean released = super.release(); - if (released) { - leakTracker.close(); + public synchronized T retain() { + if (getCount() == 0) { + this.releaseAndCheckLeak = leakDetector.track(this, + () -> LOG.warn("LEAK: A {} is not released properly", clazz.getName())); } - return released; + return super.retain(); + } + + @Override + public synchronized boolean release() { + Preconditions.assertNotNull(releaseAndCheckLeak != null, () -> "Not yet retained: " + clazz); + return releaseAndCheckLeak.test(this); } } - private static class AdvancedTracing extends Impl { - private final UncheckedAutoCloseable leakTracker; - private final List retainsTraces; - private final List releaseTraces; + private static class AdvancedTracing extends SimpleTracing { + private final StackTraceElement[] createStrace = Thread.currentThread().getStackTrace(); + private final List retainsTraces = new LinkedList<>(); + private final List releaseTraces = new LinkedList<>(); AdvancedTracing(T value, Runnable retainMethod, Consumer releaseMethod, LeakDetector leakDetector) { - super(value, retainMethod, releaseMethod); - - StackTraceElement[] createStrace = Thread.currentThread().getStackTrace(); - final Class clazz = value.getClass(); - final List localRetainsTraces = new LinkedList<>(); - final List localReleaseTraces = new LinkedList<>(); - - this.leakTracker = leakDetector.track(this, () -> - LOG.warn("LEAK: A {} is not released properly.\nCreation trace:\n{}\n" + - "Retain traces({}):\n{}\nRelease traces({}):\n{}", - clazz.getName(), formatStackTrace(createStrace, 3), - localRetainsTraces.size(), formatStackTraces(localRetainsTraces, 2), - localReleaseTraces.size(), formatStackTraces(localReleaseTraces, 2))); - - this.retainsTraces = localRetainsTraces; - this.releaseTraces = localReleaseTraces; + super(value, retainMethod, releaseMethod, leakDetector); } @Override - public T retain() { - T retain = super.retain(); - retainsTraces.add(Thread.currentThread().getStackTrace()); - return retain; + public synchronized T retain() { + if (getCount() == 0) { + this.releaseAndCheckLeak = leakDetector.track(this, () -> + LOG.warn("LEAK: A {} is not released properly.\n" + + " Creation trace: {}\n" + + " Retain traces({}): {}\n" + + " Release traces({}): {}", + clazz.getName(), formatStackTrace(createStrace, 3), + retainsTraces.size(), formatStackTraces(retainsTraces, 2), + releaseTraces.size(), formatStackTraces(releaseTraces, 2))); + } + return super.retain(); } @Override public boolean release() { - boolean released = super.release(); - if (released) { - leakTracker.close(); - } releaseTraces.add(Thread.currentThread().getStackTrace()); - return released; + return super.release(); } } From 205c7202fd6635334d9c96deedd85ce1d3fdf97b Mon Sep 17 00:00:00 2001 From: Tsz-Wo Nicholas Sze Date: Sat, 28 Sep 2024 09:19:13 -0700 Subject: [PATCH 02/31] Fix a bug and checkstyle. --- .../org/apache/ratis/util/LeakDetector.java | 16 +++---- .../util/ReferenceCountedLeakDetector.java | 45 ++++++++++--------- 2 files changed, 29 insertions(+), 32 deletions(-) diff --git a/ratis-common/src/main/java/org/apache/ratis/util/LeakDetector.java b/ratis-common/src/main/java/org/apache/ratis/util/LeakDetector.java index 22f524b25c..28c84295f8 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/LeakDetector.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/LeakDetector.java @@ -27,7 +27,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; -import java.util.function.Predicate; /** * Simple general resource leak detector using {@link ReferenceQueue} and {@link java.lang.ref.WeakReference} to @@ -96,7 +95,7 @@ private String allLeaksString() { private final LeakTrackerSet allLeaks = new LeakTrackerSet(); private final String name; - public LeakDetector(String name) { + LeakDetector(String name) { this.name = name + COUNTER.getAndIncrement(); } @@ -128,12 +127,12 @@ private void run() { LOG.warn("Exiting leak detector {}.", name); } - public Predicate> track(Object leakable, Runnable reportLeak) { + Runnable track(Object leakable, Runnable reportLeak) { // A rate filter can be put here to only track a subset of all objects, e.g. 5%, 10%, // if we have proofs that leak tracking impacts performance, or a single LeakDetector // thread can't keep up with the pace of object allocation. // For now, it looks effective enough and let keep it simple. - return allLeaks.add(leakable, queue, reportLeak)::releaseAndCheckRemove; + return allLeaks.add(leakable, queue, reportLeak)::remove; } public void assertNoLeaks() { @@ -154,13 +153,8 @@ private static final class LeakTracker extends WeakReference { /** * Called by the tracked resource when releasing the object. */ - boolean releaseAndCheckRemove(ReferenceCountedObject referenceCountedObject) { - if (referenceCountedObject.release()) { - removeMethod.accept(this); - return true; - } else { - return false; - } + void remove() { + removeMethod.accept(this); } void reportLeak() { diff --git a/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedLeakDetector.java b/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedLeakDetector.java index 5f3c7a447d..694e598d6b 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedLeakDetector.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedLeakDetector.java @@ -25,7 +25,6 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; -import java.util.function.Predicate; import java.util.function.Supplier; /** @@ -143,30 +142,38 @@ public boolean release() { } private static class SimpleTracing extends Impl { - final LeakDetector leakDetector; - final Class clazz; + private final LeakDetector leakDetector; + private final Class valueClass; - Predicate> releaseAndCheckLeak = null; + private Runnable removeMethod = null; SimpleTracing(T value, Runnable retainMethod, Consumer releaseMethod, LeakDetector leakDetector) { super(value, retainMethod, releaseMethod); - this.clazz = value.getClass(); + this.valueClass = value.getClass(); this.leakDetector = leakDetector; } + void logLeakMessage(Class clazz) { + LOG.warn("LEAK: A {} is not released properly", clazz.getName()); + } + @Override public synchronized T retain() { if (getCount() == 0) { - this.releaseAndCheckLeak = leakDetector.track(this, - () -> LOG.warn("LEAK: A {} is not released properly", clazz.getName())); + this.removeMethod = leakDetector.track(this, () -> logLeakMessage(valueClass)); } return super.retain(); } @Override public synchronized boolean release() { - Preconditions.assertNotNull(releaseAndCheckLeak != null, () -> "Not yet retained: " + clazz); - return releaseAndCheckLeak.test(this); + Preconditions.assertNotNull(removeMethod != null, () -> "Not yet retained: " + valueClass); + if (super.release()) { + removeMethod.run(); + return true; + } else { + return false; + } } } @@ -180,18 +187,14 @@ private static class AdvancedTracing extends SimpleTracing { } @Override - public synchronized T retain() { - if (getCount() == 0) { - this.releaseAndCheckLeak = leakDetector.track(this, () -> - LOG.warn("LEAK: A {} is not released properly.\n" - + " Creation trace: {}\n" - + " Retain traces({}): {}\n" - + " Release traces({}): {}", - clazz.getName(), formatStackTrace(createStrace, 3), - retainsTraces.size(), formatStackTraces(retainsTraces, 2), - releaseTraces.size(), formatStackTraces(releaseTraces, 2))); - } - return super.retain(); + void logLeakMessage(Class clazz) { + LOG.warn("LEAK: A {} is not released properly.\n" + + " Creation trace: {}\n" + + " Retain traces({}): {}\n" + + " Release traces({}): {}", + clazz.getName(), formatStackTrace(createStrace, 3), + retainsTraces.size(), formatStackTraces(retainsTraces, 2), + releaseTraces.size(), formatStackTraces(releaseTraces, 2)); } @Override From 58f296f28250384939b609cea02e3ec7a6239f98 Mon Sep 17 00:00:00 2001 From: Tsz-Wo Nicholas Sze Date: Sat, 28 Sep 2024 09:50:04 -0700 Subject: [PATCH 03/31] Enable advanced detection for debugging. --- .../java/org/apache/ratis/util/ReferenceCountedObject.java | 6 +++--- .../java/org/apache/ratis/grpc/MiniRaftClusterWithGrpc.java | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedObject.java b/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedObject.java index b2c53182d3..f85d141921 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedObject.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedObject.java @@ -184,11 +184,11 @@ static ReferenceCountedObject wrap(V value, Runnable retainMethod, Runnab } static void enableLeakDetection() { - ReferenceCountedLeakDetector.enable(false); + enableLeakDetection(false); } - static void enableAdvancedLeakDetection() { - ReferenceCountedLeakDetector.enable(true); + static void enableLeakDetection(boolean advanced) { + ReferenceCountedLeakDetector.enable(advanced); } } diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGrpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGrpc.java index 47f9e1d4b6..54d6ff7d15 100644 --- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGrpc.java +++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGrpc.java @@ -51,7 +51,7 @@ public MiniRaftClusterWithGrpc newCluster(String[] ids, String[] listenerIds, Ra }; static { - ReferenceCountedObject.enableLeakDetection(); + ReferenceCountedObject.enableLeakDetection(true); } public interface FactoryGet extends Factory.Get { From 67577ff762ace60fcfcf976bbc03a6cc1755fac0 Mon Sep 17 00:00:00 2001 From: Tsz-Wo Nicholas Sze Date: Sat, 28 Sep 2024 11:11:42 -0700 Subject: [PATCH 04/31] Fixed some bugs. --- .../util/ReferenceCountedLeakDetector.java | 40 +++++++++++-------- .../ratis/util/ReferenceCountedObject.java | 1 - .../apache/ratis/grpc/server/GrpcService.java | 2 +- .../ratis/grpc/MiniRaftClusterWithGrpc.java | 2 +- .../server/leader/LogAppenderDefault.java | 12 ++++-- .../raftlog/segmented/SegmentedRaftLog.java | 10 +++-- 6 files changed, 40 insertions(+), 27 deletions(-) diff --git a/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedLeakDetector.java b/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedLeakDetector.java index 694e598d6b..482d9aa277 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedLeakDetector.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedLeakDetector.java @@ -187,14 +187,21 @@ private static class AdvancedTracing extends SimpleTracing { } @Override - void logLeakMessage(Class clazz) { + synchronized void logLeakMessage(Class clazz) { + final String prefix = " "; LOG.warn("LEAK: A {} is not released properly.\n" + " Creation trace: {}\n" + " Retain traces({}): {}\n" + " Release traces({}): {}", - clazz.getName(), formatStackTrace(createStrace, 3), - retainsTraces.size(), formatStackTraces(retainsTraces, 2), - releaseTraces.size(), formatStackTraces(releaseTraces, 2)); + clazz.getName(), formatStackTrace(createStrace), + retainsTraces.size(), formatStackTraces("retain", retainsTraces), + releaseTraces.size(), formatStackTraces("release", releaseTraces)); + } + + @Override + public synchronized T retain() { + retainsTraces.add(Thread.currentThread().getStackTrace()); + return super.retain(); } @Override @@ -204,24 +211,23 @@ public boolean release() { } } - private static String formatStackTrace(StackTraceElement[] stackTrace, int startIdx) { - final StringBuilder sb = new StringBuilder(); + private static String formatStackTrace(StackTraceElement[] stackTrace) { + return formatStackTrace(stackTrace, 0, new StringBuilder()).toString(); + } + + private static StringBuilder formatStackTrace(StackTraceElement[] stackTrace, int startIdx, StringBuilder sb) { for (int line = startIdx; line < stackTrace.length; line++) { - sb.append(stackTrace[line]).append("\n"); + sb.append(" ").append(stackTrace[line]).append("\n"); } - return sb.toString(); + return sb; } - private static String formatStackTraces(List stackTraces, int startIdx) { + private static String formatStackTraces(String name, List stackTraces) { final StringBuilder sb = new StringBuilder(); - stackTraces.forEach(stackTrace -> { - if (sb.length() > 0) { - sb.append("\n"); - } - for (int line = startIdx; line < stackTrace.length; line++) { - sb.append(stackTrace[line]).append("\n"); - } - }); + for (int i = 0; i < stackTraces.size(); i++) { + sb.append("\n").append(name).append(" ").append(i).append(":\n"); + formatStackTrace(stackTraces.get(i), 0, sb); + } return sb.toString(); } } diff --git a/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedObject.java b/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedObject.java index f85d141921..de1f4dba71 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedObject.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedObject.java @@ -190,5 +190,4 @@ static void enableLeakDetection() { static void enableLeakDetection(boolean advanced) { ReferenceCountedLeakDetector.enable(advanced); } - } diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java index 510dfcaa24..489b7aeb03 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java @@ -348,7 +348,7 @@ public void closeImpl() throws IOException { for (Map.Entry server : servers.entrySet()) { final String name = getId() + ": shutdown server " + server.getKey(); LOG.info("{} now", name); - final Server s = server.getValue().shutdownNow(); + final Server s = server.getValue().shutdown(); super.closeImpl(); try { s.awaitTermination(); diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGrpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGrpc.java index 54d6ff7d15..47f9e1d4b6 100644 --- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGrpc.java +++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGrpc.java @@ -51,7 +51,7 @@ public MiniRaftClusterWithGrpc newCluster(String[] ids, String[] listenerIds, Ra }; static { - ReferenceCountedObject.enableLeakDetection(true); + ReferenceCountedObject.enableLeakDetection(); } public interface FactoryGet extends Factory.Get { diff --git a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java index f75a80f825..a1978f84c3 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java @@ -81,10 +81,14 @@ private AppendEntriesReplyProto sendAppendEntriesWithRetries(AtomicLong requestF } final AppendEntriesRequestProto proto = request.get(); - final AppendEntriesReplyProto reply = sendAppendEntries(proto); - final long first = proto.getEntriesCount() > 0 ? proto.getEntries(0).getIndex() : RaftLog.INVALID_LOG_INDEX; - requestFirstIndex.set(first); - request.release(); + final AppendEntriesReplyProto reply; + try { + reply = sendAppendEntries(proto); + final long first = proto.getEntriesCount() > 0 ? proto.getEntries(0).getIndex() : RaftLog.INVALID_LOG_INDEX; + requestFirstIndex.set(first); + } finally { + request.release(); + } return reply; } catch (InterruptedIOException | RaftLogIOException e) { throw e; diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java index 44b9c7599b..8fff97b385 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java @@ -306,9 +306,13 @@ public ReferenceCountedObject retainLog(long index) throws RaftLo } final ReferenceCountedObject entry = segment.getEntryFromCache(record.getTermIndex()); if (entry != null) { - getRaftLogMetrics().onRaftLogCacheHit(); - entry.retain(); - return entry; + try { + entry.retain(); + getRaftLogMetrics().onRaftLogCacheHit(); + return entry; + } catch (IllegalStateException ignore) { + // the entry could be removed from the cache and released. + } } // the entry is not in the segment's cache. Load the cache without holding the lock. From 90321af76d8e62c61ae7e05a511fe364f3a51353 Mon Sep 17 00:00:00 2001 From: Tsz-Wo Nicholas Sze Date: Sat, 28 Sep 2024 13:22:03 -0700 Subject: [PATCH 05/31] Some minor changes. --- .../util/ReferenceCountedLeakDetector.java | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedLeakDetector.java b/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedLeakDetector.java index 482d9aa277..4fc7c54e23 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedLeakDetector.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedLeakDetector.java @@ -167,13 +167,12 @@ public synchronized T retain() { @Override public synchronized boolean release() { - Preconditions.assertNotNull(removeMethod != null, () -> "Not yet retained: " + valueClass); - if (super.release()) { + boolean released = super.release(); + if (released) { + Preconditions.assertNotNull(removeMethod, () -> "Not yet retained (removeMethod == null): " + valueClass); removeMethod.run(); - return true; - } else { - return false; } + return released; } } @@ -188,7 +187,6 @@ private static class AdvancedTracing extends SimpleTracing { @Override synchronized void logLeakMessage(Class clazz) { - final String prefix = " "; LOG.warn("LEAK: A {} is not released properly.\n" + " Creation trace: {}\n" + " Retain traces({}): {}\n" @@ -212,11 +210,11 @@ public boolean release() { } private static String formatStackTrace(StackTraceElement[] stackTrace) { - return formatStackTrace(stackTrace, 0, new StringBuilder()).toString(); + return formatStackTrace(stackTrace, new StringBuilder()).toString(); } - private static StringBuilder formatStackTrace(StackTraceElement[] stackTrace, int startIdx, StringBuilder sb) { - for (int line = startIdx; line < stackTrace.length; line++) { + private static StringBuilder formatStackTrace(StackTraceElement[] stackTrace, StringBuilder sb) { + for (int line = 2; line < stackTrace.length; line++) { sb.append(" ").append(stackTrace[line]).append("\n"); } return sb; @@ -226,7 +224,7 @@ private static String formatStackTraces(String name, List s final StringBuilder sb = new StringBuilder(); for (int i = 0; i < stackTraces.size(); i++) { sb.append("\n").append(name).append(" ").append(i).append(":\n"); - formatStackTrace(stackTraces.get(i), 0, sb); + formatStackTrace(stackTraces.get(i), sb); } return sb.toString(); } From 1c5c6eb4a00847b8bb2d5374c9d065ebdc9558ca Mon Sep 17 00:00:00 2001 From: Tsz-Wo Nicholas Sze Date: Sun, 29 Sep 2024 08:41:21 -0700 Subject: [PATCH 06/31] try-catch MiniRaftCluster shutdown. --- .../org/apache/ratis/grpc/server/GrpcService.java | 2 +- .../apache/ratis/server/impl/MiniRaftCluster.java | 12 +++++++++++- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java index 489b7aeb03..510dfcaa24 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java @@ -348,7 +348,7 @@ public void closeImpl() throws IOException { for (Map.Entry server : servers.entrySet()) { final String name = getId() + ": shutdown server " + server.getKey(); LOG.info("{} now", name); - final Server s = server.getValue().shutdown(); + final Server s = server.getValue().shutdownNow(); super.closeImpl(); try { s.awaitTermination(); diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java index 9a54700ecd..25b46fc148 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java @@ -136,6 +136,7 @@ default void runWithNewCluster(int numServers, int numListeners, boolean startCl final StackTraceElement caller = JavaUtils.getCallerStackTraceElement(); LOG.info("Running " + caller.getMethodName()); final CLUSTER cluster = newCluster(numServers, numListeners); + Exception failed = null; try { if (startCluster) { cluster.start(); @@ -144,9 +145,18 @@ default void runWithNewCluster(int numServers, int numListeners, boolean startCl } catch(Exception t) { LOG.info(cluster.printServers()); LOG.error("Failed " + caller, t); + failed = t; throw t; } finally { - cluster.shutdown(); + try { + cluster.shutdown(); + } catch (Exception e) { + if (failed == null) { + throw e; + } else { + failed.addSuppressed(e); + } + } } } From 9159532a6090641806f01fb3ded569ef3503a6d2 Mon Sep 17 00:00:00 2001 From: Tsz-Wo Nicholas Sze Date: Sun, 29 Sep 2024 09:15:54 -0700 Subject: [PATCH 07/31] Report earlier leaks at shutdown. --- .../org/apache/ratis/util/LeakDetector.java | 43 +++++++++++++------ .../util/ReferenceCountedLeakDetector.java | 30 ++++++++----- 2 files changed, 48 insertions(+), 25 deletions(-) diff --git a/ratis-common/src/main/java/org/apache/ratis/util/LeakDetector.java b/ratis-common/src/main/java/org/apache/ratis/util/LeakDetector.java index 28c84295f8..7480d054fc 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/LeakDetector.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/LeakDetector.java @@ -22,11 +22,14 @@ import java.lang.ref.ReferenceQueue; import java.lang.ref.WeakReference; +import java.util.ArrayList; import java.util.Collections; +import java.util.List; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; +import java.util.function.Supplier; /** * Simple general resource leak detector using {@link ReferenceQueue} and {@link java.lang.ref.WeakReference} to @@ -69,7 +72,7 @@ synchronized void removeExisting(LeakTracker tracker) { Preconditions.assertTrue(removed, () -> "Failed to remove existing " + tracker); } - synchronized LeakTracker add(Object referent, ReferenceQueue queue, Runnable leakReporter) { + synchronized LeakTracker add(Object referent, ReferenceQueue queue, Supplier leakReporter) { final LeakTracker tracker = new LeakTracker(referent, queue, this::removeExisting, leakReporter); final boolean added = set.add(tracker); Preconditions.assertTrue(added, () -> "Failed to add " + tracker + " for " + referent); @@ -77,15 +80,18 @@ synchronized LeakTracker add(Object referent, ReferenceQueue queue, Runn } synchronized void assertNoLeaks() { - Preconditions.assertTrue(set.isEmpty(), this::allLeaksString); - } - - private String allLeaksString() { if (set.isEmpty()) { - return "allLeaks = "; + return; + } + + int n = 0; + for(LeakTracker tracker : set) { + if (tracker.reportLeak() != null) { + n++; + } } - set.forEach(LeakTracker::reportLeak); - return "allLeaks.size = " + set.size(); + final int leaks = n; + Preconditions.assertTrue(n == 0, () -> "#leaks = " + leaks + ", set.size = " + set.size()); } } @@ -93,6 +99,7 @@ private String allLeaksString() { private final ReferenceQueue queue = new ReferenceQueue<>(); private final LeakTrackerSet allLeaks = new LeakTrackerSet(); + private final List leakMessages = Collections.synchronizedList(new ArrayList<>()); private final String name; LeakDetector(String name) { @@ -115,7 +122,10 @@ private void run() { // Original resource already been GCed, if tracker is not closed yet, // report a leak. if (allLeaks.remove(tracker)) { - tracker.reportLeak(); + final String leak = tracker.reportLeak(); + if (leak != null) { + leakMessages.add(leak); + } } } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -127,7 +137,7 @@ private void run() { LOG.warn("Exiting leak detector {}.", name); } - Runnable track(Object leakable, Runnable reportLeak) { + Runnable track(Object leakable, Supplier reportLeak) { // A rate filter can be put here to only track a subset of all objects, e.g. 5%, 10%, // if we have proofs that leak tracking impacts performance, or a single LeakDetector // thread can't keep up with the pace of object allocation. @@ -136,15 +146,19 @@ Runnable track(Object leakable, Runnable reportLeak) { } public void assertNoLeaks() { + synchronized (leakMessages) { + Preconditions.assertTrue(leakMessages.isEmpty(), + () -> "#leaks = " + leakMessages.size() + "\n" + leakMessages); + } allLeaks.assertNoLeaks(); } private static final class LeakTracker extends WeakReference { private final Consumer removeMethod; - private final Runnable leakReporter; + private final Supplier leakReporter; LeakTracker(Object referent, ReferenceQueue referenceQueue, - Consumer removeMethod, Runnable leakReporter) { + Consumer removeMethod, Supplier leakReporter) { super(referent, referenceQueue); this.removeMethod = removeMethod; this.leakReporter = leakReporter; @@ -157,8 +171,9 @@ void remove() { removeMethod.accept(this); } - void reportLeak() { - leakReporter.run(); + /** @return the leak message if there is a leak; return null if there is no leak. */ + String reportLeak() { + return leakReporter.get(); } } } diff --git a/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedLeakDetector.java b/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedLeakDetector.java index 4fc7c54e23..0571e9d80a 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedLeakDetector.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedLeakDetector.java @@ -153,14 +153,25 @@ private static class SimpleTracing extends Impl { this.leakDetector = leakDetector; } - void logLeakMessage(Class clazz) { - LOG.warn("LEAK: A {} is not released properly", clazz.getName()); + String getLeakMessage(int count) { + return "LEAK: A " + valueClass.getName() + " (count=" + count + ") is not released properly"; + } + + /** @return the leak message if there is a leak; return null if there is no leak. */ + String logLeakMessage() { + final int count = getCount(); + if (count == 0) { + return null; + } + final String message = getLeakMessage(count); + LOG.warn(message); + return message; } @Override public synchronized T retain() { if (getCount() == 0) { - this.removeMethod = leakDetector.track(this, () -> logLeakMessage(valueClass)); + this.removeMethod = leakDetector.track(this, this::logLeakMessage); } return super.retain(); } @@ -186,14 +197,11 @@ private static class AdvancedTracing extends SimpleTracing { } @Override - synchronized void logLeakMessage(Class clazz) { - LOG.warn("LEAK: A {} is not released properly.\n" - + " Creation trace: {}\n" - + " Retain traces({}): {}\n" - + " Release traces({}): {}", - clazz.getName(), formatStackTrace(createStrace), - retainsTraces.size(), formatStackTraces("retain", retainsTraces), - releaseTraces.size(), formatStackTraces("release", releaseTraces)); + synchronized String getLeakMessage(int count) { + return super.getLeakMessage(count) + + "\n Creation trace: " + formatStackTrace(createStrace) + + "\n Retain traces: {}" + formatStackTraces("retain", retainsTraces) + + "\n Release traces: {}" + formatStackTraces("release", releaseTraces); } @Override From 0f4b61e03ec347e68f82f48a6f2fef7e0c710749 Mon Sep 17 00:00:00 2001 From: Tsz-Wo Nicholas Sze Date: Sun, 29 Sep 2024 10:01:41 -0700 Subject: [PATCH 08/31] Enable advance leak detection. --- .../java/org/apache/ratis/util/LeakDetector.java | 12 +++++++++--- .../apache/ratis/grpc/MiniRaftClusterWithGrpc.java | 2 +- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/ratis-common/src/main/java/org/apache/ratis/util/LeakDetector.java b/ratis-common/src/main/java/org/apache/ratis/util/LeakDetector.java index 7480d054fc..e000072b2f 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/LeakDetector.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/LeakDetector.java @@ -85,13 +85,19 @@ synchronized void assertNoLeaks() { } int n = 0; - for(LeakTracker tracker : set) { + for (LeakTracker tracker : set) { if (tracker.reportLeak() != null) { n++; } } - final int leaks = n; - Preconditions.assertTrue(n == 0, () -> "#leaks = " + leaks + ", set.size = " + set.size()); + assertNoLeaks(n); + } + + synchronized void assertNoLeaks(int leaks) { + Preconditions.assertTrue(leaks == 0, () -> { + final int size = set.size(); + return "#leaks = " + leaks + (leaks == size? "==" : "!=") + " set.size = " + size; + }); } } diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGrpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGrpc.java index 47f9e1d4b6..54d6ff7d15 100644 --- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGrpc.java +++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGrpc.java @@ -51,7 +51,7 @@ public MiniRaftClusterWithGrpc newCluster(String[] ids, String[] listenerIds, Ra }; static { - ReferenceCountedObject.enableLeakDetection(); + ReferenceCountedObject.enableLeakDetection(true); } public interface FactoryGet extends Factory.Get { From fe29cde5ada016746276a044e83eb2c58d89a25c Mon Sep 17 00:00:00 2001 From: Tsz-Wo Nicholas Sze Date: Sun, 29 Sep 2024 10:35:46 -0700 Subject: [PATCH 09/31] Move the enable method to ReferenceCountedLeakDetector. --- .../util/ReferenceCountedLeakDetector.java | 32 ++++++++++++------- .../ratis/util/ReferenceCountedObject.java | 8 ----- .../ratis/grpc/MiniRaftClusterWithGrpc.java | 3 +- 3 files changed, 23 insertions(+), 20 deletions(-) diff --git a/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedLeakDetector.java b/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedLeakDetector.java index 0571e9d80a..69f3f02a5c 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedLeakDetector.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedLeakDetector.java @@ -49,7 +49,7 @@ public static LeakDetector getLeakDetector() { private ReferenceCountedLeakDetector() { } - static synchronized void enable(boolean advanced) { + public static synchronized void enable(boolean advanced) { FACTORY.set(advanced ? Mode.ADVANCED : Mode.SIMPLE); } @@ -153,8 +153,8 @@ private static class SimpleTracing extends Impl { this.leakDetector = leakDetector; } - String getLeakMessage(int count) { - return "LEAK: A " + valueClass.getName() + " (count=" + count + ") is not released properly"; + String getInfo(int count) { + return "(" + valueClass + ", count=" + count + ")"; } /** @return the leak message if there is a leak; return null if there is no leak. */ @@ -163,7 +163,7 @@ String logLeakMessage() { if (count == 0) { return null; } - final String message = getLeakMessage(count); + final String message = "LEAK: " + getInfo(count); LOG.warn(message); return message; } @@ -173,12 +173,22 @@ public synchronized T retain() { if (getCount() == 0) { this.removeMethod = leakDetector.track(this, this::logLeakMessage); } - return super.retain(); + try { + return super.retain(); + } catch (Exception e) { + throw new IllegalStateException("Failed to retain: " + getInfo(getCount()), e); + } } @Override public synchronized boolean release() { - boolean released = super.release(); + final boolean released; + try { + released = super.release(); + } catch (Exception e) { + throw new IllegalStateException("Failed to release: " + getInfo(getCount()), e); + } + if (released) { Preconditions.assertNotNull(removeMethod, () -> "Not yet retained (removeMethod == null): " + valueClass); removeMethod.run(); @@ -197,11 +207,11 @@ private static class AdvancedTracing extends SimpleTracing { } @Override - synchronized String getLeakMessage(int count) { - return super.getLeakMessage(count) + synchronized String getInfo(int count) { + return super.getInfo(count) + "\n Creation trace: " + formatStackTrace(createStrace) - + "\n Retain traces: {}" + formatStackTraces("retain", retainsTraces) - + "\n Release traces: {}" + formatStackTraces("release", releaseTraces); + + "\n Retain traces: " + formatStackTraces("retain", retainsTraces) + + "\n Release traces: " + formatStackTraces("release", releaseTraces); } @Override @@ -229,7 +239,7 @@ private static StringBuilder formatStackTrace(StackTraceElement[] stackTrace, St } private static String formatStackTraces(String name, List stackTraces) { - final StringBuilder sb = new StringBuilder(); + final StringBuilder sb = new StringBuilder(stackTraces.size()).append(" trace(s)"); for (int i = 0; i < stackTraces.size(); i++) { sb.append("\n").append(name).append(" ").append(i).append(":\n"); formatStackTrace(stackTraces.get(i), sb); diff --git a/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedObject.java b/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedObject.java index de1f4dba71..1fc72c3445 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedObject.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedObject.java @@ -182,12 +182,4 @@ static ReferenceCountedObject wrap(V value, Runnable retainMethod, Consum static ReferenceCountedObject wrap(V value, Runnable retainMethod, Runnable releaseMethod) { return wrap(value, retainMethod, ignored -> releaseMethod.run()); } - - static void enableLeakDetection() { - enableLeakDetection(false); - } - - static void enableLeakDetection(boolean advanced) { - ReferenceCountedLeakDetector.enable(advanced); - } } diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGrpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGrpc.java index 54d6ff7d15..22e0e7b355 100644 --- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGrpc.java +++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGrpc.java @@ -32,6 +32,7 @@ import org.apache.ratis.server.impl.MiniRaftCluster; import org.apache.ratis.server.impl.RaftServerTestUtil; import org.apache.ratis.util.NetUtils; +import org.apache.ratis.util.ReferenceCountedLeakDetector; import org.apache.ratis.util.ReferenceCountedObject; import org.junit.Assert; @@ -51,7 +52,7 @@ public MiniRaftClusterWithGrpc newCluster(String[] ids, String[] listenerIds, Ra }; static { - ReferenceCountedObject.enableLeakDetection(true); + ReferenceCountedLeakDetector.enable(false); } public interface FactoryGet extends Factory.Get { From 7a6fef93f295ca031b3652585925e8dc957b6839 Mon Sep 17 00:00:00 2001 From: Tsz-Wo Nicholas Sze Date: Thu, 3 Oct 2024 09:21:27 -0700 Subject: [PATCH 10/31] Use HashMap. --- .../src/main/java/org/apache/ratis/util/LeakDetector.java | 4 ++-- .../ratis/server/raftlog/segmented/SegmentedRaftLog.java | 5 +++-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/ratis-common/src/main/java/org/apache/ratis/util/LeakDetector.java b/ratis-common/src/main/java/org/apache/ratis/util/LeakDetector.java index e000072b2f..286dfe2a3e 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/LeakDetector.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/LeakDetector.java @@ -24,9 +24,9 @@ import java.lang.ref.WeakReference; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; import java.util.function.Supplier; @@ -61,7 +61,7 @@ public class LeakDetector { private static final Logger LOG = LoggerFactory.getLogger(LeakDetector.class); private static class LeakTrackerSet { - private final Set set = Collections.newSetFromMap(new ConcurrentHashMap<>()); + private final Set set = Collections.newSetFromMap(new HashMap<>()); synchronized boolean remove(LeakTracker tracker) { return set.remove(tracker); diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java index 8fff97b385..37b5f4fbd7 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java @@ -310,8 +310,9 @@ public ReferenceCountedObject retainLog(long index) throws RaftLo entry.retain(); getRaftLogMetrics().onRaftLogCacheHit(); return entry; - } catch (IllegalStateException ignore) { - // the entry could be removed from the cache and released. + } catch (IllegalStateException ignored) { + // The entry could be removed from the cache and released. + // The exception can be safely ignored since it is the same as cache miss. } } From 77db48ea96ad2f22e2aab24c04b991a63da0848b Mon Sep 17 00:00:00 2001 From: Tsz-Wo Nicholas Sze Date: Thu, 3 Oct 2024 10:45:16 -0700 Subject: [PATCH 11/31] Fix a bug in LogAppenderDefault. --- .../org/apache/ratis/util/LeakDetector.java | 2 +- .../org/apache/ratis/TestMultiRaftGroup.java | 7 +--- .../server/leader/LogAppenderDefault.java | 40 +++++++------------ 3 files changed, 17 insertions(+), 32 deletions(-) diff --git a/ratis-common/src/main/java/org/apache/ratis/util/LeakDetector.java b/ratis-common/src/main/java/org/apache/ratis/util/LeakDetector.java index 286dfe2a3e..123ed89782 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/LeakDetector.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/LeakDetector.java @@ -96,7 +96,7 @@ synchronized void assertNoLeaks() { synchronized void assertNoLeaks(int leaks) { Preconditions.assertTrue(leaks == 0, () -> { final int size = set.size(); - return "#leaks = " + leaks + (leaks == size? "==" : "!=") + " set.size = " + size; + return "#leaks = " + leaks + " > 0, #leaks " + (leaks == size? "==" : "!=") + " set.size = " + size; }); } } diff --git a/ratis-examples/src/test/java/org/apache/ratis/TestMultiRaftGroup.java b/ratis-examples/src/test/java/org/apache/ratis/TestMultiRaftGroup.java index 190f758589..36c7892a22 100644 --- a/ratis-examples/src/test/java/org/apache/ratis/TestMultiRaftGroup.java +++ b/ratis-examples/src/test/java/org/apache/ratis/TestMultiRaftGroup.java @@ -29,18 +29,13 @@ import org.apache.ratis.util.function.CheckedBiConsumer; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; -import org.slf4j.event.Level; import java.io.IOException; import java.util.Collection; import java.util.concurrent.atomic.AtomicInteger; public class TestMultiRaftGroup extends BaseTest { - static { - Slf4jUtils.setLogLevel(RaftServer.Division.LOG, Level.DEBUG); - } - - public static Collection data() throws IOException { + public static Collection data() { return ParameterizedBaseTest.getMiniRaftClusters(ArithmeticStateMachine.class, 0); } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java index a1978f84c3..8ec6c19db1 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java @@ -59,46 +59,36 @@ public Comparator getCallIdComparator() { /** Send an appendEntries RPC; retry indefinitely. */ private AppendEntriesReplyProto sendAppendEntriesWithRetries(AtomicLong requestFirstIndex) throws InterruptedException, InterruptedIOException, RaftLogIOException { - int retry = 0; - - ReferenceCountedObject request = nextAppendEntriesRequest( - CallId.getAndIncrement(), false); - while (isRunning()) { // keep retrying for IOException + for(int retry = 0; isRunning(); retry++) { + final ReferenceCountedObject request = nextAppendEntriesRequest( + CallId.getAndIncrement(), false); + if (request == null) { + LOG.trace("{} no entries to send now, wait ...", this); + return null; + } try { - if (request == null || request.get().getEntriesCount() == 0) { - if (request != null) { - request.release(); - } - request = nextAppendEntriesRequest(CallId.getAndIncrement(), false); - } - - if (request == null) { - LOG.trace("{} no entries to send now, wait ...", this); - return null; - } else if (!isRunning()) { + if (!isRunning()) { LOG.info("{} is stopped. Skip appendEntries.", this); return null; } final AppendEntriesRequestProto proto = request.get(); - final AppendEntriesReplyProto reply; - try { - reply = sendAppendEntries(proto); - final long first = proto.getEntriesCount() > 0 ? proto.getEntries(0).getIndex() : RaftLog.INVALID_LOG_INDEX; - requestFirstIndex.set(first); - } finally { - request.release(); - } + final AppendEntriesReplyProto reply = sendAppendEntries(proto); + final long first = proto.getEntriesCount() > 0 ? proto.getEntries(0).getIndex() : RaftLog.INVALID_LOG_INDEX; + requestFirstIndex.set(first); return reply; } catch (InterruptedIOException | RaftLogIOException e) { throw e; } catch (IOException ioe) { // TODO should have more detailed retry policy here. - if (retry++ % 10 == 0) { // to reduce the number of messages + if (retry % 10 == 0) { // to reduce the number of messages LOG.warn("{}: Failed to appendEntries (retry={})", this, retry, ioe); } handleException(ioe); + } finally { + request.release(); } + if (isRunning()) { getServer().properties().rpcSleepTime().sleep(); } From c8e3ac8d8718a5d48f533c5f1c196225337cb6a4 Mon Sep 17 00:00:00 2001 From: Tsz-Wo Nicholas Sze Date: Fri, 4 Oct 2024 11:25:19 -0700 Subject: [PATCH 12/31] Rewrite AdvancedTracing. --- .../util/ReferenceCountedLeakDetector.java | 176 +++++++++++++----- .../org/apache/ratis/TestMultiRaftGroup.java | 4 +- .../server/GrpcClientProtocolService.java | 25 +-- .../ratis/grpc/server/GrpcLogAppender.java | 22 +-- .../ratis/grpc/MiniRaftClusterWithGrpc.java | 2 +- .../raftlog/segmented/SegmentedRaftLog.java | 1 + .../java/org/apache/ratis/RaftBasicTests.java | 8 +- 7 files changed, 163 insertions(+), 75 deletions(-) diff --git a/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedLeakDetector.java b/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedLeakDetector.java index 69f3f02a5c..83dd6908e0 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedLeakDetector.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedLeakDetector.java @@ -20,7 +20,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.LinkedList; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -58,14 +58,18 @@ interface Factory { } private enum Mode implements Factory { - /** Leak detector is not enable in production to avoid performance impacts. */ + /** + * Leak detector is not enable in production to avoid performance impacts. + */ NONE { @Override public ReferenceCountedObject create(V value, Runnable retainMethod, Consumer releaseMethod) { return new Impl<>(value, retainMethod, releaseMethod); } }, - /** Leak detector is enabled to detect leaks. This is intended to use in every tests. */ + /** + * Leak detector is enabled to detect leaks. This is intended to use in every tests. + */ SIMPLE { @Override public ReferenceCountedObject create(V value, Runnable retainMethod, Consumer releaseMethod) { @@ -129,7 +133,7 @@ public boolean release() { // n <= 0: exception // n > 1: n-- // n == 1: n = -1 - final int previous = count.getAndUpdate(n -> n <= 1? -1: n - 1); + final int previous = count.getAndUpdate(n -> n <= 1 ? -1 : n - 1); if (previous < 0) { throw new IllegalStateException("Failed to release: object has already been completely released."); } else if (previous == 0) { @@ -146,6 +150,7 @@ private static class SimpleTracing extends Impl { private final Class valueClass; private Runnable removeMethod = null; + private String valueString = null; SimpleTracing(T value, Runnable retainMethod, Consumer releaseMethod, LeakDetector leakDetector) { super(value, retainMethod, releaseMethod); @@ -153,17 +158,19 @@ private static class SimpleTracing extends Impl { this.leakDetector = leakDetector; } - String getInfo(int count) { - return "(" + valueClass + ", count=" + count + ")"; + String getTraceString(int count) { + return "(" + valueClass + ", count=" + count + ", value=" + valueString + ")"; } - /** @return the leak message if there is a leak; return null if there is no leak. */ + /** + * @return the leak message if there is a leak; return null if there is no leak. + */ String logLeakMessage() { final int count = getCount(); if (count == 0) { return null; } - final String message = "LEAK: " + getInfo(count); + final String message = "LEAK: " + getTraceString(count); LOG.warn(message); return message; } @@ -174,9 +181,13 @@ public synchronized T retain() { this.removeMethod = leakDetector.track(this, this::logLeakMessage); } try { - return super.retain(); + final T value = super.retain(); + if (getCount() == 0) { + this.valueString = value.toString(); + } + return value; } catch (Exception e) { - throw new IllegalStateException("Failed to retain: " + getInfo(getCount()), e); + throw new IllegalStateException("Failed to retain: " + getTraceString(getCount()), e); } } @@ -186,7 +197,7 @@ public synchronized boolean release() { try { released = super.release(); } catch (Exception e) { - throw new IllegalStateException("Failed to release: " + getInfo(getCount()), e); + throw new IllegalStateException("Failed to release: " + getTraceString(getCount()), e); } if (released) { @@ -198,52 +209,133 @@ public synchronized boolean release() { } private static class AdvancedTracing extends SimpleTracing { - private final StackTraceElement[] createStrace = Thread.currentThread().getStackTrace(); - private final List retainsTraces = new LinkedList<>(); - private final List releaseTraces = new LinkedList<>(); + enum Op {CREATION, RETAIN, RELEASE} + + static class Counts { + private final int refCount; + private final int retainCount; + private final int releaseCount; + + Counts() { + this.refCount = 0; + this.retainCount = 0; + this.releaseCount = 0; + } + + Counts(Op op, Counts previous) { + if (op == Op.RETAIN) { + this.refCount = previous.refCount + 1; + this.retainCount = previous.retainCount + 1; + this.releaseCount = previous.releaseCount; + } else if (op == Op.RELEASE) { + this.refCount = previous.refCount - 1; + this.retainCount = previous.retainCount; + this.releaseCount = previous.releaseCount + 1; + } else { + throw new IllegalStateException("Unexpected op: " + op); + } + } + + @Override + public String toString() { + return "refCount=" + refCount + + ", retainCount=" + retainCount + + ", releaseCount=" + releaseCount; + } + } + + static class TraceInfo { + private final int id; + private final Op op; + private final int previousRefCount; + private final Counts counts; + + private final StackTraceElement[] stackTraces = Thread.currentThread().getStackTrace(); + private final int newTraceElementIndex; + + TraceInfo(int id, Op op, TraceInfo previous, int previousRefCount) { + this.id = id; + this.op = op; + this.previousRefCount = previousRefCount; + this.counts = previous == null? new Counts(): new Counts(op, previous.counts); + this.newTraceElementIndex = previous == null? stackTraces.length - 1 + : findFirstUnequalFromTail(this.stackTraces, previous.stackTraces); + } + + static int findFirstUnequalFromTail(T[] current, T[] previous) { + int c = current.length - 1; + for(int p = previous.length - 1; p >= 0; p--, c--) { + if (!previous[p].equals(current[c])) { + return c; + } + } + return -1; + } + + private StringBuilder appendTo(StringBuilder b) { + b.append(id).append(": ").append(op) + .append(", previousRefCount=").append(previousRefCount) + .append(", ").append(counts).append("\n"); + final int n = newTraceElementIndex + 1; + int line = 3; + for (; line <= n && line < stackTraces.length; line++) { + b.append(" ").append(stackTraces[line]).append("\n"); + } + if (line < stackTraces.length) { + b.append(" ...\n"); + } + return b; + } + + @Override + public String toString() { + return appendTo(new StringBuilder()).toString(); + } + } + + private final List traceInfos = new ArrayList<>(); + private int traceCount = 0; + private TraceInfo previous; AdvancedTracing(T value, Runnable retainMethod, Consumer releaseMethod, LeakDetector leakDetector) { super(value, retainMethod, releaseMethod, leakDetector); + addTraceInfo(Op.CREATION); } - @Override - synchronized String getInfo(int count) { - return super.getInfo(count) - + "\n Creation trace: " + formatStackTrace(createStrace) - + "\n Retain traces: " + formatStackTraces("retain", retainsTraces) - + "\n Release traces: " + formatStackTraces("release", releaseTraces); + private synchronized TraceInfo addTraceInfo(Op op) { + final TraceInfo current = new TraceInfo(traceCount++, op, previous, getCount()); + traceInfos.add(current); + previous = current; + return current; } @Override public synchronized T retain() { - retainsTraces.add(Thread.currentThread().getStackTrace()); - return super.retain(); + final T retained = super.retain(); + final TraceInfo info = addTraceInfo(Op.RETAIN); + Preconditions.assertSame(getCount(), info.counts.refCount, "refCount"); + return retained; } @Override - public boolean release() { - releaseTraces.add(Thread.currentThread().getStackTrace()); - return super.release(); + public synchronized boolean release() { + final boolean released = super.release(); + final TraceInfo info = addTraceInfo(Op.RELEASE); + final int count = getCount(); + final int expected = count == -1? 0 : count; + Preconditions.assertSame(expected, info.counts.refCount, "refCount"); + return released; } - } - private static String formatStackTrace(StackTraceElement[] stackTrace) { - return formatStackTrace(stackTrace, new StringBuilder()).toString(); - } - - private static StringBuilder formatStackTrace(StackTraceElement[] stackTrace, StringBuilder sb) { - for (int line = 2; line < stackTrace.length; line++) { - sb.append(" ").append(stackTrace[line]).append("\n"); + @Override + synchronized String getTraceString(int count) { + return super.getTraceString(count) + formatTraceInfos(traceInfos); } - return sb; - } - private static String formatStackTraces(String name, List stackTraces) { - final StringBuilder sb = new StringBuilder(stackTraces.size()).append(" trace(s)"); - for (int i = 0; i < stackTraces.size(); i++) { - sb.append("\n").append(name).append(" ").append(i).append(":\n"); - formatStackTrace(stackTraces.get(i), sb); + private static String formatTraceInfos(List infos) { + final StringBuilder b = new StringBuilder().append(infos.size()).append(" TraceInfo(s):\n"); + infos.forEach(info -> info.appendTo(b.append("\n"))); + return b.toString(); } - return sb.toString(); } -} +} \ No newline at end of file diff --git a/ratis-examples/src/test/java/org/apache/ratis/TestMultiRaftGroup.java b/ratis-examples/src/test/java/org/apache/ratis/TestMultiRaftGroup.java index 36c7892a22..ea3962c088 100644 --- a/ratis-examples/src/test/java/org/apache/ratis/TestMultiRaftGroup.java +++ b/ratis-examples/src/test/java/org/apache/ratis/TestMultiRaftGroup.java @@ -22,11 +22,10 @@ import org.apache.ratis.examples.arithmetic.ArithmeticStateMachine; import org.apache.ratis.examples.arithmetic.TestArithmetic; import org.apache.ratis.protocol.RaftGroup; -import org.apache.ratis.server.RaftServer; import org.apache.ratis.server.impl.GroupManagementBaseTest; import org.apache.ratis.server.impl.MiniRaftCluster; -import org.apache.ratis.util.Slf4jUtils; import org.apache.ratis.util.function.CheckedBiConsumer; +import org.junit.jupiter.api.Timeout; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; @@ -34,6 +33,7 @@ import java.util.Collection; import java.util.concurrent.atomic.AtomicInteger; +@Timeout(value = 300) public class TestMultiRaftGroup extends BaseTest { public static Collection data() { return ParameterizedBaseTest.getMiniRaftClusters(ArithmeticStateMachine.class, 0); diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcClientProtocolService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcClientProtocolService.java index 80a9a439b9..4f67335911 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcClientProtocolService.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcClientProtocolService.java @@ -323,18 +323,19 @@ private class UnorderedRequestStreamObserver extends RequestStreamObserver { @Override void processClientRequest(ReferenceCountedObject requestRef) { - final RaftClientRequest request = requestRef.retain(); - final long callId = request.getCallId(); - final SlidingWindowEntry slidingWindowEntry = request.getSlidingWindowEntry(); - final CompletableFuture f = processClientRequest(requestRef, reply -> { - if (!reply.isSuccess()) { - LOG.info("Failed request cid={}, {}, reply={}", callId, slidingWindowEntry, reply); - } - final RaftClientReplyProto proto = ClientProtoUtils.toRaftClientReplyProto(reply); - responseNext(proto); - }); - - requestRef.release(); + final long callId = requestRef.retain().getCallId(); + final CompletableFuture f; + try { + f = processClientRequest(requestRef, reply -> { + if (!reply.isSuccess()) { + LOG.info("Failed request cid={}, reply={}", callId, reply); + } + final RaftClientReplyProto proto = ClientProtoUtils.toRaftClientReplyProto(reply); + responseNext(proto); + }); + } finally { + requestRef.release(); + } put(callId, f); f.thenAccept(dummy -> remove(callId)); diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java index 7edb3ae0b7..18d4c62c61 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java @@ -379,7 +379,7 @@ public Comparator getCallIdComparator() { } private void appendLog(boolean heartbeat) throws IOException { - ReferenceCountedObject pending = null; + final ReferenceCountedObject pending; final AppendEntriesRequest request; try (AutoCloseableLock writeLock = lock.writeLock(caller, LOG::trace)) { // Prepare and send the append request. @@ -388,18 +388,18 @@ private void appendLog(boolean heartbeat) throws IOException { if (pending == null) { return; } - request = new AppendEntriesRequest(pending.get(), getFollowerId(), grpcServerMetrics); - pendingRequests.put(request); - increaseNextIndex(pending.get()); - if (appendLogRequestObserver == null) { - appendLogRequestObserver = new StreamObservers( - getClient(), new AppendLogResponseHandler(), useSeparateHBChannel, getWaitTimeMin()); - } - } catch(Exception e) { - if (pending != null) { + try { + request = new AppendEntriesRequest(pending.get(), getFollowerId(), grpcServerMetrics); + pendingRequests.put(request); + increaseNextIndex(pending.get()); + if (appendLogRequestObserver == null) { + appendLogRequestObserver = new StreamObservers( + getClient(), new AppendLogResponseHandler(), useSeparateHBChannel, getWaitTimeMin()); + } + } catch (Exception e) { pending.release(); + throw e; } - throw e; } try { diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGrpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGrpc.java index 22e0e7b355..31cf5a0c84 100644 --- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGrpc.java +++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGrpc.java @@ -52,7 +52,7 @@ public MiniRaftClusterWithGrpc newCluster(String[] ids, String[] listenerIds, Ra }; static { - ReferenceCountedLeakDetector.enable(false); + ReferenceCountedLeakDetector.enable(true); } public interface FactoryGet extends Factory.Get { diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java index 37b5f4fbd7..557a2b37ea 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java @@ -351,6 +351,7 @@ public ReferenceCountedObject retainEntryWithData(long index) thr } catch (Exception e) { final String err = getName() + ": Failed readStateMachineData for " + toLogEntryString(entry); LOG.error(err, e); + entryRef.release(); throw new RaftLogIOException(err, JavaUtils.unwrapCompletionException(e)); } } diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java index b16905d9a0..187e7e4eef 100644 --- a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java @@ -33,13 +33,11 @@ import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.server.impl.BlockRequestHandlingInjection; import org.apache.ratis.server.impl.MiniRaftCluster; -import org.apache.ratis.server.impl.RaftServerTestUtil; import org.apache.ratis.server.impl.RetryCacheTestUtil; import org.apache.ratis.server.metrics.ServerMetricsTestUtils; import org.apache.ratis.server.raftlog.RaftLog; import org.apache.ratis.util.ExitUtils; import org.apache.ratis.util.JavaUtils; -import org.apache.ratis.util.Slf4jUtils; import org.apache.ratis.util.TimeDuration; import org.apache.ratis.util.Timestamp; import org.junit.jupiter.api.Assertions; @@ -47,7 +45,6 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; import org.slf4j.Logger; -import org.slf4j.event.Level; import java.io.IOException; import java.util.List; @@ -75,9 +72,6 @@ public abstract class RaftBasicTests extends BaseTest implements MiniRaftCluster.Factory.Get { { - Slf4jUtils.setLogLevel(RaftServer.Division.LOG, Level.DEBUG); - RaftServerTestUtil.setStateMachineUpdaterLogLevel(Level.DEBUG); - RaftServerConfigKeys.RetryCache.setExpiryTime(getProperties(), TimeDuration.valueOf(5, TimeUnit.SECONDS)); } @@ -341,7 +335,7 @@ public String toString() { @Test @Timeout(value = 300) public void testWithLoad() throws Exception { - runWithNewCluster(NUM_SERVERS, cluster -> testWithLoad(10, 300, false, cluster, LOG)); + runWithNewCluster(NUM_SERVERS, cluster -> testWithLoad(3, 60, false, cluster, LOG)); } static void testWithLoad(final int numClients, final int numMessages, From aea498fad653fd0585aca7e08fed06d55d0558c1 Mon Sep 17 00:00:00 2001 From: Tsz-Wo Nicholas Sze Date: Fri, 4 Oct 2024 13:47:45 -0700 Subject: [PATCH 13/31] Fix a bug in LogSegment cache. --- .../util/ReferenceCountedLeakDetector.java | 32 +++++++++++++++---- .../server/GrpcClientProtocolService.java | 1 - .../ratis/grpc/MiniRaftClusterWithGrpc.java | 2 +- .../server/raftlog/segmented/LogSegment.java | 12 +++++-- 4 files changed, 35 insertions(+), 12 deletions(-) diff --git a/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedLeakDetector.java b/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedLeakDetector.java index 83dd6908e0..d4eccc9717 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedLeakDetector.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedLeakDetector.java @@ -175,6 +175,15 @@ String logLeakMessage() { return message; } + @Override + public T get() { + try { + return super.get(); + } catch (Exception e) { + throw new IllegalStateException("Failed to get: " + getTraceString(getCount()), e); + } + } + @Override public synchronized T retain() { if (getCount() == 0) { @@ -250,7 +259,8 @@ static class TraceInfo { private final int previousRefCount; private final Counts counts; - private final StackTraceElement[] stackTraces = Thread.currentThread().getStackTrace(); + private final String threadName; + private final StackTraceElement[] stackTraces; private final int newTraceElementIndex; TraceInfo(int id, Op op, TraceInfo previous, int previousRefCount) { @@ -258,6 +268,10 @@ static class TraceInfo { this.op = op; this.previousRefCount = previousRefCount; this.counts = previous == null? new Counts(): new Counts(op, previous.counts); + + final Thread thread = Thread.currentThread(); + this.threadName = thread.getName(); + this.stackTraces = thread.getStackTrace(); this.newTraceElementIndex = previous == null? stackTraces.length - 1 : findFirstUnequalFromTail(this.stackTraces, previous.stackTraces); } @@ -275,7 +289,8 @@ static int findFirstUnequalFromTail(T[] current, T[] previous) { private StringBuilder appendTo(StringBuilder b) { b.append(id).append(": ").append(op) .append(", previousRefCount=").append(previousRefCount) - .append(", ").append(counts).append("\n"); + .append(", ").append(counts) + .append(", ").append(threadName).append("\n"); final int n = newTraceElementIndex + 1; int line = 3; for (; line <= n && line < stackTraces.length; line++) { @@ -299,28 +314,31 @@ public String toString() { AdvancedTracing(T value, Runnable retainMethod, Consumer releaseMethod, LeakDetector leakDetector) { super(value, retainMethod, releaseMethod, leakDetector); - addTraceInfo(Op.CREATION); + addTraceInfo(Op.CREATION, -1); } - private synchronized TraceInfo addTraceInfo(Op op) { - final TraceInfo current = new TraceInfo(traceCount++, op, previous, getCount()); + private synchronized TraceInfo addTraceInfo(Op op, int previousRefCount) { + final TraceInfo current = new TraceInfo(traceCount++, op, previous, previousRefCount); traceInfos.add(current); previous = current; return current; } + @Override public synchronized T retain() { + final int previousRefCount = getCount(); final T retained = super.retain(); - final TraceInfo info = addTraceInfo(Op.RETAIN); + final TraceInfo info = addTraceInfo(Op.RETAIN, previousRefCount); Preconditions.assertSame(getCount(), info.counts.refCount, "refCount"); return retained; } @Override public synchronized boolean release() { + final int previousRefCount = getCount(); final boolean released = super.release(); - final TraceInfo info = addTraceInfo(Op.RELEASE); + final TraceInfo info = addTraceInfo(Op.RELEASE, previousRefCount); final int count = getCount(); final int expected = count == -1? 0 : count; Preconditions.assertSame(expected, info.counts.refCount, "refCount"); diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcClientProtocolService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcClientProtocolService.java index 4f67335911..b7548780cd 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcClientProtocolService.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcClientProtocolService.java @@ -29,7 +29,6 @@ import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver; import org.apache.ratis.proto.RaftProtos.RaftClientReplyProto; import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto; -import org.apache.ratis.proto.RaftProtos.SlidingWindowEntry; import org.apache.ratis.proto.grpc.RaftClientProtocolServiceGrpc.RaftClientProtocolServiceImplBase; import org.apache.ratis.util.CollectionUtils; import org.apache.ratis.util.JavaUtils; diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGrpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGrpc.java index 31cf5a0c84..22e0e7b355 100644 --- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGrpc.java +++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGrpc.java @@ -52,7 +52,7 @@ public MiniRaftClusterWithGrpc newCluster(String[] ids, String[] listenerIds, Ra }; static { - ReferenceCountedLeakDetector.enable(true); + ReferenceCountedLeakDetector.enable(false); } public interface FactoryGet extends Factory.Get { diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java index c51464f9e9..c29a92579a 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java @@ -283,10 +283,10 @@ public ReferenceCountedObject load(LogRecord key) throws IOExcept final TermIndex ti = TermIndex.valueOf(entry); putEntryCache(ti, entryRef, Op.LOAD_SEGMENT_FILE); if (ti.equals(key.getTermIndex())) { - entryRef.retain(); toReturn.set(entryRef); + } else { + entryRef.release(); } - entryRef.release(); }); loadingTimes.incrementAndGet(); return Objects.requireNonNull(toReturn.get()); @@ -433,7 +433,13 @@ ReferenceCountedObject getEntryFromCache(TermIndex ti) { synchronized ReferenceCountedObject loadCache(LogRecord record) throws RaftLogIOException { ReferenceCountedObject entry = entryCache.get(record.getTermIndex()); if (entry != null) { - return entry; + try { + entry.retain(); + return entry; + } catch (IllegalStateException ignored) { + // The entry could be removed from the cache and released. + // The exception can be safely ignored since it is the same as cache miss. + } } try { return cacheLoader.load(record); From 0104ece6f6d75106e58f27446185d71d18cc18de Mon Sep 17 00:00:00 2001 From: Tsz-Wo Nicholas Sze Date: Fri, 4 Oct 2024 13:51:41 -0700 Subject: [PATCH 14/31] Add synchronized to get() --- .../org/apache/ratis/util/ReferenceCountedLeakDetector.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedLeakDetector.java b/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedLeakDetector.java index d4eccc9717..450324c746 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedLeakDetector.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedLeakDetector.java @@ -158,7 +158,7 @@ private static class SimpleTracing extends Impl { this.leakDetector = leakDetector; } - String getTraceString(int count) { + private String getTraceString(int count) { return "(" + valueClass + ", count=" + count + ", value=" + valueString + ")"; } @@ -176,7 +176,7 @@ String logLeakMessage() { } @Override - public T get() { + public synchronized T get() { try { return super.get(); } catch (Exception e) { From 43980fbfc929ca72abcde458c57a5ce72cebc74c Mon Sep 17 00:00:00 2001 From: Tsz-Wo Nicholas Sze Date: Fri, 4 Oct 2024 13:55:21 -0700 Subject: [PATCH 15/31] Fix javac error. --- .../ratis/util/ReferenceCountedLeakDetector.java | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedLeakDetector.java b/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedLeakDetector.java index 450324c746..16bf46dd4f 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedLeakDetector.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedLeakDetector.java @@ -58,18 +58,14 @@ interface Factory { } private enum Mode implements Factory { - /** - * Leak detector is not enable in production to avoid performance impacts. - */ + /** Leak detector is not enable in production to avoid performance impacts. */ NONE { @Override public ReferenceCountedObject create(V value, Runnable retainMethod, Consumer releaseMethod) { return new Impl<>(value, retainMethod, releaseMethod); } }, - /** - * Leak detector is enabled to detect leaks. This is intended to use in every tests. - */ + /** Leak detector is enabled to detect leaks. This is intended to use in every tests. */ SIMPLE { @Override public ReferenceCountedObject create(V value, Runnable retainMethod, Consumer releaseMethod) { @@ -158,7 +154,7 @@ private static class SimpleTracing extends Impl { this.leakDetector = leakDetector; } - private String getTraceString(int count) { + String getTraceString(int count) { return "(" + valueClass + ", count=" + count + ", value=" + valueString + ")"; } From 9150bdce6ba6dd84e415f237563354480783e524 Mon Sep 17 00:00:00 2001 From: Tsz-Wo Nicholas Sze Date: Fri, 4 Oct 2024 14:08:10 -0700 Subject: [PATCH 16/31] Restore RaftBasicTests. --- ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java index 187e7e4eef..dbb0dbcabc 100644 --- a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java @@ -335,7 +335,7 @@ public String toString() { @Test @Timeout(value = 300) public void testWithLoad() throws Exception { - runWithNewCluster(NUM_SERVERS, cluster -> testWithLoad(3, 60, false, cluster, LOG)); + runWithNewCluster(NUM_SERVERS, cluster -> testWithLoad(10, 300, false, cluster, LOG)); } static void testWithLoad(final int numClients, final int numMessages, From 38f5c69745a301364e77d67c5d45a7ccce6a72d4 Mon Sep 17 00:00:00 2001 From: Tsz-Wo Nicholas Sze Date: Fri, 4 Oct 2024 14:27:34 -0700 Subject: [PATCH 17/31] Move ReferenceCountedLeakDetector.enable(..) to MiniRaftCluster. --- .../java/org/apache/ratis/grpc/MiniRaftClusterWithGrpc.java | 4 ---- .../java/org/apache/ratis/server/impl/MiniRaftCluster.java | 4 ++++ 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGrpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGrpc.java index 22e0e7b355..2d7beb9146 100644 --- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGrpc.java +++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGrpc.java @@ -51,10 +51,6 @@ public MiniRaftClusterWithGrpc newCluster(String[] ids, String[] listenerIds, Ra } }; - static { - ReferenceCountedLeakDetector.enable(false); - } - public interface FactoryGet extends Factory.Get { @Override default Factory getFactory() { diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java index 25b46fc148..e27dbbb1b9 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java @@ -86,6 +86,10 @@ public abstract class MiniRaftCluster implements Closeable { public static final Logger LOG = LoggerFactory.getLogger(MiniRaftCluster.class); + static { + ReferenceCountedLeakDetector.enable(true); + } + public static final String CLASS_NAME = JavaUtils.getClassSimpleName(MiniRaftCluster.class); public static final String STATEMACHINE_CLASS_KEY = CLASS_NAME + ".statemachine.class"; private static final StateMachine.Registry STATEMACHINE_REGISTRY_DEFAULT = gid -> new BaseStateMachine(); From 23af8eddb23d5f9bfb1c0419eccab42b433cbff7 Mon Sep 17 00:00:00 2001 From: Tsz-Wo Nicholas Sze Date: Sat, 5 Oct 2024 12:57:05 -0700 Subject: [PATCH 18/31] Fix bugs in LogSegment.EntryCache. --- .../ratis/server/raftlog/RaftLogBase.java | 2 +- .../server/raftlog/segmented/LogSegment.java | 28 +++++++++++++------ .../java/org/apache/ratis/RaftTestUtil.java | 7 +++-- 3 files changed, 25 insertions(+), 12 deletions(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java index c92c04c436..af5bd0e1f1 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java @@ -370,7 +370,7 @@ protected List> appendImpl(ReferenceCountedObject load(LogRecord key) throws IOExcept } static class EntryCache { - private final Map> map = new ConcurrentHashMap<>(); + private final Map>> map = new ConcurrentHashMap<>(); private final AtomicLong size = new AtomicLong(); long size() { @@ -302,28 +302,38 @@ long size() { } ReferenceCountedObject get(TermIndex ti) { - return map.get(ti); + final AtomicReference> ref = map.get(ti); + return ref == null? null: ref.get(); } void clear() { - map.values().forEach(ReferenceCountedObject::release); - map.clear(); - size.set(0); + for (Iterator>>> + i = map.entrySet().iterator(); i.hasNext(); ) { + release(i.next().getValue()); + i.remove(); + } } void put(TermIndex key, ReferenceCountedObject valueRef, Op op) { valueRef.retain(); - Optional.ofNullable(map.put(key, valueRef)).ifPresent(this::release); + release(map.put(key, new AtomicReference<>(valueRef))); size.getAndAdd(getEntrySize(valueRef.get(), op)); } - private void release(ReferenceCountedObject entry) { + private void release(AtomicReference> ref) { + if (ref == null) { + return; + } + final ReferenceCountedObject entry = ref.getAndSet(null); + if (entry == null) { + return; + } size.getAndAdd(-getEntrySize(entry.get(), Op.REMOVE_CACHE)); entry.release(); } void remove(TermIndex key) { - Optional.ofNullable(map.remove(key)).ifPresent(this::release); + release(map.remove(key)); } } diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java index be8739ad8e..0d6c06358f 100644 --- a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java +++ b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java @@ -187,8 +187,11 @@ static boolean logEntriesContains(RaftLog log, long startIndex, long endIndex, S log.get(termIndices[idxEntries].getIndex()).getStateMachineLogEntry().getLogData().toByteArray())) { ++idxExpected; } - } catch (IOException e) { - throw new RuntimeException(e); + } catch (Exception e) { + throw new IllegalStateException("Failed startIndex=" + startIndex + + ", endIndex=" + endIndex + + ", #expectedMessages=" + expectedMessages.length + + ", log=" + log, e); } ++idxEntries; } From 351238741619742caa6a42825ec35b414da756fd Mon Sep 17 00:00:00 2001 From: Tsz-Wo Nicholas Sze Date: Sat, 5 Oct 2024 13:32:04 -0700 Subject: [PATCH 19/31] Fix a bug in SimpleStateMachine4Testing. --- .../util/ReferenceCountedLeakDetector.java | 3 ++- .../server/raftlog/segmented/LogSegment.java | 3 ++- .../impl/SimpleStateMachine4Testing.java | 26 +++++++++++-------- 3 files changed, 19 insertions(+), 13 deletions(-) diff --git a/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedLeakDetector.java b/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedLeakDetector.java index 16bf46dd4f..bcd921d7a8 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedLeakDetector.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedLeakDetector.java @@ -347,7 +347,8 @@ synchronized String getTraceString(int count) { } private static String formatTraceInfos(List infos) { - final StringBuilder b = new StringBuilder().append(infos.size()).append(" TraceInfo(s):\n"); + final int n = infos.size(); + final StringBuilder b = new StringBuilder(n << 10).append(" #TraceInfos=").append(n); infos.forEach(info -> info.appendTo(b.append("\n"))); return b.toString(); } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java index 053270d51e..cc8e911298 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java @@ -294,7 +294,8 @@ public ReferenceCountedObject load(LogRecord key) throws IOExcept } static class EntryCache { - private final Map>> map = new ConcurrentHashMap<>(); + private final Map>> map + = new ConcurrentHashMap<>(); private final AtomicLong size = new AtomicLong(); long size() { diff --git a/ratis-server/src/test/java/org/apache/ratis/statemachine/impl/SimpleStateMachine4Testing.java b/ratis-server/src/test/java/org/apache/ratis/statemachine/impl/SimpleStateMachine4Testing.java index 18e4f2eca0..5f90f15b17 100644 --- a/ratis-server/src/test/java/org/apache/ratis/statemachine/impl/SimpleStateMachine4Testing.java +++ b/ratis-server/src/test/java/org/apache/ratis/statemachine/impl/SimpleStateMachine4Testing.java @@ -84,7 +84,7 @@ public static SimpleStateMachine4Testing get(RaftServer.Division s) { } private final SortedMap> indexMap = Collections.synchronizedSortedMap(new TreeMap<>()); - private final SortedMap dataMap = Collections.synchronizedSortedMap(new TreeMap<>()); + private final SortedMap dataMap = Collections.synchronizedSortedMap(new TreeMap<>()); private final Daemon checkpointer; private final SimpleStateMachineStorage storage = new SimpleStateMachineStorage(); private final RaftProperties properties = new RaftProperties(); @@ -199,14 +199,18 @@ public RoleInfoProto getLeaderElectionTimeoutInfo() { } private void put(ReferenceCountedObject entryRef) { - LogEntryProto entry = entryRef.retain(); - final ReferenceCountedObject previous = indexMap.put(entry.getIndex(), entryRef); - Preconditions.assertNull(previous, "previous"); - final String s = entry.getStateMachineLogEntry().getLogData().toStringUtf8(); - dataMap.put(s, entry); - LOG.info("{}: put {}, {} -> {}", getId(), entry.getIndex(), - s.length() <= 10? s: s.substring(0, 10) + "...", - LogProtoUtils.toLogEntryString(entry)); + try { + final LogEntryProto entry = entryRef.retain(); + final ReferenceCountedObject previous = indexMap.put(entry.getIndex(), entryRef); + Preconditions.assertNull(previous, "previous"); + final String s = entry.getStateMachineLogEntry().getLogData().toStringUtf8(); + dataMap.put(s, entry.toByteString()); + LOG.info("{}: put {}, {} -> {}", getId(), entry.getIndex(), + s.length() <= 10 ? s : s.substring(0, 10) + "...", + LogProtoUtils.toLogEntryString(entry)); + } finally { + entryRef.release(); + } } @Override @@ -333,9 +337,9 @@ public CompletableFuture query(Message request) { Exception exception; try { LOG.info("query {}, all available: {}", string, dataMap.keySet()); - final LogEntryProto entry = dataMap.get(string); + final ByteString entry = dataMap.get(string); if (entry != null) { - return CompletableFuture.completedFuture(Message.valueOf(entry.toByteString())); + return CompletableFuture.completedFuture(Message.valueOf(entry)); } exception = new IndexOutOfBoundsException(getId() + ": LogEntry not found for query " + string); } catch (Exception e) { From b54837365bb28fec7de8d7f7395d8d7bf5e741bc Mon Sep 17 00:00:00 2001 From: Tsz-Wo Nicholas Sze Date: Sat, 5 Oct 2024 14:04:48 -0700 Subject: [PATCH 20/31] Copy LogEntryProto in SimpleStateMachine4Testing. --- .../ratis/server/raftlog/RaftLogBase.java | 2 +- .../ratis/server/impl/MiniRaftCluster.java | 2 +- .../impl/SimpleStateMachine4Testing.java | 39 ++++++++----------- 3 files changed, 19 insertions(+), 24 deletions(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java index af5bd0e1f1..c92c04c436 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java @@ -370,7 +370,7 @@ protected List> appendImpl(ReferenceCountedObject> indexMap = Collections.synchronizedSortedMap(new TreeMap<>()); - private final SortedMap dataMap = Collections.synchronizedSortedMap(new TreeMap<>()); + private final SortedMap indexMap = Collections.synchronizedSortedMap(new TreeMap<>()); + private final SortedMap dataMap = Collections.synchronizedSortedMap(new TreeMap<>()); private final Daemon checkpointer; private final SimpleStateMachineStorage storage = new SimpleStateMachineStorage(); private final RaftProperties properties = new RaftProperties(); @@ -198,19 +198,14 @@ public RoleInfoProto getLeaderElectionTimeoutInfo() { return leaderElectionTimeoutInfo; } - private void put(ReferenceCountedObject entryRef) { - try { - final LogEntryProto entry = entryRef.retain(); - final ReferenceCountedObject previous = indexMap.put(entry.getIndex(), entryRef); - Preconditions.assertNull(previous, "previous"); - final String s = entry.getStateMachineLogEntry().getLogData().toStringUtf8(); - dataMap.put(s, entry.toByteString()); - LOG.info("{}: put {}, {} -> {}", getId(), entry.getIndex(), - s.length() <= 10 ? s : s.substring(0, 10) + "...", - LogProtoUtils.toLogEntryString(entry)); - } finally { - entryRef.release(); - } + private void put(LogEntryProto entry) { + final LogEntryProto previous = indexMap.put(entry.getIndex(), entry); + Preconditions.assertNull(previous, "previous"); + final String s = entry.getStateMachineLogEntry().getLogData().toStringUtf8(); + dataMap.put(s, entry); + LOG.info("{}: put {}, {} -> {}", getId(), entry.getIndex(), + s.length() <= 10? s: s.substring(0, 10) + "...", + LogProtoUtils.toLogEntryString(entry)); } @Override @@ -254,7 +249,7 @@ public CompletableFuture applyTransaction(TransactionContext trx) { LogEntryProto entry = entryRef.get(); LOG.info("applyTransaction for log index {}", entry.getIndex()); - put(entryRef); + put(LogProtoUtils.copy(entry)); updateLastAppliedTermIndex(entry.getTerm(), entry.getIndex()); final SimpleMessage m = new SimpleMessage(entry.getIndex() + " OK"); @@ -274,8 +269,7 @@ public long takeSnapshot() { LOG.debug("Taking a snapshot with {}, file:{}", termIndex, snapshotFile); try (SegmentedRaftLogOutputStream out = new SegmentedRaftLogOutputStream(snapshotFile, false, segmentMaxSize, preallocatedSize, ByteBuffer.allocateDirect(bufferSize))) { - for (final ReferenceCountedObject entryRef : indexMap.values()) { - LogEntryProto entry = entryRef.get(); + for (final LogEntryProto entry : indexMap.values()) { if (entry.getIndex() > endIndex) { break; } else { @@ -310,7 +304,7 @@ private synchronized void loadSnapshot(SingleFileSnapshotInfo snapshot) throws I snapshot.getFile().getPath().toFile(), 0, endIndex, false)) { LogEntryProto entry; while ((entry = in.nextEntry()) != null) { - put(ReferenceCountedObject.wrap(entry)); + put(entry); updateLastAppliedTermIndex(entry.getTerm(), entry.getIndex()); } } @@ -337,7 +331,7 @@ public CompletableFuture query(Message request) { Exception exception; try { LOG.info("query {}, all available: {}", string, dataMap.keySet()); - final ByteString entry = dataMap.get(string); + final LogEntryProto entry = dataMap.get(string); if (entry != null) { return CompletableFuture.completedFuture(Message.valueOf(entry)); } @@ -385,11 +379,12 @@ public void close() { running = false; checkpointer.interrupt(); }); - indexMap.values().forEach(ReferenceCountedObject::release); + indexMap.clear(); + dataMap.clear(); } public LogEntryProto[] getContent() { - return indexMap.values().stream().map(ReferenceCountedObject::get).toArray(LogEntryProto[]::new); + return indexMap.values().toArray(new LogEntryProto[0]); } public void blockStartTransaction() { From c4ac263b8b43d01ac3e38d63bdfcf66769d4c0be Mon Sep 17 00:00:00 2001 From: Tsz-Wo Nicholas Sze Date: Sat, 5 Oct 2024 14:15:02 -0700 Subject: [PATCH 21/31] Use Throwable in MiniRaftCluster. --- .../java/org/apache/ratis/server/impl/MiniRaftCluster.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java index 833131b69b..4fb516e5db 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java @@ -140,13 +140,13 @@ default void runWithNewCluster(int numServers, int numListeners, boolean startCl final StackTraceElement caller = JavaUtils.getCallerStackTraceElement(); LOG.info("Running " + caller.getMethodName()); final CLUSTER cluster = newCluster(numServers, numListeners); - Exception failed = null; + Throwable failed = null; try { if (startCluster) { cluster.start(); } testCase.accept(cluster); - } catch(Exception t) { + } catch(Throwable t) { LOG.info(cluster.printServers()); LOG.error("Failed " + caller, t); failed = t; From b831226443e5d5bbf238406501abd2f50108b46a Mon Sep 17 00:00:00 2001 From: Tsz-Wo Nicholas Sze Date: Sun, 6 Oct 2024 10:36:31 -0700 Subject: [PATCH 22/31] New entries can to added after EntryCache is closed. --- .../util/ReferenceCountedLeakDetector.java | 15 ++- .../server/raftlog/segmented/LogSegment.java | 92 ++++++++++++++----- .../raftlog/segmented/SegmentedRaftLog.java | 2 + 3 files changed, 78 insertions(+), 31 deletions(-) diff --git a/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedLeakDetector.java b/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedLeakDetector.java index bcd921d7a8..1c794a516b 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedLeakDetector.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedLeakDetector.java @@ -182,18 +182,17 @@ public synchronized T get() { @Override public synchronized T retain() { - if (getCount() == 0) { - this.removeMethod = leakDetector.track(this, this::logLeakMessage); - } + final T value; try { - final T value = super.retain(); - if (getCount() == 0) { - this.valueString = value.toString(); - } - return value; + value = super.retain(); } catch (Exception e) { throw new IllegalStateException("Failed to retain: " + getTraceString(getCount()), e); } + if (getCount() == 1) { // this is the first retain + this.removeMethod = leakDetector.track(this, this::logLeakMessage); + this.valueString = value.toString(); + } + return value; } @Override diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java index cc8e911298..a88ade5872 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java @@ -29,6 +29,7 @@ import org.apache.ratis.thirdparty.com.google.common.cache.CacheLoader; import org.apache.ratis.thirdparty.com.google.protobuf.CodedOutputStream; import org.apache.ratis.util.FileUtils; +import org.apache.ratis.util.JavaUtils; import org.apache.ratis.util.Preconditions; import org.apache.ratis.util.ReferenceCountedObject; import org.apache.ratis.util.SizeInBytes; @@ -38,10 +39,10 @@ import java.io.File; import java.io.IOException; import java.util.Comparator; +import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.Objects; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.AtomicInteger; @@ -293,47 +294,92 @@ public ReferenceCountedObject load(LogRecord key) throws IOExcept } } - static class EntryCache { - private final Map>> map - = new ConcurrentHashMap<>(); + private static class Item { + private final AtomicReference> ref; + private final long serializedSize; + + Item(ReferenceCountedObject obj, long serializedSize) { + this.ref = new AtomicReference<>(obj); + this.serializedSize = serializedSize; + } + + ReferenceCountedObject get() { + return ref.get(); + } + + long release() { + final ReferenceCountedObject entry = ref.getAndSet(null); + if (entry == null) { + return 0; + } + entry.release(); + return serializedSize; + } + } + + class EntryCache { + private Map map = new HashMap<>(); private final AtomicLong size = new AtomicLong(); + @Override + public String toString() { + return JavaUtils.getClassSimpleName(getClass()) + "-" + LogSegment.this; + } + long size() { return size.get(); } - ReferenceCountedObject get(TermIndex ti) { - final AtomicReference> ref = map.get(ti); + synchronized ReferenceCountedObject get(TermIndex ti) { + if (map == null) { + return null; + } + final Item ref = map.get(ti); return ref == null? null: ref.get(); } - void clear() { - for (Iterator>>> - i = map.entrySet().iterator(); i.hasNext(); ) { + /** After close(), the cache CANNOT be used again. */ + synchronized void close() { + if (map == null) { + return; + } + evict(); + map = null; + LOG.info("Successfully closed {}", this); + } + + /** After evict(), the cache can be used again. */ + synchronized void evict() { + if (map == null) { + return; + } + for (Iterator> i = map.entrySet().iterator(); i.hasNext(); i.remove()) { release(i.next().getValue()); - i.remove(); } } - void put(TermIndex key, ReferenceCountedObject valueRef, Op op) { + synchronized void put(TermIndex key, ReferenceCountedObject valueRef, Op op) { + if (map == null) { + return; + } valueRef.retain(); - release(map.put(key, new AtomicReference<>(valueRef))); - size.getAndAdd(getEntrySize(valueRef.get(), op)); + final long serializedSize = getEntrySize(valueRef.get(), op); + release(map.put(key, new Item(valueRef, serializedSize))); + size.getAndAdd(serializedSize); } - private void release(AtomicReference> ref) { + private void release(Item ref) { if (ref == null) { return; } - final ReferenceCountedObject entry = ref.getAndSet(null); - if (entry == null) { - return; - } - size.getAndAdd(-getEntrySize(entry.get(), Op.REMOVE_CACHE)); - entry.release(); + final long serializedSize = ref.release(); + size.getAndAdd(-serializedSize); } - void remove(TermIndex key) { + synchronized void remove(TermIndex key) { + if (map == null) { + return; + } release(map.remove(key)); } } @@ -522,7 +568,7 @@ private int compareTo(Long l) { synchronized void clear() { records.clear(); - evictCache(); + entryCache.close(); endIndex = startIndex - 1; } @@ -531,7 +577,7 @@ int getLoadingTimes() { } void evictCache() { - entryCache.clear(); + entryCache.evict(); } void putEntryCache(TermIndex key, ReferenceCountedObject valueRef, Op op) { diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java index 557a2b37ea..485eb53d10 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java @@ -564,6 +564,7 @@ public CompletableFuture onSnapshotInstalled(long lastSnapshotIndex) { @Override public void close() throws IOException { try(AutoCloseableLock writeLock = writeLock()) { + LOG.info("Start closing {}", this); super.close(); cacheEviction.close(); cache.close(); @@ -571,6 +572,7 @@ public void close() throws IOException { fileLogWorker.close(); storage.close(); getRaftLogMetrics().unregister(); + LOG.info("Successfully closed {}", this); } SegmentedRaftLogCache getRaftLogCache() { From 55a3896927bdc236e8fef1c94da0af5c6385a31c Mon Sep 17 00:00:00 2001 From: Tsz-Wo Nicholas Sze Date: Sun, 6 Oct 2024 18:15:08 -0700 Subject: [PATCH 23/31] Bump test related plugin versions. --- pom.xml | 10 +++++----- .../util/ReferenceCountedLeakDetector.java | 20 ++++++++++++------- .../java/org/apache/ratis/RaftBasicTests.java | 2 +- 3 files changed, 19 insertions(+), 13 deletions(-) diff --git a/pom.xml b/pom.xml index 1284e2f873..cffadfc443 100644 --- a/pom.xml +++ b/pom.xml @@ -176,7 +176,7 @@ 3.3.0 4.0.6 1.6.1 - 3.0.0-M4 + 3.5.1 3.5.3 3.4.0 @@ -223,8 +223,8 @@ 4 2.0.7 - 5.10.1 - 0.8.11 + 5.11.2 + 0.8.12 @@ -427,7 +427,7 @@ org.junit junit-bom - ${junit.jupiter.version} + ${junit-bom.version} pom import @@ -643,7 +643,7 @@ all 600 - -Xmx2048m -XX:+HeapDumpOnOutOfMemoryError @{argLine} + -Xmx8g -XX:+HeapDumpOnOutOfMemoryError @{argLine} ${project.build.directory}/log ${project.build.directory}/tmp diff --git a/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedLeakDetector.java b/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedLeakDetector.java index 1c794a516b..e38066b471 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedLeakDetector.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedLeakDetector.java @@ -213,7 +213,7 @@ public synchronized boolean release() { } private static class AdvancedTracing extends SimpleTracing { - enum Op {CREATION, RETAIN, RELEASE} + enum Op {CREATION, RETAIN, RELEASE, LEAK} static class Counts { private final int refCount; @@ -304,7 +304,6 @@ public String toString() { } private final List traceInfos = new ArrayList<>(); - private int traceCount = 0; private TraceInfo previous; AdvancedTracing(T value, Runnable retainMethod, Consumer releaseMethod, LeakDetector leakDetector) { @@ -313,7 +312,7 @@ public String toString() { } private synchronized TraceInfo addTraceInfo(Op op, int previousRefCount) { - final TraceInfo current = new TraceInfo(traceCount++, op, previous, previousRefCount); + final TraceInfo current = new TraceInfo(traceInfos.size(), op, previous, previousRefCount); traceInfos.add(current); previous = current; return current; @@ -342,13 +341,20 @@ public synchronized boolean release() { @Override synchronized String getTraceString(int count) { - return super.getTraceString(count) + formatTraceInfos(traceInfos); + return super.getTraceString(count) + formatTraceInfos(); } - private static String formatTraceInfos(List infos) { - final int n = infos.size(); + private String formatTraceInfos() { + final int n = traceInfos.size(); final StringBuilder b = new StringBuilder(n << 10).append(" #TraceInfos=").append(n); - infos.forEach(info -> info.appendTo(b.append("\n"))); + TraceInfo previous = null; + for (TraceInfo info : traceInfos) { + info.appendTo(b.append("\n")); + previous = info; + } + final TraceInfo last = new TraceInfo(n, Op.LEAK, previous, getCount()); + last.appendTo(b.append("\n")); + return b.toString(); } } diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java index dbb0dbcabc..34a3c5581b 100644 --- a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java @@ -335,7 +335,7 @@ public String toString() { @Test @Timeout(value = 300) public void testWithLoad() throws Exception { - runWithNewCluster(NUM_SERVERS, cluster -> testWithLoad(10, 300, false, cluster, LOG)); + runWithNewCluster(NUM_SERVERS, cluster -> testWithLoad(5, 100, false, cluster, LOG)); } static void testWithLoad(final int numClients, final int numMessages, From 1d49431bf1e487999f122f2bc33119833d93384b Mon Sep 17 00:00:00 2001 From: Tsz-Wo Nicholas Sze Date: Sun, 6 Oct 2024 20:43:07 -0700 Subject: [PATCH 24/31] Reduce messages to 100 --- ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java index 925b8bbadb..7492b6ec02 100644 --- a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java @@ -272,7 +272,7 @@ public void testBasicAppendEntriesAsyncKillLeader() throws Exception { @Test public void testWithLoadAsync() throws Exception { runWithNewCluster(NUM_SERVERS, - cluster -> RaftBasicTests.testWithLoad(5, 500, true, cluster, LOG)); + cluster -> RaftBasicTests.testWithLoad(5, 100, true, cluster, LOG)); } @Test From b57a7488e74ff38d2fde3b63e6bd13c7e62fba64 Mon Sep 17 00:00:00 2001 From: Tsz-Wo Nicholas Sze Date: Sun, 6 Oct 2024 20:58:13 -0700 Subject: [PATCH 25/31] Fix checkstyle. --- .../apache/ratis/util/ReferenceCountedLeakDetector.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedLeakDetector.java b/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedLeakDetector.java index e38066b471..d10745a0d4 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedLeakDetector.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedLeakDetector.java @@ -347,13 +347,13 @@ synchronized String getTraceString(int count) { private String formatTraceInfos() { final int n = traceInfos.size(); final StringBuilder b = new StringBuilder(n << 10).append(" #TraceInfos=").append(n); - TraceInfo previous = null; + TraceInfo last = null; for (TraceInfo info : traceInfos) { info.appendTo(b.append("\n")); - previous = info; + last = info; } - final TraceInfo last = new TraceInfo(n, Op.LEAK, previous, getCount()); - last.appendTo(b.append("\n")); + final TraceInfo current = new TraceInfo(n, Op.LEAK, last, getCount()); + current.appendTo(b.append("\n")); return b.toString(); } From 483b6ae6017e8620c097428494f2554186082dce Mon Sep 17 00:00:00 2001 From: Tsz-Wo Nicholas Sze Date: Sun, 6 Oct 2024 21:21:06 -0700 Subject: [PATCH 26/31] Resest test Xmx to 2g --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index cffadfc443..21483043f2 100644 --- a/pom.xml +++ b/pom.xml @@ -643,7 +643,7 @@ all 600 - -Xmx8g -XX:+HeapDumpOnOutOfMemoryError @{argLine} + -Xmx2g -XX:+HeapDumpOnOutOfMemoryError @{argLine} ${project.build.directory}/log ${project.build.directory}/tmp From 6547e14d1500cefc8a292765bf520195ae973784 Mon Sep 17 00:00:00 2001 From: Tsz-Wo Nicholas Sze Date: Sun, 6 Oct 2024 21:50:10 -0700 Subject: [PATCH 27/31] Retry assertNoLeaks multiple times. --- .../org/apache/ratis/util/LeakDetector.java | 22 ++++++++++++++---- .../ratis/server/impl/MiniRaftCluster.java | 23 ++++++++++++++++--- 2 files changed, 37 insertions(+), 8 deletions(-) diff --git a/ratis-common/src/main/java/org/apache/ratis/util/LeakDetector.java b/ratis-common/src/main/java/org/apache/ratis/util/LeakDetector.java index 123ed89782..ccaa981eb9 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/LeakDetector.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/LeakDetector.java @@ -79,9 +79,9 @@ synchronized LeakTracker add(Object referent, ReferenceQueue queue, Supp return tracker; } - synchronized void assertNoLeaks() { + synchronized int getNumLeaks(boolean throwException) { if (set.isEmpty()) { - return; + return 0; } int n = 0; @@ -90,7 +90,10 @@ synchronized void assertNoLeaks() { n++; } } - assertNoLeaks(n); + if (throwException) { + assertNoLeaks(n); + } + return n; } synchronized void assertNoLeaks(int leaks) { @@ -151,12 +154,21 @@ Runnable track(Object leakable, Supplier reportLeak) { return allLeaks.add(leakable, queue, reportLeak)::remove; } - public void assertNoLeaks() { + public void assertNoLeaks(int maxRetries) throws InterruptedException { synchronized (leakMessages) { Preconditions.assertTrue(leakMessages.isEmpty(), () -> "#leaks = " + leakMessages.size() + "\n" + leakMessages); } - allLeaks.assertNoLeaks(); + + for(int i = 0; i < maxRetries; i++) { + final int numLeaks = allLeaks.getNumLeaks(false); + if (numLeaks == 0) { + return; + } + LOG.warn("{}/{}) numLeaks == {} > 0, will wait and retry ...", i, maxRetries, numLeaks); + TimeDuration.ONE_SECOND.sleep(); + } + allLeaks.getNumLeaks(true); } private static final class LeakTracker extends WeakReference { diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java index 4fb516e5db..1fc5355fa1 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java @@ -854,6 +854,7 @@ public void shutdown() { LOG.info("*** "); LOG.info("************************************************************** "); LOG.info(printServers()); + final int maxRetries = 30; // TODO: classes like RaftLog may throw uncaught exception during shutdown (e.g. write after close) ExitUtils.setTerminateOnUncaughtException(false); @@ -864,7 +865,19 @@ public void shutdown() { try { executor.shutdown(); // just wait for a few seconds - executor.awaitTermination(5, TimeUnit.SECONDS); + boolean terminated = false; + + for(int i = 0; i < maxRetries && !terminated; ) { + terminated = executor.awaitTermination(1, TimeUnit.SECONDS); + if (!terminated) { + i++; + if (i < maxRetries) { + LOG.warn("Not yet able to shutdown executor {}/{}, will wait again ...", i, maxRetries); + } else { + LOG.error("Failed to shutdown executor, some servers may be still running:\n{}", printServers()); + } + } + } } catch (InterruptedException e) { LOG.warn("shutdown interrupted", e); Thread.currentThread().interrupt(); @@ -878,9 +891,13 @@ public void shutdown() { try { RaftTestUtil.gc(); } catch (InterruptedException e) { - LOG.info("gc interrupted."); + LOG.warn("gc interrupted.", e); + } + try { + ReferenceCountedLeakDetector.getLeakDetector().assertNoLeaks(maxRetries); + } catch (InterruptedException e) { + LOG.warn("LeakDetector interrupted.", e); } - ReferenceCountedLeakDetector.getLeakDetector().assertNoLeaks(); } /** From dc690e480cc78e8a353c8f57aee814937aeb512b Mon Sep 17 00:00:00 2001 From: Tsz-Wo Nicholas Sze Date: Sun, 6 Oct 2024 22:13:50 -0700 Subject: [PATCH 28/31] Copy log entries in MemoryRaftLog. --- .../util/ReferenceCountedLeakDetector.java | 4 +++- .../server/raftlog/memory/MemoryRaftLog.java | 23 +++++++------------ 2 files changed, 11 insertions(+), 16 deletions(-) diff --git a/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedLeakDetector.java b/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedLeakDetector.java index d10745a0d4..088e41f274 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedLeakDetector.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedLeakDetector.java @@ -262,7 +262,9 @@ static class TraceInfo { this.id = id; this.op = op; this.previousRefCount = previousRefCount; - this.counts = previous == null? new Counts(): new Counts(op, previous.counts); + this.counts = previous == null? new Counts() + : op == Op.LEAK? previous.counts + : new Counts(op, previous.counts); final Thread thread = Thread.currentThread(); this.threadName = thread.getName(); diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java index 2aac6c1b1f..f4b6dc452e 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java @@ -45,15 +45,10 @@ */ public class MemoryRaftLog extends RaftLogBase { static class EntryList { - private final List> entries = new ArrayList<>(); - - ReferenceCountedObject getRef(int i) { - return i >= 0 && i < entries.size() ? entries.get(i) : null; - } + private final List entries = new ArrayList<>(); LogEntryProto get(int i) { - final ReferenceCountedObject ref = getRef(i); - return ref != null ? ref.get() : null; + return i >= 0 && i < entries.size() ? entries.get(i) : null; } TermIndex getTermIndex(int i) { @@ -81,13 +76,10 @@ void purge(int index) { } void clear(int from, int to) { - List> subList = entries.subList(from, to); - subList.forEach(ReferenceCountedObject::release); - subList.clear(); + entries.subList(from, to).clear(); } - void add(ReferenceCountedObject entryRef) { - entryRef.retain(); + void add(LogEntryProto entryRef) { entries.add(entryRef); } } @@ -128,7 +120,8 @@ public LogEntryProto get(long index) throws RaftLogIOException { public ReferenceCountedObject retainLog(long index) { checkLogState(); try (AutoCloseableLock readLock = readLock()) { - ReferenceCountedObject ref = entries.getRef(Math.toIntExact(index)); + final LogEntryProto entry = entries.get(Math.toIntExact(index)); + final ReferenceCountedObject ref = ReferenceCountedObject.wrap(entry); ref.retain(); return ref; } @@ -205,7 +198,7 @@ protected CompletableFuture appendEntryImpl(ReferenceCountedObject> appendImpl(ReferenceCountedObject Date: Sun, 6 Oct 2024 23:40:26 -0700 Subject: [PATCH 29/31] SegmentedRaftLogWorker should clean up unfinished tasks in the queue. --- .../apache/ratis/util/DataBlockingQueue.java | 24 ++++++++++++++++++ .../segmented/SegmentedRaftLogWorker.java | 25 +++++++++++-------- 2 files changed, 38 insertions(+), 11 deletions(-) diff --git a/ratis-common/src/main/java/org/apache/ratis/util/DataBlockingQueue.java b/ratis-common/src/main/java/org/apache/ratis/util/DataBlockingQueue.java index e905893e5b..4651bcf230 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/DataBlockingQueue.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/DataBlockingQueue.java @@ -29,6 +29,7 @@ import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Consumer; import java.util.function.ToLongFunction; /** @@ -46,6 +47,8 @@ public class DataBlockingQueue extends DataQueue { private final Condition notFull = lock.newCondition(); private final Condition notEmpty = lock.newCondition(); + private boolean closed = false; + public DataBlockingQueue(Object name, SizeInBytes byteLimit, int elementLimit, ToLongFunction getNumBytes) { super(name, byteLimit, elementLimit, getNumBytes); } @@ -72,10 +75,28 @@ public void clear() { } } + public void clear(Consumer handler) { + try(AutoCloseableLock auto = AutoCloseableLock.acquire(lock)) { + for(E e : this) { + handler.accept(e); + } + super.clear(); + } + } + + public void close() { + try(AutoCloseableLock auto = AutoCloseableLock.acquire(lock)) { + closed = true; + } + } + @Override public boolean offer(E element) { Objects.requireNonNull(element, "element == null"); try(AutoCloseableLock auto = AutoCloseableLock.acquire(lock)) { + if (closed) { + return false; + } if (super.offer(element)) { notEmpty.signal(); return true; @@ -95,6 +116,9 @@ public boolean offer(E element, TimeDuration timeout) throws InterruptedExceptio long nanos = timeout.toLong(TimeUnit.NANOSECONDS); try(AutoCloseableLock auto = AutoCloseableLock.acquire(lock)) { for(;;) { + if (closed) { + return false; + } if (super.offer(element)) { notEmpty.signal(); return true; diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java index 9ed3a1b763..111cc2d21f 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java @@ -51,6 +51,7 @@ import java.util.Optional; import java.util.Queue; import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.function.Supplier; @@ -242,10 +243,11 @@ void start(long latestIndex, long evictIndex, File openSegmentFile) throws IOExc } void close() { + queue.close(); this.running = false; + ConcurrentUtils.shutdownAndWait(TimeDuration.ONE_MINUTE, workerThreadExecutor, + timeout -> LOG.warn("{}: shutdown timeout in {}", name, timeout)); Optional.ofNullable(flushExecutor).ifPresent(ExecutorService::shutdown); - ConcurrentUtils.shutdownAndWait(TimeDuration.ONE_SECOND.multiply(3), - workerThreadExecutor, timeout -> LOG.warn("{}: shutdown timeout in " + timeout, name)); IOUtils.cleanup(LOG, out); PlatformDependent.freeDirectBuffer(writeBuffer); LOG.info("{} close()", name); @@ -341,7 +343,7 @@ private void run() { LOG.info(Thread.currentThread().getName() + " was interrupted, exiting. There are " + queue.getNumElements() + " tasks remaining in the queue."); - return; + break; } catch (Exception e) { if (!running) { LOG.info("{} got closed and hit exception", @@ -352,6 +354,8 @@ private void run() { } } } + + queue.clear(Task::discard); } private boolean shouldFlush() { @@ -494,7 +498,7 @@ private class WriteLog extends Task { private final LogEntryProto entry; private final CompletableFuture stateMachineFuture; private final CompletableFuture combined; - private final ReferenceCountedObject ref; + private final AtomicReference> ref = new AtomicReference<>(); WriteLog(ReferenceCountedObject entryRef, LogEntryProto removedStateMachineData, TransactionContext context) { @@ -512,7 +516,7 @@ private class WriteLog extends Task { this.stateMachineFuture = null; } entryRef.retain(); - this.ref = entryRef; + this.ref.set(entryRef); } else { try { // this.entry != origEntry if it has state machine data @@ -522,7 +526,6 @@ private class WriteLog extends Task { + ", entry=" + LogProtoUtils.toLogEntryString(origEntry, stateMachine::toStateMachineLogEntryString), e); throw e; } - this.ref = null; } this.combined = stateMachineFuture == null? super.getFuture() : super.getFuture().thenCombine(stateMachineFuture, (index, stateMachineResult) -> index); @@ -532,6 +535,7 @@ private class WriteLog extends Task { void failed(IOException e) { stateMachine.event().notifyLogFailed(e, entry); super.failed(e); + discard(); } @Override @@ -547,15 +551,14 @@ CompletableFuture getFuture() { @Override void done() { writeTasks.offerOrCompleteFuture(this); - if (ref != null) { - ref.release(); - } + discard(); } @Override void discard() { - if (ref != null) { - ref.release(); + final ReferenceCountedObject entry = ref.getAndSet(null); + if (entry != null) { + entry.release(); } } From 3b07ab9218516d19ef5501c69be21b0b84dd2296 Mon Sep 17 00:00:00 2001 From: Tsz-Wo Nicholas Sze Date: Sun, 6 Oct 2024 23:49:09 -0700 Subject: [PATCH 30/31] Fix checkstyle --- .../server/raftlog/segmented/SegmentedRaftLogWorker.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java index 111cc2d21f..a3d13de9fd 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java @@ -556,9 +556,9 @@ void done() { @Override void discard() { - final ReferenceCountedObject entry = ref.getAndSet(null); - if (entry != null) { - entry.release(); + final ReferenceCountedObject entryRef = ref.getAndSet(null); + if (entryRef != null) { + entryRef.release(); } } From 6c15124d6f24a83237aa4654148e6e51d657ccba Mon Sep 17 00:00:00 2001 From: Tsz-Wo Nicholas Sze Date: Mon, 7 Oct 2024 09:12:29 -0700 Subject: [PATCH 31/31] Revert pom.xml changes. --- pom.xml | 8 ++++---- .../java/org/apache/ratis/util/DataBlockingQueue.java | 8 +++++++- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/pom.xml b/pom.xml index 21483043f2..94f3a7d99f 100644 --- a/pom.xml +++ b/pom.xml @@ -176,7 +176,7 @@ 3.3.0 4.0.6 1.6.1 - 3.5.1 + 3.0.0-M4 3.5.3 3.4.0 @@ -223,8 +223,8 @@ 4 2.0.7 - 5.11.2 - 0.8.12 + 5.10.1 + 0.8.11 @@ -427,7 +427,7 @@ org.junit junit-bom - ${junit-bom.version} + ${junit.jupiter.version} pom import diff --git a/ratis-common/src/main/java/org/apache/ratis/util/DataBlockingQueue.java b/ratis-common/src/main/java/org/apache/ratis/util/DataBlockingQueue.java index 4651bcf230..fb0f0715c5 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/DataBlockingQueue.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/DataBlockingQueue.java @@ -75,6 +75,7 @@ public void clear() { } } + /** Apply the given handler to each element and then {@link #clear()}. */ public void clear(Consumer handler) { try(AutoCloseableLock auto = AutoCloseableLock.acquire(lock)) { for(E e : this) { @@ -84,8 +85,13 @@ public void clear(Consumer handler) { } } + /** + * Close this queue to stop accepting new elements, i.e. the offer(…) methods always return false. + * Note that closing the queue will not clear the existing elements. + * The existing elements can be peeked, polled or cleared after close. + */ public void close() { - try(AutoCloseableLock auto = AutoCloseableLock.acquire(lock)) { + try(AutoCloseableLock ignored = AutoCloseableLock.acquire(lock)) { closed = true; } }