From 23d9b2d84efd21929c1ebf3d959b8776b3f92055 Mon Sep 17 00:00:00 2001 From: Marcus Linke Date: Tue, 8 Nov 2016 20:35:38 +0100 Subject: [PATCH] Fix issue #15 - KQSelectionKey.interestOps(...) must update events directly --- .../java/jnr/enxio/channels/KQSelector.java | 170 +++++++++--------- 1 file changed, 84 insertions(+), 86 deletions(-) diff --git a/src/main/java/jnr/enxio/channels/KQSelector.java b/src/main/java/jnr/enxio/channels/KQSelector.java index ba4e6bd..0f9fb4c 100644 --- a/src/main/java/jnr/enxio/channels/KQSelector.java +++ b/src/main/java/jnr/enxio/channels/KQSelector.java @@ -58,9 +58,7 @@ class KQSelector extends java.nio.channels.spi.AbstractSelector { private final Object regLock = new Object(); private final Map descriptors = new ConcurrentHashMap(); private final Set selected = new LinkedHashSet(); - private final Set changed = new LinkedHashSet(); - private final Native.Timespec ZERO_TIMESPEC = new Native.Timespec(0, 0); - + private final Native.Timespec ZERO_TIMESPEC = new Native.Timespec(0, 0); public KQSelector(NativeSelectorProvider provider) { super(provider); @@ -100,9 +98,9 @@ protected void implCloseSelector() throws IOException { // deregister all keys for (Map.Entry entry : descriptors.entrySet()) { for (KQSelectionKey k : entry.getValue().keys) { - deregister(k); - } - } + deregister(k); + } + } } @Override @@ -115,7 +113,7 @@ protected SelectionKey register(AbstractSelectableChannel ch, int ops, Object at descriptors.put(k.getFD(), d); } d.keys.add(k); - changed.add(d); + handleChangedKey(d); } k.attach(att); return k; @@ -149,85 +147,10 @@ public int select(long timeout) throws IOException { public int select() throws IOException { return poll(-1); } - + private int poll(long timeout) { - int nchanged = 0; - // - // Remove any cancelled keys - // - Set cancelled = cancelledKeys(); - synchronized (cancelled) { - synchronized (regLock) { - for (SelectionKey k : cancelled) { - KQSelectionKey kqs = (KQSelectionKey) k; - Descriptor d = descriptors.get(kqs.getFD()); - deregister(kqs); - synchronized (selected) { - selected.remove(kqs); - } - d.keys.remove(kqs); - if (d.keys.isEmpty()) { - io.put(changebuf, nchanged++, kqs.getFD(), EVFILT_READ, EV_DELETE); - io.put(changebuf, nchanged++, kqs.getFD(), EVFILT_WRITE, EV_DELETE); - descriptors.remove(kqs.getFD()); - changed.remove(d); - } - if (nchanged >= MAX_EVENTS) { - Native.libc().kevent(kqfd, changebuf, nchanged, null, 0, ZERO_TIMESPEC); - nchanged = 0; - } - } - } - cancelled.clear(); - } - synchronized (regLock) { - for (Descriptor d : changed) { - int writers = 0, readers = 0; - for (KQSelectionKey k : d.keys) { - if ((k.interestOps() & (SelectionKey.OP_ACCEPT | SelectionKey.OP_READ)) != 0) { - ++readers; - } - if ((k.interestOps() & (SelectionKey.OP_CONNECT | SelectionKey.OP_WRITE)) != 0) { - ++writers; - } - } - for (Integer filt : new Integer[] { EVFILT_READ, EVFILT_WRITE }) { - int flags = 0; - // - // If no one is interested in events on the fd, disable it - // - if (filt == EVFILT_READ) { - if (readers > 0 && !d.read) { - flags = EV_ADD |EV_ENABLE | EV_CLEAR; - d.read = true; - } else if (readers == 0 && d.read) { - flags = EV_DISABLE; - d.read = false; - } - } - if (filt == EVFILT_WRITE) { - if (writers > 0 && !d.write) { - flags = EV_ADD | EV_ENABLE | EV_CLEAR; - d.write = true; - } else if (writers == 0 && d.write) { - flags = EV_DISABLE; - d.write = false; - } - } - if (DEBUG) System.out.printf("Updating fd %d filt=0x%x flags=0x%x\n", - d.fd, filt, flags); - if (flags != 0) { - io.put(changebuf, nchanged++, d.fd, filt, flags); - } - if (nchanged >= MAX_EVENTS) { - Native.libc().kevent(kqfd, changebuf, nchanged, null, 0, ZERO_TIMESPEC); - nchanged = 0; - } - } - } - changed.clear(); - } + int nchanged = handleCancelledKeys(); Native.Timespec ts = null; if (timeout >= 0) { @@ -286,6 +209,82 @@ private int poll(long timeout) { } return updatedKeyCount; } + + private int handleCancelledKeys() { + Set cancelled = cancelledKeys(); + synchronized (cancelled) { + int nchanged = 0; + synchronized (regLock) { + for (SelectionKey k : cancelled) { + KQSelectionKey kqs = (KQSelectionKey) k; + Descriptor d = descriptors.get(kqs.getFD()); + deregister(kqs); + synchronized (selected) { + selected.remove(kqs); + } + d.keys.remove(kqs); + if (d.keys.isEmpty()) { + io.put(changebuf, nchanged++, kqs.getFD(), EVFILT_READ, EV_DELETE); + io.put(changebuf, nchanged++, kqs.getFD(), EVFILT_WRITE, EV_DELETE); + descriptors.remove(kqs.getFD()); + } + if (nchanged >= MAX_EVENTS) { + Native.libc().kevent(kqfd, changebuf, nchanged, null, 0, ZERO_TIMESPEC); + nchanged = 0; + } + } + } + cancelled.clear(); + return nchanged; + } + + } + + private void handleChangedKey(Descriptor changed) { + synchronized (regLock) { + int _nchanged = 0; + int writers = 0, readers = 0; + for (KQSelectionKey k : changed.keys) { + if ((k.interestOps() & (SelectionKey.OP_ACCEPT | SelectionKey.OP_READ)) != 0) { + ++readers; + } + if ((k.interestOps() & (SelectionKey.OP_CONNECT | SelectionKey.OP_WRITE)) != 0) { + ++writers; + } + } + for (Integer filt : new Integer[] { EVFILT_READ, EVFILT_WRITE }) { + int flags = 0; + // + // If no one is interested in events on the fd, disable it + // + if (filt == EVFILT_READ) { + if (readers > 0 && !changed.read) { + flags = EV_ADD |EV_ENABLE | EV_CLEAR; + changed.read = true; + } else if (readers == 0 && changed.read) { + flags = EV_DISABLE; + changed.read = false; + } + } + if (filt == EVFILT_WRITE) { + if (writers > 0 && !changed.write) { + flags = EV_ADD | EV_ENABLE | EV_CLEAR; + changed.write = true; + } else if (writers == 0 && changed.write) { + flags = EV_DISABLE; + changed.write = false; + } + } + if (DEBUG) System.out.printf("Updating fd %d filt=0x%x flags=0x%x\n", + changed.fd, filt, flags); + if (flags != 0) { + io.put(changebuf, _nchanged++, changed.fd, filt, flags); + } + } + + Native.libc().kevent(kqfd, changebuf, _nchanged, null, 0, ZERO_TIMESPEC); + } + } private void wakeupReceived() { Native.libc().read(pipefd[0], new byte[1], 1); @@ -299,7 +298,7 @@ public Selector wakeup() { void interestOps(KQSelectionKey k, int ops) { synchronized (regLock) { - changed.add(descriptors.get(k.getFD())); + handleChangedKey(descriptors.get(k.getFD())); } } @@ -350,5 +349,4 @@ private EventLayout(jnr.ffi.Runtime runtime) { public final intptr_t data = new intptr_t(); public final Pointer udata = new Pointer(); } - }