Skip to content

Commit

Permalink
Basic stream impl
Browse files Browse the repository at this point in the history
  • Loading branch information
vietj committed May 30, 2023
1 parent 5aeb871 commit 6180fcc
Show file tree
Hide file tree
Showing 13 changed files with 414 additions and 4 deletions.
5 changes: 4 additions & 1 deletion src/main/java/io/vertx/core/eventbus/EventBus.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import io.vertx.codegen.annotations.GenIgnore;
import io.vertx.codegen.annotations.Nullable;
import io.vertx.codegen.annotations.VertxGen;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.eventbus.impl.DefaultSerializableChecker;
Expand Down Expand Up @@ -199,6 +198,10 @@ default <T> Future<Message<T>> request(String address, @Nullable Object message)
*/
<T> MessageProducer<T> publisher(String address, DeliveryOptions options);

Future<Void> bindStream(String address, Handler<MessageStream> handler);

Future<MessageStream> connectStream(String address);

/**
* Register a message codec.
* <p>
Expand Down
27 changes: 27 additions & 0 deletions src/main/java/io/vertx/core/eventbus/MessageStream.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright (c) 2011-2021 Contributors to the Eclipse Foundation
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
* which is available at https://www.apache.org/licenses/LICENSE-2.0.
*
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
*/
package io.vertx.core.eventbus;

import io.vertx.codegen.annotations.VertxGen;
import io.vertx.core.Handler;

@VertxGen
public interface MessageStream {

void handler(Handler<Message<String>> handler);

void endHandler(Handler<Void> handler);

void write(String msg);

void end();

}
51 changes: 51 additions & 0 deletions src/main/java/io/vertx/core/eventbus/impl/ClientStream.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package io.vertx.core.eventbus.impl;

import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.eventbus.Message;
import io.vertx.core.eventbus.MessageStream;
import io.vertx.core.impl.ContextInternal;

import java.util.concurrent.TimeoutException;

class ClientStream extends StreamBase implements Handler<Long> {

private final Promise<MessageStream> promise2;
private final long timeoutID;

public ClientStream(EventBusImpl eventBus, String sourceAddress, ContextInternal ctx, Promise<MessageStream> promise2) {
super(sourceAddress, ctx, eventBus, sourceAddress, true);
this.promise2 = promise2;
this.timeoutID = ctx.setTimer(3_000, this);
}

@Override
public void handle(Long event) {
unregister();
promise2.fail(new TimeoutException());
}

@Override
protected boolean doReceive(Message msg) {
if (msg.body() instanceof SynFrame) {
if (context.owner().cancelTimer(timeoutID)) {
return true;
}
base = (MessageImpl) msg;
SynFrame syn = (SynFrame) msg.body();
remoteAddress = syn.src;
promise2.complete(this);
return true;
} else {
if (base != null) {
return super.doReceive(msg);
} else {
if (context.owner().cancelTimer(timeoutID)) {
unregister();
promise2.fail(new IllegalStateException());
}
return true;
}
}
}
}
6 changes: 5 additions & 1 deletion src/main/java/io/vertx/core/eventbus/impl/CodecManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public CodecManager() {
this.systemCodecs = codecs(NULL_MESSAGE_CODEC, PING_MESSAGE_CODEC, STRING_MESSAGE_CODEC, BUFFER_MESSAGE_CODEC, JSON_OBJECT_MESSAGE_CODEC, JSON_ARRAY_MESSAGE_CODEC,
BYTE_ARRAY_MESSAGE_CODEC, INT_MESSAGE_CODEC, LONG_MESSAGE_CODEC, FLOAT_MESSAGE_CODEC, DOUBLE_MESSAGE_CODEC,
BOOLEAN_MESSAGE_CODEC, SHORT_MESSAGE_CODEC, CHAR_MESSAGE_CODEC, BYTE_MESSAGE_CODEC, REPLY_EXCEPTION_MESSAGE_CODEC,
clusterSerializableCodec, serializableCodec);
clusterSerializableCodec, serializableCodec, SynFrame.CODEC, FinFrame.CODEC);
}

public MessageCodec lookupCodec(Object body, String codecName, boolean local) {
Expand Down Expand Up @@ -98,6 +98,10 @@ public MessageCodec lookupCodec(Object body, String codecName, boolean local) {
codec = CHAR_MESSAGE_CODEC;
} else if (body instanceof Byte) {
codec = BYTE_MESSAGE_CODEC;
} else if (body instanceof SynFrame) {
codec = SynFrame.CODEC;
} else if (body instanceof FinFrame) {
codec = FinFrame.CODEC;
} else if (body instanceof ReplyException) {
codec = defaultCodecMap.get(body.getClass());
if (codec == null) {
Expand Down
27 changes: 27 additions & 0 deletions src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,33 @@ public <T> MessageProducer<T> publisher(String address, DeliveryOptions options)
return new MessageProducerImpl<>(vertx, address, false, options);
}

@Override
public Future<Void> bindStream(String address, Handler<MessageStream> handler) {
ContextInternal ctx = vertx.getOrCreateContext();
HandlerRegistration reg = new StreamServer(this, ctx, address, handler);
Promise<Void> promise = ctx.promise();
reg.register(true, false, promise);
return promise.future();
}

@Override
public Future<MessageStream> connectStream(String address) {
ContextInternal ctx = vertx.getOrCreateContext();
String sourceAddress = generateReplyAddress();
Promise<MessageStream> promise2 = ctx.promise();
StreamBase reg = new ClientStream(this, sourceAddress, ctx, promise2);
Promise<Void> promise = ctx.promise();
reg.register(false, false, promise);
promise.future().onComplete(ar -> {
if (ar.succeeded()) {
MessageImpl msg = createMessage(true, address, MultiMap.caseInsensitiveMultiMap(), new SynFrame(sourceAddress, address), null);
msg.setReplyAddress(sourceAddress);
sendOrPub(ctx, msg, new DeliveryOptions(), ctx.promise());
}
});
return promise2.future();
}

@Override
public EventBus publish(String address, Object message) {
return publish(address, message, new DeliveryOptions());
Expand Down
47 changes: 47 additions & 0 deletions src/main/java/io/vertx/core/eventbus/impl/FinFrame.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package io.vertx.core.eventbus.impl;

import io.netty.util.CharsetUtil;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.eventbus.MessageCodec;

class FinFrame implements Frame {

public static final MessageCodec<FinFrame, FinFrame> CODEC = new MessageCodec<>() {
@Override
public void encodeToWire(Buffer buffer, FinFrame synFrame) {
byte[] strBytes = synFrame.addr.getBytes(CharsetUtil.UTF_8);
buffer.appendInt(strBytes.length);
buffer.appendBytes(strBytes);
}

@Override
public FinFrame decodeFromWire(int pos, Buffer buffer) {
int length = buffer.getInt(pos);
pos += 4;
byte[] bytes = buffer.getBytes(pos, pos + length);
String src = new String(bytes, CharsetUtil.UTF_8);
return new FinFrame(src);
}

@Override
public FinFrame transform(FinFrame finFrame) {
return finFrame;
}

@Override
public String name() {
return "frame.fin";
}

@Override
public byte systemCodecID() {
return 19;
}
};

final String addr;

public FinFrame(String addr) {
this.addr = addr;
}
}
4 changes: 4 additions & 0 deletions src/main/java/io/vertx/core/eventbus/impl/Frame.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
package io.vertx.core.eventbus.impl;

public interface Frame {
}
2 changes: 1 addition & 1 deletion src/main/java/io/vertx/core/eventbus/impl/MessageImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public <R> Future<Message<R>> replyAndRequest(Object message, DeliveryOptions op
}
}

protected MessageImpl createReply(Object message, DeliveryOptions options) {
public MessageImpl createReply(Object message, DeliveryOptions options) {
MessageImpl reply = bus.createMessage(true, replyAddress, options.getHeaders(), message, options.getCodecName());
reply.trace = trace;
return reply;
Expand Down
77 changes: 77 additions & 0 deletions src/main/java/io/vertx/core/eventbus/impl/StreamBase.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package io.vertx.core.eventbus.impl;

import io.vertx.core.Handler;
import io.vertx.core.MultiMap;
import io.vertx.core.eventbus.DeliveryOptions;
import io.vertx.core.eventbus.Message;
import io.vertx.core.eventbus.MessageStream;
import io.vertx.core.impl.ContextInternal;

class StreamBase extends HandlerRegistration implements MessageStream {

MessageImpl base;
private Handler<Message<String>> handler;
private Handler<Void> endHandler;
final String localAddress;
String remoteAddress;
private boolean halfClosed;

StreamBase(String localAddress, ContextInternal context, EventBusImpl bus, String address, boolean src) {
super(context, bus, address, src);
this.localAddress = localAddress;
}

@Override
protected boolean doReceive(Message msg) {
if (msg.body() instanceof FinFrame) {
Handler<Void> h = endHandler;
if (h != null) {
h.handle(null);
}
if (halfClosed) {
unregister();
} else {
halfClosed = true;
}
return true;
} else {
Handler<Message<String>> h = handler;
if (h != null) {
h.handle(msg);
}
return true;
}
}

@Override
protected void dispatch(Message msg, ContextInternal context, Handler handler) {

}

@Override
public void handler(Handler<Message<String>> handler) {
this.handler = handler;
}

@Override
public void endHandler(Handler<Void> handler) {
this.endHandler = handler;
}

@Override
public void write(String body) {
MessageImpl msg = base.createReply(body, new DeliveryOptions());
bus.sendOrPub(context, msg, new DeliveryOptions(), context.promise());
}

@Override
public void end() {
MessageImpl msg = base.createReply(new FinFrame(remoteAddress), new DeliveryOptions());
bus.sendOrPub(context, msg, new DeliveryOptions(), context.promise());
if (halfClosed) {
unregister();
} else {
halfClosed = true;
}
}
}
46 changes: 46 additions & 0 deletions src/main/java/io/vertx/core/eventbus/impl/StreamServer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package io.vertx.core.eventbus.impl;

import io.vertx.core.Handler;
import io.vertx.core.MultiMap;
import io.vertx.core.eventbus.DeliveryOptions;
import io.vertx.core.eventbus.Message;
import io.vertx.core.eventbus.MessageStream;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.future.PromiseInternal;

class StreamServer extends HandlerRegistration {
private final EventBusImpl eventBus;
private final Handler<MessageStream> handler;

public StreamServer(EventBusImpl eventBus, ContextInternal ctx, String address, Handler<MessageStream> handler) {
super(ctx, eventBus, address, false);
this.eventBus = eventBus;
this.handler = handler;
}

@Override
protected boolean doReceive(Message msg) {
if (msg.body() instanceof SynFrame) {
SynFrame syn = (SynFrame) msg.body();
String localAddress = eventBus.generateReplyAddress();
StreamBase ss = new StreamBase(localAddress, context, eventBus, localAddress, false);
ss.remoteAddress = syn.src;
ss.base = (MessageImpl) msg;
PromiseInternal<Void> p = context.promise();
ss.register(false, false, p);
p.onComplete(ar -> {
if (ar.succeeded()) {
MessageImpl reply = ((MessageImpl)msg).createReply(new SynFrame(localAddress, syn.src), new DeliveryOptions());
reply.setReplyAddress(localAddress);
eventBus.sendOrPub(context, reply, new DeliveryOptions(), context.promise());
handler.handle(ss);
}
});
}
return true;
}

@Override
protected void dispatch(Message msg, ContextInternal context, Handler handler) {
}
}
58 changes: 58 additions & 0 deletions src/main/java/io/vertx/core/eventbus/impl/SynFrame.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package io.vertx.core.eventbus.impl;

import io.netty.util.CharsetUtil;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.eventbus.MessageCodec;

public class SynFrame implements Frame {

public static final MessageCodec<SynFrame, SynFrame> CODEC = new MessageCodec<>() {
@Override
public void encodeToWire(Buffer buffer, SynFrame synFrame) {
byte[] strBytes = synFrame.src.getBytes(CharsetUtil.UTF_8);
buffer.appendInt(strBytes.length);
buffer.appendBytes(strBytes);
byte[] dstBytes = synFrame.dst.getBytes(CharsetUtil.UTF_8);
buffer.appendInt(dstBytes.length);
buffer.appendBytes(dstBytes);
}

@Override
public SynFrame decodeFromWire(int pos, Buffer buffer) {
int length = buffer.getInt(pos);
pos += 4;
byte[] bytes = buffer.getBytes(pos, pos + length);
String src = new String(bytes, CharsetUtil.UTF_8);
pos += length;
length = buffer.getInt(pos);
pos += 4;
bytes = buffer.getBytes(pos, pos + length);
String dst = new String(bytes, CharsetUtil.UTF_8);
pos += length;
return new SynFrame(src, dst);
}

@Override
public SynFrame transform(SynFrame synFrame) {
return synFrame;
}

@Override
public String name() {
return "frame.syn";
}

@Override
public byte systemCodecID() {
return 18;
}
};

final String src;
final String dst;

public SynFrame(String src, String dst) {
this.src = src;
this.dst = dst;
}
}
Loading

0 comments on commit 6180fcc

Please sign in to comment.