Skip to content

Commit

Permalink
Fix issue jnr#15 - KQSelectionKey.interestOps(...) must update events
Browse files Browse the repository at this point in the history
directly
  • Loading branch information
Marcus Linke committed Nov 8, 2016
1 parent a7c187e commit 23d9b2d
Showing 1 changed file with 84 additions and 86 deletions.
170 changes: 84 additions & 86 deletions src/main/java/jnr/enxio/channels/KQSelector.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,7 @@ class KQSelector extends java.nio.channels.spi.AbstractSelector {
private final Object regLock = new Object();
private final Map<Integer, Descriptor> descriptors = new ConcurrentHashMap<Integer, Descriptor>();
private final Set<SelectionKey> selected = new LinkedHashSet<SelectionKey>();
private final Set<Descriptor> changed = new LinkedHashSet<Descriptor>();
private final Native.Timespec ZERO_TIMESPEC = new Native.Timespec(0, 0);

private final Native.Timespec ZERO_TIMESPEC = new Native.Timespec(0, 0);

public KQSelector(NativeSelectorProvider provider) {
super(provider);
Expand Down Expand Up @@ -100,9 +98,9 @@ protected void implCloseSelector() throws IOException {
// deregister all keys
for (Map.Entry<Integer, Descriptor> entry : descriptors.entrySet()) {
for (KQSelectionKey k : entry.getValue().keys) {
deregister(k);
}
}
deregister(k);
}
}
}

@Override
Expand All @@ -115,7 +113,7 @@ protected SelectionKey register(AbstractSelectableChannel ch, int ops, Object at
descriptors.put(k.getFD(), d);
}
d.keys.add(k);
changed.add(d);
handleChangedKey(d);
}
k.attach(att);
return k;
Expand Down Expand Up @@ -149,85 +147,10 @@ public int select(long timeout) throws IOException {
public int select() throws IOException {
return poll(-1);
}

private int poll(long timeout) {
int nchanged = 0;
//
// Remove any cancelled keys
//
Set<SelectionKey> cancelled = cancelledKeys();
synchronized (cancelled) {
synchronized (regLock) {
for (SelectionKey k : cancelled) {
KQSelectionKey kqs = (KQSelectionKey) k;
Descriptor d = descriptors.get(kqs.getFD());
deregister(kqs);
synchronized (selected) {
selected.remove(kqs);
}
d.keys.remove(kqs);
if (d.keys.isEmpty()) {
io.put(changebuf, nchanged++, kqs.getFD(), EVFILT_READ, EV_DELETE);
io.put(changebuf, nchanged++, kqs.getFD(), EVFILT_WRITE, EV_DELETE);
descriptors.remove(kqs.getFD());
changed.remove(d);
}
if (nchanged >= MAX_EVENTS) {
Native.libc().kevent(kqfd, changebuf, nchanged, null, 0, ZERO_TIMESPEC);
nchanged = 0;
}
}
}
cancelled.clear();
}

synchronized (regLock) {
for (Descriptor d : changed) {
int writers = 0, readers = 0;
for (KQSelectionKey k : d.keys) {
if ((k.interestOps() & (SelectionKey.OP_ACCEPT | SelectionKey.OP_READ)) != 0) {
++readers;
}
if ((k.interestOps() & (SelectionKey.OP_CONNECT | SelectionKey.OP_WRITE)) != 0) {
++writers;
}
}
for (Integer filt : new Integer[] { EVFILT_READ, EVFILT_WRITE }) {
int flags = 0;
//
// If no one is interested in events on the fd, disable it
//
if (filt == EVFILT_READ) {
if (readers > 0 && !d.read) {
flags = EV_ADD |EV_ENABLE | EV_CLEAR;
d.read = true;
} else if (readers == 0 && d.read) {
flags = EV_DISABLE;
d.read = false;
}
}
if (filt == EVFILT_WRITE) {
if (writers > 0 && !d.write) {
flags = EV_ADD | EV_ENABLE | EV_CLEAR;
d.write = true;
} else if (writers == 0 && d.write) {
flags = EV_DISABLE;
d.write = false;
}
}
if (DEBUG) System.out.printf("Updating fd %d filt=0x%x flags=0x%x\n",
d.fd, filt, flags);
if (flags != 0) {
io.put(changebuf, nchanged++, d.fd, filt, flags);
}
if (nchanged >= MAX_EVENTS) {
Native.libc().kevent(kqfd, changebuf, nchanged, null, 0, ZERO_TIMESPEC);
nchanged = 0;
}
}
}
changed.clear();
}
int nchanged = handleCancelledKeys();

Native.Timespec ts = null;
if (timeout >= 0) {
Expand Down Expand Up @@ -286,6 +209,82 @@ private int poll(long timeout) {
}
return updatedKeyCount;
}

private int handleCancelledKeys() {
Set<SelectionKey> cancelled = cancelledKeys();
synchronized (cancelled) {
int nchanged = 0;
synchronized (regLock) {
for (SelectionKey k : cancelled) {
KQSelectionKey kqs = (KQSelectionKey) k;
Descriptor d = descriptors.get(kqs.getFD());
deregister(kqs);
synchronized (selected) {
selected.remove(kqs);
}
d.keys.remove(kqs);
if (d.keys.isEmpty()) {
io.put(changebuf, nchanged++, kqs.getFD(), EVFILT_READ, EV_DELETE);
io.put(changebuf, nchanged++, kqs.getFD(), EVFILT_WRITE, EV_DELETE);
descriptors.remove(kqs.getFD());
}
if (nchanged >= MAX_EVENTS) {
Native.libc().kevent(kqfd, changebuf, nchanged, null, 0, ZERO_TIMESPEC);
nchanged = 0;
}
}
}
cancelled.clear();
return nchanged;
}

}

private void handleChangedKey(Descriptor changed) {
synchronized (regLock) {
int _nchanged = 0;
int writers = 0, readers = 0;
for (KQSelectionKey k : changed.keys) {
if ((k.interestOps() & (SelectionKey.OP_ACCEPT | SelectionKey.OP_READ)) != 0) {
++readers;
}
if ((k.interestOps() & (SelectionKey.OP_CONNECT | SelectionKey.OP_WRITE)) != 0) {
++writers;
}
}
for (Integer filt : new Integer[] { EVFILT_READ, EVFILT_WRITE }) {
int flags = 0;
//
// If no one is interested in events on the fd, disable it
//
if (filt == EVFILT_READ) {
if (readers > 0 && !changed.read) {
flags = EV_ADD |EV_ENABLE | EV_CLEAR;
changed.read = true;
} else if (readers == 0 && changed.read) {
flags = EV_DISABLE;
changed.read = false;
}
}
if (filt == EVFILT_WRITE) {
if (writers > 0 && !changed.write) {
flags = EV_ADD | EV_ENABLE | EV_CLEAR;
changed.write = true;
} else if (writers == 0 && changed.write) {
flags = EV_DISABLE;
changed.write = false;
}
}
if (DEBUG) System.out.printf("Updating fd %d filt=0x%x flags=0x%x\n",
changed.fd, filt, flags);
if (flags != 0) {
io.put(changebuf, _nchanged++, changed.fd, filt, flags);
}
}

Native.libc().kevent(kqfd, changebuf, _nchanged, null, 0, ZERO_TIMESPEC);
}
}

private void wakeupReceived() {
Native.libc().read(pipefd[0], new byte[1], 1);
Expand All @@ -299,7 +298,7 @@ public Selector wakeup() {

void interestOps(KQSelectionKey k, int ops) {
synchronized (regLock) {
changed.add(descriptors.get(k.getFD()));
handleChangedKey(descriptors.get(k.getFD()));
}
}

Expand Down Expand Up @@ -350,5 +349,4 @@ private EventLayout(jnr.ffi.Runtime runtime) {
public final intptr_t data = new intptr_t();
public final Pointer udata = new Pointer();
}

}

0 comments on commit 23d9b2d

Please sign in to comment.