转载

『互联网架构』软件架构-io与nio线程模型reactor模型(上)(53)

老铁不管用过hfs,还是dubbo等等RPC框架,对nio,bio,aio多熟悉,咱们一起以初学者的心态,一起学习下netty。大家都知道netty是基于nio,为什么会有nio,是之前的bio有写问题无法解决,所以出现了nio,nio也有自身的问题,例如:堵塞。源码:https://github.com/limingios/netFuture/tree/master/源码/『互联网架构』软件架构-io与nio线程模型reactor模型(上)(53)/nio

『互联网架构』软件架构-io与nio线程模型reactor模型(上)(53)

(一)传统BIO线程模型/阻塞

  • 网络编程

    1.两个进程之间的通迅 CS模式

    2.服务端需要绑定端口,监听客户端是否有请求。one-by-one

    3.客户端是连接服务端的端口,建立连接三次握手,双方通过套接字进行连接。

传统的BIO方式进行通信堵塞

『互联网架构』软件架构-io与nio线程模型reactor模型(上)(53)

1.serversocket 就是建立服务端绑定端口。

2.serverSocket.accept()建立一个accept线程,一直等待客户端的连接事件,连接之后他才会返回,所以说这是阻塞的。

  • 2个阻塞点

    >Socket socket = serverSocket.appept();

    in data = is.read(b);

2个阻塞点,也就是在同一个时间只能为一个客户服务,如果是你餐厅的老板,在同一个时间你只能为一个就餐的人服务,你这个餐厅是不是开不下去。如何改进,就是请多个服务生为客户服务。

传统的BIO方式进行解决堵塞

『互联网架构』软件架构-io与nio线程模型reactor模型(上)(53)

每次来一个请求开辟一个线程,互相不影响,主线程一直是空闲的。也就是就餐的人来一个就让一个服务员专门为你服务,是不是海底捞的感觉。

『互联网架构』软件架构-io与nio线程模型reactor模型(上)(53)

早期互联网还不太发达的时候可以这么搞,现在都是互联网大爆炸的时候,如果成千上万个,这时候你需要new thread 成千上万个,线程其实是系统的资源,而且是稀有的资源,请求client一直增长,线程也就一直的增长,此时我们操作系统的资源越来越紧张,最后server端无法使用。这跟餐厅是一样的,一个餐厅成百上千个人来吃饭,你就请成白上千个人,最后餐厅就倒闭了。其实很多的程序设计跟现实生活息息相关。小应用其实没问题,多用户肯定不能这样。这时候肯定有老铁说可以用线程池啊。

多线程伪异步IO

『互联网架构』软件架构-io与nio线程模型reactor模型(上)(53)

并发编程jdk1.5之后。ExecutorService可以new一个newCachedTheadPool,首先它是一个线程池,它是缓存,无限大是它的特点,既然是无限大,可以回收线程,如果此时有100个请求,也就是有1000线程在哪里,底层依然是new Thread。底层还是m:m的概念。m:m = 请求:线程。最重要点就是复用。

『互联网架构』软件架构-io与nio线程模型reactor模型(上)(53)

虽然可以复用,但是还是解决不了m个请求n个线程的问题。需要m要大于n。如果设置 ExecutorService threadPool = Excutors.newCachedThreadPool(100); 本身这个机制是队列,如果1000个请求来了,需要排队,也就是100个人服务1000个人。所以就是伪异步IO。 底层还是堵塞的。

  • NIO

    Non Block IO 非阻塞IO (new io)

    Selector 通道的管理器

    ServerSocketChannel(ServerSocket): 只关心客户端连接事件 SocketChannel(Socket):关心读事件,写事件,读写事件.. SelectionKey 事件集合

    ByteBuffer

『互联网架构』软件架构-io与nio线程模型reactor模型(上)(53)

在Java1.4之前的I/O系统中,提供的都是面向流的I/O系统,系统一次一个字节地处理数据,一个输入流产生一个字节的数据,一个输出流消费一个字节的数据,面向流的I/O速度非常慢,而在Java 1.4中推出了NIO,这是一个面向块的I/O系统,系统以块的方式处理处理,每一个操作在一步中产生或者消费一个数据库,按块处理要比按字节处理数据快的多。在NIO中有几个核心对象需要掌握:缓冲区(Buffer)、通道(Channel)、选择器(Selector)。

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;

/**
 * nio
 */
public class NioSocketDemo {
    // 通道管理器(选择器),多个用户共用的,所以需要放到这里
    private Selector selector;

    /**
     * 初始化服务端ServerSocketChannel通道,并初始化选择器
     * 获得一个ServerSocket通道,并对该通道做一些初始化的工作
     */
    public void initServer(int port) throws IOException {
        // 获取ServerSocket通道 , 相对于传统的ServerSocket
        ServerSocketChannel serverChannel = ServerSocketChannel.open();
        // 设置通道为非阻塞
        serverChannel.configureBlocking(false);
        // 将该通道对应的ServerSocket绑定到port端口
        serverChannel.socket().bind(new InetSocketAddress(port));
        // 获得一个通道选择器(管理器)
        this.selector = Selector.open();
        // 将通道选择器和该通道绑定,并为该通道注册SelectionKey.OP_ACCEPT事件,注册该事件后,
        // 当该事件到达时,selector.select()会返回,如果该事件没到达selector.select()会一直阻塞。
        // 意思是大门交给selector看着,给我监听是否有accpet事件
        serverChannel.register(this.selector, SelectionKey.OP_ACCEPT);
        System.out.println("服务端启动成功...");
        /*
        ***SelectionKey中定义的4中事件 ***
        OP_ACCEPT —— 接收连接继续事件,表示服务器监听到了客户连接,服务器可以接收这个连接了
        OP_CONNECT —— 连接就绪事件,表示客户与服务器的连接已经建立成功
        OP_READ —— 读就绪事件,表示通道中已经有了可读的数据,可以执行读操作了(通道目前有数据,可以进行读操作了)
        OP_WRITE —— 写就绪事件,表示已经可以向通道写数据了(通道目前可以用于写操作)
        */

    }

    /**
     * 采用轮询的方式监听selector上是否有需要处理的事件,如果有,则进行处理
     */
    public void listenSelector() throws IOException {
        // 轮询访问selector
        while (true) {
            // 当注册的事件到达时,方法返回;否则,该方法会一直阻塞
            // 多路复用  Reactor模型
            this.selector.select();
            // 无论是否有读写事件发生,selector每隔1s被唤醒一次  
             //this.selector.select(1000);
             //this.selector.selectNow();
            // 获得selector中选中的项的迭代器,选中的项为注册的事件
            Iterator<?> iteratorKey = this.selector.selectedKeys().iterator();
            while (iteratorKey.hasNext()) {
                SelectionKey selectionKey = (SelectionKey) iteratorKey.next();
                // 删除已选的key,以防重复处理
                iteratorKey.remove();

                new Thread(new Runnable() {

                    @Override
                    public void run() {
                        // 处理请求
                        try {
                            handler(selectionKey);
                        } catch (IOException e) {
                            e.printStackTrace();
                        }

                    }
                });

            }
        }
    }

    /**
     * 处理请求
     */
    public void handler(SelectionKey selectionKey) throws IOException {
        if (selectionKey.isAcceptable()) {//处理客户端连接请求事件
            System.out.println("新的客户端连接...");
            ServerSocketChannel server = (ServerSocketChannel) selectionKey.channel();
            // 获得和客户端连接的通道
            // 完成该操作意味着完成TCP三次握手,TCP物理链路正式建立  
            SocketChannel channel = server.accept();
            // 设置成非阻塞
            channel.configureBlocking(false);
            // 在和客户端连接成功之后,为了可以接收到客户端的信息,需要给通道设置读的权限。
            channel.register(this.selector, SelectionKey.OP_READ);
        } else if (selectionKey.isReadable()) {// 处理读的事件
            // 服务器可读取消息:得到事件发生的Socket通道
            SocketChannel channel = (SocketChannel) selectionKey.channel();
            // 创建读取的缓冲区
            ByteBuffer buffer = ByteBuffer.allocate(1024);//1kb
            int readData = channel.read(buffer);
            if(readData > 0){
                String msg = new String(buffer.array(),"GBK").trim();// 先讲缓冲区数据转化成byte数组,再转化成String
                System.out.println("服务端收到信息:" + msg);

                //回写数据
                ByteBuffer writeBackBuffer = ByteBuffer.wrap("receive data".getBytes("GBK"));
                channel.write(writeBackBuffer);// 将消息回送给客户端
            }else{
                System.out.println("客户端关闭咯...");
                //SelectionKey对象会失效,这意味着Selector再也不会监控与它相关的事件
                selectionKey.cancel();
            }
        }
    }
    /**
     * 启动服务端测试
     */
    public static void main(String[] args) throws IOException {
        NioSocketDemo server = new NioSocketDemo();
        // 初始化服务端
        server.initServer(8888);
        // 服务器端监听Selector事件
        server.listenSelector();
    }
}

PS:NIO不需要的代码里面根本没有多线程,实际上nio只有一个工作线程,一个线程可以为多个客人服务。

百度未收录

>>原创文章,欢迎转载。转载请注明:转载自IT人故事会,谢谢!

>>原文链接地址:上一篇:

已是最新文章

原文  https://idig8.com/2019/05/17/hulianwangjiagouruanjianjiagou-ioyunioxianchengmoxingreactormoxingshang53/
正文到此结束
Loading...