转载

java网络编程实战 - 原生NIO非阻塞式通讯网络编程实战 荐

前言

上次提到要改进我们的RPC框架,这周花时间研究一下JDK提供给我们的原生NIO非阻塞式网络编程思想。NIO 库是在 JDK 1.4 中引入的。NIO 弥补了原来的 I/O 的不足,它在标准 Java 代码中提供了高速的、面向块的 I/O。

java网络编程实战 - 原生NIO非阻塞式通讯网络编程实战 荐

BIO与NIO的主要区别

1. 面向流和面向缓冲

java NIO和BIO之间第一个最大的区别是,BIO是面向流的,NIO是面向缓冲区的。 Java IO面向流意味着每次从流中读一个或多个字节,直至读取所有字节,它们没有被缓存在任何地方。此外,它不能前后移动流中的数据。如果需要前后移动从流中读取的数据,需要先将它缓存到一个缓冲区。 Java NIO的缓冲导向方法略有不同。数据读取到一个它稍后处理的缓冲区,需要时可在缓冲区中前后移动。这就增加了处理过程中的灵活性。但是,还需要检查是否该缓冲区中包含所有需要处理的数据。而且,需确保当更多的数据读入缓冲区时,不要覆盖缓冲区里尚未处理的数据。

2. 阻塞与非阻塞

Java BIO的各种流是阻塞的。这意味着,当一个线程调用read() 或 write()时,该线程被阻塞,直到有一些数据被读取,或数据完全写入。该线程在此期间不能再干任何事情了。

Java NIO的非阻塞模式,使一个线程从某通道发送请求读取数据,但是它仅能得到目前可用的数据,如果目前没有数据可用时,就什么都不会获取。而不是保持线程阻塞,所以直至数据变的可以读取之前,该线程可以继续做其他的事情。 非阻塞写也是如此。一个线程请求写入一些数据到某通道,但不需要等待它完全写入,这个线程同时可以去做别的事情。 线程通常将非阻塞IO的空闲时间用于在其它通道上执行IO操作,所以一个单独的线程现在可以管理多个输入和输出通道(channel)。

3. NIO特有的Selector选择器机制

Java NIO的选择器允许一个单独的线程来监视多个输入通道,你可以注册多个通道使用一个选择器,然后使用一个单独的线程来“选择”通道:这些通道里已经有可以处理的输入,或者选择已准备写入的通道。这种选择机制,使得一个单独的线程很容易来管理多个通道。

今天我们就基于以上的理解,实现一个端对端的非阻塞式IO的网络编程。

实战设计

客户端部分

/**
 * @author andychen https://blog.51cto.com/14815984
 * @description:NIO客户端核心处理器
 */
public class NioClientHandler implements Runnable {
    //服务端主机
    private final String host;
    //服务端口
    private final int port;
    /**定义NIO选择器:用于注册和监听事件
     * 选择监听的事件类型: OP_READ 读事件 / OP_WRITE 写事件
     * OP_CONNECT 客户端连接事件 / OP_ACCEPT 服务端接收通道连接事件
     */
    private Selector selector = null;
    //定义客户端连接通道
    private SocketChannel channel = null;
    //运行状态是否被激活
    private volatile boolean activated=false;
    public NioClientHandler(String host, int port) {
        this.port = port;
        this.host = host;
        this.init();
    }

    /**
     * 处理器初始化
     * 负责建立连接准备工作
     */
    private void init(){
        try {
            //创建并打开选择器
            this.selector =  Selector.open();
            //建立并打开监听通道
            this.channel = SocketChannel.open();
            /**
             * 设置通道通讯模式为非阻塞,NIO默认为阻塞式的
             */
            this.channel.configureBlocking(false);
            //激活运行状态
            this.activated = true;
        } catch (IOException e) {
            e.printStackTrace();
            this.stop();
        }
    }

    /**
     * 连接服务器
     */
    private void connect(){
        try {
            /**
             * 连接服务端:因为之前设置了通讯模式为非阻塞
             * 这里会立即返回TCP握手是否已建立
             */
            if(this.channel.connect(new InetSocketAddress(this.host, this.port))){
                //连接建立后,在通道上注册读事件关注,客户端一接收到数据立即触发处理
                this.channel.register(this.selector, SelectionKey.OP_READ);
            }
            else{
                //若连接握手未建立,则在通道上继续关注连接事件,一旦连接建立继续进行后续的处理逻辑
                this.channel.register(this.selector, SelectionKey.OP_CONNECT);
            }
        } catch (IOException e) {
            e.printStackTrace();
            this.stop();
        }
    }

    /**
     * 选择器事件迭代处理
     * @param keys 选择器事件KEY
     */
    private void eventIterator(Set<SelectionKey> keys){
        SelectionKey key = null;
        //这里采用迭代器,因为需要迭代时对key进行移除操作
        Iterator<SelectionKey> it = keys.iterator();
        while (it.hasNext()){
            key = it.next();
            //这里先移除事件key,避免多次处理
            it.remove();
            //处理迭代事件
            this.proccessEvent(key);
        }
    }

    /**
     * 处理发生的事件
     * @param key 选择器事件KEY
     */
    private void proccessEvent(SelectionKey key){
        //只对有效的事件类型进行处理
        if(key.isValid()){
            try {
                //在事件通道上处理
                SocketChannel socketChannel = (SocketChannel) key.channel();
                /**处理连接就绪事件
                * */
                if(key.isConnectable()){
                    //检测连接是否完成,避免发生导致NotYetConnectedException异常
                    if(socketChannel.finishConnect()){
                        System.out.println("Has completed connection with server..");
                        /**
                         * 在通道上关注读事件,NO的写事件一般不特别关注,
                         * 原因:写缓冲区大部分时间被认为是空闲的,会频繁被选择器选择(会浪费CPU资源),
                         *       所以不应该频繁被注册;
                         * 只有在写的数据超过写缓冲区可用空间时,把一部分数据刷出缓冲区后,
                         * 有空间时再通知应用程序进行写;
                         * 且应用程序写完后,应立即关闭写事件
                         */
                         socketChannel.register(this.selector, SelectionKey.OP_READ);
                    }else{//这里若连接仍未建立一般视为网络或其他原因,暂时退出
                        this.stop();
                    }
                }
                /**
                 * 处理读事件
                 */
                if(key.isReadable()){
                    //开辟内存缓冲区,这里用JVM堆内存
                    ByteBuffer buffer = ByteBuffer.allocate(Constant.BUF_SIZE);
                    //将通道中的数据读到缓冲区
                    int length = socketChannel.read(buffer);
                    if(0 < length){
                        /**
                         * 进行读写转换,NIO固定范式
                         */
                        buffer.flip();
                        //获取buffer可用空间
                        int size = buffer.remaining();
                        byte[] bytes = new byte[size];
                        //读Buffer
                        buffer.get(bytes);
                        //获取缓冲区数据
                        String result = new String(bytes,"utf-8");
                        System.out.println("Recevied server message: "+result);
                    }else if(0 > length){
                        //取消关注当前事件,关闭通道
                        key.cancel();
                        socketChannel.close();
                    }
                }
            } catch (Exception e) {
                key.cancel();
                if(null != key.channel()){
                    try {
                        key.channel().close();
                    } catch (IOException ex) {
                        ex.printStackTrace();
                    }
                }
                e.printStackTrace();
            }
        }
    }

    /**
     * 写数据到对端
     * @param data
     */
    public void write(String data){
        try {
            byte[] bytes = data.getBytes();
            ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
            //将数据放入写缓冲区
            buffer.put(bytes);
            buffer.flip();
            this.channel.write(buffer);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * 停止运行
     */
    public void stop(){
        this.activated = false;
        System.exit(-1);
    }

    /**
     * 客户端通讯业务核实现
     */
    @Override
    public void run() {
        //建立服务器连接
        this.connect();
        //持续监听各种事件的发生
        while (this.activated){
            try {
                //监听事件是否发生,若发生直接返回;反之阻塞至事件发生
                this.selector.select();
            } catch (IOException e) {
                e.printStackTrace();
                this.stop();
            }
            //获取发生事件的类型
            Set<SelectionKey> keys = this.selector.selectedKeys();
            //迭代处理事件
            this.eventIterator(keys);
        }
        //关闭选择器
        if(null != this.selector){
            try {
                this.selector.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        this.stop();
    }
}
/**
 * @author andychen https://blog.51cto.com/14815984
 * @description:NIO客户端启动器
 */
public class NioClientStarter {
    private static NioClientHandler clientHandler = null;

    /*启动运行客户端*/
    public static void main(String[] args) {
        try {
            clientHandler = new NioClientHandler(Constant.SERV_HOST, Constant.SERV_PORT);
            new Thread(clientHandler).start();
        } catch (Exception e) {
            e.printStackTrace();
        }
        /**
         * 在控制台发实时数据到对端
         */
        Scanner scanner = new Scanner(System.in);
        while (true){
            String data = scanner.next();
            if(null != data && !"".equals(data)){
                clientHandler.write(data);
            }
        }
    }
}

服务端部分

/**
 * @author andychen https://blog.51cto.com/14815984
 * @description:NIO服务端核心处理器
 */
public class NioServerHandler  implements Runnable{
    private final int port;
    //定义选择器
    private Selector selector = null;
    /**
     * 定义服务端通道: 与客户端类似的思路
     */
    private ServerSocketChannel channel = null;
    //服务器运行是否被激活
    private volatile boolean activated = false;
    public NioServerHandler(int port) {
        this.port = port;
        this.init();
    }

    /**
     * 初始化处理器
     * 负责做好运行监听和接收之前的准备
     */
    private void init(){
        try {
            //创建并打开选择器
            this.selector = Selector.open();
            //创建并打开监听通道
            this.channel = ServerSocketChannel.open();
            /**
             * 设置通道通讯模式为非阻塞(NIO默认为阻塞)
             */
            this.channel.configureBlocking(false);
            //绑定监听的服务端口
            this.channel.socket().bind(new InetSocketAddress(this.port));
            /**
             * 注册在服务端通道上,首先关注的事件
             */
            this.channel.register(this.selector, SelectionKey.OP_ACCEPT);
            //设置运行状态激活
            this.activated = true;
        } catch (IOException e) {
            e.printStackTrace();
            this.stop();
        }
    }

    /**
     * 停止服务
     */
    public void stop(){
        this.activated = false;
        try {
            //关闭选择器
            if(null != this.selector){
                if(this.selector.isOpen()){
                    this.selector.close();
                }
                this.selector = null;
            }
            //关闭通道
            if(null != this.channel){
                if(this.channel.isOpen()){
                    this.channel.close();
                }
                this.channel = null;
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
        System.exit(-1);
    }

    /**
     * 在迭代处理发生的事件
     * @param keys 发生的事件类型
     */
    private void eventIterator(Set<SelectionKey> keys){
        //SelectionKey key = null;
        Iterator<SelectionKey> it = keys.iterator();
        while (it.hasNext()){
            SelectionKey key = it.next();
            /**
             * 这里先从迭代器移除,避免后面重复执行
             */
            it.remove();
            //处理事件
            this.proccessEvent(key);
        }
    }

    /**
     *
     * @param key 选择执行的事件KEY
     */
    private void proccessEvent(SelectionKey key){
        //只对有效的事件KEY执行处理
        if(key.isValid()){
            try {
                /**
                 * 处理通道接收数据事件
                 */
                if(key.isAcceptable()){
                    /**
                     * 注意这里接收事件的通道是服务端通道
                     */
                    ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
                    //接收客户端Socket
                    SocketChannel channel = serverChannel.accept();
                    //设置其为非阻塞
                    channel.configureBlocking(false);
                    //然后注册此通道的读事件
                    channel.register(this.selector, SelectionKey.OP_READ);
                    System.out.println("Build connection with client..");
                }
                /**
                 * 处理读事件
                 */
                if(key.isReadable()){
                    System.out.println("Reading client data...");
                    SocketChannel channel = (SocketChannel) key.channel();
                    //开辟内存空间,接收数据
                    ByteBuffer buffer = ByteBuffer.allocate(Constant.BUF_SIZE);
                    //将数据读入缓冲区
                    int length = channel.read(buffer);
                    if(0 < length){
                        //读写切换
                        buffer.flip();
                        //更具缓冲区数据建立转换的字节数组
                        byte[] bytes = new byte[buffer.remaining()];
                        //从缓冲区读取字节数据
                        buffer.get(bytes);
                        //解码数据
                        String data = new String(bytes, "utf-8");
                        System.out.println("Recevied data: "+data);
                        //向对端发送接收应答
                        String answer = "Server has recevied data:"+data;
                        this.reply(channel, answer);
                    }else if(0 > length){
                        //取消处理的事件
                        key.cancel();
                        channel.close();
                    }
                }
                /**
                 * 处理写事件
                 */
                if(key.isWritable()){
                    SocketChannel channel = (SocketChannel) key.channel();
                    //拿到写事件的buffer
                    ByteBuffer buffer = (ByteBuffer) key.attachment();
                    //若buffer中有数据,则刷到对端
                    if(buffer.hasRemaining()){
                         int length = channel.write(buffer);
                         System.out.println("Write data "+length+" byte to client.");
                    }else{
                        //若没有数据,则继续监听读事件
                        key.interestOps(SelectionKey.OP_READ);
                    }
                }
            } catch (IOException e) {
                key.cancel();
                e.printStackTrace();
            }
        }
    }

    /**
     * 应答对端
     * @param msg 应答消息
     */
    private void reply(SocketChannel channel, String msg){
        //消息编码
        byte[] bytes = msg.getBytes();
        //开启写缓冲区
        ByteBuffer buffer = ByteBuffer.allocate(Constant.BUF_SIZE);
        //将数据写入缓冲区
        buffer.put(bytes);
        //切换到读事件
        buffer.flip();
        /**
         * 这里为了不出现写空或写溢出缓冲区情况,建立写事件监听同时保留之前的读监听
         * 作为监听的附件传入写操作的buffer
         */
        try {
            channel.register(this.selector, SelectionKey.OP_WRITE |SelectionKey.OP_READ, buffer);
        } catch (ClosedChannelException e) {
            e.printStackTrace();
        }
    }
    /**
     * 服务端监听运行核心业务实现
     */
    @Override
    public void run() {
        while (this.activated){
            try {
                /**
                 * 运行到此方法阻塞,直到有事件发生再返回
                * */
                this.selector.select();
                //获取被监听的事件
                Set<SelectionKey> keys = this.selector.selectedKeys();
                //在迭代器中,处理不同的事件
                this.eventIterator(keys);
            } catch (IOException e) {
                e.printStackTrace();
                this.stop();
            }
        }
    }
}
/**
 * @author andychen https://blog.51cto.com/14815984
 * @description:NIO网络编程服务端启动类
 */
public class NioServerStart {

    /**
     * 运行服务端监听
     * @param args
     */
    public static void main(String[] args) {
        String serverTag = "server: "+Constant.SERV_PORT;
        NioServerHandler serverHandler = null;
        try {
            serverHandler = new NioServerHandler(Constant.SERV_PORT);
            new Thread(serverHandler, serverTag).start();
            System.out.println("Starting "+serverTag+" listening...");
        } catch (Exception e) {
            e.printStackTrace();
            if(null != serverHandler){
                serverHandler.stop();
            }
        }
    }
}

多次验证结果

java网络编程实战 - 原生NIO非阻塞式通讯网络编程实战 荐

java网络编程实战 - 原生NIO非阻塞式通讯网络编程实战 荐

总结

通过以上的实战,我们看到NIO网络编程实现比BIO稍微要复杂一些。面向缓冲的机制确实比面向流的机制要灵活很多;服务运行的体验也比阻塞式IO更加流畅;独有的选择器机制也让NIO可以支撑较大并发数,但学习和开发的成本稍微高一些,项目当中可以有选择地使用。

目前网络编程这块用得比较多的优秀IO框架非Netty莫属了,很多优秀的RPC框架的底层也基于Netty扩展和开发。下次我们就顺带给大家展示一下Netty的网络编程之美。

原文  https://blog.51cto.com/14815984/2506261
正文到此结束
Loading...