Skip to content

Commit

Permalink
Improvements to JNA related code.
Browse files Browse the repository at this point in the history
  * Pooling max 20 events instead of 1 event at a time in ProcessEpoll,
  * explicit freeing of out/err/in buffers instead of waiting for JNA GC RefHandler,
  * reusing of IntByReference to avoid per method malloc call,
  * updated JNA library to 5.13.0,
  • Loading branch information
avrecko authored and Alen Vrecko committed Feb 12, 2024
1 parent 9e2f75c commit c06bd4d
Show file tree
Hide file tree
Showing 7 changed files with 127 additions and 13 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@
<dependency>
<groupId>net.java.dev.jna</groupId>
<artifactId>jna</artifactId>
<version>5.11.0</version>
<version>5.13.0</version>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ public abstract class BaseEventProcessor<T extends BasePosixProcess> implements
private CyclicBarrier startBarrier;
private AtomicBoolean isRunning;

// avoid unnecessary malloc calls by reusing native pointer
// this field is thread safe as this is only used by Epoll/KQueue thread
protected final IntByReference exitCodePointer = new IntByReference();

static {
LINGER_TIME_MS = Math.max(1000, Integer.getInteger("com.zaxxer.nuprocess.lingerTimeMs", 2500));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,16 +323,16 @@ public void onExit(int statusCode)
}
finally {
exitPending.countDown();
// Once the last reference to the buffer is gone, Java will finalize the buffer
// and release the native memory we allocated in initializeBuffers().
outBufferMemory.close();
errBufferMemory.close();
inBufferMemory.close();
outBufferMemory = null;
errBufferMemory = null;
inBufferMemory = null;
outBuffer = null;
errBuffer = null;
inBuffer = null;
processHandler = null;
Memory.purge();
}
}

Expand Down
23 changes: 23 additions & 0 deletions src/main/java/com/zaxxer/nuprocess/linux/LinuxProcess.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.zaxxer.nuprocess.linux;

import com.sun.jna.JNIEnv;
import com.sun.jna.Memory;
import com.zaxxer.nuprocess.NuProcess;
import com.zaxxer.nuprocess.NuProcessHandler;
import com.zaxxer.nuprocess.internal.BasePosixProcess;
Expand Down Expand Up @@ -124,6 +125,9 @@ public void run(List<String> command, String[] environment, Path cwd)
* @return this process's {@link EpollEvent} struct
*/
EpollEvent getEpollEvent() {



return epollEvent;
}

Expand Down Expand Up @@ -198,6 +202,25 @@ private void closePipes()
LibC.close(stderrWidow);
}

public void onExit(final int statusCode)
{
super.onExit(statusCode);

// at this point we'd like to free the epollEvent native memory
// the isRunning volatile field is set to false in the super call to onExit
// it guarantees a happens before relation, this is too weak to guarantee
// protection for ProcessEpoll#queueWrite's use of epollEvent
// don't want to get a segfault if someone tries to call queueWrite after process has done onExit
// e.g. running a load with calls to wantWrite() after process exit in some rare cases success in Segfaulting
// without protection

synchronized (epollEvent)
{
isRunning = false; // guaranteed to be seen by a thread that also syncs on the same synchronizer
((Memory)epollEvent.getPointer()).close();
}
}

private static byte[] toCString(String s) {
if (s == null)
return null;
Expand Down
23 changes: 16 additions & 7 deletions src/main/java/com/zaxxer/nuprocess/linux/ProcessEpoll.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.util.List;

import com.sun.jna.Native;
import com.sun.jna.ptr.IntByReference;
import com.zaxxer.nuprocess.NuProcess;
import com.zaxxer.nuprocess.internal.BaseEventProcessor;
import com.zaxxer.nuprocess.internal.LibC;
Expand All @@ -40,6 +39,7 @@ class ProcessEpoll extends BaseEventProcessor<LinuxProcess>
private final EpollEvent triggeredEvent;
private final List<LinuxProcess> deadPool;


private LinuxProcess process;

ProcessEpoll()
Expand Down Expand Up @@ -86,6 +86,8 @@ public void registerProcess(LinuxProcess process)
int stdinFd = Integer.MIN_VALUE;
int stdoutFd = Integer.MIN_VALUE;
int stderrFd = Integer.MIN_VALUE;


try {
stdinFd = process.getStdin().acquire();
stdoutFd = process.getStdout().acquire();
Expand Down Expand Up @@ -140,6 +142,15 @@ public void queueWrite(LinuxProcess process)
}

EpollEvent event = process.getEpollEvent();

synchronized (event)
{
if(!process.isRunning())
{
return;
}
}

event.setEvents(LibEpoll.EPOLLOUT | LibEpoll.EPOLLONESHOT | LibEpoll.EPOLLRDHUP | LibEpoll.EPOLLHUP);
event.setFileDescriptor(stdin);
int rc = LibEpoll.epoll_ctl(epoll, LibEpoll.EPOLL_CTL_MOD, stdin, event.getPointer());
Expand Down Expand Up @@ -306,8 +317,7 @@ private void cleanupProcess(LinuxProcess linuxProcess, int stdinFd, int stdoutFd
return;
}

IntByReference ret = new IntByReference();
int rc = LibC.waitpid(linuxProcess.getPid(), ret, LibC.WNOHANG);
int rc = LibC.waitpid(linuxProcess.getPid(), exitCodePointer, LibC.WNOHANG);

if (rc == 0) {
deadPool.add(linuxProcess);
Expand All @@ -316,7 +326,7 @@ else if (rc < 0) {
linuxProcess.onExit((Native.getLastError() == LibC.ECHILD) ? Integer.MAX_VALUE : Integer.MIN_VALUE);
}
else {
handleExit(linuxProcess, ret.getValue());
handleExit(linuxProcess, exitCodePointer.getValue());
}
}

Expand All @@ -326,11 +336,10 @@ private void checkDeadPool()
return;
}

IntByReference ret = new IntByReference();
Iterator<LinuxProcess> iterator = deadPool.iterator();
while (iterator.hasNext()) {
LinuxProcess process = iterator.next();
int rc = LibC.waitpid(process.getPid(), ret, LibC.WNOHANG);
int rc = LibC.waitpid(process.getPid(), exitCodePointer, LibC.WNOHANG);
if (rc == 0) {
continue;
}
Expand All @@ -341,7 +350,7 @@ private void checkDeadPool()
continue;
}

handleExit(process, ret.getValue());
handleExit(process, exitCodePointer.getValue());
}
}

Expand Down
3 changes: 1 addition & 2 deletions src/main/java/com/zaxxer/nuprocess/osx/ProcessKqueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@

import com.sun.jna.Native;
import com.sun.jna.Pointer;
import com.sun.jna.ptr.IntByReference;
import com.zaxxer.nuprocess.internal.BaseEventProcessor;
import com.zaxxer.nuprocess.internal.LibC;
import com.zaxxer.nuprocess.osx.LibKevent.Kevent;
Expand Down Expand Up @@ -393,7 +392,7 @@ private void checkWaitWrites()

private void cleanupProcess(OsxProcess osxProcess)
{
LibC.waitpid(osxProcess.getPid(), new IntByReference(), LibC.WNOHANG);
LibC.waitpid(osxProcess.getPid(), exitCodePointer, LibC.WNOHANG);

// If this is the last process in the map, this thread will cleanly shut down.
pidToProcessMap.remove(osxProcess.getPid());
Expand Down
79 changes: 79 additions & 0 deletions src/test/java/com/zaxxer/nuprocess/LoadTestForStartUseCase.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package com.zaxxer.nuprocess;

import org.junit.Assert;
import org.junit.Test;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

public class LoadTestForStartUseCase
{

@Test
public void testStartLoad() throws InterruptedException
{
int durationInMs = 3000_000;
long cutOfTime = System.currentTimeMillis() + durationInMs;
int nrOfThreads = (Runtime.getRuntime().availableProcessors() * 2);
CountDownLatch latch = new CountDownLatch(nrOfThreads);
AtomicLong runCountCode0 = new AtomicLong();
AtomicLong runCountCodeNon0 = new AtomicLong();
for (int i = 0; i < nrOfThreads; i++)
{
newThread(latch, cutOfTime, runCountCode0, runCountCodeNon0);
}

Assert.assertTrue(latch.await(durationInMs + 1_000, TimeUnit.MILLISECONDS));
System.out.println("runCount 0 = " + runCountCode0.get());
System.out.println("runCount non-0 = " + runCountCodeNon0.get());
}

private void newThread(final CountDownLatch latch, final long cutOfTime, final AtomicLong zeroExit, final AtomicLong nonZeroExit)
{
new Thread(new Runnable()
{
public void run()
{
while (System.currentTimeMillis() < cutOfTime)
{
final int randomInt = ThreadLocalRandom.current().nextInt(10_000);

long startTime = System.nanoTime();
final NuProcess start = new NuProcessBuilder(new NuAbstractProcessHandler()
{
public void onPreStart(final NuProcess nuProcess)
{
super.onPreStart(nuProcess);
}

public void onExit(final int statusCode)
{
if (statusCode == 0)
{
zeroExit.incrementAndGet();
}
else
{
nonZeroExit.incrementAndGet();
}
}
}, "echo", "foo" + randomInt).start();
try
{
start.waitFor(10, TimeUnit.DAYS);
System.out.println("Took " + ((System.nanoTime() - startTime) / 1000000));
// start.wantWrite();
}
catch (InterruptedException e)
{
throw new RuntimeException(e);
}
}

latch.countDown();
}
}).start();
}
}

0 comments on commit c06bd4d

Please sign in to comment.