转载

Netty 学习系列六:服务端 ServerSocketChannel 绑定

零、 整体流程

Netty 学习系列六:服务端 ServerSocketChannel 绑定

1、用户在main线程启动执行ServerSocketChannel的初始化

  • 1)初始化一个NioServerSocketChannel channel 包括初始化Channel关联的ch、pipeline、unsafe、config。
nio.ServerSocketChannel ch = nio.SelectorProvider.openServerSocketChannel();
ch.configBlocking(false);
  • 2)将channel与ServerBootstrap的EventLoopGroup中的某个EventLoop child绑定
  • 3)向child的任务执行队列中添加channel的register0事件
  • 4)监听register0事件的完成状态,完成时向child的任务执行队列中添加channel的bind事件

2、EventLoop线程中执行register0事件

  • 1)将NioServerSocketChannel的ch注册到NioEventLoop child的selector上
SelectionKey selectionKey = ch.register(eventLoop().selector, 0, this);
  • 2)标识ChannelPromise状态为success,触发Listener将bind任务添加到EventLoop的执行任务队列
  • 3)ServerSocketChannel channel产生ChannelRegisted事件
pipeline.fireChannelRegistered();

会导致ChannelInitialzer.channelRegisted()执行,将ServerBootstrapAcceptor添加到channel的pipeline中。

3、EventLoop线程中执行Channel.bind事件

  • 1)ServerSocketChannel绑定对应服务端口,监听新的客户端连接
javaChannel().socket().bind(localAddress, config.getBacklog());
  • 2)ServerSocketChannel channel产生ChannelActive事件
pipeline.fireChannelActive();

ChannelActive事件由HeadContext处理,向ch中添加OP_ACCEPT事件监听。

selectionKey.interestOps(OP_ACCEPT);

一、代码入口

ServerBootstrap b = new ServerBootstrap();
// Start the server.
ChannelFuture f = b.bind(port).sync();

服务端启动时都会执行上面的代码,用来启动ServerSocketChannel监听对应端口。

最终由AbstractBootstrap.doBind方法处理。

private ChannelFuture doBind(final SocketAddress localAddress) {
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();
        if (regFuture.cause() != null) {
            return regFuture;
        }

        if (regFuture.isDone()) {
            // At this point we know that the registration was complete and successful.
            ChannelPromise promise = channel.newPromise();
            doBind0(regFuture, channel, localAddress, promise);
            return promise;
        } else {
            // Registration future is almost always fulfilled already, but just in case it's not.
            final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
            regFuture.addListener(new ChannelFutureListener() {
               
                public void operationComplete(ChannelFuture future) throws Exception {
                    Throwable cause = future.cause();
                    if (cause != null) {
                        // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                        // IllegalStateException once we try to access the EventLoop of the Channel.
                        promise.setFailure(cause);
                    } else {
                        // Registration was successful, so set the correct executor to use.
                        // See https://github.com/netty/netty/issues/2586
                        promise.registered();

                        doBind0(regFuture, channel, localAddress, promise);
                    }
                }
            });
            return promise;
        }
    }

二、用户线程初始化NioServerSocketChannel

final Channel channel = channelFactory.newChannel();

从此行代码开始,ReflectiveChannelFactory通过反射的方式调用NioServerSocketChannel的构造方法进行channel的初始化。

1、NioServerSocketChannel的实例化

  • 1)创建一个nio.ServerSocketChannel
newSocket(SelectorProvider provider);

使用nio.SelectorProvider.openServerSocketChannel()创建一个nio.ServerSocketChannel ch。

  • 2)通过构造方法实例化NioServerSocketChannel,并初始化相关的field
parent = null
unsafe = new NioMessageUnsafe()
pipeline = new DefaultChannelPipeline(this) ->此处初始化一个DefaultChannelPipeline,并将pipeline和channel互相绑定
ch = ch ->同时将ch设置为非阻塞模式
readInterestOp = OP_ACCEPT
config = new ServerSocketChannelConfig(this, javaChannel().socket())
  • 3)向channel的pipeline中添加Inbound处理器ChannelInitializer
init(channel);

由其父类ServerBootstrap直接实现。

  • 设置channel的attr和options
  • 添加ChannelInitializer到pipeline中
p.addLast(new ChannelInitializer<Channel>() {
           
            public void initChannel(Channel ch) throws Exception {
                ChannelPipeline pipeline = ch.pipeline();
                ChannelHandler handler = config.handler();
                if (handler != null) {
                    pipeline.addLast(handler);
                }
                pipeline.addLast(new ServerBootstrapAcceptor(
                        currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
            }
        }

该处理器的 channelRegisted事件 回调方法会将 ServerBootstrapAcceptor 加入channel的pipeline中,并将自己从pipeline中移除。

  • 4)ServerBootstrapAcceptor

ServerBootstrapAcceptor也是一个Inbound处理器,用于在Server端accept新的客户端连接时,向新生成的socketChannel中添加用户定义的业务处理器。

其channelRead事件回调方法会将业务方往ServerBootstrap中添加的childHandler添加到socketChannel对应的pipeline中。

对于Server端,channelRead事件被定义为server端accept到了新的socket连接。

2、将NioServerSocketChannel注册到ServerBootstrap的EventLoopGroup上

ChannelFuture regFuture = group().register(channel);

  • 1)NioEventLoopGroup的chooser从其children中选出一个NioEventLoop child,调用其register()方法进行channel注册;

  • 2)实际将NioEventLoop child传给NioServerSocketChannel的unsafe,调用其register(EventLoop eventLoop, final ChannelPromise promise)方法完成注册

  • 将channel.eventLoop绑定为当前NioEventLoop child;

  • 将AbstractUnsafe.register0(DefaultChannelPromise promise)任务加入EventLoop child的执行任务队列;

3、给标识“register0任务”完成状态ChannelFuture添加一个Listener

doBind0(regFuture, channel, localAddress, promise);

当ChannelFuture完成时,将NioServerSocketChannel.bind(SocketAddress localAddress, ChannelPromise promise)任务加入EventLoop child的执行任务队列

三、EventLoop线程中执行register0事件

AbstractUnsafe.register0(ChannelPromise promise);

1、将NioServerSocketChannel的ch注册到NioEventLoop child的selector上,同时将注册得到的SelectionKey绑定为NioServerSocketChannel的selectionKey

doRegister();
selectionKey = javaChannel().register(eventLoop().selector, 0, this);

netty的轮询注册机制其实是将AbstractNioChannel内部的jdk类对象SelectableChannel ch注册到jdk类对象Selector selector上去,并且将AbstractNioChannel channel作为SelectableChannel对象ch的一个attachment附属上,这样在jdk轮询出某个SelectableChannel有IO事件发生时,就可以直接取出AbstractNioChannel进行后续操作。

2、标识ChannelPromise状态为success,触发Listener将bind任务添加到EventLoop的执行任务队列

safeSetSuccess(promise);

3、ServerSocketChannel channel产生ChannelRegisted事件

pipeline.fireChannelRegistered();

会导致ChannelInitialzer.channelRegisted()执行,将ServerBootstrapAcceptor添加到channel的pipeline中。

四、EventLoop线程中执行Channel.bind事件

AbstractUnsafe.bind(final SocketAddress localAddress, final ChannelPromise promise);

1、ServerSocketChannel绑定对应服务端口,监听新的客户端连接

javaChannel().socket().bind(localAddress, config.getBacklog());

2、ServerSocketChannel channel产生ChannelActive事件

pipeline.fireChannelActive();

ChannelActive事件由HeadContext处理,最终调用了AbstractNioUnsafe.doBeginRead()方法,向ch中添加OP_READ事件监听。

selectionKey.interestOps(interestOps | readInterestOp);
原文  https://xiaozhuanlan.com/topic/7863219054
正文到此结束
Loading...