From 48ae530aad6bffc2d8a893b1f89cb114803f1418 Mon Sep 17 00:00:00 2001 From: guohao1 Date: Mon, 28 Aug 2023 20:17:08 +0800 Subject: [PATCH] RATIS-1880. Optimize Stream client&server side channel pipeline Create --- .../netty/client/NettyClientStreamRpc.java | 23 ++++++++++++++----- .../netty/server/NettyServerStreamRpc.java | 18 +++++++++++---- 2 files changed, 31 insertions(+), 10 deletions(-) diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java index f815bcffe3..37de190c10 100644 --- a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java +++ b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java @@ -256,9 +256,16 @@ public NettyClientStreamRpc(RaftPeer server, TlsConf tlsConf, RaftProperties pro final InetSocketAddress address = NetUtils.createSocketAddr(server.getDataStreamAddress()); final SslContext sslContext = NettyUtils.buildSslContextForClient(tlsConf); + + final MessageToMessageEncoder encoder = newEncoder(); + final MessageToMessageEncoder + filePositionCountEncoder = newEncoderDataStreamRequestFilePositionCount(); + final MessageToMessageEncoder byteBufferEncoder = newEncoderByteBuffer(); + final ByteToMessageDecoder decoder = newDecoder(); this.connection = new Connection(address, new WorkerGroupGetter(properties), - () -> newChannelInitializer(address, sslContext, getClientHandler())); + () -> newChannelInitializer(address, sslContext, encoder, filePositionCountEncoder, byteBufferEncoder, + decoder, getClientHandler())); } private ChannelInboundHandler getClientHandler(){ @@ -305,7 +312,11 @@ public void channelInactive(ChannelHandlerContext ctx) { } static ChannelInitializer newChannelInitializer( - InetSocketAddress address, SslContext sslContext, ChannelInboundHandler handler) { + InetSocketAddress address, SslContext sslContext, + MessageToMessageEncoder encoder, + MessageToMessageEncoder filePositionCountEncoder, + MessageToMessageEncoder byteBufferEncoder, + ByteToMessageDecoder decoder, ChannelInboundHandler handler) { return new ChannelInitializer(){ @Override public void initChannel(SocketChannel ch) { @@ -313,10 +324,10 @@ public void initChannel(SocketChannel ch) { if (sslContext != null) { p.addLast("ssl", sslContext.newHandler(ch.alloc(), address.getHostName(), address.getPort())); } - p.addLast(newEncoder()); - p.addLast(newEncoderDataStreamRequestFilePositionCount()); - p.addLast(newEncoderByteBuffer()); - p.addLast(newDecoder()); + p.addLast(encoder); + p.addLast(filePositionCountEncoder); + p.addLast(byteBufferEncoder); + p.addLast(decoder); p.addLast(handler); } }; diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java index 8cb34d897b..dd29d6f369 100644 --- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java +++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java @@ -158,6 +158,13 @@ void close() { private final TimeDuration channelInactiveGracePeriod; + + + + + + + public NettyServerStreamRpc(RaftServer server, Parameters parameters) { this.name = server.getId() + "-" + JavaUtils.getClassSimpleName(getClass()); this.metrics = new NettyServerStreamRpcMetrics(this.name); @@ -182,12 +189,14 @@ public NettyServerStreamRpc(RaftServer server, Parameters parameters) { final int port = NettyConfigKeys.DataStream.port(properties); InetSocketAddress socketAddress = host == null || host.isEmpty() ? new InetSocketAddress(port) : new InetSocketAddress(host, port); + final ByteToMessageDecoder decoder = newDecoder(); + final MessageToMessageEncoder encoder = newEncoder(); this.channelFuture = new ServerBootstrap() .group(bossGroup, workerGroup) .channel(bossGroup instanceof EpollEventLoopGroup ? EpollServerSocketChannel.class : NioServerSocketChannel.class) .handler(new LoggingHandler(LogLevel.INFO)) - .childHandler(newChannelInitializer(sslContext)) + .childHandler(newChannelInitializer(sslContext, decoder, encoder)) .childOption(ChannelOption.SO_KEEPALIVE, true) .childOption(ChannelOption.TCP_NODELAY, true) .bind(socketAddress); @@ -253,7 +262,8 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable throwable) { }; } - private ChannelInitializer newChannelInitializer(SslContext sslContext){ + private ChannelInitializer newChannelInitializer(SslContext sslContext, + ByteToMessageDecoder decoder, MessageToMessageEncoder encoder){ return new ChannelInitializer(){ @Override public void initChannel(SocketChannel ch) { @@ -261,8 +271,8 @@ public void initChannel(SocketChannel ch) { if (sslContext != null) { p.addLast("ssl", sslContext.newHandler(ch.alloc())); } - p.addLast(newDecoder()); - p.addLast(newEncoder()); + p.addLast(decoder); + p.addLast(encoder); p.addLast(newChannelInboundHandlerAdapter()); } };