diff --git a/pom.xml b/pom.xml index 9f1cbdc..af66c5a 100644 --- a/pom.xml +++ b/pom.xml @@ -68,7 +68,7 @@ net.java.dev.jna jna - 5.11.0 + 5.14.0 @@ -173,8 +173,8 @@ org.apache.maven.plugins maven-compiler-plugin - 1.7 - 1.7 + 1.8 + 1.8 diff --git a/src/main/java/com/zaxxer/nuprocess/internal/BaseEventProcessor.java b/src/main/java/com/zaxxer/nuprocess/internal/BaseEventProcessor.java index 33ff02b..7e7387c 100644 --- a/src/main/java/com/zaxxer/nuprocess/internal/BaseEventProcessor.java +++ b/src/main/java/com/zaxxer/nuprocess/internal/BaseEventProcessor.java @@ -48,6 +48,10 @@ public abstract class BaseEventProcessor implements private CyclicBarrier startBarrier; private AtomicBoolean isRunning; + // avoid unnecessary malloc calls by reusing native pointer + // this field is thread safe as this is only used by Epoll/KQueue thread + protected final IntByReference tempIntPointer = new IntByReference(); + static { LINGER_TIME_MS = Math.max(1000, Integer.getInteger("com.zaxxer.nuprocess.lingerTimeMs", 2500)); diff --git a/src/main/java/com/zaxxer/nuprocess/internal/BasePosixProcess.java b/src/main/java/com/zaxxer/nuprocess/internal/BasePosixProcess.java index 5b8f19d..f61764a 100644 --- a/src/main/java/com/zaxxer/nuprocess/internal/BasePosixProcess.java +++ b/src/main/java/com/zaxxer/nuprocess/internal/BasePosixProcess.java @@ -65,9 +65,7 @@ public abstract class BasePosixProcess implements NuProcess protected AtomicBoolean userWantsWrite; // ******* Input/Output Buffers - private Memory outBufferMemory; - private Memory errBufferMemory; - private Memory inBufferMemory; + private Memory nativeBufferMem; protected ByteBuffer outBuffer; protected ByteBuffer errBuffer; @@ -323,16 +321,13 @@ public void onExit(int statusCode) } finally { exitPending.countDown(); - // Once the last reference to the buffer is gone, Java will finalize the buffer - // and release the native memory we allocated in initializeBuffers(). - outBufferMemory = null; - errBufferMemory = null; - inBufferMemory = null; + // explicitly free memory without waiting for GC finalizers + nativeBufferMem.close(); + nativeBufferMem = null; outBuffer = null; errBuffer = null; inBuffer = null; processHandler = null; - Memory.purge(); } } @@ -528,14 +523,10 @@ protected void initializeBuffers() pendingWrites = new ConcurrentLinkedQueue<>(); - outBufferMemory = new Memory(BUFFER_CAPACITY); - outBuffer = outBufferMemory.getByteBuffer(0, outBufferMemory.size()).order(ByteOrder.nativeOrder()); - - errBufferMemory = new Memory(BUFFER_CAPACITY); - errBuffer = errBufferMemory.getByteBuffer(0, outBufferMemory.size()).order(ByteOrder.nativeOrder()); - - inBufferMemory = new Memory(BUFFER_CAPACITY); - inBuffer = inBufferMemory.getByteBuffer(0, outBufferMemory.size()).order(ByteOrder.nativeOrder()); + nativeBufferMem = new Memory(3 * BUFFER_CAPACITY); + outBuffer = nativeBufferMem.getByteBuffer(0, BUFFER_CAPACITY).order(ByteOrder.nativeOrder()); + errBuffer = nativeBufferMem.getByteBuffer(BUFFER_CAPACITY, BUFFER_CAPACITY).order(ByteOrder.nativeOrder()); + inBuffer = nativeBufferMem.getByteBuffer(2 * BUFFER_CAPACITY, BUFFER_CAPACITY).order(ByteOrder.nativeOrder()); // Ensure stdin initially has 0 bytes pending write. We'll // update this before invoking onStdinReady. diff --git a/src/main/java/com/zaxxer/nuprocess/linux/EpollEvent.java b/src/main/java/com/zaxxer/nuprocess/linux/EpollEvent.java index bbe58e7..ae7ce1b 100644 --- a/src/main/java/com/zaxxer/nuprocess/linux/EpollEvent.java +++ b/src/main/java/com/zaxxer/nuprocess/linux/EpollEvent.java @@ -40,6 +40,10 @@ class EpollEvent pointer = new Memory(size); } + EpollEvent(Pointer pointer) { + this.pointer = pointer; + } + int getEvents() { return pointer.getInt(eventsOffset); } @@ -79,7 +83,7 @@ public static class EpollEventPrototype extends Structure public int events; public EpollData data; - EpollEventPrototype() { + public EpollEventPrototype() { super(detectAlignment()); data = new EpollData(); @@ -91,7 +95,6 @@ int getFieldOffset(String field) return fieldOffset(field); } - @SuppressWarnings("rawtypes") @Override protected List getFieldOrder() { return Arrays.asList("events", "data"); diff --git a/src/main/java/com/zaxxer/nuprocess/linux/LibEpoll.java b/src/main/java/com/zaxxer/nuprocess/linux/LibEpoll.java index 86fa539..8e55de0 100644 --- a/src/main/java/com/zaxxer/nuprocess/linux/LibEpoll.java +++ b/src/main/java/com/zaxxer/nuprocess/linux/LibEpoll.java @@ -35,14 +35,6 @@ public class LibEpoll public static native int epoll_ctl(int epfd, int op, int fd, Pointer event); - // We only ever call this API with maxevents=1. However, if calling with maxevents > 1, - // care must be taken to ensure that the "events" Pointer actually points to a - // contiguous block of memory large enough to handle maxevents number of EpollEvent - // mappings. - // - // EpollEvent would likely need to be updated to add a convenience method that - // allocates a block of memory and returns an array of EpollEvents mapped into it. The - // EpollEvent.getPointer() of the first array element could then be passed to this API. public static native int epoll_wait(int epfd, Pointer events, int maxevents, int timeout); public static final int SIGPIPE = 13; diff --git a/src/main/java/com/zaxxer/nuprocess/linux/LinuxProcess.java b/src/main/java/com/zaxxer/nuprocess/linux/LinuxProcess.java index d4c6bca..a0ff29f 100644 --- a/src/main/java/com/zaxxer/nuprocess/linux/LinuxProcess.java +++ b/src/main/java/com/zaxxer/nuprocess/linux/LinuxProcess.java @@ -35,8 +35,6 @@ */ public class LinuxProcess extends BasePosixProcess { - private final EpollEvent epollEvent; - static { LibEpoll.sigignore(LibEpoll.SIGPIPE); @@ -57,8 +55,6 @@ private enum LaunchMechanism { LinuxProcess(NuProcessHandler processListener) { super(processListener); - - epollEvent = new EpollEvent(); } @Override @@ -116,17 +112,6 @@ public void run(List command, String[] environment, Path cwd) } } - /** - * An {@link EpollEvent} struct, which may be used when registering for events for this process. Each process has - * its own struct to avoid concurrency issues in {@link ProcessEpoll#registerProcess} when multiple processes are - * registered at once (e.g. multiple threads are all starting new processes concurrently). - * - * @return this process's {@link EpollEvent} struct - */ - EpollEvent getEpollEvent() { - return epollEvent; - } - private void prepareProcess(List command, String[] environment, Path cwd) throws IOException { String[] cmdarray = command.toArray(new String[0]); diff --git a/src/main/java/com/zaxxer/nuprocess/linux/ProcessEpoll.java b/src/main/java/com/zaxxer/nuprocess/linux/ProcessEpoll.java index 54c09fc..dce488d 100644 --- a/src/main/java/com/zaxxer/nuprocess/linux/ProcessEpoll.java +++ b/src/main/java/com/zaxxer/nuprocess/linux/ProcessEpoll.java @@ -19,8 +19,11 @@ import java.util.Iterator; import java.util.LinkedList; import java.util.List; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import com.sun.jna.Native; +import com.sun.jna.Structure; import com.sun.jna.ptr.IntByReference; import com.zaxxer.nuprocess.NuProcess; import com.zaxxer.nuprocess.internal.BaseEventProcessor; @@ -36,10 +39,19 @@ */ class ProcessEpoll extends BaseEventProcessor { + private static final int EPOLL_MAX_EVENTS = 20; private final int epoll; - private final EpollEvent triggeredEvent; + private final EpollEvent[] triggeredEvents; + private final EpollEvent firstTriggeredEvent; private final List deadPool; + // preferring (assuming uncontented) locking over memory allocations + // used in #registration and #queueWrite methods + private final EpollEvent tempEventForRegistration = new EpollEvent(); + private final EpollEvent tempEventForQueueWrite = new EpollEvent(); + private final Lock tempEventForRegistrationLock = new ReentrantLock(); + private final Lock tempEventForQueueWriteLock = new ReentrantLock(); + private LinuxProcess process; ProcessEpoll() @@ -67,7 +79,17 @@ private ProcessEpoll(int lingerIterations) throw new RuntimeException("Unable to create kqueue, errno: " + errno); } - triggeredEvent = new EpollEvent(); + triggeredEvents = new EpollEvent[EPOLL_MAX_EVENTS]; + + EpollEvent.EpollEventPrototype epollEvent = Structure.newInstance(EpollEvent.EpollEventPrototype.class); + EpollEvent.EpollEventPrototype[] array = (EpollEvent.EpollEventPrototype[]) epollEvent.toArray(EPOLL_MAX_EVENTS); + + for (int i = 0; i < EPOLL_MAX_EVENTS; i++) { + EpollEvent.EpollEventPrototype proto = array[i]; + triggeredEvents[i] = new EpollEvent(proto.getPointer()); + } + + firstTriggeredEvent = triggeredEvents[0]; deadPool = new LinkedList<>(); } @@ -86,6 +108,9 @@ public void registerProcess(LinuxProcess process) int stdinFd = Integer.MIN_VALUE; int stdoutFd = Integer.MIN_VALUE; int stderrFd = Integer.MIN_VALUE; + + tempEventForRegistrationLock.lock(); + try { stdinFd = process.getStdin().acquire(); stdoutFd = process.getStdout().acquire(); @@ -96,7 +121,7 @@ public void registerProcess(LinuxProcess process) fildesToProcessMap.put(stdoutFd, process); fildesToProcessMap.put(stderrFd, process); - EpollEvent event = process.getEpollEvent(); + EpollEvent event = tempEventForRegistration; event.setEvents(LibEpoll.EPOLLIN); event.setFileDescriptor(stdoutFd); int rc = LibEpoll.epoll_ctl(epoll, LibEpoll.EPOLL_CTL_ADD, stdoutFd, event.getPointer()); @@ -114,6 +139,7 @@ public void registerProcess(LinuxProcess process) } } finally { + tempEventForRegistrationLock.unlock(); if (stdinFd != Integer.MIN_VALUE) { process.getStdin().release(); } @@ -133,13 +159,15 @@ public void queueWrite(LinuxProcess process) return; } + tempEventForQueueWriteLock.lock(); + try { int stdin = process.getStdin().acquire(); if (stdin == -1) { return; } - EpollEvent event = process.getEpollEvent(); + EpollEvent event = tempEventForQueueWrite; event.setEvents(LibEpoll.EPOLLOUT | LibEpoll.EPOLLONESHOT | LibEpoll.EPOLLRDHUP | LibEpoll.EPOLLHUP); event.setFileDescriptor(stdin); int rc = LibEpoll.epoll_ctl(epoll, LibEpoll.EPOLL_CTL_MOD, stdin, event.getPointer()); @@ -153,6 +181,7 @@ public void queueWrite(LinuxProcess process) } } finally { + tempEventForQueueWriteLock.unlock(); process.getStdin().release(); } } @@ -167,6 +196,9 @@ public void run() // the handler's onExit is called before LinuxProcess.run returns. waitForDeadPool(); } + // todo: we could explicitly free tempEventForRegistration and tempEventForQueueWrite native memory + // but not sure if further use of the objects is technically not possible + // for now the JNA GC Ref Handler will eventually free this two objects } @Override @@ -186,12 +218,8 @@ public void closeStdin(LinuxProcess process) @Override public boolean process() { - int stdinFd = Integer.MIN_VALUE; - int stdoutFd = Integer.MIN_VALUE; - int stderrFd = Integer.MIN_VALUE; - LinuxProcess linuxProcess = null; try { - int nev = LibEpoll.epoll_wait(epoll, triggeredEvent.getPointer(), 1, DEADPOOL_POLL_INTERVAL); + int nev = LibEpoll.epoll_wait(epoll, firstTriggeredEvent.getPointer(), EPOLL_MAX_EVENTS, DEADPOOL_POLL_INTERVAL); if (nev == -1) { int errno = Native.getLastError(); if (errno == LibC.EINTR) { @@ -210,13 +238,30 @@ public boolean process() return false; } - EpollEvent epEvent = triggeredEvent; + for (int i = 0; i < nev; i++) { + EpollEvent triggeredEvent = triggeredEvents[i]; + processEpollEvent(triggeredEvent); + } + + return true; + } + finally { + checkDeadPool(); + } + } + + private void processEpollEvent(EpollEvent epEvent) { + int stdinFd = Integer.MIN_VALUE; + int stdoutFd = Integer.MIN_VALUE; + int stderrFd = Integer.MIN_VALUE; + LinuxProcess linuxProcess = null; + try { int ident = epEvent.getFileDescriptor(); int events = epEvent.getEvents(); linuxProcess = fildesToProcessMap.get(ident); if (linuxProcess == null) { - return true; + return; } stdinFd = linuxProcess.getStdin().acquire(); @@ -256,8 +301,6 @@ else if (ident == stdinFd) { if (linuxProcess.isSoftExit()) { cleanupProcess(linuxProcess, stdinFd, stdoutFd, stderrFd); } - - return true; } finally { if (linuxProcess != null) { @@ -271,7 +314,6 @@ else if (ident == stdinFd) { linuxProcess.getStderr().release(); } } - checkDeadPool(); } } @@ -306,8 +348,7 @@ private void cleanupProcess(LinuxProcess linuxProcess, int stdinFd, int stdoutFd return; } - IntByReference ret = new IntByReference(); - int rc = LibC.waitpid(linuxProcess.getPid(), ret, LibC.WNOHANG); + int rc = LibC.waitpid(linuxProcess.getPid(), tempIntPointer, LibC.WNOHANG); if (rc == 0) { deadPool.add(linuxProcess); @@ -316,7 +357,7 @@ else if (rc < 0) { linuxProcess.onExit((Native.getLastError() == LibC.ECHILD) ? Integer.MAX_VALUE : Integer.MIN_VALUE); } else { - handleExit(linuxProcess, ret.getValue()); + handleExit(linuxProcess, tempIntPointer.getValue()); } } @@ -326,7 +367,7 @@ private void checkDeadPool() return; } - IntByReference ret = new IntByReference(); + IntByReference ret = tempIntPointer; Iterator iterator = deadPool.iterator(); while (iterator.hasNext()) { LinuxProcess process = iterator.next(); diff --git a/src/main/java/com/zaxxer/nuprocess/osx/ProcessKqueue.java b/src/main/java/com/zaxxer/nuprocess/osx/ProcessKqueue.java index 210bf23..1b7cc90 100644 --- a/src/main/java/com/zaxxer/nuprocess/osx/ProcessKqueue.java +++ b/src/main/java/com/zaxxer/nuprocess/osx/ProcessKqueue.java @@ -30,7 +30,6 @@ import com.sun.jna.Native; import com.sun.jna.Pointer; -import com.sun.jna.ptr.IntByReference; import com.zaxxer.nuprocess.internal.BaseEventProcessor; import com.zaxxer.nuprocess.internal.LibC; import com.zaxxer.nuprocess.osx.LibKevent.Kevent; @@ -393,7 +392,7 @@ private void checkWaitWrites() private void cleanupProcess(OsxProcess osxProcess) { - LibC.waitpid(osxProcess.getPid(), new IntByReference(), LibC.WNOHANG); + LibC.waitpid(osxProcess.getPid(), tempIntPointer, LibC.WNOHANG); // If this is the last process in the map, this thread will cleanly shut down. pidToProcessMap.remove(osxProcess.getPid());