Skip to content

Commit

Permalink
Improvements to JNA related code.
Browse files Browse the repository at this point in the history
  * Pooling max 20 events instead of 1 event at a time in ProcessEpoll,
  * explicit freeing of out/err/in buffers instead of waiting for JNA GC RefHandler,
  * one call native memory allocation for out/err/in buffer instead of individual,
  * reusing of EpollEvent instances to avoid malloc on register and queueWrite,
  * reusing of IntByReference to avoid per method malloc call,
  * updated JNA library to 5.14.0,
  * dropped Java 1.7 support, now minimal java version we support is 8.
  • Loading branch information
avrecko committed Feb 11, 2024
1 parent 9e2f75c commit adff2b9
Show file tree
Hide file tree
Showing 8 changed files with 80 additions and 65 deletions.
6 changes: 3 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@
<dependency>
<groupId>net.java.dev.jna</groupId>
<artifactId>jna</artifactId>
<version>5.11.0</version>
<version>5.14.0</version>
</dependency>

<dependency>
Expand Down Expand Up @@ -173,8 +173,8 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.7</source>
<target>1.7</target>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ public abstract class BaseEventProcessor<T extends BasePosixProcess> implements
private CyclicBarrier startBarrier;
private AtomicBoolean isRunning;

// avoid unnecessary malloc calls by reusing native pointer
// this field is thread safe as this is only used by Epoll/KQueue thread
protected final IntByReference tempIntPointer = new IntByReference();

static {
LINGER_TIME_MS = Math.max(1000, Integer.getInteger("com.zaxxer.nuprocess.lingerTimeMs", 2500));

Expand Down
25 changes: 8 additions & 17 deletions src/main/java/com/zaxxer/nuprocess/internal/BasePosixProcess.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,7 @@ public abstract class BasePosixProcess implements NuProcess
protected AtomicBoolean userWantsWrite;

// ******* Input/Output Buffers
private Memory outBufferMemory;
private Memory errBufferMemory;
private Memory inBufferMemory;
private Memory nativeBufferMem;

protected ByteBuffer outBuffer;
protected ByteBuffer errBuffer;
Expand Down Expand Up @@ -323,16 +321,13 @@ public void onExit(int statusCode)
}
finally {
exitPending.countDown();
// Once the last reference to the buffer is gone, Java will finalize the buffer
// and release the native memory we allocated in initializeBuffers().
outBufferMemory = null;
errBufferMemory = null;
inBufferMemory = null;
// explicitly free memory without waiting for GC finalizers
nativeBufferMem.close();
nativeBufferMem = null;
outBuffer = null;
errBuffer = null;
inBuffer = null;
processHandler = null;
Memory.purge();
}
}

Expand Down Expand Up @@ -528,14 +523,10 @@ protected void initializeBuffers()

pendingWrites = new ConcurrentLinkedQueue<>();

outBufferMemory = new Memory(BUFFER_CAPACITY);
outBuffer = outBufferMemory.getByteBuffer(0, outBufferMemory.size()).order(ByteOrder.nativeOrder());

errBufferMemory = new Memory(BUFFER_CAPACITY);
errBuffer = errBufferMemory.getByteBuffer(0, outBufferMemory.size()).order(ByteOrder.nativeOrder());

inBufferMemory = new Memory(BUFFER_CAPACITY);
inBuffer = inBufferMemory.getByteBuffer(0, outBufferMemory.size()).order(ByteOrder.nativeOrder());
nativeBufferMem = new Memory(3 * BUFFER_CAPACITY);
outBuffer = nativeBufferMem.getByteBuffer(0, BUFFER_CAPACITY).order(ByteOrder.nativeOrder());
errBuffer = nativeBufferMem.getByteBuffer(BUFFER_CAPACITY, BUFFER_CAPACITY).order(ByteOrder.nativeOrder());
inBuffer = nativeBufferMem.getByteBuffer(2 * BUFFER_CAPACITY, BUFFER_CAPACITY).order(ByteOrder.nativeOrder());

// Ensure stdin initially has 0 bytes pending write. We'll
// update this before invoking onStdinReady.
Expand Down
7 changes: 5 additions & 2 deletions src/main/java/com/zaxxer/nuprocess/linux/EpollEvent.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ class EpollEvent
pointer = new Memory(size);
}

EpollEvent(Pointer pointer) {
this.pointer = pointer;
}

int getEvents() {
return pointer.getInt(eventsOffset);
}
Expand Down Expand Up @@ -79,7 +83,7 @@ public static class EpollEventPrototype extends Structure
public int events;
public EpollData data;

EpollEventPrototype() {
public EpollEventPrototype() {
super(detectAlignment());

data = new EpollData();
Expand All @@ -91,7 +95,6 @@ int getFieldOffset(String field)
return fieldOffset(field);
}

@SuppressWarnings("rawtypes")
@Override
protected List<String> getFieldOrder() {
return Arrays.asList("events", "data");
Expand Down
8 changes: 0 additions & 8 deletions src/main/java/com/zaxxer/nuprocess/linux/LibEpoll.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,6 @@ public class LibEpoll

public static native int epoll_ctl(int epfd, int op, int fd, Pointer event);

// We only ever call this API with maxevents=1. However, if calling with maxevents > 1,
// care must be taken to ensure that the "events" Pointer actually points to a
// contiguous block of memory large enough to handle maxevents number of EpollEvent
// mappings.
//
// EpollEvent would likely need to be updated to add a convenience method that
// allocates a block of memory and returns an array of EpollEvents mapped into it. The
// EpollEvent.getPointer() of the first array element could then be passed to this API.
public static native int epoll_wait(int epfd, Pointer events, int maxevents, int timeout);

public static final int SIGPIPE = 13;
Expand Down
15 changes: 0 additions & 15 deletions src/main/java/com/zaxxer/nuprocess/linux/LinuxProcess.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@
*/
public class LinuxProcess extends BasePosixProcess
{
private final EpollEvent epollEvent;

static {
LibEpoll.sigignore(LibEpoll.SIGPIPE);

Expand All @@ -57,8 +55,6 @@ private enum LaunchMechanism {

LinuxProcess(NuProcessHandler processListener) {
super(processListener);

epollEvent = new EpollEvent();
}

@Override
Expand Down Expand Up @@ -116,17 +112,6 @@ public void run(List<String> command, String[] environment, Path cwd)
}
}

/**
* An {@link EpollEvent} struct, which may be used when registering for events for this process. Each process has
* its own struct to avoid concurrency issues in {@link ProcessEpoll#registerProcess} when multiple processes are
* registered at once (e.g. multiple threads are all starting new processes concurrently).
*
* @return this process's {@link EpollEvent} struct
*/
EpollEvent getEpollEvent() {
return epollEvent;
}

private void prepareProcess(List<String> command, String[] environment, Path cwd) throws IOException
{
String[] cmdarray = command.toArray(new String[0]);
Expand Down
77 changes: 59 additions & 18 deletions src/main/java/com/zaxxer/nuprocess/linux/ProcessEpoll.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import com.sun.jna.Native;
import com.sun.jna.Structure;
import com.sun.jna.ptr.IntByReference;
import com.zaxxer.nuprocess.NuProcess;
import com.zaxxer.nuprocess.internal.BaseEventProcessor;
Expand All @@ -36,10 +39,19 @@
*/
class ProcessEpoll extends BaseEventProcessor<LinuxProcess>
{
private static final int EPOLL_MAX_EVENTS = 20;
private final int epoll;
private final EpollEvent triggeredEvent;
private final EpollEvent[] triggeredEvents;
private final EpollEvent firstTriggeredEvent;
private final List<LinuxProcess> deadPool;

// preferring (assuming uncontented) locking over memory allocations
// used in #registration and #queueWrite methods
private final EpollEvent tempEventForRegistration = new EpollEvent();
private final EpollEvent tempEventForQueueWrite = new EpollEvent();
private final Lock tempEventForRegistrationLock = new ReentrantLock();
private final Lock tempEventForQueueWriteLock = new ReentrantLock();

private LinuxProcess process;

ProcessEpoll()
Expand Down Expand Up @@ -67,7 +79,17 @@ private ProcessEpoll(int lingerIterations)
throw new RuntimeException("Unable to create kqueue, errno: " + errno);
}

triggeredEvent = new EpollEvent();
triggeredEvents = new EpollEvent[EPOLL_MAX_EVENTS];

EpollEvent.EpollEventPrototype epollEvent = Structure.newInstance(EpollEvent.EpollEventPrototype.class);
EpollEvent.EpollEventPrototype[] array = (EpollEvent.EpollEventPrototype[]) epollEvent.toArray(EPOLL_MAX_EVENTS);

for (int i = 0; i < EPOLL_MAX_EVENTS; i++) {
EpollEvent.EpollEventPrototype proto = array[i];
triggeredEvents[i] = new EpollEvent(proto.getPointer());
}

firstTriggeredEvent = triggeredEvents[0];

deadPool = new LinkedList<>();
}
Expand All @@ -86,6 +108,9 @@ public void registerProcess(LinuxProcess process)
int stdinFd = Integer.MIN_VALUE;
int stdoutFd = Integer.MIN_VALUE;
int stderrFd = Integer.MIN_VALUE;

tempEventForRegistrationLock.lock();

try {
stdinFd = process.getStdin().acquire();
stdoutFd = process.getStdout().acquire();
Expand All @@ -96,7 +121,7 @@ public void registerProcess(LinuxProcess process)
fildesToProcessMap.put(stdoutFd, process);
fildesToProcessMap.put(stderrFd, process);

EpollEvent event = process.getEpollEvent();
EpollEvent event = tempEventForRegistration;
event.setEvents(LibEpoll.EPOLLIN);
event.setFileDescriptor(stdoutFd);
int rc = LibEpoll.epoll_ctl(epoll, LibEpoll.EPOLL_CTL_ADD, stdoutFd, event.getPointer());
Expand All @@ -114,6 +139,7 @@ public void registerProcess(LinuxProcess process)
}
}
finally {
tempEventForRegistrationLock.unlock();
if (stdinFd != Integer.MIN_VALUE) {
process.getStdin().release();
}
Expand All @@ -133,13 +159,15 @@ public void queueWrite(LinuxProcess process)
return;
}

tempEventForQueueWriteLock.lock();

try {
int stdin = process.getStdin().acquire();
if (stdin == -1) {
return;
}

EpollEvent event = process.getEpollEvent();
EpollEvent event = tempEventForQueueWrite;
event.setEvents(LibEpoll.EPOLLOUT | LibEpoll.EPOLLONESHOT | LibEpoll.EPOLLRDHUP | LibEpoll.EPOLLHUP);
event.setFileDescriptor(stdin);
int rc = LibEpoll.epoll_ctl(epoll, LibEpoll.EPOLL_CTL_MOD, stdin, event.getPointer());
Expand All @@ -153,6 +181,7 @@ public void queueWrite(LinuxProcess process)
}
}
finally {
tempEventForQueueWriteLock.unlock();
process.getStdin().release();
}
}
Expand All @@ -167,6 +196,9 @@ public void run()
// the handler's onExit is called before LinuxProcess.run returns.
waitForDeadPool();
}
// todo: we could explicitly free tempEventForRegistration and tempEventForQueueWrite native memory
// but not sure if further use of the objects is technically not possible
// for now the JNA GC Ref Handler will eventually free this two objects
}

@Override
Expand All @@ -186,12 +218,8 @@ public void closeStdin(LinuxProcess process)
@Override
public boolean process()
{
int stdinFd = Integer.MIN_VALUE;
int stdoutFd = Integer.MIN_VALUE;
int stderrFd = Integer.MIN_VALUE;
LinuxProcess linuxProcess = null;
try {
int nev = LibEpoll.epoll_wait(epoll, triggeredEvent.getPointer(), 1, DEADPOOL_POLL_INTERVAL);
int nev = LibEpoll.epoll_wait(epoll, firstTriggeredEvent.getPointer(), EPOLL_MAX_EVENTS, DEADPOOL_POLL_INTERVAL);
if (nev == -1) {
int errno = Native.getLastError();
if (errno == LibC.EINTR) {
Expand All @@ -210,13 +238,30 @@ public boolean process()
return false;
}

EpollEvent epEvent = triggeredEvent;
for (int i = 0; i < nev; i++) {
EpollEvent triggeredEvent = triggeredEvents[i];
processEpollEvent(triggeredEvent);
}

return true;
}
finally {
checkDeadPool();
}
}

private void processEpollEvent(EpollEvent epEvent) {
int stdinFd = Integer.MIN_VALUE;
int stdoutFd = Integer.MIN_VALUE;
int stderrFd = Integer.MIN_VALUE;
LinuxProcess linuxProcess = null;
try {
int ident = epEvent.getFileDescriptor();
int events = epEvent.getEvents();

linuxProcess = fildesToProcessMap.get(ident);
if (linuxProcess == null) {
return true;
return;
}

stdinFd = linuxProcess.getStdin().acquire();
Expand Down Expand Up @@ -256,8 +301,6 @@ else if (ident == stdinFd) {
if (linuxProcess.isSoftExit()) {
cleanupProcess(linuxProcess, stdinFd, stdoutFd, stderrFd);
}

return true;
}
finally {
if (linuxProcess != null) {
Expand All @@ -271,7 +314,6 @@ else if (ident == stdinFd) {
linuxProcess.getStderr().release();
}
}
checkDeadPool();
}
}

Expand Down Expand Up @@ -306,8 +348,7 @@ private void cleanupProcess(LinuxProcess linuxProcess, int stdinFd, int stdoutFd
return;
}

IntByReference ret = new IntByReference();
int rc = LibC.waitpid(linuxProcess.getPid(), ret, LibC.WNOHANG);
int rc = LibC.waitpid(linuxProcess.getPid(), tempIntPointer, LibC.WNOHANG);

if (rc == 0) {
deadPool.add(linuxProcess);
Expand All @@ -316,7 +357,7 @@ else if (rc < 0) {
linuxProcess.onExit((Native.getLastError() == LibC.ECHILD) ? Integer.MAX_VALUE : Integer.MIN_VALUE);
}
else {
handleExit(linuxProcess, ret.getValue());
handleExit(linuxProcess, tempIntPointer.getValue());
}
}

Expand All @@ -326,7 +367,7 @@ private void checkDeadPool()
return;
}

IntByReference ret = new IntByReference();
IntByReference ret = tempIntPointer;
Iterator<LinuxProcess> iterator = deadPool.iterator();
while (iterator.hasNext()) {
LinuxProcess process = iterator.next();
Expand Down
3 changes: 1 addition & 2 deletions src/main/java/com/zaxxer/nuprocess/osx/ProcessKqueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@

import com.sun.jna.Native;
import com.sun.jna.Pointer;
import com.sun.jna.ptr.IntByReference;
import com.zaxxer.nuprocess.internal.BaseEventProcessor;
import com.zaxxer.nuprocess.internal.LibC;
import com.zaxxer.nuprocess.osx.LibKevent.Kevent;
Expand Down Expand Up @@ -393,7 +392,7 @@ private void checkWaitWrites()

private void cleanupProcess(OsxProcess osxProcess)
{
LibC.waitpid(osxProcess.getPid(), new IntByReference(), LibC.WNOHANG);
LibC.waitpid(osxProcess.getPid(), tempIntPointer, LibC.WNOHANG);

// If this is the last process in the map, this thread will cleanly shut down.
pidToProcessMap.remove(osxProcess.getPid());
Expand Down

0 comments on commit adff2b9

Please sign in to comment.