Skip to content

Commit

Permalink
#NettyDemo #增加Netty对序列化的支持
Browse files Browse the repository at this point in the history
  • Loading branch information
JMCuixy committed Aug 27, 2018
1 parent 41d395f commit a76af81
Show file tree
Hide file tree
Showing 11 changed files with 453 additions and 3 deletions.
5 changes: 5 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@
<artifactId>netty-all</artifactId>
<version>5.0.0.Alpha2</version>
</dependency>
<dependency>
<groupId>org.jboss.marshalling</groupId>
<artifactId>jboss-marshalling-serial</artifactId>
<version>2.0.0.Beta2</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package org.netty.demo.protocol;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.stream.ChunkedStream;
import io.netty.handler.stream.ChunkedWriteHandler;

import java.io.File;
import java.io.FileInputStream;

/**
* Created by XiuYin.Cui on 2018/8/27.
*/
public class ChunkedWriteHandlerInitializer extends ChannelInitializer<Channel> {
private final File file;
private final SslContext sslCtx;

public ChunkedWriteHandlerInitializer(File file, SslContext sslCtx) {
this.file = file;
this.sslCtx = sslCtx;
}


@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(
new SslHandler(sslCtx.newEngine(ch.alloc())),
//1、要使用你自己的 ChunkedInput 实现,请在 ChannelPipeline 中安装一个 ChunkedWriteHandler
//2、添加 ChunkedWriteHandler 以处理作为 ChunkedInput 传入的数据
new ChunkedWriteHandler(),
new WriteStreamHandler()
);

}


private final class WriteStreamHandler extends ChannelHandlerAdapter {

/**
* 当连接建立时,channelActive() 方法将使用 ChunkedInput 写文件数据
*
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
ctx.writeAndFlush(new ChunkedStream(new FileInputStream(file)));
}
}
}
26 changes: 23 additions & 3 deletions src/main/java/org/netty/demo/protocol/CmdHandlerInitializer.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.LineBasedFrameDecoder;

/**
Expand All @@ -13,6 +14,10 @@ public class CmdHandlerInitializer extends ChannelInitializer<Channel> {

@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(
new CmdDecoder(64 * 1024),
new CmdHandler()
);

}

Expand All @@ -37,6 +42,9 @@ public ByteBuf getArgs() {
}
}

/**
* 解码成 Cmd 对象
*/
public static final class CmdDecoder extends LineBasedFrameDecoder {

private byte SPACE = ' ';
Expand All @@ -48,12 +56,24 @@ public CmdDecoder(int maxLength) {
@Override
protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
//1、提取由行尾符序列分隔的桢
ByteBuf frame =(ByteBuf) super.decode(ctx, buffer);
ByteBuf frame = (ByteBuf) super.decode(ctx, buffer);
//如果输入没有桢,则返回null
if (frame == null){
if (frame == null) {
return null;
}
frame.indexOf(frame.readerIndex(), frame.writerIndex(), SPACE);
int index = frame.indexOf(frame.readerIndex(), frame.writerIndex(), SPACE);
return new Cmd(frame.slice(frame.readerIndex(), index), frame.slice(index + 1, frame.writerIndex()));
}
}

/**
* 业务ChannelHandler
*/
public static final class CmdHandler extends SimpleChannelInboundHandler<Cmd> {

@Override
protected void messageReceived(ChannelHandlerContext ctx, Cmd msg) throws Exception {
//Do something with Cmd
}
}
}
41 changes: 41 additions & 0 deletions src/main/java/org/netty/demo/protocol/FileRegionHandler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package org.netty.demo.protocol;

import io.netty.channel.*;

import java.io.FileInputStream;
import java.nio.channels.FileChannel;

/**
* Created by XiuYin.Cui on 2018/8/27.
*/
public class FileRegionHandler extends ChannelHandlerAdapter {


/**
* 这个示例只适用于文件内容的直接传输,不包括应用程序对数据的任何处理。在需要将数据
* 从文件系统复制到用户内存中时,可以使用 ChunkedWriteHandler, 它支持异步写大型数据
* 流,而又不会导致大量的内存消耗
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String file = "";
FileInputStream fileInputStream = new FileInputStream(file);
FileChannel channel = fileInputStream.getChannel();
//以该文件的完整长度创建一个新的 DefaultFileRegion
DefaultFileRegion defaultFileRegion = new DefaultFileRegion(channel, 0, file.length());
//发送该 DefaultFileRegion, 并注册一个 ChannelFutureListener
ctx.writeAndFlush(defaultFileRegion).addListener(
new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
Throwable cause = future.cause();
}
}
}
);
}
}
31 changes: 31 additions & 0 deletions src/main/java/org/netty/demo/protocol/LengthBasedInitializer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package org.netty.demo.protocol;

import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;

/**
* Created by XiuYin.Cui on 2018/8/27.
*/
public class LengthBasedInitializer extends ChannelInitializer<Channel> {


@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(
new LengthFieldBasedFrameDecoder(64 * 1024, 0, 8),
new FrameHandler()
);
}

public static final class FrameHandler extends SimpleChannelInboundHandler<ByteBuf> {

@Override
protected void messageReceived(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
//处理桢的数据
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
* Created by XiuYin.Cui on 2018/8/26.
*/
public class LineBasedHandlerInitializer extends ChannelInitializer<Channel> {

@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(
Expand Down
43 changes: 43 additions & 0 deletions src/main/java/org/netty/demo/protocol/MarshallingInitializer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package org.netty.demo.protocol;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.marshalling.MarshallerProvider;
import io.netty.handler.codec.marshalling.MarshallingDecoder;
import io.netty.handler.codec.marshalling.MarshallingEncoder;
import io.netty.handler.codec.marshalling.UnmarshallerProvider;

import java.io.Serializable;

/**
* Created by XiuYin.Cui on 2018/8/27.
*/
public class MarshallingInitializer extends ChannelInitializer<Channel> {
private final MarshallerProvider marshallerProvider;
private final UnmarshallerProvider unmarshallerProvider;

public MarshallingInitializer(MarshallerProvider marshallerProvider, UnmarshallerProvider unmarshallerProvider) {
this.marshallerProvider = marshallerProvider;
this.unmarshallerProvider = unmarshallerProvider;
}


@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(
new MarshallingDecoder(unmarshallerProvider),
new MarshallingEncoder(marshallerProvider),
new ObjectHandler()
);
}

public static final class ObjectHandler extends SimpleChannelInboundHandler<Serializable> {

@Override
protected void messageReceived(ChannelHandlerContext ctx, Serializable msg) throws Exception {

}
}
}
89 changes: 89 additions & 0 deletions src/main/java/org/netty/demo/protocol/serializable/Client.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package org.netty.demo.protocol.serializable;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.ReferenceCountUtil;

public class Client {


public void connect(int port, String host) throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new MyChannelHandler());
ChannelFuture f = b.connect(host, port).sync();
f.channel().closeFuture().sync();

} finally {
group.shutdownGracefully();
System.out.println("客户端优雅的释放了线程资源...");
}

}

/**
* 网络事件处理器
*/
private static final class MyChannelHandler extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
System.out.println("MyChannelHandler");
// 添加Jboss的序列化,编解码工具
ch.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
ch.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
// 处理网络IO
ch.pipeline().addLast(new ClientHandler());
}
}

/**
* 业务处理器
*/
public static final class ClientHandler extends ChannelHandlerAdapter {

// 客户端与服务端,连接成功的售后
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 发送消息
Message request1 = new Message("666");
ctx.writeAndFlush(request1).addListener(new ChannelFutureListener() {

@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
System.out.println("成功发送到服务端消息");
} else {
System.out.println("失败服务端消息失败:" + future.cause().getMessage());
future.cause().printStackTrace();
}
}
});
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
Message response = (Message) msg;
System.out.println(response);
} finally {
ReferenceCountUtil.release(msg);
}
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}

public static void main(String[] args) throws Exception {
new Client().connect(8765, "127.0.0.1");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package org.netty.demo.protocol.serializable;


import io.netty.handler.codec.marshalling.DefaultMarshallerProvider;
import io.netty.handler.codec.marshalling.DefaultUnmarshallerProvider;
import io.netty.handler.codec.marshalling.MarshallerProvider;
import io.netty.handler.codec.marshalling.MarshallingDecoder;
import io.netty.handler.codec.marshalling.MarshallingEncoder;
import io.netty.handler.codec.marshalling.UnmarshallerProvider;

import org.jboss.marshalling.MarshallerFactory;
import org.jboss.marshalling.Marshalling;
import org.jboss.marshalling.MarshallingConfiguration;

/**
* Marshalling工厂
*/
public final class MarshallingCodeCFactory {

/**
* 创建Jboss Marshalling解码器MarshallingDecoder
*
* @return MarshallingDecoder
*/
public static MarshallingDecoder buildMarshallingDecoder() {
// 首先通过Marshalling工具类的精通方法获取Marshalling实例对象 参数serial标识创建的是java序列化工厂对象。
final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
// 创建了MarshallingConfiguration对象,配置了版本号为5
final MarshallingConfiguration configuration = new MarshallingConfiguration();
configuration.setVersion(5);
// 根据marshallerFactory和configuration创建provider
UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration);
// 构建Netty的MarshallingDecoder对象,俩个参数分别为provider和单个消息序列化后的最大长度
MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024 * 1024 * 1);
return decoder;
}

/**
* 创建Jboss Marshalling编码器MarshallingEncoder
*
* @return MarshallingEncoder
*/
public static MarshallingEncoder buildMarshallingEncoder() {
final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
final MarshallingConfiguration configuration = new MarshallingConfiguration();
configuration.setVersion(5);
MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration);
// 构建Netty的MarshallingEncoder对象,MarshallingEncoder用于实现序列化接口的POJO对象序列化为二进制数组
MarshallingEncoder encoder = new MarshallingEncoder(provider);
return encoder;
}
}
Loading

0 comments on commit a76af81

Please sign in to comment.