From 8cb5aad368841728be0c291161947299b7d903f2 Mon Sep 17 00:00:00 2001 From: cuixiuyin <1099442418@qq.com> Date: Mon, 13 Aug 2018 21:29:16 +0800 Subject: [PATCH] =?UTF-8?q?#NettyDemo=20#=E6=95=B4=E7=90=86=E7=BC=96?= =?UTF-8?q?=E8=A7=A3=E7=A0=81=E5=99=A8=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../demo/codec/MyBytetoMessageCodec.java | 1 - .../demo/codec/WebSocketConvertHandler.java | 104 ++++++++++++++---- .../demo/decoder/IntegerToStringDecoder.java | 10 +- .../org/netty/demo/decoder/MyDecoder.java | 11 +- .../demo/decoder/MyReplayingDecoder.java | 6 +- .../demo/encoder/IntegerToStringEncoder.java | 5 +- .../demo/encoder/ShortToByteEncoder.java | 9 +- 7 files changed, 97 insertions(+), 49 deletions(-) diff --git a/src/main/java/org/netty/demo/codec/MyBytetoMessageCodec.java b/src/main/java/org/netty/demo/codec/MyBytetoMessageCodec.java index c6384f0..240faa0 100644 --- a/src/main/java/org/netty/demo/codec/MyBytetoMessageCodec.java +++ b/src/main/java/org/netty/demo/codec/MyBytetoMessageCodec.java @@ -14,7 +14,6 @@ public class MyBytetoMessageCodec extends ByteToMessageCodec { @Override protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception { - } @Override diff --git a/src/main/java/org/netty/demo/codec/WebSocketConvertHandler.java b/src/main/java/org/netty/demo/codec/WebSocketConvertHandler.java index 5212a73..388b383 100644 --- a/src/main/java/org/netty/demo/codec/WebSocketConvertHandler.java +++ b/src/main/java/org/netty/demo/codec/WebSocketConvertHandler.java @@ -3,7 +3,7 @@ import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToMessageCodec; -import io.netty.handler.codec.http.websocketx.WebSocketFrame; +import io.netty.handler.codec.http.websocketx.*; import java.util.List; @@ -12,35 +12,83 @@ */ public class WebSocketConvertHandler extends MessageToMessageCodec { - private final MyWebSocketFrame.FrameType type; - - private final ByteBuf byteBuf; - - public WebSocketConvertHandler(MyWebSocketFrame.FrameType type, ByteBuf byteBuf) { - this.type = type; - this.byteBuf = byteBuf; - } - - public MyWebSocketFrame.FrameType getType() { - return type; - } - - public ByteBuf getByteBuf() { - return byteBuf; - } - + /** + * 1、对于每个 OUTBOUND_IN 类型的消息,这个方法都会被调用。 + * 2、这个消息将会被编码为 INBOUND_IN 类型的消息。 + * 3、然后被转发给 ChannelPipeline 中的下一个 ChannelOutboundHandler + * + * @param ctx + * @param msg + * @param out + * @throws Exception + */ @Override protected void encode(ChannelHandlerContext ctx, MyWebSocketFrame msg, List out) throws Exception { - + ByteBuf byteBuf = msg.getByteBuf().duplicate().retain(); + switch (msg.getFrameType()) { + case BINARY: + out.add(new BinaryWebSocketFrame(byteBuf)); + break; + case TEXT: + out.add(new TextWebSocketFrame(byteBuf)); + break; + case CLOSE: + out.add(new CloseWebSocketFrame(true, 0, byteBuf)); + break; + case CONTINUATION: + out.add(new ContinuationWebSocketFrame(byteBuf)); + break; + case PONG: + out.add(new PongWebSocketFrame(byteBuf)); + break; + case PING: + out.add(new PingWebSocketFrame(byteBuf)); + default: + break; + } + + } + /** + * 1、传入 INBOUND_IN 类型的消息,该方法会被调用。 + * 2、这个消息会被解码为 OUTBOUND_IN 类型的消息。 + * 3、然后被转发给 ChannelPipeline 中的下一个 ChannelInboundHandler + * + * @param ctx + * @param msg + * @param out + * @throws Exception + */ @Override protected void decode(ChannelHandlerContext ctx, WebSocketFrame msg, List out) throws Exception { + ByteBuf byteBuf = msg.content().duplicate().retain(); + if (msg instanceof BinaryWebSocketFrame){ + out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.BINARY, byteBuf)); + }else + if (msg instanceof CloseWebSocketFrame){ + out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.CLOSE, byteBuf)); + }else + if (msg instanceof TextWebSocketFrame){ + out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.TEXT, byteBuf)); + }else + if (msg instanceof PingWebSocketFrame){ + out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.PING, byteBuf)); + }else + if (msg instanceof PongWebSocketFrame){ + out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.PONG, byteBuf)); + }else + if (msg instanceof ContinuationWebSocketFrame){ + out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.CONTINUATION, byteBuf)); + }else { + throw new IllegalStateException("Unsupported websocket msg " + msg); + } } - public static final class MyWebSocketFrame{ + public static final class MyWebSocketFrame { + public enum FrameType { BINARY, CLOSE, @@ -49,5 +97,21 @@ public enum FrameType { TEXT, CONTINUATION } + + private final FrameType frameType; + private final ByteBuf byteBuf; + + public MyWebSocketFrame(FrameType frameType, ByteBuf byteBuf) { + this.frameType = frameType; + this.byteBuf = byteBuf; + } + + public FrameType getFrameType() { + return frameType; + } + + public ByteBuf getByteBuf() { + return byteBuf; + } } } diff --git a/src/main/java/org/netty/demo/decoder/IntegerToStringDecoder.java b/src/main/java/org/netty/demo/decoder/IntegerToStringDecoder.java index fe0e7ab..724e8dd 100644 --- a/src/main/java/org/netty/demo/decoder/IntegerToStringDecoder.java +++ b/src/main/java/org/netty/demo/decoder/IntegerToStringDecoder.java @@ -11,15 +11,9 @@ */ public class IntegerToStringDecoder extends MessageToMessageEncoder { - /** - * 对于每个需要被解码为另一种格式的入站消息来说,该方法将会被调用。 - * 解码消息随后会被传递给 ChannelPipeline 中的下一个 ChannelInboundHandler - * - * @param channelHandlerContext - * @param integer - * @param list - * @throws Exception + * 1、对于每个需要被解码为另一种格式的入站消息来说,该方法将会被调用。 + * 2、解码消息随后会被传递给 ChannelPipeline 中的下一个 ChannelInboundHandler */ @Override protected void encode(ChannelHandlerContext channelHandlerContext, Integer integer, List list) throws Exception { diff --git a/src/main/java/org/netty/demo/decoder/MyDecoder.java b/src/main/java/org/netty/demo/decoder/MyDecoder.java index 53a51e4..f3e58e5 100644 --- a/src/main/java/org/netty/demo/decoder/MyDecoder.java +++ b/src/main/java/org/netty/demo/decoder/MyDecoder.java @@ -15,14 +15,9 @@ public class MyDecoder extends ByteToMessageDecoder { private static final int MAX_FRAME_SIZE = 1024; /** - * 该方法被调用时,将会传入一个包含了传入数据的 ByteBuf,以及一个用来添加解码消息的 List. - * 对该方法的调用将会重复进行,直到确定没有新的元素被添加到该 List,或者Butebuf 没有更多可读取的字节为止。 - * List 的内容将会被传递给 ChannelPipeline 中的下一个 ChannelInboundHandler。 - * - * @param channelHandlerContext - * @param byteBuf 传入数据 - * @param list 解码后的消息元素被放入该List - * @throws Exception + * 1、该方法被调用时,将会传入一个包含了传入数据的 ByteBuf,以及一个用来添加解码消息的 List. + * 2、对该方法的调用将会重复进行,直到确定没有新的元素被添加到该 List,或者Butebuf 没有更多可读取的字节为止。 + * 3、List 的内容将会被传递给 ChannelPipeline 中的下一个 ChannelInboundHandler。 */ @Override protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List list) throws Exception { diff --git a/src/main/java/org/netty/demo/decoder/MyReplayingDecoder.java b/src/main/java/org/netty/demo/decoder/MyReplayingDecoder.java index 5b42086..81628b3 100644 --- a/src/main/java/org/netty/demo/decoder/MyReplayingDecoder.java +++ b/src/main/java/org/netty/demo/decoder/MyReplayingDecoder.java @@ -7,16 +7,14 @@ import java.util.List; /** - * Created by XiuYin.Cui on 2018/8/12. * * Void 代表不需要进行状态管理 */ public class MyReplayingDecoder extends ReplayingDecoder { /** - * ReplayingDecoder 扩展了 ByteToMessageDecoder。 - * ReplayingDecoder 自定义了 ByteBuf 实现 ReplayingDecoderByteBuf,对要转换的消息的字节数 - * 进行内部管理,如果没有足够的字节使用,将会抛出一个 Signal,由ReplayingDecoder进行处理。 + * 1、ReplayingDecoder 扩展了 ByteToMessageDecoder,并且自定义了 ByteBuf 的实现 ReplayingDecoderByteBuf。 + * 2、ReplayingDecoderByteBuf 对要转换的消息的字节数进行内部管理,如果没有足够的字节使用,将会抛出一个 Signal,由ReplayingDecoder进行处理。 * * @param channelHandlerContext * @param byteBuf 传入的 ByteBuf 实际上是 ReplayingDecoderByteBuf diff --git a/src/main/java/org/netty/demo/encoder/IntegerToStringEncoder.java b/src/main/java/org/netty/demo/encoder/IntegerToStringEncoder.java index c576bf9..bdfe924 100644 --- a/src/main/java/org/netty/demo/encoder/IntegerToStringEncoder.java +++ b/src/main/java/org/netty/demo/encoder/IntegerToStringEncoder.java @@ -10,7 +10,10 @@ */ public class IntegerToStringEncoder extends MessageToMessageEncoder { - + /** + * 1、类型为 I 的出站消息被编码为目标类型 存入List 中 + * 2、该 List 随后将会被转发给 ChannelPipeline中的下一个 ChannelOutboundHandler。 + */ @Override protected void encode(ChannelHandlerContext ctx, Integer msg, List out) throws Exception { out.add(String.valueOf(msg)); diff --git a/src/main/java/org/netty/demo/encoder/ShortToByteEncoder.java b/src/main/java/org/netty/demo/encoder/ShortToByteEncoder.java index 54573ca..86a85c5 100644 --- a/src/main/java/org/netty/demo/encoder/ShortToByteEncoder.java +++ b/src/main/java/org/netty/demo/encoder/ShortToByteEncoder.java @@ -9,14 +9,9 @@ */ public class ShortToByteEncoder extends MessageToByteEncoder { - /** - * 它被调用时将会传入要被该类编码为 ByteBuf 的(类型为 I 的)出站消息。 - * 该 ByteBuf 随后将会被转发给 ChannelPipeline中的下一个 ChannelOutboundHandler。 - * @param channelHandlerContext - * @param aShort - * @param byteBuf - * @throws Exception + * 1、类型为 I 的出站消息被编码为 ByteBuf + * 2、该 ByteBuf 随后将会被转发给 ChannelPipeline中的下一个 ChannelOutboundHandler。 */ @Override protected void encode(ChannelHandlerContext channelHandlerContext, Short aShort, ByteBuf byteBuf) throws Exception {