diff --git a/src/main/java/io/vertx/core/eventbus/EventBus.java b/src/main/java/io/vertx/core/eventbus/EventBus.java index 140808e5c11..3d718093149 100644 --- a/src/main/java/io/vertx/core/eventbus/EventBus.java +++ b/src/main/java/io/vertx/core/eventbus/EventBus.java @@ -15,7 +15,6 @@ import io.vertx.codegen.annotations.GenIgnore; import io.vertx.codegen.annotations.Nullable; import io.vertx.codegen.annotations.VertxGen; -import io.vertx.core.AsyncResult; import io.vertx.core.Future; import io.vertx.core.Handler; import io.vertx.core.eventbus.impl.DefaultSerializableChecker; @@ -199,6 +198,10 @@ default Future> request(String address, @Nullable Object message) */ MessageProducer publisher(String address, DeliveryOptions options); + Future bindStream(String address, Handler handler); + + Future connectStream(String address); + /** * Register a message codec. *

diff --git a/src/main/java/io/vertx/core/eventbus/MessageStream.java b/src/main/java/io/vertx/core/eventbus/MessageStream.java new file mode 100644 index 00000000000..c1d2952fd9c --- /dev/null +++ b/src/main/java/io/vertx/core/eventbus/MessageStream.java @@ -0,0 +1,27 @@ +/* + * Copyright (c) 2011-2021 Contributors to the Eclipse Foundation + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 + * which is available at https://www.apache.org/licenses/LICENSE-2.0. + * + * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 + */ +package io.vertx.core.eventbus; + +import io.vertx.codegen.annotations.VertxGen; +import io.vertx.core.Handler; + +@VertxGen +public interface MessageStream { + + void handler(Handler> handler); + + void endHandler(Handler handler); + + void write(String msg); + + void end(); + +} diff --git a/src/main/java/io/vertx/core/eventbus/impl/ClientStream.java b/src/main/java/io/vertx/core/eventbus/impl/ClientStream.java new file mode 100644 index 00000000000..8e29ee3ebf6 --- /dev/null +++ b/src/main/java/io/vertx/core/eventbus/impl/ClientStream.java @@ -0,0 +1,51 @@ +package io.vertx.core.eventbus.impl; + +import io.vertx.core.Handler; +import io.vertx.core.Promise; +import io.vertx.core.eventbus.Message; +import io.vertx.core.eventbus.MessageStream; +import io.vertx.core.impl.ContextInternal; + +import java.util.concurrent.TimeoutException; + +class ClientStream extends StreamBase implements Handler { + + private final Promise promise2; + private final long timeoutID; + + public ClientStream(EventBusImpl eventBus, String sourceAddress, ContextInternal ctx, Promise promise2) { + super(sourceAddress, ctx, eventBus, sourceAddress, true); + this.promise2 = promise2; + this.timeoutID = ctx.setTimer(3_000, this); + } + + @Override + public void handle(Long event) { + unregister(); + promise2.fail(new TimeoutException()); + } + + @Override + protected boolean doReceive(Message msg) { + if (msg.body() instanceof SynFrame) { + if (context.owner().cancelTimer(timeoutID)) { + return true; + } + base = (MessageImpl) msg; + SynFrame syn = (SynFrame) msg.body(); + remoteAddress = syn.src; + promise2.complete(this); + return true; + } else { + if (base != null) { + return super.doReceive(msg); + } else { + if (context.owner().cancelTimer(timeoutID)) { + unregister(); + promise2.fail(new IllegalStateException()); + } + return true; + } + } + } +} diff --git a/src/main/java/io/vertx/core/eventbus/impl/CodecManager.java b/src/main/java/io/vertx/core/eventbus/impl/CodecManager.java index e8d501b4331..2aa51ef471f 100644 --- a/src/main/java/io/vertx/core/eventbus/impl/CodecManager.java +++ b/src/main/java/io/vertx/core/eventbus/impl/CodecManager.java @@ -63,7 +63,7 @@ public CodecManager() { this.systemCodecs = codecs(NULL_MESSAGE_CODEC, PING_MESSAGE_CODEC, STRING_MESSAGE_CODEC, BUFFER_MESSAGE_CODEC, JSON_OBJECT_MESSAGE_CODEC, JSON_ARRAY_MESSAGE_CODEC, BYTE_ARRAY_MESSAGE_CODEC, INT_MESSAGE_CODEC, LONG_MESSAGE_CODEC, FLOAT_MESSAGE_CODEC, DOUBLE_MESSAGE_CODEC, BOOLEAN_MESSAGE_CODEC, SHORT_MESSAGE_CODEC, CHAR_MESSAGE_CODEC, BYTE_MESSAGE_CODEC, REPLY_EXCEPTION_MESSAGE_CODEC, - clusterSerializableCodec, serializableCodec); + clusterSerializableCodec, serializableCodec, SynFrame.CODEC, FinFrame.CODEC); } public MessageCodec lookupCodec(Object body, String codecName, boolean local) { @@ -98,6 +98,10 @@ public MessageCodec lookupCodec(Object body, String codecName, boolean local) { codec = CHAR_MESSAGE_CODEC; } else if (body instanceof Byte) { codec = BYTE_MESSAGE_CODEC; + } else if (body instanceof SynFrame) { + codec = SynFrame.CODEC; + } else if (body instanceof FinFrame) { + codec = FinFrame.CODEC; } else if (body instanceof ReplyException) { codec = defaultCodecMap.get(body.getClass()); if (codec == null) { diff --git a/src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java b/src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java index d9194e559b2..815aa2381ac 100644 --- a/src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java +++ b/src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java @@ -155,6 +155,33 @@ public MessageProducer publisher(String address, DeliveryOptions options) return new MessageProducerImpl<>(vertx, address, false, options); } + @Override + public Future bindStream(String address, Handler handler) { + ContextInternal ctx = vertx.getOrCreateContext(); + HandlerRegistration reg = new StreamServer(this, ctx, address, handler); + Promise promise = ctx.promise(); + reg.register(true, false, promise); + return promise.future(); + } + + @Override + public Future connectStream(String address) { + ContextInternal ctx = vertx.getOrCreateContext(); + String sourceAddress = generateReplyAddress(); + Promise promise2 = ctx.promise(); + StreamBase reg = new ClientStream(this, sourceAddress, ctx, promise2); + Promise promise = ctx.promise(); + reg.register(false, false, promise); + promise.future().onComplete(ar -> { + if (ar.succeeded()) { + MessageImpl msg = createMessage(true, address, MultiMap.caseInsensitiveMultiMap(), new SynFrame(sourceAddress, address), null); + msg.setReplyAddress(sourceAddress); + sendOrPub(ctx, msg, new DeliveryOptions(), ctx.promise()); + } + }); + return promise2.future(); + } + @Override public EventBus publish(String address, Object message) { return publish(address, message, new DeliveryOptions()); diff --git a/src/main/java/io/vertx/core/eventbus/impl/FinFrame.java b/src/main/java/io/vertx/core/eventbus/impl/FinFrame.java new file mode 100644 index 00000000000..08b9b033d82 --- /dev/null +++ b/src/main/java/io/vertx/core/eventbus/impl/FinFrame.java @@ -0,0 +1,47 @@ +package io.vertx.core.eventbus.impl; + +import io.netty.util.CharsetUtil; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.eventbus.MessageCodec; + +class FinFrame implements Frame { + + public static final MessageCodec CODEC = new MessageCodec<>() { + @Override + public void encodeToWire(Buffer buffer, FinFrame synFrame) { + byte[] strBytes = synFrame.addr.getBytes(CharsetUtil.UTF_8); + buffer.appendInt(strBytes.length); + buffer.appendBytes(strBytes); + } + + @Override + public FinFrame decodeFromWire(int pos, Buffer buffer) { + int length = buffer.getInt(pos); + pos += 4; + byte[] bytes = buffer.getBytes(pos, pos + length); + String src = new String(bytes, CharsetUtil.UTF_8); + return new FinFrame(src); + } + + @Override + public FinFrame transform(FinFrame finFrame) { + return finFrame; + } + + @Override + public String name() { + return "frame.fin"; + } + + @Override + public byte systemCodecID() { + return 19; + } + }; + + final String addr; + + public FinFrame(String addr) { + this.addr = addr; + } +} diff --git a/src/main/java/io/vertx/core/eventbus/impl/Frame.java b/src/main/java/io/vertx/core/eventbus/impl/Frame.java new file mode 100644 index 00000000000..bc6f42c5059 --- /dev/null +++ b/src/main/java/io/vertx/core/eventbus/impl/Frame.java @@ -0,0 +1,4 @@ +package io.vertx.core.eventbus.impl; + +public interface Frame { +} diff --git a/src/main/java/io/vertx/core/eventbus/impl/MessageImpl.java b/src/main/java/io/vertx/core/eventbus/impl/MessageImpl.java index c2919156d20..5a4b571e131 100644 --- a/src/main/java/io/vertx/core/eventbus/impl/MessageImpl.java +++ b/src/main/java/io/vertx/core/eventbus/impl/MessageImpl.java @@ -118,7 +118,7 @@ public Future> replyAndRequest(Object message, DeliveryOptions op } } - protected MessageImpl createReply(Object message, DeliveryOptions options) { + public MessageImpl createReply(Object message, DeliveryOptions options) { MessageImpl reply = bus.createMessage(true, replyAddress, options.getHeaders(), message, options.getCodecName()); reply.trace = trace; return reply; diff --git a/src/main/java/io/vertx/core/eventbus/impl/StreamBase.java b/src/main/java/io/vertx/core/eventbus/impl/StreamBase.java new file mode 100644 index 00000000000..ecaa2729e33 --- /dev/null +++ b/src/main/java/io/vertx/core/eventbus/impl/StreamBase.java @@ -0,0 +1,77 @@ +package io.vertx.core.eventbus.impl; + +import io.vertx.core.Handler; +import io.vertx.core.MultiMap; +import io.vertx.core.eventbus.DeliveryOptions; +import io.vertx.core.eventbus.Message; +import io.vertx.core.eventbus.MessageStream; +import io.vertx.core.impl.ContextInternal; + +class StreamBase extends HandlerRegistration implements MessageStream { + + MessageImpl base; + private Handler> handler; + private Handler endHandler; + final String localAddress; + String remoteAddress; + private boolean halfClosed; + + StreamBase(String localAddress, ContextInternal context, EventBusImpl bus, String address, boolean src) { + super(context, bus, address, src); + this.localAddress = localAddress; + } + + @Override + protected boolean doReceive(Message msg) { + if (msg.body() instanceof FinFrame) { + Handler h = endHandler; + if (h != null) { + h.handle(null); + } + if (halfClosed) { + unregister(); + } else { + halfClosed = true; + } + return true; + } else { + Handler> h = handler; + if (h != null) { + h.handle(msg); + } + return true; + } + } + + @Override + protected void dispatch(Message msg, ContextInternal context, Handler handler) { + + } + + @Override + public void handler(Handler> handler) { + this.handler = handler; + } + + @Override + public void endHandler(Handler handler) { + this.endHandler = handler; + } + + @Override + public void write(String body) { + MessageImpl msg = base.createReply(body, new DeliveryOptions()); + bus.sendOrPub(context, msg, new DeliveryOptions(), context.promise()); + } + + @Override + public void end() { + MessageImpl msg = base.createReply(new FinFrame(remoteAddress), new DeliveryOptions()); + bus.sendOrPub(context, msg, new DeliveryOptions(), context.promise()); + if (halfClosed) { + unregister(); + } else { + halfClosed = true; + } + } +} diff --git a/src/main/java/io/vertx/core/eventbus/impl/StreamServer.java b/src/main/java/io/vertx/core/eventbus/impl/StreamServer.java new file mode 100644 index 00000000000..d24665d24e1 --- /dev/null +++ b/src/main/java/io/vertx/core/eventbus/impl/StreamServer.java @@ -0,0 +1,46 @@ +package io.vertx.core.eventbus.impl; + +import io.vertx.core.Handler; +import io.vertx.core.MultiMap; +import io.vertx.core.eventbus.DeliveryOptions; +import io.vertx.core.eventbus.Message; +import io.vertx.core.eventbus.MessageStream; +import io.vertx.core.impl.ContextInternal; +import io.vertx.core.impl.future.PromiseInternal; + +class StreamServer extends HandlerRegistration { + private final EventBusImpl eventBus; + private final Handler handler; + + public StreamServer(EventBusImpl eventBus, ContextInternal ctx, String address, Handler handler) { + super(ctx, eventBus, address, false); + this.eventBus = eventBus; + this.handler = handler; + } + + @Override + protected boolean doReceive(Message msg) { + if (msg.body() instanceof SynFrame) { + SynFrame syn = (SynFrame) msg.body(); + String localAddress = eventBus.generateReplyAddress(); + StreamBase ss = new StreamBase(localAddress, context, eventBus, localAddress, false); + ss.remoteAddress = syn.src; + ss.base = (MessageImpl) msg; + PromiseInternal p = context.promise(); + ss.register(false, false, p); + p.onComplete(ar -> { + if (ar.succeeded()) { + MessageImpl reply = ((MessageImpl)msg).createReply(new SynFrame(localAddress, syn.src), new DeliveryOptions()); + reply.setReplyAddress(localAddress); + eventBus.sendOrPub(context, reply, new DeliveryOptions(), context.promise()); + handler.handle(ss); + } + }); + } + return true; + } + + @Override + protected void dispatch(Message msg, ContextInternal context, Handler handler) { + } +} diff --git a/src/main/java/io/vertx/core/eventbus/impl/SynFrame.java b/src/main/java/io/vertx/core/eventbus/impl/SynFrame.java new file mode 100644 index 00000000000..37f3cbaf27b --- /dev/null +++ b/src/main/java/io/vertx/core/eventbus/impl/SynFrame.java @@ -0,0 +1,58 @@ +package io.vertx.core.eventbus.impl; + +import io.netty.util.CharsetUtil; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.eventbus.MessageCodec; + +public class SynFrame implements Frame { + + public static final MessageCodec CODEC = new MessageCodec<>() { + @Override + public void encodeToWire(Buffer buffer, SynFrame synFrame) { + byte[] strBytes = synFrame.src.getBytes(CharsetUtil.UTF_8); + buffer.appendInt(strBytes.length); + buffer.appendBytes(strBytes); + byte[] dstBytes = synFrame.dst.getBytes(CharsetUtil.UTF_8); + buffer.appendInt(dstBytes.length); + buffer.appendBytes(dstBytes); + } + + @Override + public SynFrame decodeFromWire(int pos, Buffer buffer) { + int length = buffer.getInt(pos); + pos += 4; + byte[] bytes = buffer.getBytes(pos, pos + length); + String src = new String(bytes, CharsetUtil.UTF_8); + pos += length; + length = buffer.getInt(pos); + pos += 4; + bytes = buffer.getBytes(pos, pos + length); + String dst = new String(bytes, CharsetUtil.UTF_8); + pos += length; + return new SynFrame(src, dst); + } + + @Override + public SynFrame transform(SynFrame synFrame) { + return synFrame; + } + + @Override + public String name() { + return "frame.syn"; + } + + @Override + public byte systemCodecID() { + return 18; + } + }; + + final String src; + final String dst; + + public SynFrame(String src, String dst) { + this.src = src; + this.dst = dst; + } +} diff --git a/src/main/java/io/vertx/core/eventbus/impl/clustered/ClusteredMessage.java b/src/main/java/io/vertx/core/eventbus/impl/clustered/ClusteredMessage.java index f0815a42797..74f520c805b 100644 --- a/src/main/java/io/vertx/core/eventbus/impl/clustered/ClusteredMessage.java +++ b/src/main/java/io/vertx/core/eventbus/impl/clustered/ClusteredMessage.java @@ -67,7 +67,7 @@ protected ClusteredMessage(ClusteredMessage other) { } @Override - protected MessageImpl createReply(Object message, DeliveryOptions options) { + public MessageImpl createReply(Object message, DeliveryOptions options) { ClusteredMessage reply = (ClusteredMessage) super.createReply(message, options); reply.repliedTo = sender; return reply; diff --git a/src/test/java/io/vertx/core/eventbus/EventBusTestBase.java b/src/test/java/io/vertx/core/eventbus/EventBusTestBase.java index f2fe338f2cc..d293cf2bf58 100644 --- a/src/test/java/io/vertx/core/eventbus/EventBusTestBase.java +++ b/src/test/java/io/vertx/core/eventbus/EventBusTestBase.java @@ -28,6 +28,8 @@ import java.util.Map; import java.util.Objects; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import static io.vertx.core.eventbus.impl.CodecManager.STRING_MESSAGE_CODEC; @@ -695,6 +697,70 @@ protected void testPublish(T val, Consumer consumer) throws Exception { await(); } + @Test + public void testStream() throws Exception { + Vertx[] vertices = vertices(2); + CountDownLatch latch = new CountDownLatch(1); + vertices[1].eventBus().bindStream(ADDRESS1, stream -> { + stream.handler(msg -> { + assertEquals("ping", msg.body()); + stream.write(msg.body()); + }); + stream.endHandler(v -> { + stream.end(); + }); + }).onComplete(onSuccess(v -> { + latch.countDown(); + })); + awaitLatch(latch); + vertices[0].eventBus().connectStream(ADDRESS1).onComplete(onSuccess(stream -> { + stream.write("ping"); + stream.handler(msg -> { + assertEquals("ping", msg.body()); + stream.end(); + }); + stream.endHandler(v -> { + testComplete(); + }); + })); + await(); + } + + @Test + public void testNoSynAck() throws Exception { + waitFor(2); + Vertx[] vertices = vertices(2); + CountDownLatch latch = new CountDownLatch(1); + vertices[1].eventBus().consumer(ADDRESS1, msg -> { + complete(); + }).completion().onComplete(onSuccess(v -> latch.countDown())); + awaitLatch(latch); + vertices[0].eventBus().connectStream(ADDRESS1).onComplete(onFailure(err -> { + assertEquals(TimeoutException.class, err.getClass()); + complete(); + })); + await(); + } + + @Test + public void testIncorrectAck() throws Exception { + waitFor(2); + Vertx[] vertices = vertices(2); + CountDownLatch latch = new CountDownLatch(1); + AtomicReference senderAddr = new AtomicReference<>(); + vertices[1].eventBus().consumer(ADDRESS1, msg -> { + senderAddr.set(msg.replyAddress()); + msg.reply("incorrect"); + complete(); + }).completion().onComplete(onSuccess(v -> latch.countDown())); + awaitLatch(latch); + vertices[0].eventBus().connectStream(ADDRESS1).onComplete(onFailure(err -> { + assertEquals(IllegalStateException.class, err.getClass()); + complete(); + })); + await(); + } + public static class MySystemDecoder implements MessageCodec { @Override