Skip to content

Commit

Permalink
Introduce options to configure the creation of an event-bus consumer …
Browse files Browse the repository at this point in the history
…and deprecate message consumer max buffered message upper bound setter.

Motivation:

The event-bus message consumer has a dynamic max buffered messages upper bound. This dynamic bound makes difficult to implement the message flow and requires a custom implementation. Instead this should be configured when the consumer is created.

Changes:

Introduce MessageConsumerOptions that provides all the necessary details to create a consumer (address, localOnly, maxBufferedMessages). Consumer setMaxBufferedMessages is deprecated in favour of using options.
  • Loading branch information
vietj committed Feb 6, 2025
1 parent 1d44cf7 commit 11c5e2f
Show file tree
Hide file tree
Showing 7 changed files with 201 additions and 10 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package io.vertx.core.eventbus;

import io.vertx.core.json.JsonObject;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.impl.JsonUtil;
import java.time.Instant;
import java.time.format.DateTimeFormatter;
import java.util.Base64;

/**
* Converter and mapper for {@link io.vertx.core.eventbus.MessageConsumerOptions}.
* NOTE: This class has been automatically generated from the {@link io.vertx.core.eventbus.MessageConsumerOptions} original class using Vert.x codegen.
*/
public class MessageConsumerOptionsConverter {


private static final Base64.Decoder BASE64_DECODER = JsonUtil.BASE64_DECODER;
private static final Base64.Encoder BASE64_ENCODER = JsonUtil.BASE64_ENCODER;

static void fromJson(Iterable<java.util.Map.Entry<String, Object>> json, MessageConsumerOptions obj) {
for (java.util.Map.Entry<String, Object> member : json) {
switch (member.getKey()) {
case "address":
if (member.getValue() instanceof String) {
obj.setAddress((String)member.getValue());
}
break;
case "localOnly":
if (member.getValue() instanceof Boolean) {
obj.setLocalOnly((Boolean)member.getValue());
}
break;
case "maxBufferedMessages":
if (member.getValue() instanceof Number) {
obj.setMaxBufferedMessages(((Number)member.getValue()).intValue());
}
break;
}
}
}

static void toJson(MessageConsumerOptions obj, JsonObject json) {
toJson(obj, json.getMap());
}

static void toJson(MessageConsumerOptions obj, java.util.Map<String, Object> json) {
if (obj.getAddress() != null) {
json.put("address", obj.getAddress());
}
json.put("localOnly", obj.isLocalOnly());
json.put("maxBufferedMessages", obj.getMaxBufferedMessages());
}
}
22 changes: 22 additions & 0 deletions src/main/java/io/vertx/core/eventbus/EventBus.java
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,28 @@ default <T> EventBus request(String address, @Nullable Object message, DeliveryO
@Fluent
EventBus publish(String address, @Nullable Object message, DeliveryOptions options);

/**
* Create a message consumer against the specified options address.
* <p>
* The returned consumer is not yet registered
* at the address, registration will be effective when {@link MessageConsumer#handler(io.vertx.core.Handler)}
* is called.
*
* @param options the consumer options
* @return the event bus message consumer
*/
<T> MessageConsumer<T> consumer(MessageConsumerOptions options);

/**
* Create a consumer and register it against the specified options address.
*
* @param options the consumer options
* @param handler the handler that will process the received messages
*
* @return the event bus message consumer
*/
<T> MessageConsumer<T> consumer(MessageConsumerOptions options, Handler<Message<T>> handler);

/**
* Create a message consumer against the specified address.
* <p>
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/io/vertx/core/eventbus/MessageConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,9 @@ public interface MessageConsumer<T> extends ReadStream<Message<T>> {
*
* @param maxBufferedMessages the maximum number of messages that can be buffered
* @return this registration
* @deprecated for removal, instead use {@link MessageConsumerOptions#setMaxBufferedMessages(int)}
*/
@Deprecated()
MessageConsumer<T> setMaxBufferedMessages(int maxBufferedMessages);

/**
Expand Down
100 changes: 100 additions & 0 deletions src/main/java/io/vertx/core/eventbus/MessageConsumerOptions.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package io.vertx.core.eventbus;

import io.vertx.codegen.annotations.DataObject;
import io.vertx.codegen.json.annotations.JsonGen;
import io.vertx.core.impl.Arguments;
import io.vertx.core.json.JsonObject;

@DataObject
@JsonGen(publicConverter = false)
public class MessageConsumerOptions {

public static final int DEFAULT_MAX_BUFFERED_MESSAGES = 1000;
public static final boolean DEFAULT_LOCAL_ONLY = false;

private String address;
private boolean localOnly;
private int maxBufferedMessages;

public MessageConsumerOptions() {
maxBufferedMessages = DEFAULT_MAX_BUFFERED_MESSAGES;
localOnly = DEFAULT_LOCAL_ONLY;
}

public MessageConsumerOptions(MessageConsumerOptions options) {
this();
maxBufferedMessages = options.getMaxBufferedMessages();
localOnly = options.isLocalOnly();
address = options.getAddress();
}

public MessageConsumerOptions(JsonObject json) {
this();
MessageConsumerOptionsConverter.fromJson(json, this);
}

/**
* @return the address the event-bus will register the consumer at
*/
public String getAddress() {
return address;
}

/**
* Set the address the event-bus will register the consumer at.
*
* @param address the consumer address
* @return this options
*/
public MessageConsumerOptions setAddress(String address) {
this.address = address;
return this;
}

/**
* @return whether the consumer is local only
*/
public boolean isLocalOnly() {
return localOnly;
}

/**
* Set whether the consumer is local only.
*
* @param localOnly whether the consumer is local only
* @return this options
*/
public MessageConsumerOptions setLocalOnly(boolean localOnly) {
this.localOnly = localOnly;
return this;
}

/**
* @return the maximum number of messages that can be buffered when this stream is paused
*/
public int getMaxBufferedMessages() {
return maxBufferedMessages;
}

/**
* Set the number of messages this registration will buffer when this stream is paused. The default
* value is <code>1000</code>.
* <p>
* When a new value is set, buffered messages may be discarded to reach the new value. The most recent
* messages will be kept.
*
* @param maxBufferedMessages the maximum number of messages that can be buffered
* @return this options
*/
public MessageConsumerOptions setMaxBufferedMessages(int maxBufferedMessages) {
Arguments.require(maxBufferedMessages >= 0, "Max buffered messages cannot be negative");
this.maxBufferedMessages = maxBufferedMessages;
return this;
}

public JsonObject toJson() {
JsonObject json = new JsonObject();
MessageConsumerOptionsConverter.toJson(this, json);
return json;
}
}
21 changes: 19 additions & 2 deletions src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

import io.vertx.core.*;
import io.vertx.core.eventbus.*;
import io.vertx.core.impl.Arguments;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.impl.utils.ConcurrentCyclicSequence;
Expand Down Expand Up @@ -165,11 +166,27 @@ public EventBus publish(String address, Object message, DeliveryOptions options)
return this;
}

@Override
public <T> MessageConsumer<T> consumer(MessageConsumerOptions options) {
checkStarted();
String address = options.getAddress();
Arguments.require(options.getAddress() != null, "Consumer address must not be null");
return new MessageConsumerImpl<>(vertx, vertx.getOrCreateContext(), this, address, options.isLocalOnly(), options.getMaxBufferedMessages());
}

@Override
public <T> MessageConsumer<T> consumer(MessageConsumerOptions options, Handler<Message<T>> handler) {
Objects.requireNonNull(handler, "handler");
MessageConsumer<T> consumer = consumer(options);
consumer.handler(handler);
return consumer;
}

@Override
public <T> MessageConsumer<T> consumer(String address) {
checkStarted();
Objects.requireNonNull(address, "address");
return new MessageConsumerImpl<>(vertx, vertx.getOrCreateContext(), this, address, false);
return new MessageConsumerImpl<>(vertx, vertx.getOrCreateContext(), this, address, false, MessageConsumerOptions.DEFAULT_MAX_BUFFERED_MESSAGES);
}

@Override
Expand All @@ -184,7 +201,7 @@ public <T> MessageConsumer<T> consumer(String address, Handler<Message<T>> handl
public <T> MessageConsumer<T> localConsumer(String address) {
checkStarted();
Objects.requireNonNull(address, "address");
return new MessageConsumerImpl<>(vertx, vertx.getOrCreateContext(), this, address, true);
return new MessageConsumerImpl<>(vertx, vertx.getOrCreateContext(), this, address, true, MessageConsumerOptions.DEFAULT_MAX_BUFFERED_MESSAGES);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,29 +33,26 @@ public class MessageConsumerImpl<T> extends HandlerRegistration<T> implements Me

private static final Logger log = LoggerFactory.getLogger(MessageConsumerImpl.class);

private static final int DEFAULT_MAX_BUFFERED_MESSAGES = 1000;

private final Vertx vertx;
private final ContextInternal context;
private final EventBusImpl eventBus;
private final String address;
private final boolean localOnly;
private Handler<Message<T>> handler;
private Handler<AsyncResult<Void>> completionHandler;
private Handler<Void> endHandler;
private Handler<Message<T>> discardHandler;
private int maxBufferedMessages = DEFAULT_MAX_BUFFERED_MESSAGES;
private int maxBufferedMessages;
private Queue<Message<T>> pending = new ArrayDeque<>(8);
private long demand = Long.MAX_VALUE;
private Promise<Void> result;

MessageConsumerImpl(Vertx vertx, ContextInternal context, EventBusImpl eventBus, String address, boolean localOnly) {
MessageConsumerImpl(Vertx vertx, ContextInternal context, EventBusImpl eventBus, String address, boolean localOnly, int maxBufferedMessages) {
super(context, eventBus, address, false);
this.vertx = vertx;
this.context = context;
this.eventBus = eventBus;
this.address = address;
this.localOnly = localOnly;
this.maxBufferedMessages = maxBufferedMessages;
}

@Override
Expand Down
4 changes: 2 additions & 2 deletions src/test/java/io/vertx/core/eventbus/LocalEventBusTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@ public void testArgumentValidation() throws Exception {
assertNullPointerException(() -> eb.publish(null, ""));
assertNullPointerException(() -> eb.publish(null, "", new DeliveryOptions()));
assertNullPointerException(() -> eb.publish("", "", null));
assertNullPointerException(() -> eb.consumer(null));
assertNullPointerException(() -> eb.consumer(null, msg -> {}));
assertNullPointerException(() -> eb.consumer((String) null));
assertNullPointerException(() -> eb.consumer((String) null, msg -> {}));
assertNullPointerException(() -> eb.consumer(ADDRESS1, null));
assertNullPointerException(() -> eb.localConsumer(null));
assertNullPointerException(() -> eb.localConsumer(null, msg -> {}));
Expand Down

0 comments on commit 11c5e2f

Please sign in to comment.