Skip to content

Commit

Permalink
RATIS-1880. Optimize Stream client&server side channel pipeline Create
Browse files Browse the repository at this point in the history
  • Loading branch information
guohao-rosicky committed Aug 28, 2023
1 parent b06f82a commit 48ae530
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -256,9 +256,16 @@ public NettyClientStreamRpc(RaftPeer server, TlsConf tlsConf, RaftProperties pro

final InetSocketAddress address = NetUtils.createSocketAddr(server.getDataStreamAddress());
final SslContext sslContext = NettyUtils.buildSslContextForClient(tlsConf);

final MessageToMessageEncoder<DataStreamRequestByteBuffer> encoder = newEncoder();
final MessageToMessageEncoder<DataStreamRequestFilePositionCount>
filePositionCountEncoder = newEncoderDataStreamRequestFilePositionCount();
final MessageToMessageEncoder<ByteBuffer> byteBufferEncoder = newEncoderByteBuffer();
final ByteToMessageDecoder decoder = newDecoder();
this.connection = new Connection(address,
new WorkerGroupGetter(properties),
() -> newChannelInitializer(address, sslContext, getClientHandler()));
() -> newChannelInitializer(address, sslContext, encoder, filePositionCountEncoder, byteBufferEncoder,
decoder, getClientHandler()));
}

private ChannelInboundHandler getClientHandler(){
Expand Down Expand Up @@ -305,18 +312,22 @@ public void channelInactive(ChannelHandlerContext ctx) {
}

static ChannelInitializer<SocketChannel> newChannelInitializer(
InetSocketAddress address, SslContext sslContext, ChannelInboundHandler handler) {
InetSocketAddress address, SslContext sslContext,
MessageToMessageEncoder<DataStreamRequestByteBuffer> encoder,
MessageToMessageEncoder<DataStreamRequestFilePositionCount> filePositionCountEncoder,
MessageToMessageEncoder<ByteBuffer> byteBufferEncoder,
ByteToMessageDecoder decoder, ChannelInboundHandler handler) {
return new ChannelInitializer<SocketChannel>(){
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
if (sslContext != null) {
p.addLast("ssl", sslContext.newHandler(ch.alloc(), address.getHostName(), address.getPort()));
}
p.addLast(newEncoder());
p.addLast(newEncoderDataStreamRequestFilePositionCount());
p.addLast(newEncoderByteBuffer());
p.addLast(newDecoder());
p.addLast(encoder);
p.addLast(filePositionCountEncoder);
p.addLast(byteBufferEncoder);
p.addLast(decoder);
p.addLast(handler);
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,13 @@ void close() {

private final TimeDuration channelInactiveGracePeriod;








public NettyServerStreamRpc(RaftServer server, Parameters parameters) {
this.name = server.getId() + "-" + JavaUtils.getClassSimpleName(getClass());
this.metrics = new NettyServerStreamRpcMetrics(this.name);
Expand All @@ -182,12 +189,14 @@ public NettyServerStreamRpc(RaftServer server, Parameters parameters) {
final int port = NettyConfigKeys.DataStream.port(properties);
InetSocketAddress socketAddress =
host == null || host.isEmpty() ? new InetSocketAddress(port) : new InetSocketAddress(host, port);
final ByteToMessageDecoder decoder = newDecoder();
final MessageToMessageEncoder<DataStreamReplyByteBuffer> encoder = newEncoder();
this.channelFuture = new ServerBootstrap()
.group(bossGroup, workerGroup)
.channel(bossGroup instanceof EpollEventLoopGroup ?
EpollServerSocketChannel.class : NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(newChannelInitializer(sslContext))
.childHandler(newChannelInitializer(sslContext, decoder, encoder))
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.TCP_NODELAY, true)
.bind(socketAddress);
Expand Down Expand Up @@ -253,16 +262,17 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable throwable) {
};
}

private ChannelInitializer<SocketChannel> newChannelInitializer(SslContext sslContext){
private ChannelInitializer<SocketChannel> newChannelInitializer(SslContext sslContext,
ByteToMessageDecoder decoder, MessageToMessageEncoder<DataStreamReplyByteBuffer> encoder){
return new ChannelInitializer<SocketChannel>(){
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
if (sslContext != null) {
p.addLast("ssl", sslContext.newHandler(ch.alloc()));
}
p.addLast(newDecoder());
p.addLast(newEncoder());
p.addLast(decoder);
p.addLast(encoder);
p.addLast(newChannelInboundHandlerAdapter());
}
};
Expand Down

0 comments on commit 48ae530

Please sign in to comment.