Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't depend on the GC to free native memory on Linux #151

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@
<dependency>
<groupId>net.java.dev.jna</groupId>
<artifactId>jna</artifactId>
<version>5.11.0</version>
<version>5.13.0</version>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ public abstract class BaseEventProcessor<T extends BasePosixProcess> implements
private CyclicBarrier startBarrier;
private AtomicBoolean isRunning;

protected final IntByReference exitCodePointer = new IntByReference();

static {
LINGER_TIME_MS = Math.max(1000, Integer.getInteger("com.zaxxer.nuprocess.lingerTimeMs", 2500));

Expand Down Expand Up @@ -128,6 +130,7 @@ public void shutdown()
process.onExit(Integer.MAX_VALUE - 1);
LibC.waitpid(process.getPid(), exitCode, LibC.WNOHANG);
}
Util.close(exitCode);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,16 +323,16 @@ public void onExit(int statusCode)
}
finally {
exitPending.countDown();
// Once the last reference to the buffer is gone, Java will finalize the buffer
// and release the native memory we allocated in initializeBuffers().
outBufferMemory.close();
errBufferMemory.close();
inBufferMemory.close();
outBufferMemory = null;
errBufferMemory = null;
inBufferMemory = null;
outBuffer = null;
errBuffer = null;
inBuffer = null;
processHandler = null;
Memory.purge();
}
}

Expand Down
22 changes: 22 additions & 0 deletions src/main/java/com/zaxxer/nuprocess/internal/Util.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package com.zaxxer.nuprocess.internal;

import com.sun.jna.Memory;
import com.sun.jna.Pointer;
import com.sun.jna.PointerType;

public class Util {

public static void close(PointerType pointerType) {
if (pointerType == null) {
return;
}
close(pointerType.getPointer());
}

public static void close(Pointer p) {
if (p instanceof Memory) {
((Memory) p).close();
}
}

}
6 changes: 6 additions & 0 deletions src/main/java/com/zaxxer/nuprocess/linux/EpollEvent.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.List;

import com.sun.jna.*;
import com.zaxxer.nuprocess.internal.Util;

class EpollEvent
{
Expand Down Expand Up @@ -64,6 +65,11 @@ int size() {
return size;
}

public void close()
{
Util.close(getPointer());
}

public static class EpollEventPrototype extends Structure
{
/*
Expand Down
13 changes: 0 additions & 13 deletions src/main/java/com/zaxxer/nuprocess/linux/LinuxProcess.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
*/
public class LinuxProcess extends BasePosixProcess
{
private final EpollEvent epollEvent;

static {
LibEpoll.sigignore(LibEpoll.SIGPIPE);
Expand All @@ -58,7 +57,6 @@ private enum LaunchMechanism {
LinuxProcess(NuProcessHandler processListener) {
super(processListener);

epollEvent = new EpollEvent();
}

@Override
Expand Down Expand Up @@ -117,17 +115,6 @@ public void run(List<String> command, String[] environment, Path cwd)
}
}

/**
* An {@link EpollEvent} struct, which may be used when registering for events for this process. Each process has
* its own struct to avoid concurrency issues in {@link ProcessEpoll#registerProcess} when multiple processes are
* registered at once (e.g. multiple threads are all starting new processes concurrently).
*
* @return this process's {@link EpollEvent} struct
*/
EpollEvent getEpollEvent() {
return epollEvent;
}

private void prepareProcess(List<String> command, String[] environment, Path cwd) throws IOException
{
String[] cmdarray = command.toArray(new String[0]);
Expand Down
20 changes: 11 additions & 9 deletions src/main/java/com/zaxxer/nuprocess/linux/ProcessEpoll.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.util.List;

import com.sun.jna.Native;
import com.sun.jna.ptr.IntByReference;
import com.zaxxer.nuprocess.NuProcess;
import com.zaxxer.nuprocess.internal.BaseEventProcessor;
import com.zaxxer.nuprocess.internal.LibC;
Expand Down Expand Up @@ -86,6 +85,7 @@ public void registerProcess(LinuxProcess process)
int stdinFd = Integer.MIN_VALUE;
int stdoutFd = Integer.MIN_VALUE;
int stderrFd = Integer.MIN_VALUE;
EpollEvent event = new EpollEvent();
try {
stdinFd = process.getStdin().acquire();
stdoutFd = process.getStdout().acquire();
Expand All @@ -96,7 +96,6 @@ public void registerProcess(LinuxProcess process)
fildesToProcessMap.put(stdoutFd, process);
fildesToProcessMap.put(stderrFd, process);

EpollEvent event = process.getEpollEvent();
event.setEvents(LibEpoll.EPOLLIN);
event.setFileDescriptor(stdoutFd);
int rc = LibEpoll.epoll_ctl(epoll, LibEpoll.EPOLL_CTL_ADD, stdoutFd, event.getPointer());
Expand All @@ -114,6 +113,7 @@ public void registerProcess(LinuxProcess process)
}
}
finally {
event.close();
if (stdinFd != Integer.MIN_VALUE) {
process.getStdin().release();
}
Expand All @@ -133,13 +133,14 @@ public void queueWrite(LinuxProcess process)
return;
}

EpollEvent event = null;
try {
int stdin = process.getStdin().acquire();
if (stdin == -1) {
return;
}

EpollEvent event = process.getEpollEvent();
event = new EpollEvent();
event.setEvents(LibEpoll.EPOLLOUT | LibEpoll.EPOLLONESHOT | LibEpoll.EPOLLRDHUP | LibEpoll.EPOLLHUP);
event.setFileDescriptor(stdin);
int rc = LibEpoll.epoll_ctl(epoll, LibEpoll.EPOLL_CTL_MOD, stdin, event.getPointer());
Expand All @@ -153,6 +154,9 @@ public void queueWrite(LinuxProcess process)
}
}
finally {
if (event != null) {
event.close();
}
process.getStdin().release();
}
}
Expand Down Expand Up @@ -306,8 +310,7 @@ private void cleanupProcess(LinuxProcess linuxProcess, int stdinFd, int stdoutFd
return;
}

IntByReference ret = new IntByReference();
int rc = LibC.waitpid(linuxProcess.getPid(), ret, LibC.WNOHANG);
int rc = LibC.waitpid(linuxProcess.getPid(), exitCodePointer, LibC.WNOHANG);

if (rc == 0) {
deadPool.add(linuxProcess);
Expand All @@ -316,7 +319,7 @@ else if (rc < 0) {
linuxProcess.onExit((Native.getLastError() == LibC.ECHILD) ? Integer.MAX_VALUE : Integer.MIN_VALUE);
}
else {
handleExit(linuxProcess, ret.getValue());
handleExit(linuxProcess, exitCodePointer.getValue());
}
}

Expand All @@ -326,11 +329,10 @@ private void checkDeadPool()
return;
}

IntByReference ret = new IntByReference();
Iterator<LinuxProcess> iterator = deadPool.iterator();
while (iterator.hasNext()) {
LinuxProcess process = iterator.next();
int rc = LibC.waitpid(process.getPid(), ret, LibC.WNOHANG);
int rc = LibC.waitpid(process.getPid(), exitCodePointer, LibC.WNOHANG);
if (rc == 0) {
continue;
}
Expand All @@ -341,7 +343,7 @@ private void checkDeadPool()
continue;
}

handleExit(process, ret.getValue());
handleExit(process, exitCodePointer.getValue());
}
}

Expand Down
3 changes: 1 addition & 2 deletions src/main/java/com/zaxxer/nuprocess/osx/ProcessKqueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@

import com.sun.jna.Native;
import com.sun.jna.Pointer;
import com.sun.jna.ptr.IntByReference;
import com.zaxxer.nuprocess.internal.BaseEventProcessor;
import com.zaxxer.nuprocess.internal.LibC;
import com.zaxxer.nuprocess.osx.LibKevent.Kevent;
Expand Down Expand Up @@ -393,7 +392,7 @@ private void checkWaitWrites()

private void cleanupProcess(OsxProcess osxProcess)
{
LibC.waitpid(osxProcess.getPid(), new IntByReference(), LibC.WNOHANG);
LibC.waitpid(osxProcess.getPid(), exitCodePointer, LibC.WNOHANG);

// If this is the last process in the map, this thread will cleanly shut down.
pidToProcessMap.remove(osxProcess.getPid());
Expand Down
80 changes: 80 additions & 0 deletions src/test/java/com/zaxxer/nuprocess/ThreadLoadTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package com.zaxxer.nuprocess;

import org.junit.Assert;
import org.junit.Test;

import java.nio.ByteBuffer;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

/**
* Yet another threaded test.
*/
public class ThreadLoadTest {

@Test
public void testStartLoad() throws InterruptedException {
int durationInMs = 15_000;
long cutOfTime = System.currentTimeMillis() + durationInMs;
int nrOfThreads = (Runtime.getRuntime().availableProcessors() * 2);
CountDownLatch latch = new CountDownLatch(nrOfThreads);
AtomicLong runCountCode0 = new AtomicLong();
AtomicLong runCountCodeNon0 = new AtomicLong();
AtomicLong problems = new AtomicLong();
for (int i = 0; i < nrOfThreads; i++) {
startNewThread(latch, cutOfTime, runCountCode0, runCountCodeNon0, problems);
}

Assert.assertTrue(latch.await(durationInMs + 1_000, TimeUnit.MILLISECONDS));
System.out.println("runCount 0 = " + runCountCode0.get());
System.out.println("runCount non-0 = " + runCountCodeNon0.get());
System.out.println("problems = " + problems.get());
}

private void startNewThread(final CountDownLatch latch, final long cutOfTime, final AtomicLong zeroExit, final AtomicLong nonZeroExit, AtomicLong problems) {
new Thread(new Runnable() {
public void run() {
while (System.currentTimeMillis() < cutOfTime) {
final int randomInt = ThreadLocalRandom.current().nextInt(10_000);
final String text = "foo" + randomInt;
long startTime = System.nanoTime();

final NuProcess start = new NuProcessBuilder(new NuAbstractProcessHandler() {
public void onPreStart(final NuProcess nuProcess) {
super.onPreStart(nuProcess);
}

@Override
public void onStart(NuProcess nuProcess) {
}

@Override
public void onStdout(ByteBuffer buffer, boolean closed) {

}

public void onExit(final int statusCode) {
if (statusCode == 0) {
zeroExit.incrementAndGet();
} else {
nonZeroExit.incrementAndGet();
}
}
}, "echo", text).start();
try {
start.waitFor(10, TimeUnit.DAYS);

// System.out.println("Took " + ((System.nanoTime() - startTime) / 1000000));
// start.wantWrite();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

latch.countDown();
}
}).start();
}
}