Skip to content

Commit

Permalink
Add inbound testing for SPSC / SingleThread and improve close protoco…
Browse files Browse the repository at this point in the history
…l of InboundMessageChannel.
  • Loading branch information
vietj committed Feb 6, 2025
1 parent 5a38f45 commit 17cb669
Show file tree
Hide file tree
Showing 4 changed files with 667 additions and 496 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import io.vertx.core.internal.EventExecutor;
import io.vertx.core.streams.impl.MessageChannel;

import java.util.List;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.function.Predicate;

Expand All @@ -28,9 +29,13 @@ public class InboundMessageChannel<M> implements Predicate<M>, Runnable {
private final EventExecutor producer;
private final MessageChannel<M> messageChannel;

// Accessed by context thread
private boolean needsDrain;
// Accessed by produced thread
private boolean producerClosed;

// Accessed by consumer thread
private boolean draining;
private boolean needsDrain;
private boolean consumerClosed;

// Any thread
private volatile long demand = Long.MAX_VALUE;
Expand Down Expand Up @@ -62,7 +67,7 @@ public InboundMessageChannel(EventExecutor producer, EventExecutor consumer, int
}
this.messageChannel = messageChannelFactory.create(this, lowWaterMark, highWaterMark);
this.consumer = consumer;
this.producer = consumer;
this.producer = producer;
}

@Override
Expand All @@ -79,26 +84,6 @@ public final boolean test(M msg) {
return true;
}

/**
* Handle resume, executed on the producer executor.
*/
protected void handleResume() {
}

/**
* Handler pause, executed on the producer executor.
*/
protected void handlePause() {
}

/**
* Handle a message, executed on the consumer executor.
*
* @param msg the message
*/
protected void handleMessage(M msg) {
}

/**
* Add a message to the channel
*
Expand All @@ -107,6 +92,10 @@ protected void handleMessage(M msg) {
*/
public final boolean add(M msg) {
assert producer.inThread();
if (producerClosed) {
handleDispose(msg);
return false;
}
int res = messageChannel.add(msg);
if ((res & MessageChannel.UNWRITABLE_MASK) != 0) {
handlePause();
Expand Down Expand Up @@ -141,10 +130,13 @@ public final void write(M msg) {
}

/**
* Schedule a drain operation on the context thread.
* Schedule a drain operation on the consumer thread, this method assumes a consumer thread.
*/
public final void drain() {
assert producer.inThread();
if (producerClosed) {
return;
}
if (consumer.inThread()) {
drainInternal();
} else {
Expand All @@ -153,7 +145,7 @@ public final void drain() {
}

/**
* Task executed from context thread.
* Task executed from context thread, this should not be called directly.
*/
@Override
public void run() {
Expand All @@ -164,12 +156,19 @@ public void run() {
}

private void drainInternal() {
if (consumerClosed) {
return;
}
draining = true;
try {
int res = messageChannel.drain();
needsDrain = (res & MessageChannel.DRAIN_REQUIRED_MASK) != 0;
if ((res & MessageChannel.WRITABLE_MASK) != 0) {
producer.execute(this::handleResume);
if (producer.inThread()) {
handleResume();
} else {
producer.execute(this::handleResume);
}
}
} finally {
draining = false;
Expand Down Expand Up @@ -203,8 +202,83 @@ public final void fetch(long amount) {
break;
}
}
consumer
.execute(this);
consumer.execute(this);
}
}

/**
* Close the channel.
*/
public final void close() {
if (!producer.inThread()) {
producer.execute(this::close);
return;
}
closeProducer();
if (consumer.inThread()) {
closeConsumer();
} else {
consumer.execute(this::closeConsumer);
}
}

/**
* Close the producer side, this must be called from the producer thread
*/
public final void closeProducer() {
assert producer.inThread();
if (producerClosed) {
return;
}
producerClosed = true;
}

/**
* Close the consumer side, this must be called from the consumer thread
*/
public final void closeConsumer() {
assert consumer.inThread();
if (consumerClosed) {
return;
}
consumerClosed = true;
releaseMessages();
}

private void releaseMessages() {
List<M> messages = messageChannel.clear();
for (M elt : messages) {
handleDispose(elt);
}
}

/**
* Handle resume, executed on a producer thread.
*/
protected void handleResume() {
}

/**
* Handler pause, executed on a producer thread.
*/
protected void handlePause() {
}

/**
* Handle a message, executed on a consumer thread.
*
* @param msg the message
*/
protected void handleMessage(M msg) {
}

/**
* Dispose a message, this is called when the channel has been closed and message resource cleanup. No specific
* thread assumption can be made on this callback.
*
* @param msg the message to dispose
*/
// Todo : try remove this
protected void handleDispose(M msg) {
}
}
Loading

0 comments on commit 17cb669

Please sign in to comment.