From 09350c572b1647e2b09e60c63b81f381426db969 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Alen=20Vre=C4=8Dko?=
<332217+avrecko@users.noreply.github.com>
Date: Sun, 11 Feb 2024 20:19:51 +0100
Subject: [PATCH] Don't depend on the GC to free native memory on Linux.
JNA is freeing 192 KiB of per NuProcess native buffers on GC Finalizer / Reference Queue processing. In cases with very low GC activity, this is essentially a memory leak, potentially causing native memory issues.
* out/err/in buffers are freed onExit,
* event needed for Epoll #registerProcess and ##queueWrite is allocated and freed in the method,
* tweak in reusing of IntByReference for duration of epoll/kqueue processor,
* updated JNA library to 5.13.0, to get #close on Memory object (added in 5.12.0).
---
pom.xml | 2 +-
.../internal/BaseEventProcessor.java | 3 +
.../nuprocess/internal/BasePosixProcess.java | 6 +-
.../com/zaxxer/nuprocess/internal/Util.java | 22 +++++
.../zaxxer/nuprocess/linux/EpollEvent.java | 6 ++
.../zaxxer/nuprocess/linux/LinuxProcess.java | 13 ---
.../zaxxer/nuprocess/linux/ProcessEpoll.java | 20 ++---
.../zaxxer/nuprocess/osx/ProcessKqueue.java | 3 +-
.../com/zaxxer/nuprocess/ThreadLoadTest.java | 80 +++++++++++++++++++
9 files changed, 127 insertions(+), 28 deletions(-)
create mode 100644 src/main/java/com/zaxxer/nuprocess/internal/Util.java
create mode 100644 src/test/java/com/zaxxer/nuprocess/ThreadLoadTest.java
diff --git a/pom.xml b/pom.xml
index 51cee582..65a14e05 100644
--- a/pom.xml
+++ b/pom.xml
@@ -68,7 +68,7 @@
net.java.dev.jna
jna
- 5.11.0
+ 5.13.0
diff --git a/src/main/java/com/zaxxer/nuprocess/internal/BaseEventProcessor.java b/src/main/java/com/zaxxer/nuprocess/internal/BaseEventProcessor.java
index 33ff02bc..df1bdb5f 100644
--- a/src/main/java/com/zaxxer/nuprocess/internal/BaseEventProcessor.java
+++ b/src/main/java/com/zaxxer/nuprocess/internal/BaseEventProcessor.java
@@ -48,6 +48,8 @@ public abstract class BaseEventProcessor implements
private CyclicBarrier startBarrier;
private AtomicBoolean isRunning;
+ protected final IntByReference exitCodePointer = new IntByReference();
+
static {
LINGER_TIME_MS = Math.max(1000, Integer.getInteger("com.zaxxer.nuprocess.lingerTimeMs", 2500));
@@ -128,6 +130,7 @@ public void shutdown()
process.onExit(Integer.MAX_VALUE - 1);
LibC.waitpid(process.getPid(), exitCode, LibC.WNOHANG);
}
+ Util.close(exitCode);
}
/**
diff --git a/src/main/java/com/zaxxer/nuprocess/internal/BasePosixProcess.java b/src/main/java/com/zaxxer/nuprocess/internal/BasePosixProcess.java
index 5b8f19d5..b310581a 100644
--- a/src/main/java/com/zaxxer/nuprocess/internal/BasePosixProcess.java
+++ b/src/main/java/com/zaxxer/nuprocess/internal/BasePosixProcess.java
@@ -323,8 +323,9 @@ public void onExit(int statusCode)
}
finally {
exitPending.countDown();
- // Once the last reference to the buffer is gone, Java will finalize the buffer
- // and release the native memory we allocated in initializeBuffers().
+ outBufferMemory.close();
+ errBufferMemory.close();
+ inBufferMemory.close();
outBufferMemory = null;
errBufferMemory = null;
inBufferMemory = null;
@@ -332,7 +333,6 @@ public void onExit(int statusCode)
errBuffer = null;
inBuffer = null;
processHandler = null;
- Memory.purge();
}
}
diff --git a/src/main/java/com/zaxxer/nuprocess/internal/Util.java b/src/main/java/com/zaxxer/nuprocess/internal/Util.java
new file mode 100644
index 00000000..2b31c88e
--- /dev/null
+++ b/src/main/java/com/zaxxer/nuprocess/internal/Util.java
@@ -0,0 +1,22 @@
+package com.zaxxer.nuprocess.internal;
+
+import com.sun.jna.Memory;
+import com.sun.jna.Pointer;
+import com.sun.jna.PointerType;
+
+public class Util {
+
+ public static void close(PointerType pointerType) {
+ if (pointerType == null) {
+ return;
+ }
+ close(pointerType.getPointer());
+ }
+
+ public static void close(Pointer p) {
+ if (p instanceof Memory) {
+ ((Memory) p).close();
+ }
+ }
+
+}
diff --git a/src/main/java/com/zaxxer/nuprocess/linux/EpollEvent.java b/src/main/java/com/zaxxer/nuprocess/linux/EpollEvent.java
index bbe58e77..b3a7168e 100644
--- a/src/main/java/com/zaxxer/nuprocess/linux/EpollEvent.java
+++ b/src/main/java/com/zaxxer/nuprocess/linux/EpollEvent.java
@@ -20,6 +20,7 @@
import java.util.List;
import com.sun.jna.*;
+import com.zaxxer.nuprocess.internal.Util;
class EpollEvent
{
@@ -64,6 +65,11 @@ int size() {
return size;
}
+ public void close()
+ {
+ Util.close(getPointer());
+ }
+
public static class EpollEventPrototype extends Structure
{
/*
diff --git a/src/main/java/com/zaxxer/nuprocess/linux/LinuxProcess.java b/src/main/java/com/zaxxer/nuprocess/linux/LinuxProcess.java
index ee2dae01..dae00dee 100644
--- a/src/main/java/com/zaxxer/nuprocess/linux/LinuxProcess.java
+++ b/src/main/java/com/zaxxer/nuprocess/linux/LinuxProcess.java
@@ -35,7 +35,6 @@
*/
public class LinuxProcess extends BasePosixProcess
{
- private final EpollEvent epollEvent;
static {
LibEpoll.sigignore(LibEpoll.SIGPIPE);
@@ -58,7 +57,6 @@ private enum LaunchMechanism {
LinuxProcess(NuProcessHandler processListener) {
super(processListener);
- epollEvent = new EpollEvent();
}
@Override
@@ -117,17 +115,6 @@ public void run(List command, String[] environment, Path cwd)
}
}
- /**
- * An {@link EpollEvent} struct, which may be used when registering for events for this process. Each process has
- * its own struct to avoid concurrency issues in {@link ProcessEpoll#registerProcess} when multiple processes are
- * registered at once (e.g. multiple threads are all starting new processes concurrently).
- *
- * @return this process's {@link EpollEvent} struct
- */
- EpollEvent getEpollEvent() {
- return epollEvent;
- }
-
private void prepareProcess(List command, String[] environment, Path cwd) throws IOException
{
String[] cmdarray = command.toArray(new String[0]);
diff --git a/src/main/java/com/zaxxer/nuprocess/linux/ProcessEpoll.java b/src/main/java/com/zaxxer/nuprocess/linux/ProcessEpoll.java
index 54c09fc0..e6703b28 100644
--- a/src/main/java/com/zaxxer/nuprocess/linux/ProcessEpoll.java
+++ b/src/main/java/com/zaxxer/nuprocess/linux/ProcessEpoll.java
@@ -21,7 +21,6 @@
import java.util.List;
import com.sun.jna.Native;
-import com.sun.jna.ptr.IntByReference;
import com.zaxxer.nuprocess.NuProcess;
import com.zaxxer.nuprocess.internal.BaseEventProcessor;
import com.zaxxer.nuprocess.internal.LibC;
@@ -86,6 +85,7 @@ public void registerProcess(LinuxProcess process)
int stdinFd = Integer.MIN_VALUE;
int stdoutFd = Integer.MIN_VALUE;
int stderrFd = Integer.MIN_VALUE;
+ EpollEvent event = new EpollEvent();
try {
stdinFd = process.getStdin().acquire();
stdoutFd = process.getStdout().acquire();
@@ -96,7 +96,6 @@ public void registerProcess(LinuxProcess process)
fildesToProcessMap.put(stdoutFd, process);
fildesToProcessMap.put(stderrFd, process);
- EpollEvent event = process.getEpollEvent();
event.setEvents(LibEpoll.EPOLLIN);
event.setFileDescriptor(stdoutFd);
int rc = LibEpoll.epoll_ctl(epoll, LibEpoll.EPOLL_CTL_ADD, stdoutFd, event.getPointer());
@@ -114,6 +113,7 @@ public void registerProcess(LinuxProcess process)
}
}
finally {
+ event.close();
if (stdinFd != Integer.MIN_VALUE) {
process.getStdin().release();
}
@@ -133,13 +133,14 @@ public void queueWrite(LinuxProcess process)
return;
}
+ EpollEvent event = null;
try {
int stdin = process.getStdin().acquire();
if (stdin == -1) {
return;
}
- EpollEvent event = process.getEpollEvent();
+ event = new EpollEvent();
event.setEvents(LibEpoll.EPOLLOUT | LibEpoll.EPOLLONESHOT | LibEpoll.EPOLLRDHUP | LibEpoll.EPOLLHUP);
event.setFileDescriptor(stdin);
int rc = LibEpoll.epoll_ctl(epoll, LibEpoll.EPOLL_CTL_MOD, stdin, event.getPointer());
@@ -153,6 +154,9 @@ public void queueWrite(LinuxProcess process)
}
}
finally {
+ if (event != null) {
+ event.close();
+ }
process.getStdin().release();
}
}
@@ -306,8 +310,7 @@ private void cleanupProcess(LinuxProcess linuxProcess, int stdinFd, int stdoutFd
return;
}
- IntByReference ret = new IntByReference();
- int rc = LibC.waitpid(linuxProcess.getPid(), ret, LibC.WNOHANG);
+ int rc = LibC.waitpid(linuxProcess.getPid(), exitCodePointer, LibC.WNOHANG);
if (rc == 0) {
deadPool.add(linuxProcess);
@@ -316,7 +319,7 @@ else if (rc < 0) {
linuxProcess.onExit((Native.getLastError() == LibC.ECHILD) ? Integer.MAX_VALUE : Integer.MIN_VALUE);
}
else {
- handleExit(linuxProcess, ret.getValue());
+ handleExit(linuxProcess, exitCodePointer.getValue());
}
}
@@ -326,11 +329,10 @@ private void checkDeadPool()
return;
}
- IntByReference ret = new IntByReference();
Iterator iterator = deadPool.iterator();
while (iterator.hasNext()) {
LinuxProcess process = iterator.next();
- int rc = LibC.waitpid(process.getPid(), ret, LibC.WNOHANG);
+ int rc = LibC.waitpid(process.getPid(), exitCodePointer, LibC.WNOHANG);
if (rc == 0) {
continue;
}
@@ -341,7 +343,7 @@ private void checkDeadPool()
continue;
}
- handleExit(process, ret.getValue());
+ handleExit(process, exitCodePointer.getValue());
}
}
diff --git a/src/main/java/com/zaxxer/nuprocess/osx/ProcessKqueue.java b/src/main/java/com/zaxxer/nuprocess/osx/ProcessKqueue.java
index 210bf233..075b8f18 100644
--- a/src/main/java/com/zaxxer/nuprocess/osx/ProcessKqueue.java
+++ b/src/main/java/com/zaxxer/nuprocess/osx/ProcessKqueue.java
@@ -30,7 +30,6 @@
import com.sun.jna.Native;
import com.sun.jna.Pointer;
-import com.sun.jna.ptr.IntByReference;
import com.zaxxer.nuprocess.internal.BaseEventProcessor;
import com.zaxxer.nuprocess.internal.LibC;
import com.zaxxer.nuprocess.osx.LibKevent.Kevent;
@@ -393,7 +392,7 @@ private void checkWaitWrites()
private void cleanupProcess(OsxProcess osxProcess)
{
- LibC.waitpid(osxProcess.getPid(), new IntByReference(), LibC.WNOHANG);
+ LibC.waitpid(osxProcess.getPid(), exitCodePointer, LibC.WNOHANG);
// If this is the last process in the map, this thread will cleanly shut down.
pidToProcessMap.remove(osxProcess.getPid());
diff --git a/src/test/java/com/zaxxer/nuprocess/ThreadLoadTest.java b/src/test/java/com/zaxxer/nuprocess/ThreadLoadTest.java
new file mode 100644
index 00000000..356827d3
--- /dev/null
+++ b/src/test/java/com/zaxxer/nuprocess/ThreadLoadTest.java
@@ -0,0 +1,80 @@
+package com.zaxxer.nuprocess;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Yet another threaded test.
+ */
+public class ThreadLoadTest {
+
+ @Test
+ public void testStartLoad() throws InterruptedException {
+ int durationInMs = 15_000;
+ long cutOfTime = System.currentTimeMillis() + durationInMs;
+ int nrOfThreads = (Runtime.getRuntime().availableProcessors() * 2);
+ CountDownLatch latch = new CountDownLatch(nrOfThreads);
+ AtomicLong runCountCode0 = new AtomicLong();
+ AtomicLong runCountCodeNon0 = new AtomicLong();
+ AtomicLong problems = new AtomicLong();
+ for (int i = 0; i < nrOfThreads; i++) {
+ startNewThread(latch, cutOfTime, runCountCode0, runCountCodeNon0, problems);
+ }
+
+ Assert.assertTrue(latch.await(durationInMs + 1_000, TimeUnit.MILLISECONDS));
+ System.out.println("runCount 0 = " + runCountCode0.get());
+ System.out.println("runCount non-0 = " + runCountCodeNon0.get());
+ System.out.println("problems = " + problems.get());
+ }
+
+ private void startNewThread(final CountDownLatch latch, final long cutOfTime, final AtomicLong zeroExit, final AtomicLong nonZeroExit, AtomicLong problems) {
+ new Thread(new Runnable() {
+ public void run() {
+ while (System.currentTimeMillis() < cutOfTime) {
+ final int randomInt = ThreadLocalRandom.current().nextInt(10_000);
+ final String text = "foo" + randomInt;
+ long startTime = System.nanoTime();
+
+ final NuProcess start = new NuProcessBuilder(new NuAbstractProcessHandler() {
+ public void onPreStart(final NuProcess nuProcess) {
+ super.onPreStart(nuProcess);
+ }
+
+ @Override
+ public void onStart(NuProcess nuProcess) {
+ }
+
+ @Override
+ public void onStdout(ByteBuffer buffer, boolean closed) {
+
+ }
+
+ public void onExit(final int statusCode) {
+ if (statusCode == 0) {
+ zeroExit.incrementAndGet();
+ } else {
+ nonZeroExit.incrementAndGet();
+ }
+ }
+ }, "echo", text).start();
+ try {
+ start.waitFor(10, TimeUnit.DAYS);
+
+// System.out.println("Took " + ((System.nanoTime() - startTime) / 1000000));
+// start.wantWrite();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ latch.countDown();
+ }
+ }).start();
+ }
+}