转载

Netty编程实战之:Netty基础入门

前面两节我们介绍了学习Netty的准备知识,分别是:

  • Netty编程实战之:掌控NIO
  • Netty编程实战之:Reactor反应器模式

如果大家没有看过这两节的话, 推荐先去看看, 因为这两节是基础知识

Netty入门小案例

Netty 的介绍,优点,特性等已经在第一节中介绍过,这里不再重复,接下来我们先给大家看一个 Netty 的小例子, 直接通过完整的例子让大家对 Netty 有一个清晰的认识:

案例说明:

服务端接收客户端发送的消息

我们先创建项目, 配置环境

配置环境

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.50.Final</version>
</dependency>
复制代码

基于 maven 的方式引入 netty ,相信不用再多说什么

服务端主程序

public class IMNettyServer {
    public static final String HOST = "127.0.0.1";
    public static final int PORT = 45882;

    public static void main(String[] args) {
        new IMNettyServer().start_server();
    }

    private void start_server() {
        NioEventLoopGroup boss = new NioEventLoopGroup();
        NioEventLoopGroup work = new NioEventLoopGroup();

        try {
            ServerBootstrap bootstrap = new ServerBootstrap()
                    // 反应器组
                    .group(boss, work)
                    // 绑定端口
                    .localAddress(PORT)
                    // NIO类型的通道
                    .channel(NioServerSocketChannel.class)
                    // 通道的参数
                    .option(ChannelOption.SO_KEEPALIVE, true)
                    .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                    // 装配子通道流水线
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            // 添加处理器到流水线中
                            socketChannel.pipeline().addLast(new IMNettyServerHandler());
                        }
                    });

            ChannelFuture future = bootstrap.bind().sync();
            System.out.println("服务器启动成功, 监听端口:" + future.channel().localAddress());

            future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            work.shutdownGracefully();
            boss.shutdownGracefully();
        }
    }
}
复制代码

Handler

@ChannelHandler.Sharable
public class IMNettyServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        System.out.println("客户端通道成功注册");
        super.channelRegistered(ctx);
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("客户端监听者:" + ctx.channel().localAddress());
        super.channelActive(ctx);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("读取数据");
        ByteBuf buf = (ByteBuf) msg;
        byte[] bytes = new byte[buf.readableBytes()];
        buf.getBytes(0, bytes);

        System.out.println("客户端:" + new String(bytes));

        super.channelRead(ctx, msg);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        System.out.println("通道缓存区已经读完");
        super.channelReadComplete(ctx);
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("客户端" + ctx.channel().localAddress() + "断开");
        super.channelInactive(ctx);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("发生了异常");
        super.exceptionCaught(ctx, cause);
    }
}
复制代码

大家看到上面的代码, 估计很多都会蒙圈, 别着急, 我们看完客户端的代码之后, 一点一点的剖析它们

客户端

public class IMNettyClient {
    public static void main(String[] args) {
        new IMNettyClient().connect();
    }

    private void connect() {
        NioEventLoopGroup work = new NioEventLoopGroup();

        try {
            Bootstrap bootstrap = new Bootstrap()
                    // 反应器组
                    .group(work)
                    // 通道类
                    .channel(NioSocketChannel.class)
                    // 连接服务端
                    .remoteAddress(IMNettyServer.HOST, IMNettyServer.PORT)
                    // 设置
                    .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                    // 装配通道流水线
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            // 添加处理器到流水线
							socketChannel.pipeline().addLast("read", new ImNettyClientHandler());
                        }
                    });
            ChannelFuture sync = bootstrap
                    .connect()
                    .sync();

            sendMsg(sync);


        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }

    private void sendMsg(ChannelFuture sync) {
        Scanner scanner = new Scanner(System.in);

        while (scanner.hasNext()) {
            String msg = scanner.next();

            ByteBuf buf = sync.channel().alloc().buffer();
            buf.writeBytes(msg.getBytes());
            sync.channel().writeAndFlush(buf);
        }
    }
}
复制代码

从代码上, 我们可以看到, 客户端的编码和服务端编码大同小异, 区别就在于 Netty 各个组件, 下面我们就来剖析这个小案例

Netty中Reactor反应器模式的体现

前面已经说过, Netty 是基于 Reactor反应器模式 设计出来的高性能,高可扩展的为网络服务器和客户端程序而提供的异步事件驱动基础框架和工具, 那么我们看看在 Netty 中式如果体现的

回顾 反应器模式 的处理流程

  • 事件注册

通道注册选择器,并指定对应的IO事件

  • 轮询事件

一个反应器负责一个线程,不断轮询,查询选择器中的IO事件

  • 事件分发

将查询到的IO事件,分发给于IO事件有绑定关系的Handler业务处理器

  • 完成IO操做和业务处理

Netty中对应Channel组件

通道是 Netty 中非常重要的组件, 从上面的流程来看,大家肯定也明白:

  • 反应器模式和通道密切相关
  • 反应器查询和分发的IO事件都是来源于通道

所以我们首先来了解 Netty 中的通道组件

Netty 不直接使用 NIO 的通道组件,而是 Netty 针对不同的通信协议,对每一种通信协议都进行了自己的封装,而且 Netty 不仅仅只是支持异步,还对 标准阻塞式IO 进行了封装

所以 Netty 中的每一种协议的通道,都有 异步IO同步IO 两种版本,

不过在 Netty 4.x 中, 同步IO 被标注为过时类,所以就不介绍他们了,大家可以去看代码,同步IO都是以 Oio 开头的类

常见的 Netty通道 如下所示:

  • NioServerSocketChannel

异步非阻塞socket服务端监听通道,在上面服务端代码中已经用到

  • NioSocketChannel

异步非阻塞socket客户端监听通道,在上面客户端代码中已经用到

可以说, 上面两个通道是我们在Netty开发TCP协议最常用的通道

  • NioDatagramChannel

异步非阻塞UDP传输通道

使用方式

可以看到,在 Netty 中通过调用 channel() 方法,传入指定通道类,就可以了

// 指定客户端通道
channel(NioSocketChannel.class);
// 指定服务端通道
channel(NioServerSocketChannel.class);
复制代码

Netty中对应的Reactor反应器

NIO版Reactor反应器模式 中,反应器会负责事件处理线程,不断轮询,通过 Selector选择器 查询注册的IO事件,而在 Netty 中,也是有这样的存在,那就是: EventLoopGroup

该类是一个接口类,我们在 Netty 中主要使用其实现类: NioEventLoopGroup

Netty中的反应器模式肯定是多线程版本的,所以

NioEventLoopGroup是多线程版本中的Reactor反应器模式的实现类,除了包含IO事件外, NioEventLoopGroup 还存在一个重要属性: Thread线程类成员:用于指定内部的线程数 ,类似于线程池的概念,其内部执行思路和之前是一致的:

一个 NioEventLoop 拥有一个Thread线程,负责一个选择器的IO事件轮询

使用方式

在上面的代码中,我们可以看到我们采用的是无参构造方式,

NioEventLoopGroup work = new NioEventLoopGroup();
复制代码

猜一猜,无参的构造函数内部的线程数是多少?

无参的构造函数的内部线程数为最大可用的CPU处理器数量的2倍

服务端代码反应器组解析

查看上面服务端代码,我们为服务端提供了两个 NioEventLoopGroup

NioEventLoopGroup boss = new NioEventLoopGroup();
NioEventLoopGroup work = new NioEventLoopGroup();

group(boss, work)
复制代码

我们可以这样理解,

  • 服务端为了及时接收到新连接,boss反应器负责新连接的监听和接收
  • work反应器负责IO事件的处理

说完了反应器,我们来聊一聊处理器

Handler处理器

之前我们讲到,可供选择器监控的IO事件类型包括:

  • 可读

SelectionKey.OP_READ

  • 可写

SelectionKey.OP_WRITE

  • 接收

SelectionKey.OP_CONNECT

  • 连接

SelectionKey.OP_ACCEPT

NioEventLoop 反应器内部有一个选择器执行以上事件的查询,然后进行事件的分发,目的地就是我们定义的Handler处理器

使用方式

  • 服务端通过 childHandler() 来装配定义的处理器
childHandler(new ChannelInitializer<SocketChannel>() {
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        // 自定义处理器
        socketChannel.pipeline().addLast(new IMNettyServerHandler());
    }
});
复制代码
  • 客户端通过 handler() 来装配定义的处理器
handler(new ChannelInitializer<SocketChannel>() {
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        // 自定义处理器
        socketChannel.pipeline().addLast("read", new ImNettyClientHandler());
    }
});
复制代码

先聊处理器,随后再聊 流水线 的问题

处理器类型

所有的业务处理都是在处理器中完成,在 Netty 中,处理器分为两大类, 一类是 入站处理器 ,另一类是 出站处理器

ChannelInboundHandler:入站处理器

Netty 中,入站处理器是由 Netty 从通道底层触发,通过层层传递,然后调用 ChannelInboundHandler 进行的逻辑处理。基本上所有的业务处理都是通过入站处理器来进行处理的

处理过程

单一句话估计看不懂,我们通过具体的例子来说明:以 OP_READ 事件为例

  • 在通道中发生了 OP_READ 事件后,会被 NioEventLoop 查询到
  • 然后会分发给 ChannelInboundHandler 入站处理器,调用其中的方法 channelRead()
  • 然后在 channelRead() 中,我们可以从通道中读取到数据,进行业务逻辑操作

从上面的处理过程中我们可以看到, Netty的入站处理器 的触发方向: 从底层通道到ChannelInboundHandler入站处理器

生命周期

ChannelInboundHandler 是一个接口类,在 Netty 中,我们一般使用其子类 ChannelInboundHandlerAdapter

其中几个重要的方法,我们来一一看看

  • channelRegistered

当有客户端连接进来,会触发此方法

  • channelActive

当有客户端连接成功后,会触发此方法,我们可以通过此方法监控客户端连接地址,在线人数等等业务功能

  • channelRead

当通道缓冲区可读,会触发通道可读事件,在此方法中,我们可以获取通道缓冲区的数据

  • channelReadComplete

当通道缓冲区可读完后, Netty 会触发通道读取完成事件

  • channelInactive

当连接被断开或者不可用, Netty 会触发此方法,我们可以在此方法中做用户退出连接等业务功能

  • exceptionCaught

当通道处理过程中发生异常, Netty 会触发异常捕获事件

接下来我们看下各个方法的执行顺序,请看下面的代码,将其中的方法都重写,通过输出查看执行顺序

public class IMNettyServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        System.out.println("客户端通道成功注册");
        super.channelRegistered(ctx);
    }
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("客户端监听者:" + ctx.channel().localAddress());
        super.channelActive(ctx);
    }
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("读取数据");
        super.channelRead(ctx, msg);
    }
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        System.out.println("通道缓存区已经读完");
        super.channelReadComplete(ctx);
    }
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("客户端" + ctx.channel().localAddress() + "断开");
        super.channelInactive(ctx);
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("发生了异常");
        super.exceptionCaught(ctx, cause);
    }
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        System.out.println("生命周期方法:add");
        super.handlerAdded(ctx);
    }
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        System.out.println("生命周期方法:remove");
        super.handlerRemoved(ctx);
    }
}
复制代码

大家可以猜一猜

  1. 当客户端连接到服务端的时候
生命周期方法:add
客户端通道成功注册
客户端监听者:/127.0.0.1:45882
复制代码
  1. 当客户端给服务端发送消息时
读取数据
客户端:aaa
通道缓存区已经读完
    
读取数据
客户端:asdf
通道缓存区已经读完
复制代码

其中, 这两者方法是会多次执行的,只要有数据发送过来就会执行到

  1. 客户端断开服务端连接
通道缓存区已经读完
发生了异常
客户端/127.0.0.1:45882断开
生命周期方法:remove
复制代码

因为我是强制关掉客户端的,所以会触发异常

了解完了 入站处理器 ,那么我们再来看看出站处理器

ChannelOutboundHandler:出站处理器

出站处理器表示的是 ChannelOutboundHandler 到通道的某次IO操作,也就是说,在应用程序完成业务处理后,可以通过该处理器将处理结果写入底层通道,然后发送给另外一端。最常用的一个方法就是 write()

所以出站处理器的触发方向为: Netty上层的通道,去操作底层Java IO通道

重要方法

ChannelOutboundHandler 是一个接口类,在 Netty 中,我们一般使用其子类 ChannelOutboundHandlerAdapter

其中几个重要的方法,我们来一一看看

  • bind

监听地址绑定:完成底层 Java IO通道的IO地址绑定。如果是TCP传输协议,方法用于服务端

  • connect

连接服务端,完成服务端的连接操作。如果是TCP协议,方法用于客户端

  • write

写数据到底层,完成 Netty 通道向底层Java IO通道的数据写入。 此方法只是触发操作,并不是完成实际的数据写入操作, 后面我们学到编解码器,我们就能真正明白这个意思:一般我们可以将版本号,魔数,消息长度等信息写在这里

  • flush

刷新数据,将缓冲区的数据写到对端

  • read

从底层读取数据

  • disConnect

断开服务端连接,如果TCP协议,方法用于客户端

  • close

主动关闭通道

说明:实在话,我使用出站处理器使用到的地方略少,或者说我对出站处理器的使用也是一知半解,我就不多说,评论区给大家开放,大家可以尽情发言,我们一起来讨论

下面我们来看看绑定通道和Handler处理器之间关系的特殊组件

流水线:Pipeline

我们来梳理下 Netty的反应器模式 中,各个组件之间的关系:

  • NioEventLoopNioChannel 之间是一对多的关系:一个反应器可以查询很多通道的IO事件
  • NioChannelHandler处理器 之间是多对多的关系:一个通道的IO事件可以被多个Handler处理器处理;一个Handler处理器也可以绑定多个通道,处理多个通道的IO事件

在这种情况下,为了能够很好的协调各个组件,保证应用程序的正常运行, Netty 为我们提供了一个特殊的组件: ChannelPipline ,我们叫它 流水线

因为它像一条管道,将绑定到一个通道的多个Handler处理器串在一起,所有被添加进来的Handler处理器都是这条管道上的节点,就好像是工厂里的流水作业

执行顺序

ChannelPipline被设计成一个 双向链表 的结构,可以支持动态添加、删除Handler业务处理器,比如

  • addLast

  • addFirst

  • remove

  • ...

在流水线中,入站处理器和出站处理器的执行顺序是不同的

  • 入站处理器

入站处理器的顺序是从前往后,按照我们在流水线中添加处理器的顺序来执行的, 比如

socketChannel.pipeline().addLast(new InHandlerA());
socketChannel.pipeline().addLast(new InHandlerB());
socketChannel.pipeline().addLast(new InHandlerB());
复制代码

那么,他们在流水线中的结构是 A --> B --> C 的结构,执行顺序也就是 A --> B --> C

  • 出站处理器

出站处理器的顺序是从后往前,按照我们在流水线中添加处理器顺序的倒序来执行,比如还是上面的添加方式,不过是出站处理器

socketChannel.pipeline().addLast(new OutHandlerA());
socketChannel.pipeline().addLast(new OutHandlerB());
socketChannel.pipeline().addLast(new OutHandlerB());
复制代码

他们在流水线中的结构是 A --> B --> C 的结构,但是他们是从后往前来执行的,所以执行顺序是 C --> B --> A

大家可以亲自验证下

每一个来自通道的IO事件,都会进入流水线中处理器,那么处理器在处理的过程中会遇到3中情况

  • 如果后面还有其他的Handler处理器,那么IO事件会交给下一个Handler处理器
  • 如果后面没有其他的Handler处理器,那么IO事件的处理就到此结束
  • 如果在流水线中间需要终止,那么处理器也可以选择不将IO事件继续向下传递,到此终止处理

截断流水线处理器

上面第三种情况说到,我们可以手动截断流水线的传递,那么我们来看看如何截断,

用入站处理器来说明:

public class IMNettyServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("读取数据");
        // super.channelRead(ctx, msg);
    }
}
复制代码

每一个重写的方法,我们都会调用父类的方法,如果我们 不调用父类的方法 ,那么流水线将终止向下传递

装配

那么,相信大家还记得上面的代码, Netty 是如何向流水线中装配处理器

// 服务端
childHandler(new ChannelInitializer<SocketChannel>() {
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        socketChannel.pipeline().addLast(new IMNettyServerHandler());
    }
});

// 客户端
handler(new ChannelInitializer<SocketChannel>() {
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        // 这里的代码和服务端的Handler基本一样, 就不再展示
        socketChannel.pipeline().addLast("read", new ImNettyClientHandler());
    }
});
复制代码

这里涉及到一个类: ChannelInitializer ,也叫 通道初始化处理器 ,这里只要我们实现 initChannel() 方法,得到 新接收的通道(参数) ,我们就可以将处理器装配到流水线之中

启动类

Bootstrap是 Netty 为我们提供的一个便利的工厂类,通过这个类可以来完成 Netty 的客户端或服务端的组件组装以及程序的初始化。

Netty 为我们提供了两个启动类,且提供了非常方便的链式调用的方式

  • Bootstrap

客户端启动类

  • ServerBootstrap

服务端启动类

通道选项配置

从上面的代码中我们可以看到,我们通过 option() 对服务端通道或者客户端通道设置了一系列选项,下面我们来看一些常用的选项

  • SO_RCVBUF,SO_SNDBUF

此为TCP参数,用来设置每个TCP socket在内核中的发送缓冲区和接收缓冲区的大小

  • SO_KEEPALIVE

此为TCP参数,表示底层TCP协议的心跳机制。true为连接保持心跳,默认为false

  • SO_BACKLOG

此为TCP参数,表示服务器端接收连接的队列长度,如果队列满,客户端拒绝连接。

window默认为200,其他操作系统为128.

  • SO_BROADCAST

此为TCP参数,表示设置广播模式

  • ALLOCATOR

定义ByteBuf的实例方式,下一节我们介绍 ByteBuf

到此,有关 Netty 的基础知识就全部完成,代码上其实就是一些固定写法,重要是要理解 Netty 的模式等

写在后面的话

关于上面的点,有什么写的不好的,或者写的有问题的,欢迎大家指正出来,以上都是最基础的知识点,更深入的知识我们一起学习,也欢迎大家来跟我一起讨论。

下面的是我用 Hexo + NexT 新搭建的个人知识体系源,最新内容都会先放到这里, 欢迎大家来访

最基础,什么都没有,O(∩_∩)O哈哈~

我的知识体系总结

原文  https://juejin.im/post/5f13b64d5188252e505c327a
正文到此结束
Loading...