Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Testing zero-copy bugs fixes (not for merging) #1156

Open
wants to merge 31 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
0d2a4dc
RATIS-2164. LeakDetector has a race condition.
szetszwo Sep 28, 2024
205c720
Fix a bug and checkstyle.
szetszwo Sep 28, 2024
58f296f
Enable advanced detection for debugging.
szetszwo Sep 28, 2024
67577ff
Fixed some bugs.
szetszwo Sep 28, 2024
90321af
Some minor changes.
szetszwo Sep 28, 2024
1c5c6eb
try-catch MiniRaftCluster shutdown.
szetszwo Sep 29, 2024
9159532
Report earlier leaks at shutdown.
szetszwo Sep 29, 2024
0f4b61e
Enable advance leak detection.
szetszwo Sep 29, 2024
fe29cde
Move the enable method to ReferenceCountedLeakDetector.
szetszwo Sep 29, 2024
7a6fef9
Use HashMap.
szetszwo Oct 3, 2024
77db48e
Fix a bug in LogAppenderDefault.
szetszwo Oct 3, 2024
c8e3ac8
Rewrite AdvancedTracing.
szetszwo Oct 4, 2024
aea498f
Fix a bug in LogSegment cache.
szetszwo Oct 4, 2024
0104ece
Add synchronized to get()
szetszwo Oct 4, 2024
43980fb
Fix javac error.
szetszwo Oct 4, 2024
9150bdc
Restore RaftBasicTests.
szetszwo Oct 4, 2024
38f5c69
Move ReferenceCountedLeakDetector.enable(..) to MiniRaftCluster.
szetszwo Oct 4, 2024
23af8ed
Fix bugs in LogSegment.EntryCache.
szetszwo Oct 5, 2024
3512387
Fix a bug in SimpleStateMachine4Testing.
szetszwo Oct 5, 2024
b548373
Copy LogEntryProto in SimpleStateMachine4Testing.
szetszwo Oct 5, 2024
c4ac263
Use Throwable in MiniRaftCluster.
szetszwo Oct 5, 2024
b831226
New entries can to added after EntryCache is closed.
szetszwo Oct 6, 2024
55a3896
Bump test related plugin versions.
szetszwo Oct 7, 2024
1d49431
Reduce messages to 100
szetszwo Oct 7, 2024
b57a748
Fix checkstyle.
szetszwo Oct 7, 2024
483b6ae
Resest test Xmx to 2g
szetszwo Oct 7, 2024
6547e14
Retry assertNoLeaks multiple times.
szetszwo Oct 7, 2024
dc690e4
Copy log entries in MemoryRaftLog.
szetszwo Oct 7, 2024
632809e
SegmentedRaftLogWorker should clean up unfinished tasks in the queue.
szetszwo Oct 7, 2024
3b07ab9
Fix checkstyle
szetszwo Oct 7, 2024
6c15124
Revert pom.xml changes.
szetszwo Oct 7, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -643,7 +643,7 @@
<enableProcessChecker>all</enableProcessChecker>
<forkedProcessTimeoutInSeconds>600</forkedProcessTimeoutInSeconds>
<!-- @argLine is filled by jacoco maven plugin. @{} means late evaluation -->
<argLine>-Xmx2048m -XX:+HeapDumpOnOutOfMemoryError @{argLine}</argLine>
<argLine>-Xmx2g -XX:+HeapDumpOnOutOfMemoryError @{argLine}</argLine>
<systemPropertyVariables>
<ratis.log.dir>${project.build.directory}/log</ratis.log.dir>
<ratis.tmp.dir>${project.build.directory}/tmp</ratis.tmp.dir>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.function.ToLongFunction;

/**
Expand All @@ -46,6 +47,8 @@ public class DataBlockingQueue<E> extends DataQueue<E> {
private final Condition notFull = lock.newCondition();
private final Condition notEmpty = lock.newCondition();

private boolean closed = false;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use atomic?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is already a lock.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. :)

public DataBlockingQueue(Object name, SizeInBytes byteLimit, int elementLimit, ToLongFunction<E> getNumBytes) {
super(name, byteLimit, elementLimit, getNumBytes);
}
Expand All @@ -72,10 +75,34 @@ public void clear() {
}
}

/** Apply the given handler to each element and then {@link #clear()}. */
public void clear(Consumer<E> handler) {
try(AutoCloseableLock auto = AutoCloseableLock.acquire(lock)) {
for(E e : this) {
handler.accept(e);
}
super.clear();
}
}

/**
* Close this queue to stop accepting new elements, i.e. the offer(…) methods always return false.
* Note that closing the queue will not clear the existing elements.
* The existing elements can be peeked, polled or cleared after close.
*/
public void close() {
try(AutoCloseableLock ignored = AutoCloseableLock.acquire(lock)) {
closed = true;
}
}

@Override
public boolean offer(E element) {
Objects.requireNonNull(element, "element == null");
try(AutoCloseableLock auto = AutoCloseableLock.acquire(lock)) {
if (closed) {
return false;
}
if (super.offer(element)) {
notEmpty.signal();
return true;
Expand All @@ -95,6 +122,9 @@ public boolean offer(E element, TimeDuration timeout) throws InterruptedExceptio
long nanos = timeout.toLong(TimeUnit.NANOSECONDS);
try(AutoCloseableLock auto = AutoCloseableLock.acquire(lock)) {
for(;;) {
if (closed) {
return false;
}
if (super.offer(element)) {
notEmpty.signal();
return true;
Expand Down
111 changes: 84 additions & 27 deletions ratis-common/src/main/java/org/apache/ratis/util/LeakDetector.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,14 @@

import java.lang.ref.ReferenceQueue;
import java.lang.ref.WeakReference;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Supplier;

/**
* Simple general resource leak detector using {@link ReferenceQueue} and {@link java.lang.ref.WeakReference} to
Expand Down Expand Up @@ -55,13 +59,59 @@
*/
public class LeakDetector {
private static final Logger LOG = LoggerFactory.getLogger(LeakDetector.class);

private static class LeakTrackerSet {
private final Set<LeakTracker> set = Collections.newSetFromMap(new HashMap<>());

synchronized boolean remove(LeakTracker tracker) {
return set.remove(tracker);
}

synchronized void removeExisting(LeakTracker tracker) {
final boolean removed = set.remove(tracker);
Preconditions.assertTrue(removed, () -> "Failed to remove existing " + tracker);
}

synchronized LeakTracker add(Object referent, ReferenceQueue<Object> queue, Supplier<String> leakReporter) {
final LeakTracker tracker = new LeakTracker(referent, queue, this::removeExisting, leakReporter);
final boolean added = set.add(tracker);
Preconditions.assertTrue(added, () -> "Failed to add " + tracker + " for " + referent);
return tracker;
}

synchronized int getNumLeaks(boolean throwException) {
if (set.isEmpty()) {
return 0;
}

int n = 0;
for (LeakTracker tracker : set) {
if (tracker.reportLeak() != null) {
n++;
}
}
if (throwException) {
assertNoLeaks(n);
}
return n;
}

synchronized void assertNoLeaks(int leaks) {
Preconditions.assertTrue(leaks == 0, () -> {
final int size = set.size();
return "#leaks = " + leaks + " > 0, #leaks " + (leaks == size? "==" : "!=") + " set.size = " + size;
});
}
}

private static final AtomicLong COUNTER = new AtomicLong();

private final ReferenceQueue<Object> queue = new ReferenceQueue<>();
private final Set<LeakTracker> allLeaks = Collections.newSetFromMap(new ConcurrentHashMap<>());
private final LeakTrackerSet allLeaks = new LeakTrackerSet();
private final List<String> leakMessages = Collections.synchronizedList(new ArrayList<>());
private final String name;

public LeakDetector(String name) {
LeakDetector(String name) {
this.name = name + COUNTER.getAndIncrement();
}

Expand All @@ -81,7 +131,10 @@ private void run() {
// Original resource already been GCed, if tracker is not closed yet,
// report a leak.
if (allLeaks.remove(tracker)) {
tracker.reportLeak();
final String leak = tracker.reportLeak();
if (leak != null) {
leakMessages.add(leak);
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
Expand All @@ -93,48 +146,52 @@ private void run() {
LOG.warn("Exiting leak detector {}.", name);
}

public UncheckedAutoCloseable track(Object leakable, Runnable reportLeak) {
Runnable track(Object leakable, Supplier<String> reportLeak) {
// A rate filter can be put here to only track a subset of all objects, e.g. 5%, 10%,
// if we have proofs that leak tracking impacts performance, or a single LeakDetector
// thread can't keep up with the pace of object allocation.
// For now, it looks effective enough and let keep it simple.
LeakTracker tracker = new LeakTracker(leakable, queue, allLeaks, reportLeak);
allLeaks.add(tracker);
return tracker;
return allLeaks.add(leakable, queue, reportLeak)::remove;
}

public void assertNoLeaks() {
Preconditions.assertTrue(allLeaks.isEmpty(), this::allLeaksString);
}
public void assertNoLeaks(int maxRetries) throws InterruptedException {
synchronized (leakMessages) {
Preconditions.assertTrue(leakMessages.isEmpty(),
() -> "#leaks = " + leakMessages.size() + "\n" + leakMessages);
}

String allLeaksString() {
if (allLeaks.isEmpty()) {
return "allLeaks = <empty>";
for(int i = 0; i < maxRetries; i++) {
final int numLeaks = allLeaks.getNumLeaks(false);
if (numLeaks == 0) {
return;
}
LOG.warn("{}/{}) numLeaks == {} > 0, will wait and retry ...", i, maxRetries, numLeaks);
TimeDuration.ONE_SECOND.sleep();
}
allLeaks.forEach(LeakTracker::reportLeak);
return "allLeaks.size = " + allLeaks.size();
allLeaks.getNumLeaks(true);
}

private static final class LeakTracker extends WeakReference<Object> implements UncheckedAutoCloseable {
private final Set<LeakTracker> allLeaks;
private final Runnable leakReporter;
private static final class LeakTracker extends WeakReference<Object> {
private final Consumer<LeakTracker> removeMethod;
private final Supplier<String> leakReporter;

LeakTracker(Object referent, ReferenceQueue<Object> referenceQueue,
Set<LeakTracker> allLeaks, Runnable leakReporter) {
Consumer<LeakTracker> removeMethod, Supplier<String> leakReporter) {
super(referent, referenceQueue);
this.allLeaks = allLeaks;
this.removeMethod = removeMethod;
this.leakReporter = leakReporter;
}

/**
* Called by the tracked resource when closing.
* Called by the tracked resource when releasing the object.
*/
@Override
public void close() {
allLeaks.remove(this);
void remove() {
removeMethod.accept(this);
}

void reportLeak() {
leakReporter.run();
/** @return the leak message if there is a leak; return null if there is no leak. */
String reportLeak() {
return leakReporter.get();
}
}
}
Loading
Loading