From c06bd4de75db0738cff9e053d199f7eb67d97c9b Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Alen=20Vre=C4=8Dko?=
<332217+avrecko@users.noreply.github.com>
Date: Sun, 11 Feb 2024 20:19:51 +0100
Subject: [PATCH] Improvements to JNA related code.
* Pooling max 20 events instead of 1 event at a time in ProcessEpoll,
* explicit freeing of out/err/in buffers instead of waiting for JNA GC RefHandler,
* reusing of IntByReference to avoid per method malloc call,
* updated JNA library to 5.13.0,
---
pom.xml | 2 +-
.../internal/BaseEventProcessor.java | 4 +
.../nuprocess/internal/BasePosixProcess.java | 6 +-
.../zaxxer/nuprocess/linux/LinuxProcess.java | 23 ++++++
.../zaxxer/nuprocess/linux/ProcessEpoll.java | 23 ++++--
.../zaxxer/nuprocess/osx/ProcessKqueue.java | 3 +-
.../nuprocess/LoadTestForStartUseCase.java | 79 +++++++++++++++++++
7 files changed, 127 insertions(+), 13 deletions(-)
create mode 100644 src/test/java/com/zaxxer/nuprocess/LoadTestForStartUseCase.java
diff --git a/pom.xml b/pom.xml
index 9f1cbdcf..8360904f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -68,7 +68,7 @@
net.java.dev.jna
jna
- 5.11.0
+ 5.13.0
diff --git a/src/main/java/com/zaxxer/nuprocess/internal/BaseEventProcessor.java b/src/main/java/com/zaxxer/nuprocess/internal/BaseEventProcessor.java
index 33ff02bc..2e65b2f0 100644
--- a/src/main/java/com/zaxxer/nuprocess/internal/BaseEventProcessor.java
+++ b/src/main/java/com/zaxxer/nuprocess/internal/BaseEventProcessor.java
@@ -48,6 +48,10 @@ public abstract class BaseEventProcessor implements
private CyclicBarrier startBarrier;
private AtomicBoolean isRunning;
+ // avoid unnecessary malloc calls by reusing native pointer
+ // this field is thread safe as this is only used by Epoll/KQueue thread
+ protected final IntByReference exitCodePointer = new IntByReference();
+
static {
LINGER_TIME_MS = Math.max(1000, Integer.getInteger("com.zaxxer.nuprocess.lingerTimeMs", 2500));
diff --git a/src/main/java/com/zaxxer/nuprocess/internal/BasePosixProcess.java b/src/main/java/com/zaxxer/nuprocess/internal/BasePosixProcess.java
index 5b8f19d5..b310581a 100644
--- a/src/main/java/com/zaxxer/nuprocess/internal/BasePosixProcess.java
+++ b/src/main/java/com/zaxxer/nuprocess/internal/BasePosixProcess.java
@@ -323,8 +323,9 @@ public void onExit(int statusCode)
}
finally {
exitPending.countDown();
- // Once the last reference to the buffer is gone, Java will finalize the buffer
- // and release the native memory we allocated in initializeBuffers().
+ outBufferMemory.close();
+ errBufferMemory.close();
+ inBufferMemory.close();
outBufferMemory = null;
errBufferMemory = null;
inBufferMemory = null;
@@ -332,7 +333,6 @@ public void onExit(int statusCode)
errBuffer = null;
inBuffer = null;
processHandler = null;
- Memory.purge();
}
}
diff --git a/src/main/java/com/zaxxer/nuprocess/linux/LinuxProcess.java b/src/main/java/com/zaxxer/nuprocess/linux/LinuxProcess.java
index d4c6bca8..519eda7b 100644
--- a/src/main/java/com/zaxxer/nuprocess/linux/LinuxProcess.java
+++ b/src/main/java/com/zaxxer/nuprocess/linux/LinuxProcess.java
@@ -17,6 +17,7 @@
package com.zaxxer.nuprocess.linux;
import com.sun.jna.JNIEnv;
+import com.sun.jna.Memory;
import com.zaxxer.nuprocess.NuProcess;
import com.zaxxer.nuprocess.NuProcessHandler;
import com.zaxxer.nuprocess.internal.BasePosixProcess;
@@ -124,6 +125,9 @@ public void run(List command, String[] environment, Path cwd)
* @return this process's {@link EpollEvent} struct
*/
EpollEvent getEpollEvent() {
+
+
+
return epollEvent;
}
@@ -198,6 +202,25 @@ private void closePipes()
LibC.close(stderrWidow);
}
+ public void onExit(final int statusCode)
+ {
+ super.onExit(statusCode);
+
+ // at this point we'd like to free the epollEvent native memory
+ // the isRunning volatile field is set to false in the super call to onExit
+ // it guarantees a happens before relation, this is too weak to guarantee
+ // protection for ProcessEpoll#queueWrite's use of epollEvent
+ // don't want to get a segfault if someone tries to call queueWrite after process has done onExit
+ // e.g. running a load with calls to wantWrite() after process exit in some rare cases success in Segfaulting
+ // without protection
+
+ synchronized (epollEvent)
+ {
+ isRunning = false; // guaranteed to be seen by a thread that also syncs on the same synchronizer
+ ((Memory)epollEvent.getPointer()).close();
+ }
+ }
+
private static byte[] toCString(String s) {
if (s == null)
return null;
diff --git a/src/main/java/com/zaxxer/nuprocess/linux/ProcessEpoll.java b/src/main/java/com/zaxxer/nuprocess/linux/ProcessEpoll.java
index 54c09fc0..0ea2fac4 100644
--- a/src/main/java/com/zaxxer/nuprocess/linux/ProcessEpoll.java
+++ b/src/main/java/com/zaxxer/nuprocess/linux/ProcessEpoll.java
@@ -21,7 +21,6 @@
import java.util.List;
import com.sun.jna.Native;
-import com.sun.jna.ptr.IntByReference;
import com.zaxxer.nuprocess.NuProcess;
import com.zaxxer.nuprocess.internal.BaseEventProcessor;
import com.zaxxer.nuprocess.internal.LibC;
@@ -40,6 +39,7 @@ class ProcessEpoll extends BaseEventProcessor
private final EpollEvent triggeredEvent;
private final List deadPool;
+
private LinuxProcess process;
ProcessEpoll()
@@ -86,6 +86,8 @@ public void registerProcess(LinuxProcess process)
int stdinFd = Integer.MIN_VALUE;
int stdoutFd = Integer.MIN_VALUE;
int stderrFd = Integer.MIN_VALUE;
+
+
try {
stdinFd = process.getStdin().acquire();
stdoutFd = process.getStdout().acquire();
@@ -140,6 +142,15 @@ public void queueWrite(LinuxProcess process)
}
EpollEvent event = process.getEpollEvent();
+
+ synchronized (event)
+ {
+ if(!process.isRunning())
+ {
+ return;
+ }
+ }
+
event.setEvents(LibEpoll.EPOLLOUT | LibEpoll.EPOLLONESHOT | LibEpoll.EPOLLRDHUP | LibEpoll.EPOLLHUP);
event.setFileDescriptor(stdin);
int rc = LibEpoll.epoll_ctl(epoll, LibEpoll.EPOLL_CTL_MOD, stdin, event.getPointer());
@@ -306,8 +317,7 @@ private void cleanupProcess(LinuxProcess linuxProcess, int stdinFd, int stdoutFd
return;
}
- IntByReference ret = new IntByReference();
- int rc = LibC.waitpid(linuxProcess.getPid(), ret, LibC.WNOHANG);
+ int rc = LibC.waitpid(linuxProcess.getPid(), exitCodePointer, LibC.WNOHANG);
if (rc == 0) {
deadPool.add(linuxProcess);
@@ -316,7 +326,7 @@ else if (rc < 0) {
linuxProcess.onExit((Native.getLastError() == LibC.ECHILD) ? Integer.MAX_VALUE : Integer.MIN_VALUE);
}
else {
- handleExit(linuxProcess, ret.getValue());
+ handleExit(linuxProcess, exitCodePointer.getValue());
}
}
@@ -326,11 +336,10 @@ private void checkDeadPool()
return;
}
- IntByReference ret = new IntByReference();
Iterator iterator = deadPool.iterator();
while (iterator.hasNext()) {
LinuxProcess process = iterator.next();
- int rc = LibC.waitpid(process.getPid(), ret, LibC.WNOHANG);
+ int rc = LibC.waitpid(process.getPid(), exitCodePointer, LibC.WNOHANG);
if (rc == 0) {
continue;
}
@@ -341,7 +350,7 @@ private void checkDeadPool()
continue;
}
- handleExit(process, ret.getValue());
+ handleExit(process, exitCodePointer.getValue());
}
}
diff --git a/src/main/java/com/zaxxer/nuprocess/osx/ProcessKqueue.java b/src/main/java/com/zaxxer/nuprocess/osx/ProcessKqueue.java
index 210bf233..075b8f18 100644
--- a/src/main/java/com/zaxxer/nuprocess/osx/ProcessKqueue.java
+++ b/src/main/java/com/zaxxer/nuprocess/osx/ProcessKqueue.java
@@ -30,7 +30,6 @@
import com.sun.jna.Native;
import com.sun.jna.Pointer;
-import com.sun.jna.ptr.IntByReference;
import com.zaxxer.nuprocess.internal.BaseEventProcessor;
import com.zaxxer.nuprocess.internal.LibC;
import com.zaxxer.nuprocess.osx.LibKevent.Kevent;
@@ -393,7 +392,7 @@ private void checkWaitWrites()
private void cleanupProcess(OsxProcess osxProcess)
{
- LibC.waitpid(osxProcess.getPid(), new IntByReference(), LibC.WNOHANG);
+ LibC.waitpid(osxProcess.getPid(), exitCodePointer, LibC.WNOHANG);
// If this is the last process in the map, this thread will cleanly shut down.
pidToProcessMap.remove(osxProcess.getPid());
diff --git a/src/test/java/com/zaxxer/nuprocess/LoadTestForStartUseCase.java b/src/test/java/com/zaxxer/nuprocess/LoadTestForStartUseCase.java
new file mode 100644
index 00000000..5ad36a90
--- /dev/null
+++ b/src/test/java/com/zaxxer/nuprocess/LoadTestForStartUseCase.java
@@ -0,0 +1,79 @@
+package com.zaxxer.nuprocess;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class LoadTestForStartUseCase
+{
+
+ @Test
+ public void testStartLoad() throws InterruptedException
+ {
+ int durationInMs = 3000_000;
+ long cutOfTime = System.currentTimeMillis() + durationInMs;
+ int nrOfThreads = (Runtime.getRuntime().availableProcessors() * 2);
+ CountDownLatch latch = new CountDownLatch(nrOfThreads);
+ AtomicLong runCountCode0 = new AtomicLong();
+ AtomicLong runCountCodeNon0 = new AtomicLong();
+ for (int i = 0; i < nrOfThreads; i++)
+ {
+ newThread(latch, cutOfTime, runCountCode0, runCountCodeNon0);
+ }
+
+ Assert.assertTrue(latch.await(durationInMs + 1_000, TimeUnit.MILLISECONDS));
+ System.out.println("runCount 0 = " + runCountCode0.get());
+ System.out.println("runCount non-0 = " + runCountCodeNon0.get());
+ }
+
+ private void newThread(final CountDownLatch latch, final long cutOfTime, final AtomicLong zeroExit, final AtomicLong nonZeroExit)
+ {
+ new Thread(new Runnable()
+ {
+ public void run()
+ {
+ while (System.currentTimeMillis() < cutOfTime)
+ {
+ final int randomInt = ThreadLocalRandom.current().nextInt(10_000);
+
+ long startTime = System.nanoTime();
+ final NuProcess start = new NuProcessBuilder(new NuAbstractProcessHandler()
+ {
+ public void onPreStart(final NuProcess nuProcess)
+ {
+ super.onPreStart(nuProcess);
+ }
+
+ public void onExit(final int statusCode)
+ {
+ if (statusCode == 0)
+ {
+ zeroExit.incrementAndGet();
+ }
+ else
+ {
+ nonZeroExit.incrementAndGet();
+ }
+ }
+ }, "echo", "foo" + randomInt).start();
+ try
+ {
+ start.waitFor(10, TimeUnit.DAYS);
+ System.out.println("Took " + ((System.nanoTime() - startTime) / 1000000));
+// start.wantWrite();
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ latch.countDown();
+ }
+ }).start();
+ }
+}