From 17cb669c17dae24c157f91de1b99c2dcc3bd7e82 Mon Sep 17 00:00:00 2001 From: Julien Viet Date: Tue, 4 Feb 2025 16:14:23 +0100 Subject: [PATCH] Add inbound testing for SPSC / SingleThread and improve close protocol of InboundMessageChannel. --- .../concurrent/InboundMessageChannel.java | 130 +++- ...InboundMessageChannelSingleThreadTest.java | 319 ++++++++ .../InboundMessageChannelSpScTest.java | 22 + .../concurrent/InboundMessageChannelTest.java | 692 ++++++------------ 4 files changed, 667 insertions(+), 496 deletions(-) create mode 100644 vertx-core/src/test/java/io/vertx/tests/concurrent/InboundMessageChannelSingleThreadTest.java create mode 100644 vertx-core/src/test/java/io/vertx/tests/concurrent/InboundMessageChannelSpScTest.java diff --git a/vertx-core/src/main/java/io/vertx/core/internal/concurrent/InboundMessageChannel.java b/vertx-core/src/main/java/io/vertx/core/internal/concurrent/InboundMessageChannel.java index e74d3215551..1093b3e7d5c 100644 --- a/vertx-core/src/main/java/io/vertx/core/internal/concurrent/InboundMessageChannel.java +++ b/vertx-core/src/main/java/io/vertx/core/internal/concurrent/InboundMessageChannel.java @@ -14,6 +14,7 @@ import io.vertx.core.internal.EventExecutor; import io.vertx.core.streams.impl.MessageChannel; +import java.util.List; import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.function.Predicate; @@ -28,9 +29,13 @@ public class InboundMessageChannel implements Predicate, Runnable { private final EventExecutor producer; private final MessageChannel messageChannel; - // Accessed by context thread - private boolean needsDrain; + // Accessed by produced thread + private boolean producerClosed; + + // Accessed by consumer thread private boolean draining; + private boolean needsDrain; + private boolean consumerClosed; // Any thread private volatile long demand = Long.MAX_VALUE; @@ -62,7 +67,7 @@ public InboundMessageChannel(EventExecutor producer, EventExecutor consumer, int } this.messageChannel = messageChannelFactory.create(this, lowWaterMark, highWaterMark); this.consumer = consumer; - this.producer = consumer; + this.producer = producer; } @Override @@ -79,26 +84,6 @@ public final boolean test(M msg) { return true; } - /** - * Handle resume, executed on the producer executor. - */ - protected void handleResume() { - } - - /** - * Handler pause, executed on the producer executor. - */ - protected void handlePause() { - } - - /** - * Handle a message, executed on the consumer executor. - * - * @param msg the message - */ - protected void handleMessage(M msg) { - } - /** * Add a message to the channel * @@ -107,6 +92,10 @@ protected void handleMessage(M msg) { */ public final boolean add(M msg) { assert producer.inThread(); + if (producerClosed) { + handleDispose(msg); + return false; + } int res = messageChannel.add(msg); if ((res & MessageChannel.UNWRITABLE_MASK) != 0) { handlePause(); @@ -141,10 +130,13 @@ public final void write(M msg) { } /** - * Schedule a drain operation on the context thread. + * Schedule a drain operation on the consumer thread, this method assumes a consumer thread. */ public final void drain() { assert producer.inThread(); + if (producerClosed) { + return; + } if (consumer.inThread()) { drainInternal(); } else { @@ -153,7 +145,7 @@ public final void drain() { } /** - * Task executed from context thread. + * Task executed from context thread, this should not be called directly. */ @Override public void run() { @@ -164,12 +156,19 @@ public void run() { } private void drainInternal() { + if (consumerClosed) { + return; + } draining = true; try { int res = messageChannel.drain(); needsDrain = (res & MessageChannel.DRAIN_REQUIRED_MASK) != 0; if ((res & MessageChannel.WRITABLE_MASK) != 0) { - producer.execute(this::handleResume); + if (producer.inThread()) { + handleResume(); + } else { + producer.execute(this::handleResume); + } } } finally { draining = false; @@ -203,8 +202,83 @@ public final void fetch(long amount) { break; } } - consumer - .execute(this); + consumer.execute(this); + } + } + + /** + * Close the channel. + */ + public final void close() { + if (!producer.inThread()) { + producer.execute(this::close); + return; + } + closeProducer(); + if (consumer.inThread()) { + closeConsumer(); + } else { + consumer.execute(this::closeConsumer); + } + } + + /** + * Close the producer side, this must be called from the producer thread + */ + public final void closeProducer() { + assert producer.inThread(); + if (producerClosed) { + return; + } + producerClosed = true; + } + + /** + * Close the consumer side, this must be called from the consumer thread + */ + public final void closeConsumer() { + assert consumer.inThread(); + if (consumerClosed) { + return; } + consumerClosed = true; + releaseMessages(); + } + + private void releaseMessages() { + List messages = messageChannel.clear(); + for (M elt : messages) { + handleDispose(elt); + } + } + + /** + * Handle resume, executed on a producer thread. + */ + protected void handleResume() { + } + + /** + * Handler pause, executed on a producer thread. + */ + protected void handlePause() { + } + + /** + * Handle a message, executed on a consumer thread. + * + * @param msg the message + */ + protected void handleMessage(M msg) { + } + + /** + * Dispose a message, this is called when the channel has been closed and message resource cleanup. No specific + * thread assumption can be made on this callback. + * + * @param msg the message to dispose + */ + // Todo : try remove this + protected void handleDispose(M msg) { } } diff --git a/vertx-core/src/test/java/io/vertx/tests/concurrent/InboundMessageChannelSingleThreadTest.java b/vertx-core/src/test/java/io/vertx/tests/concurrent/InboundMessageChannelSingleThreadTest.java new file mode 100644 index 00000000000..8fd1898931a --- /dev/null +++ b/vertx-core/src/test/java/io/vertx/tests/concurrent/InboundMessageChannelSingleThreadTest.java @@ -0,0 +1,319 @@ +/* + * Copyright (c) 2011-2019 Contributors to the Eclipse Foundation + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 + * which is available at https://www.apache.org/licenses/LICENSE-2.0. + * + * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 + */ +package io.vertx.tests.concurrent; + +import io.vertx.core.Context; +import io.vertx.core.internal.VertxInternal; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.IntConsumer; + +public class InboundMessageChannelSingleThreadTest extends InboundMessageChannelTest { + + @Override + protected Context createContext(VertxInternal vertx) { + return vertx.createEventLoopContext(); + } + + @Test + public void testEmitInElementHandler() { + AtomicInteger events = new AtomicInteger(); + AtomicBoolean receiving = new AtomicBoolean(); + queue = buffer(elt -> { + assertConsumer(); + assertFalse(receiving.getAndSet(true)); + events.incrementAndGet(); + if (elt == 0) { + queue.emit(5); + } + receiving.set(false); + }, 5, 5); + producerTask(() -> { + queue.pause(); + queue.fetch(1); + assertFalse(queue.emit()); + assertEquals(5, queue.size()); + assertEquals(1, events.get()); + testComplete(); + }); + await(); + } + + @Test + public void testEmitInElementHandler1() { + testEmitInElementHandler(n -> { + assertFalse(queue.emit(n)); + }); + } + +/* + @Test + public void testEmitInElementHandler2() { + testEmitInElementHandler(n -> { + for (int i = 0;i < n - 1;i++) { + assertTrue(buffer.emit()); + } + assertFalse(buffer.emit()); + }); + } +*/ + + private void testEmitInElementHandler(IntConsumer emit) { + AtomicInteger events = new AtomicInteger(); + AtomicInteger drained = new AtomicInteger(); + AtomicBoolean draining = new AtomicBoolean(); + queue = buffer(elt -> { + assertConsumer(); + switch (elt) { + case 5: + // Emitted in drain handler + emit.accept(9); + break; + case 9: + vertx.runOnContext(v2 -> { + assertEquals(1, drained.get()); + assertEquals(10, events.get()); + assertEquals(5, queue.size()); + testComplete(); + }); + break; + } + events.incrementAndGet(); + }, 5, 5); + queue.drainHandler(v3 -> { + // Check reentrancy + assertFalse(draining.get()); + draining.set(true); + assertEquals(0, drained.getAndIncrement()); + assertFalse(queue.emit()); + draining.set(false); + }); + producerTask(() -> { + queue.pause(); + queue.fill(); + queue.fetch(10); + }); + await(); + } + + @Test + public void testEmitWhenHandlingLastItem() { + int next = sequence.get(); + AtomicInteger received = new AtomicInteger(next); + AtomicInteger writable = new AtomicInteger(); + queue = buffer(elt -> { + if (received.decrementAndGet() == 0) { + queue.write(next); + } + }, 4, 4) + .drainHandler(v2 -> { + writable.incrementAndGet(); + }); + producerTask(() -> { + queue.pause(); + queue.fill(); + queue.fetch(sequence.get()); + assertEquals(0, writable.get()); + testComplete(); + }); + await(); + } + + @Test + public void testEmitInDrainHandler1() { + AtomicInteger drained = new AtomicInteger(); + AtomicInteger expectedDrained = new AtomicInteger(); + queue = buffer(elt -> { + if (elt == 0) { + // This will set writable to false + queue.fill(); + } + assertEquals(expectedDrained.get(), drained.get()); + }, 4, 4) + .drainHandler(v2 -> { + switch (drained.getAndIncrement()) { + case 0: + // Check that emitting again will not drain again + expectedDrained.set(1); + queue.fill(); + assertEquals(1, drained.get()); + testComplete(); + break; + } + }); + producerTask(() -> { + queue.pause(); + queue.fetch(1); + queue.emit(); + queue.fetch(4L); + }); + await(); + } + + @Test + public void testEmitInDrainHandler2() { + waitFor(2); + AtomicInteger drained = new AtomicInteger(); + AtomicBoolean draining = new AtomicBoolean(); + AtomicInteger emitted = new AtomicInteger(); + queue = buffer(elt -> { + emitted.incrementAndGet(); + if (elt == 0) { + assertEquals(0, drained.get()); + } else if (elt == 6) { + assertEquals(1, drained.get()); + } + }, 5, 5) + .drainHandler(v2 -> { + assertFalse(draining.get()); + draining.set(true); + switch (drained.getAndIncrement()) { + case 0: + // This will trigger a new asynchronous drain + queue.fill(); + queue.fetch(5); + break; + case 1: + assertEquals(10, emitted.get()); + complete(); + break; + } + draining.set(false); + }); + producerTask(() -> { + queue.pause(); + queue.fill(); + queue.fetch(5); + complete(); + }); + await(); + } + + @Test + public void testDrainAfter() { + AtomicInteger events = new AtomicInteger(); + AtomicBoolean receiving = new AtomicBoolean(); + queue = buffer(elt -> { + assertConsumer(); + assertFalse(receiving.getAndSet(true)); + events.incrementAndGet(); + if (elt == 0) { + queue.emit(5); + } + receiving.set(false); + }, 5, 5); + producerTask(() -> { + assertTrue(queue.emit()); + assertEquals(6, sequence.get()); + assertEquals(6, events.get()); + testComplete(); + }); + await(); + } + + @Test + public void testPauseInElementHandler() { + AtomicInteger events = new AtomicInteger(); + queue = buffer(elt -> { + events.incrementAndGet(); + if (elt == 0) { + queue.pause(); + queue.emit(5); + } + }, 5, 5); + producerTask(() -> { + assertFalse(queue.emit()); + assertEquals(1, events.get()); + assertEquals(5, queue.size()); + testComplete(); + }); + await(); + } + + @Test + public void testAddAllEmitInHandler() { + List emitted = new ArrayList<>(); + queue = buffer(elt -> { + switch (elt) { + case 0: + queue.emit(); + } + emitted.add(elt); + }, 4, 4); + producerTask(() -> { + assertTrue(queue.emit(3)); + assertEquals(Arrays.asList(0, 1, 2, 3), emitted); + testComplete(); + }); + await(); + } + + @Test + public void testAddAllWhenDelivering() { + List emitted = new ArrayList<>(); + queue = buffer(elt -> { + emitted.add(elt); + if (elt == 2) { + queue.write(Arrays.asList(4, 5)); + // Check that we haven't re-entered the handler + assertEquals(Arrays.asList(0, 1, 2), emitted); + } + }, 4, 4); + producerTask(() -> { + queue.emit(4); + assertWaitUntil(() -> Arrays.asList(0, 1, 2, 3, 4, 5).equals(emitted)); + testComplete(); + }); + await(); + } + + @Test + public void testPauseInHandlerSignalsFullImmediately() { + queue = buffer(elt -> { + queue.pause(); + queue.emit(); + }, 1, 1); + producerTask(() -> { + assertFalse(queue.emit()); + testComplete(); + }); + await(); + } + + @Test + public void testReentrantClose() { + AtomicInteger emitted = new AtomicInteger(); + List dropped = Collections.synchronizedList(new ArrayList<>()); + queue = new TestChannel(elt -> emitted.incrementAndGet(), 4, 4) { + @Override + protected void handleDispose(Integer msg) { + dropped.add(msg); + if (msg == 0) { + queue.write(1); + } + } + }; + producerTask(() -> { + queue.pause(); + queue.write(0); + queue.close(); + assertEquals(0, emitted.get()); + assertEquals(Arrays.asList(0, 1), dropped); + testComplete(); + }); + } +} diff --git a/vertx-core/src/test/java/io/vertx/tests/concurrent/InboundMessageChannelSpScTest.java b/vertx-core/src/test/java/io/vertx/tests/concurrent/InboundMessageChannelSpScTest.java new file mode 100644 index 00000000000..44071a4b9db --- /dev/null +++ b/vertx-core/src/test/java/io/vertx/tests/concurrent/InboundMessageChannelSpScTest.java @@ -0,0 +1,22 @@ +/* + * Copyright (c) 2011-2019 Contributors to the Eclipse Foundation + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 + * which is available at https://www.apache.org/licenses/LICENSE-2.0. + * + * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 + */ +package io.vertx.tests.concurrent; + +import io.vertx.core.Context; +import io.vertx.core.internal.VertxInternal; + +public class InboundMessageChannelSpScTest extends InboundMessageChannelTest { + + @Override + protected Context createContext(VertxInternal vertx) { + return vertx.createWorkerContext(); + } +} diff --git a/vertx-core/src/test/java/io/vertx/tests/concurrent/InboundMessageChannelTest.java b/vertx-core/src/test/java/io/vertx/tests/concurrent/InboundMessageChannelTest.java index df48a0eb2da..fb8bfa8ab24 100644 --- a/vertx-core/src/test/java/io/vertx/tests/concurrent/InboundMessageChannelTest.java +++ b/vertx-core/src/test/java/io/vertx/tests/concurrent/InboundMessageChannelTest.java @@ -12,30 +12,37 @@ import io.vertx.core.Context; import io.vertx.core.Handler; +import io.vertx.core.VertxOptions; import io.vertx.core.internal.ContextInternal; +import io.vertx.core.internal.VertxInternal; import io.vertx.core.internal.concurrent.InboundMessageChannel; import io.vertx.test.core.VertxTestBase; import org.junit.Test; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.IntConsumer; -public class InboundMessageChannelTest extends VertxTestBase { +public abstract class InboundMessageChannelTest extends VertxTestBase { - private volatile Runnable contextChecker; - private Context context; - private TestChannel queue; - private AtomicInteger sequence; + private volatile Thread producerThread; + private volatile Thread consumerThread; + + Context context; + TestChannel queue; + final AtomicInteger sequence = new AtomicInteger(); + + InboundMessageChannelTest() { + } class TestChannel extends InboundMessageChannel { final IntConsumer consumer; private Handler drainHandler; - private boolean writable; + private volatile boolean writable; private int size; public TestChannel(IntConsumer consumer) { @@ -69,6 +76,10 @@ protected void handleResume() { } } + public boolean isWritable() { + return writable; + } + @Override protected void handlePause() { writable = false; @@ -116,80 +127,99 @@ TestChannel drainHandler(Handler handler) { } } + protected abstract Context createContext(VertxInternal vertx); + @Override public void setUp() throws Exception { super.setUp(); - context = vertx.getOrCreateContext(); - sequence = new AtomicInteger(); + context = createContext((VertxInternal) vertx); + sequence.set(0); context.runOnContext(v -> { - Thread contextThread = Thread.currentThread(); - contextChecker = () -> { - assertSame(contextThread, Thread.currentThread()); - }; + consumerThread = Thread.currentThread(); }); - waitUntil(() -> contextChecker != null); + ((ContextInternal)context).nettyEventLoop().execute(() -> { + producerThread = Thread.currentThread(); + }); + waitUntil(() -> consumerThread != null && producerThread != null); + } + + @Override + protected VertxOptions getOptions() { + return super.getOptions().setWorkerPoolSize(1); } public void tearDown() throws Exception { super.tearDown(); } - private void checkContext() { - contextChecker.run(); + protected final void assertConsumer() { + assertSame(consumerThread, Thread.currentThread()); } - @Test - public void testFlowing() { - context.runOnContext(v -> { - AtomicInteger events = new AtomicInteger(); - queue = buffer(elt -> { - checkContext(); - assertEquals(0, elt); - assertEquals(0, events.getAndIncrement()); - testComplete(); - }); - assertTrue(queue.emit()); - }); - await(); + protected final void assertProducer() { + assertSame(producerThread, Thread.currentThread()); + } + + protected final void producerTask(Runnable task) { + ((ContextInternal)context).nettyEventLoop().execute(task); + } + + protected final void consumerTask(Runnable task) { + context.runOnContext(v -> task.run()); } - private TestChannel buffer(IntConsumer consumer) { + protected final TestChannel buffer(IntConsumer consumer) { return new TestChannel(consumer); } - private TestChannel buffer(IntConsumer consumer, int lwm, int hwm) { + protected final TestChannel buffer(IntConsumer consumer, int lwm, int hwm) { return new TestChannel(consumer, lwm, hwm); } + @Test + public void testFlowing() { + AtomicInteger events = new AtomicInteger(); + queue = buffer(elt -> { + assertConsumer(); + assertEquals(0, elt); + assertEquals(0, events.getAndIncrement()); + testComplete(); + }); + producerTask(() -> { + assertTrue(queue.emit()); + }); + await(); + } + @Test public void testTake() { - context.runOnContext(v -> { - AtomicInteger events = new AtomicInteger(); - queue = buffer(elt -> { - checkContext(); - assertEquals(0, elt); - assertEquals(0, events.getAndIncrement()); - testComplete(); - }); + AtomicInteger events = new AtomicInteger(); + queue = buffer(elt -> { + assertConsumer(); + assertEquals(0, elt); + assertEquals(0, events.getAndIncrement()); + testComplete(); + }); + consumerTask(() -> { queue.pause(); queue.fetch(1); - assertTrue(queue.emit()); + producerTask(() -> queue.emit()); }); await(); } @Test public void testFlowingAdd() { - context.runOnContext(v -> { - AtomicInteger events = new AtomicInteger(); - queue = buffer(elt -> { - checkContext(); - events.getAndIncrement(); - }); + AtomicInteger events = new AtomicInteger(); + queue = buffer(elt -> { + assertConsumer(); + events.getAndIncrement(); + }); + producerTask(() -> { assertTrue(queue.emit()); - assertEquals(1, events.get()); + assertWaitUntil(() -> events.get() == 1); assertTrue(queue.emit()); - assertEquals(2, events.get()); + assertWaitUntil(() -> events.get() == 2); testComplete(); }); await(); @@ -197,17 +227,17 @@ public void testFlowingAdd() { @Test public void testFlowingRefill() { - context.runOnContext(v1 -> { - AtomicInteger events = new AtomicInteger(); - queue = buffer(elt -> { - checkContext(); - events.getAndIncrement(); - }, 5, 5).drainHandler(v -> { - checkContext(); - assertEquals(8, events.get()); - testComplete(); - }); - queue.pause(); + AtomicInteger events = new AtomicInteger(); + queue = buffer(elt -> { + assertConsumer(); + events.getAndIncrement(); + }, 5, 5).drainHandler(v -> { + assertProducer(); + assertEquals(8, events.get()); + testComplete(); + }); + queue.pause(); + producerTask(() -> { for (int i = 0; i < 8; i++) { assertEquals("Expected " + i + " to be bilto", i < 4, queue.emit()); } @@ -218,18 +248,18 @@ public void testFlowingRefill() { @Test public void testPauseWhenFull() { - context.runOnContext(v1 -> { - AtomicInteger events = new AtomicInteger(); - AtomicInteger reads = new AtomicInteger(); - queue = buffer(elt -> { - checkContext(); - assertEquals(0, reads.get()); - assertEquals(0, events.getAndIncrement()); - testComplete(); - }, 5, 5).drainHandler(v2 -> { - checkContext(); - assertEquals(0, reads.getAndIncrement()); - }); + AtomicInteger events = new AtomicInteger(); + AtomicInteger reads = new AtomicInteger(); + queue = buffer(elt -> { + assertConsumer(); + assertEquals(0, reads.get()); + assertEquals(0, events.getAndIncrement()); + testComplete(); + }, 5, 5).drainHandler(v2 -> { + assertProducer(); + assertEquals(0, reads.getAndIncrement()); + }); + producerTask(() -> { queue.pause(); for (int i = 0; i < 5; i++) { assertEquals(i < 4, queue.emit()); @@ -241,18 +271,18 @@ public void testPauseWhenFull() { @Test public void testPausedResume() { - context.runOnContext(v1 -> { - AtomicInteger reads = new AtomicInteger(); - AtomicInteger events = new AtomicInteger(); - queue = buffer(elt -> { - checkContext(); - events.getAndIncrement(); - }, 5, 5).drainHandler(v2 -> { - checkContext(); - assertEquals(0, reads.getAndIncrement()); - assertEquals(5, events.get()); - testComplete(); - }); + AtomicInteger reads = new AtomicInteger(); + AtomicInteger events = new AtomicInteger(); + queue = buffer(elt -> { + assertConsumer(); + events.getAndIncrement(); + }, 5, 5).drainHandler(v2 -> { + assertProducer(); + assertEquals(0, reads.getAndIncrement()); + assertEquals(5, events.get()); + testComplete(); + }); + producerTask(() -> { queue.pause(); queue.fill(); queue.resume(); @@ -263,20 +293,20 @@ public void testPausedResume() { @Test public void testPausedDrain() { waitFor(2); - context.runOnContext(v1 -> { - AtomicInteger drained = new AtomicInteger(); - AtomicInteger emitted = new AtomicInteger(); - queue = buffer(elt -> { - checkContext(); - assertEquals(0, drained.get()); - emitted.getAndIncrement(); - }, 5, 5); - queue.drainHandler(v2 -> { - checkContext(); - assertEquals(0, drained.getAndIncrement()); - assertEquals(5, emitted.get()); - complete(); - }); + AtomicInteger drained = new AtomicInteger(); + AtomicInteger emitted = new AtomicInteger(); + queue = buffer(elt -> { + assertConsumer(); + assertEquals(0, drained.get()); + emitted.getAndIncrement(); + }, 5, 5); + queue.drainHandler(v2 -> { + assertProducer(); + assertEquals(0, drained.getAndIncrement()); + assertEquals(5, emitted.get()); + complete(); + }); + producerTask(() -> { queue.pause(); queue.fill(); assertEquals(0, drained.get()); @@ -289,17 +319,17 @@ public void testPausedDrain() { @Test public void testPausedRequestLimited() { - context.runOnContext(v1 -> { - AtomicInteger events = new AtomicInteger(); - AtomicInteger reads = new AtomicInteger(); - queue = buffer(elt -> { - checkContext(); - events.getAndIncrement(); - }, 3, 3) - .drainHandler(v2 -> { - checkContext(); - assertEquals(0, reads.getAndIncrement()); - }); + AtomicInteger events = new AtomicInteger(); + AtomicInteger reads = new AtomicInteger(); + queue = buffer(elt -> { + assertConsumer(); + events.getAndIncrement(); + }, 3, 3) + .drainHandler(v2 -> { + assertProducer(); + assertEquals(0, reads.getAndIncrement()); + }); + producerTask(() -> { queue.pause(); queue.fetch(1); assertEquals(0, reads.get()); @@ -323,13 +353,15 @@ public void testPausedRequestLimited() { @Test public void testPushReturnsTrueUntilHighWatermark() { - context.runOnContext(v1 -> { - queue = buffer(elt -> { - - }, 2, 2); + AtomicInteger emitted = new AtomicInteger(); + queue = buffer(elt -> { + emitted.incrementAndGet(); + }, 2, 2); + producerTask(() -> { queue.pause(); queue.fetch(1); assertTrue(queue.emit()); + assertWaitUntil(() -> emitted.get() == 1); assertTrue(queue.emit()); assertFalse(queue.emit()); testComplete(); @@ -339,9 +371,9 @@ public void testPushReturnsTrueUntilHighWatermark() { @Test public void testHighWaterMark() { - context.runOnContext(v -> { - queue = buffer(elt -> { - }, 5, 5); + queue = buffer(elt -> { + }, 5, 5); + producerTask(() -> { queue.pause(); queue.fill(); assertEquals(5, sequence.get()); @@ -379,257 +411,25 @@ public void testEmptyHandler() { await(); } */ - @Test - public void testEmitWhenHandlingLastItem() { - context.runOnContext(v1 -> { - int next = sequence.get(); - AtomicInteger received = new AtomicInteger(next); - AtomicInteger writable = new AtomicInteger(); - queue = buffer(elt -> { - if (received.decrementAndGet() == 0) { - queue.write(next); - } - }, 4, 4) - .drainHandler(v2 -> { - writable.incrementAndGet(); - }); - queue.pause(); - queue.fill(); - queue.fetch(sequence.get()); - assertEquals(0, writable.get()); - testComplete(); - }); - await(); - } - - @Test - public void testEmitInElementHandler() { - context.runOnContext(v1 -> { - AtomicInteger events = new AtomicInteger(); - AtomicBoolean receiving = new AtomicBoolean(); - queue = buffer(elt -> { - checkContext(); - assertFalse(receiving.getAndSet(true)); - events.incrementAndGet(); - if (elt == 0) { - queue.fill(); - } - receiving.set(false); - }, 5, 5); - queue.pause(); - queue.fetch(1); - assertFalse(queue.emit()); - assertEquals(5 - 1, queue.size()); - assertEquals(1, events.get()); - testComplete(); - }); - await(); - } - - @Test - public void testEmitInElementHandler1() { - testEmitInElementHandler(n -> { - assertFalse(queue.emit(n)); - }); - } - -/* - @Test - public void testEmitInElementHandler2() { - testEmitInElementHandler(n -> { - for (int i = 0;i < n - 1;i++) { - assertTrue(buffer.emit()); - } - assertFalse(buffer.emit()); - }); - } -*/ - - private void testEmitInElementHandler(IntConsumer emit) { - context.runOnContext(v1 -> { - AtomicInteger events = new AtomicInteger(); - AtomicInteger drained = new AtomicInteger(); - AtomicBoolean draining = new AtomicBoolean(); - queue = buffer(elt -> { - checkContext(); - switch (elt) { - case 5: - // Emitted in drain handler - emit.accept(9); - break; - case 9: - vertx.runOnContext(v2 -> { - assertEquals(1, drained.get()); - assertEquals(10, events.get()); - assertEquals(5, queue.size()); - testComplete(); - }); - break; - } - events.incrementAndGet(); - }, 5, 5); - queue.drainHandler(v3 -> { - // Check reentrancy - assertFalse(draining.get()); - draining.set(true); - assertEquals(0, drained.getAndIncrement()); - assertFalse(queue.emit()); - draining.set(false); - }); - queue.pause(); - queue.fill(); - queue.fetch(10); - }); - await(); - } - - @Test - public void testEmitInDrainHandler1() { - context.runOnContext(v1 -> { - AtomicInteger drained = new AtomicInteger(); - AtomicInteger expectedDrained = new AtomicInteger(); - queue = buffer(elt -> { - if (elt == 0) { - // This will set writable to false - queue.fill(); - } - assertEquals(expectedDrained.get(), drained.get()); - }, 4, 4) - .drainHandler(v2 -> { - switch (drained.getAndIncrement()) { - case 0: - // Check that emitting again will not drain again - expectedDrained.set(1); - queue.fill(); - assertEquals(1, drained.get()); - testComplete(); - break; - } - }); - queue.pause(); - queue.fetch(1); - queue.emit(); - queue.fetch(4L); - }); - await(); - } - - @Test - public void testEmitInDrainHandler2() { - waitFor(2); - context.runOnContext(v1 -> { - AtomicInteger drained = new AtomicInteger(); - AtomicBoolean draining = new AtomicBoolean(); - AtomicInteger emitted = new AtomicInteger(); - queue = buffer(elt -> { - emitted.incrementAndGet(); - if (elt == 0) { - assertEquals(0, drained.get()); - } else if (elt == 6) { - assertEquals(1, drained.get()); - } - }, 5, 5) - .drainHandler(v2 -> { - assertFalse(draining.get()); - draining.set(true); - switch (drained.getAndIncrement()) { - case 0: - // This will trigger a new asynchronous drain - queue.fill(); - queue.fetch(5); - break; - case 1: - assertEquals(10, emitted.get()); - complete(); - break; - } - draining.set(false); - }); - queue.pause(); - queue.fill(); - queue.fetch(5); - complete(); - }); - await(); - } - - @Test - public void testDrainAfter() { - context.runOnContext(v1 -> { - AtomicInteger events = new AtomicInteger(); - AtomicBoolean receiving = new AtomicBoolean(); - queue = buffer(elt -> { - checkContext(); - assertFalse(receiving.getAndSet(true)); - events.incrementAndGet(); - if (elt == 0) { - queue.emit(5); - } - receiving.set(false); - }, 5, 5); - assertFalse(queue.emit()); - assertEquals(6, sequence.get()); - assertEquals(6, events.get()); - testComplete(); - }); - await(); - } - - @Test - public void testPauseInElementHandler() { - context.runOnContext(v1 -> { - AtomicInteger events = new AtomicInteger(); - queue = buffer(elt -> { - events.incrementAndGet(); - if (elt == 0) { - queue.pause(); - queue.fill(); - } - }, 5, 5); - assertFalse(queue.emit()); - assertEquals(1, events.get()); - assertEquals(5 - 1, queue.size()); - testComplete(); - }); - await(); - } - - @Test - public void testAddAllEmitInHandler() { - context.runOnContext(v1 -> { - List emitted = new ArrayList<>(); - queue = buffer(elt -> { - switch (elt) { - case 0: - queue.emit(); - } - emitted.add(elt); - }, 4, 4); - assertFalse(queue.emit(3)); - assertEquals(Arrays.asList(0, 1, 2, 3), emitted); - testComplete(); - }); - await(); - } @Test public void testAddAllWhenPaused() { waitFor(2); - context.runOnContext(v1 -> { - AtomicInteger emitted = new AtomicInteger(); - AtomicInteger emptied = new AtomicInteger(); - AtomicInteger drained = new AtomicInteger(); - queue = buffer(elt -> { - emitted.incrementAndGet(); - assertEquals(0, drained.get()); - assertEquals(0, emptied.get()); - queue.fetch(1); - }, 4, 4) - .drainHandler(v2 -> { - assertEquals(5, emitted.get()); - drained.incrementAndGet(); - complete(); - }); + AtomicInteger emitted = new AtomicInteger(); + AtomicInteger emptied = new AtomicInteger(); + AtomicInteger drained = new AtomicInteger(); + queue = buffer(elt -> { + emitted.incrementAndGet(); + assertEquals(0, drained.get()); + assertEquals(0, emptied.get()); + queue.fetch(1); + }, 4, 4) + .drainHandler(v2 -> { + assertEquals(5, emitted.get()); + drained.incrementAndGet(); + complete(); + }); + producerTask(() -> { queue.pause(); assertFalse(queue.emit(5)); queue.fetch(1); @@ -640,139 +440,95 @@ public void testAddAllWhenPaused() { @Test public void testAddAllWhenFlowing() { - context.runOnContext(v1 -> { - AtomicInteger emitted = new AtomicInteger(); - AtomicInteger drained = new AtomicInteger(); - queue = buffer(elt -> { - emitted.incrementAndGet(); - }, 4, 4) - .drainHandler(v2 -> drained.incrementAndGet()); - assertFalse(queue.emit(4)); - context.runOnContext(v -> { - waitUntilEquals(1, drained::get); - waitUntilEquals(4, emitted::get); - testComplete(); - }); - }); - await(); - } - - @Test - public void testAddAllWhenDelivering() { - context.runOnContext(v1 -> { - List emitted = new ArrayList<>(); - queue = buffer(elt -> { - emitted.add(elt); - if (elt == 2) { - queue.write(Arrays.asList(4, 5)); - // Check that we haven't re-entered the handler - assertEquals(Arrays.asList(0, 1, 2), emitted); - } - }, 4, 4); + AtomicInteger emitted = new AtomicInteger(); + AtomicInteger drained = new AtomicInteger(); + queue = buffer(elt -> { + emitted.incrementAndGet(); + }, 4, 4) + .drainHandler(v2 -> drained.incrementAndGet()); + producerTask(() -> { queue.emit(4); - assertEquals(Arrays.asList(0, 1, 2, 3, 4, 5), emitted); - testComplete(); }); - await(); + waitUntilEquals(1, drained::get); + waitUntilEquals(4, emitted::get); } @Test public void testCheckThatPauseAfterResumeWontDoAnyEmission() { - context.runOnContext(v1 -> { - AtomicInteger emitted = new AtomicInteger(); - queue = buffer(elt -> emitted.incrementAndGet(), 4, 4); + AtomicInteger emitted = new AtomicInteger(); + queue = buffer(elt -> emitted.incrementAndGet(), 4, 4); + producerTask(() -> { queue.pause(); queue.fill(); - // Resume will execute an asynchronous drain operation - queue.resume(); - // Pause just after to ensure that no elements will be delivered to he handler - queue.pause(); - // Give enough time to have elements delivered - vertx.setTimer(20, id -> { - // Check we haven't received anything - assertEquals(0, emitted.get()); - testComplete(); + consumerTask(() -> { + // Resume will execute an asynchronous drain operation + queue.resume(); + // Pause just after to ensure that no elements will be delivered to the handler + queue.pause(); + // Give enough time to have elements delivered + vertx.setTimer(20, id -> { + // Check we haven't received anything + assertEquals(0, emitted.get()); + testComplete(); + }); }); }); await(); } -/* - @Test public void testBufferSignalingFullImmediately() { - context.runOnContext(v1 -> { - buffer = new InboundBuffer<>(context, 0L); - List emitted = new ArrayList<>(); - buffer.drainHandler(v -> { - assertEquals(Arrays.asList(0, 1), emitted); - testComplete(); + List emitted = Collections.synchronizedList(new ArrayList<>()); + AtomicInteger drained = new AtomicInteger(); + queue = buffer(emitted::add, 1, 1); + producerTask(() -> { + queue.drainHandler(v -> { + switch (drained.getAndIncrement()) { + case 0: + producerTask(() -> { + assertFalse(queue.emit()); + queue.resume(); + }); + break; + case 1: + assertEquals(Arrays.asList(0, 1), emitted); + testComplete(); + break; + } }); - buffer.handler(emitted::add); - assertTrue(emit()); + queue.emit(); + assertWaitUntil(() -> emitted.size() == 1); assertEquals(Collections.singletonList(0), emitted); - buffer.pause(); - assertFalse(emit()); - buffer.resume(); + queue.pause(); }); await(); } @Test - public void testPauseInHandlerSignalsFullImmediately() { - context.runOnContext(v -> { - buffer = new InboundBuffer<>(context, 0); - buffer.handler(elt -> { - checkContext(); - buffer.pause(); - }); - assertFalse(emit()); - testComplete(); - }); - await(); - } + public void testClose() { + List emitted = Collections.synchronizedList(new ArrayList<>()); + List disposed = Collections.synchronizedList(new ArrayList<>()); + queue = new TestChannel(emitted::add, 1, 1) { + @Override + protected void handleDispose(Integer msg) { + disposed.add(msg); + } + }; - @Test - public void testFetchWhenNotEmittingWithNoPendingElements() { - context.runOnContext(v1 -> { - buffer = new InboundBuffer<>(context, 0); - AtomicInteger drained = new AtomicInteger(); - buffer.drainHandler(v2 -> { - context.runOnContext(v -> { - assertEquals(0, drained.getAndIncrement()); + producerTask(() -> { + queue.pause(); + queue.emit(5); + queue.closeProducer(); + consumerTask(() -> { + queue.closeConsumer(); + assertEquals(Collections.emptyList(), emitted); + assertEquals(Arrays.asList(0, 1, 2, 3, 4), disposed); + producerTask(() -> { + queue.write(5); testComplete(); }); }); - buffer.emptyHandler(v -> { - fail(); - }); - buffer.handler(elt -> { - checkContext(); - switch (elt) { - case 0: - buffer.pause(); - break; - } - }); - assertFalse(emit()); - buffer.fetch(1); }); await(); } - - @Test - public void testRejectWrongThread() { - buffer = new InboundBuffer<>(context); - try { - buffer.write(0); - fail(); - } catch (IllegalStateException ignore) { - } - try { - buffer.write(Arrays.asList(0, 1, 2)); - fail(); - } catch (IllegalStateException ignore) { - } - } -*/ }