示例
从客户端发送字母A,经过解码器ByteToMessageDecoder、ReplayingDecoder、MessageToMessageDecoder以及编码器MessageToMessageEncoder、MessageToByteEncoder,把A解码为a、b、c,再编码成d、e
服务端
EchoServer
public class EchoServer { public static void main(String[] args) throws InterruptedException { // 创建NioEventLoopGroup类型的EventLoopGroup EventLoopGroup group = new NioEventLoopGroup(); try { // 创建ServerBootstrap ServerBootstrap sbs = new ServerBootstrap(); sbs.group(group) // 设置Channel为NIO的服务端Channel .channel(NioServerSocketChannel.class) // 绑定本地端口 .localAddress(new InetSocketAddress(Const.PORT)) // 新连接被接受时,会创建一个Channel // 再把把echoServerHandler加入到这个Channel的ChannelPipeline中 .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new MyMessageToByteEncoder()); socketChannel.pipeline().addLast(new MyMessageToMessageEncoder()); socketChannel.pipeline().addLast(new MyByteToMessageDecoder()); socketChannel.pipeline().addLast(new MyReplayingDecoder()); socketChannel.pipeline().addLast(new MyMessageToMessageDecoder()); } }); // 异步绑定服务器,阻塞到服务器绑定完成 ChannelFuture sync = sbs.bind().sync(); // 获取channel的closeFuture,阻塞到关闭 sync.channel().closeFuture().sync(); } finally { // 优雅的关掉group并释放所有的资源 group.shutdownGracefully().sync(); } } }
MyByteToMessageDecoder
public class MyByteToMessageDecoder extends ByteToMessageDecoder { /** * @param ctx * @param in 传过来的ByteBuf * @param out 添加解码消息的List * @throws Exception */ @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { String str = ""; str += (char) in.readByte(); System.out.println("MyByteToMessageDecoder receive:" + str); str = "a"; ByteBuf byteBuf = Unpooled.buffer(); byteBuf.writeBytes(str.getBytes()); out.add(byteBuf); System.out.println("MyByteToMessageDecoder send:" + str); } }
MyReplayingDecoder
public class MyReplayingDecoder extends ReplayingDecoder<Void> { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { String str = ""; str += (char) in.readByte(); System.out.println("MyReplayingDecoder receive:" + str); str = "b"; out.add(str); System.out.println("MyReplayingDecoder send:" + str); } }
MyMessageToMessageDecoder
public class MyMessageToMessageDecoder extends MessageToMessageDecoder<String> { @Override protected void decode(ChannelHandlerContext ctx, String msg, List out) throws Exception { System.out.println("MyMessageToMessageDecoder receive:" + msg); //out.add(msg); msg = "c"; System.out.println("MyMessageToMessageDecoder send:" + msg); ctx.write(msg); } }
MyMessageToByteEncoder
public class MyMessageToByteEncoder extends MessageToByteEncoder<String> { @Override protected void encode(ChannelHandlerContext ctx, String msg, ByteBuf out) throws Exception { System.out.println("MyMessageToByteEncoder receive:" + msg); msg = "e"; System.out.println("MyMessageToByteEncoder send:" + msg); out.writeBytes(msg.getBytes()); ctx.writeAndFlush(out); } }
MessageToMessageEncoder
public class MyMessageToMessageEncoder extends MessageToMessageEncoder<String> { @Override protected void encode(ChannelHandlerContext ctx, String msg, List out) throws Exception { System.out.println("MyMessageToMessageEncoder receive:" + msg); msg = "d"; System.out.println("MyMessageToMessageEncoder send:" + msg); out.add(msg); } }
客户端
Client
public class Client { public static void main(String[] args) throws InterruptedException { EventLoopGroup group = new NioEventLoopGroup(); try { // 创建Bootstrap Bootstrap bs = new Bootstrap(); bs.group(group) // 设置Channel为NIO的客户端Channel .channel(NioSocketChannel.class) // 设置服务器的地址端口信息 .remoteAddress(new InetSocketAddress(Const.IP, Const.PORT)) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new ClientHandler()); } }); // 连接远程服务器,阻塞到连接完成 ChannelFuture cf = bs.connect().sync(); // 获取channel的closeFuture,阻塞到关闭 cf.channel().closeFuture().sync(); } finally { // 优雅的关掉group并释放所有的资源 group.shutdownGracefully().sync(); } } }
ClientHandler
public class ClientHandler extends SimpleChannelInboundHandler<ByteBuf> { /** * 客户端收到服务器消息后调用 * * @param channelHandlerContext * @param byteBuf * @throws Exception */ @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) throws Exception { // 处理收到的消息 System.out.println("客户端收到信息:" + byteBuf.toString(CharsetUtil.UTF_8)); } /** * 客户端与服务器连接后调用 * * @param ctx * @throws Exception */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { // 当channel是活跃的时候,往服务端发送一条消息 ctx.writeAndFlush(Unpooled.copiedBuffer("A", CharsetUtil.UTF_8)); } /** * 客户端处理消息过程中,对异常的处理 * * @param ctx * @param cause * @throws Exception */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
运行结果
服务端运行结果
客户端运行结果
解码器
在netty中,解码器包括两种:
- 将字节解码为消息:ByteToMessageDecoder 和 ReplayingDecoder
- 将一种消息解码成另外一种消息:MessageToMessageDecoder
ByteToMessageDecoder
decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
ByteBuf,是调用这个方法时,传入的数据,List是用来存放解码信息的。如果ByteBuf没有读取完,这个List有值,会传入ChannelPipeline的下一个ChannelInboundHandler。
除了decode,这个类还有decodeLast方法,当Channel的状态变为非活动时会被调用。
MessageToMessageDecoder
MessageToMessageDecoder把一种类型的消息转为另外一种,上面例子中,是把String转String,我们简单的做了字符串替换。msg的类型,是需要传入的类型,解析后存入List。
decode(ChannelHandlerContext ctx, String msg, List out)
编码器
编码器也有两种方式:
- 将消息编码为字节:MessageToByteEncoder
- 将消息编码为消息:MessageToMessageEncoder
MessageToByteEncoder
msg,是传入消息的类型及数据,out是会被传到ChannelPipeline的下一个ChannelOutboundHandler。
encode(ChannelHandlerContext ctx, String msg, ByteBuf out)
MessageToMessageEncoder
msg,是传入消息的类型及数据,out是存放编码后的消息,也会被传到ChannelPipeline的下一个ChannelOutboundHandler。
encode(ChannelHandlerContext ctx, String msg, List out)
其他内置的编解码器
- ssl协议:SslHandler
- http:HttpRequestEncoder、HttpResponseEncoder、HttpRequestDecoder、HttpResponseDecoder、FullHttpRequest、FullHttpResponse等
- 空闲的连接和超时:IdleStateHandler、ReadTimeoutHandler、WriteTimeoutHandler
- 半包粘包解决:DelimiterBasedFrameDecoder、LineBasedFrameDecoder、FixedLengthFrameDecoder、LengthFieldBasedFrameDecoder
- 写大型数据:ChunkedWriteHandler
- 序列号反序列:CompatibleObjectDecoder、CompatibleObjectEncoder、ObjectDecoder、ObjectEncoder等
原文
https://segmentfault.com/a/1190000022215044
本站部分文章源于互联网,本着传播知识、有益学习和研究的目的进行的转载,为网友免费提供。如有著作权人或出版方提出异议,本站将立即删除。如果您对文章转载有任何疑问请告之我们,以便我们及时纠正。PS:推荐一个微信公众号: askHarries 或者qq群:474807195,里面会分享一些资深架构师录制的视频录像:有Spring,MyBatis,Netty源码分析,高并发、高性能、分布式、微服务架构的原理,JVM性能优化这些成为架构师必备的知识体系。还能领取免费的学习资源,目前受益良多

转载请注明原文出处:Harries Blog™ » 网络编程 – Netty(编解码器)