Netty系列(一):NioEventLoopGroup源码解析

前言

对于 NioEventLoopGroup 这个对象,在我的理解里面它就和 ThreadGroup 类似, NioEventLoopGroup 中有一堆 NioEventLoop 小弟, ThreadGroup 中有一堆 Thread 小弟,真正意义上干活的都是 NioEventLoopThread 这两个小弟。下面的文章大家可以类比下进行阅读,应该会很容易弄懂的。(本文基于netty-4.1.32.Final)

NioEventLoopGroup

这里咱们可以从 NioEventLoopGroup 最简单的无参构造函数开始。

1    public NioEventLoopGroup() {
2        this(0);
3    }
复制代码

一步步往下走,可以发现最终调用到构造函数:

1    public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
2                             final SelectStrategyFactory selectStrategyFactory) {
3        super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
4    }
复制代码

参数说明:

  1. nThreads:在整个方法链的调用过程中,其值到这里为止一直为0,在没有主动配置的情况下后面会进行设置。若配置 io.netty.eventLoopThreads 系统环境变量,则优先考虑,否则设置成为 CPU核心数*2
  2. executor: 到目前为止是 null
  3. selectorProvider: 这里为JDK的默认实现 SelectorProvider.provider()
  4. selectStrategyFactory:这里的值是DefaultSelectStrategyFactory的一个实例 SelectStrategyFactory INSTANCE = new DefaultSelectStrategyFactory()
  5. RejectedExecutionHandlers:这里是个拒绝策略,这里默认的实现是队列溢出时抛出 RejectedExecutionException 异常。

MultithreadEventLoopGroup

继续往下面走,调用父类 MultithreadEventLoopGroup 中的构造函数:

1    protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
2        super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
3    }
复制代码

这里可以看到判断 nThreads == 0 后就会给其附上一个默认值。继续走,调用父类 MultithreadEventExecutorGroup 中的构造方法

1    protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
2        this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
3    }
复制代码

DefaultEventExecutorChooserFactory

这里有个关注的点, DefaultEventExecutorChooserFactory 。这是一个chooserFactory,用来生产 EventExecutorChooser 选择器的。而 EventExecutorChooser 的功能是用来选择哪个 EventExecutor 去执行咱们的任务。咱们从下面的代码中可以观察到 DefaultEventExecutorChooserFactory 一共给咱们提供了两种策略。

1    public EventExecutorChooser newChooser(EventExecutor[] executors) {
2        if (isPowerOfTwo(executors.length)) {
3            return new PowerOfTwoEventExecutorChooser(executors);
4        } else {
5            return new GenericEventExecutorChooser(executors);
6        }
7    }
复制代码

这里的策略也很简单:

  1. 如果给的线程数是2^n个,那么选择 PowerOfTwoEventExecutorChooser 这个选择器,因为这样可以采用位运算去获取执行任务的 EventExecutor
1        public EventExecutor next() {
2            return executors[idx.getAndIncrement() & executors.length - 1];
3        }
复制代码
  1. GenericEventExecutorChooser 选择器,这里采用的是取模的方式去获取执行任务的 EventExecutor
1        public EventExecutor next() {
2            return executors[Math.abs(idx.getAndIncrement() % executors.length)];
3        }
复制代码

相比而言, 位运算的效率要比取模的效率高 ,所以咱们在自定义线程数的时候,最好设置成为2^n个线程数。

干正事

到达最终调用的函数

 1    protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
 2                                            EventExecutorChooserFactory chooserFactory, Object... args) {
 3        if (nThreads <= 0) {
 4            throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
 5        }
 6
 7        if (executor == null) {
 8            executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
 9        }
10
11        children = new EventExecutor[nThreads];
12
13        for (int i = 0; i < nThreads; i ++) {
14            boolean success = false;
15            try {
16                children[i] = newChild(executor, args);
17                success = true;
18            } catch (Exception e) {
19                // TODO: Think about if this is a good exception type
20                throw new IllegalStateException("failed to create a child event loop", e);
21            } finally {
22                if (!success) {
23                    for (int j = 0; j < i; j ++) {
24                        //创建NioEventLoop失败后进行资源的一些释放
25                        children[j].shutdownGracefully();
26                    }
27
28                    for (int j = 0; j < i; j ++) {
29                        EventExecutor e = children[j];
30                        try {
31                            while (!e.isTerminated()) {
32                                e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
33                            }
34                        } catch (InterruptedException interrupted) {
35                            // Let the caller handle the interruption.
36                            Thread.currentThread().interrupt();
37                            break;
38                        }
39                    }
40                }
41            }
42        }
43       //这里可以去看下上面对于 DefaultEventExecutorChooserFactory的一些介绍
44        chooser = chooserFactory.newChooser(children);
45
46        final FutureListener<Object> terminationListener = new FutureListener<Object>() {
47            @Override
48            public void operationComplete(Future<Object> future) throws Exception {
49                if (terminatedChildren.incrementAndGet() == children.length) {
50                    terminationFuture.setSuccess(null);
51                }
52            }
53        };
54
55        for (EventExecutor e: children) {
56            // 给每一个成功创建的EventExecutor 绑定一个监听终止事件
57            e.terminationFuture().addListener(terminationListener);
58        }
59
60        Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
61        Collections.addAll(childrenSet, children);
62        // 弄一个只读的EventExecutor数组,方便后面快速迭代,不会抛出并发修改异常
63        readonlyChildren = Collections.unmodifiableSet(childrenSet);
64    }
复制代码

从上面的代码可以观察到,等了很久的executor 在这里终于给其赋值了,其值为 ThreadPerTaskExecutor 的一个实例对象,这一块的初始化赋值都是很简单的,干活调用的是如下方法:

1    public void execute(Runnable command) {
2        threadFactory.newThread(command).start();
3    }
复制代码

对这一块不是很了解的可以去查阅下线程池有关的资料,咱们重点关注一下 newChild 这个方法,可以说是上面整个流程中的重点:

newChild

newChild 这个方法在 NioEventLoopGroup 中被重写了:

1    protected EventLoop newChild(Executor executor, Object... args) throws Exception {
2        return new NioEventLoop(this, executor, (SelectorProvider) args[0],
3            ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
4    }
复制代码

细心的小伙伴可以观察到,这里有用到SelectorProvider,SelectStrategyFactory以及RejectedExecutionHandler这个三个参数,实际上就是本文最开始初始化的三个实例对象(可以翻阅到顶部查看一下)。

继续往下走流程:

 1    NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
 2                 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
 3        super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
 4        if (selectorProvider == null) {
 5            throw new NullPointerException("selectorProvider");
 6        }
 7        if (strategy == null) {
 8            throw new NullPointerException("selectStrategy");
 9        }
10        provider = selectorProvider;
11        final SelectorTuple selectorTuple = openSelector();
12        selector = selectorTuple.selector;
13        unwrappedSelector = selectorTuple.unwrappedSelector;
14        selectStrategy = strategy;
15    }
复制代码

在上面的代码片段中除了调用父类的构造器之外就进行了参数的判空和简单的赋值。这里 openSelector 方法调用后返回 SelectorTuple 实例主要是为了能同时得到包装前后的 selectorunwrappedSelector

 1    protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
 2                                        boolean addTaskWakesUp, int maxPendingTasks,
 3                                        RejectedExecutionHandler rejectedHandler) {
 4        super(parent);
 5        this.addTaskWakesUp = addTaskWakesUp;
 6        this.maxPendingTasks = Math.max(16, maxPendingTasks);
 7        this.executor = ObjectUtil.checkNotNull(executor, "executor");
 8        taskQueue = newTaskQueue(this.maxPendingTasks);
 9        rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
10    }
复制代码

这里会有一个 taskQueue 队列的初始化( Queue<Runnable> taskQueue ),看名字就知道,这个队列里面放着的是咱们要去执行的任务。这里的初始化方法 newTaskQueueNioEventLoop 中重写了的。具体如下:

1    protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
2        // This event loop never calls takeTask()
3        return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue()
4                                                    : PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks);
5    }
复制代码

这里生成的是一个 MPSC队列(Multi Producer Single Consumer) ,这是一个多生产者单消费的无队列,支持并发。从字面意思上就可以观察到这个队列效率应该是蛮高的。这里的 maxPendingTasks 值为 Integer.MAX_VALUE 。然后最终生成的是 MpscUnboundedArrayQueue 这样一个无边界的队列。

这样 newChild 这个方法到这里就走完了。

terminationListener

简单介绍下这个环节,在上面的创建 NioEventLoopGroup 有个环节是给每个 NioEventLoop 儿子绑定一个terminationListener监听事件

1        for (EventExecutor e: children) {
2            e.terminationFuture().addListener(terminationListener);
3        }
复制代码

这个事件的回调方法是:

1            @Override
2            public void operationComplete(Future<Object> future) throws Exception {
3                if (terminatedChildren.incrementAndGet() == children.length) {
4                    terminationFuture.setSuccess(null);
5                }
6            }
复制代码

在每一个 NioEventLoop 关闭后,就会回调这个方法,然后给 NioEventLoopGroup 实例中的 terminatedChildren 字段自增1,并与初始化成功的 NioEventLoop 的总个数进行比较,如果

terminatedChildren 的值与 NioEventLoop 的总个数相等,则调用 bossGroup.terminationFuture().get() 方法就不会阻塞,并正常返回 null

同样, future.channel().closeFuture().sync() 这段代码也将不会阻塞住了,调用 sync.get() 也会返回 null

下面给一段测试代码,完整示例大家可以到我的 github 中去获取:

Netty系列(一):NioEventLoopGroup源码解析

terminationListener_test

上面的代码只是一个简单的测试,后面还有别的发现的话会继续在 github 中与大家一起分享~

End

原文 

https://juejin.im/post/5c3b4e43e51d45501c5664de

本站部分文章源于互联网,本着传播知识、有益学习和研究的目的进行的转载,为网友免费提供。如有著作权人或出版方提出异议,本站将立即删除。如果您对文章转载有任何疑问请告之我们,以便我们及时纠正。

PS:推荐一个微信公众号: askHarries 或者qq群:474807195,里面会分享一些资深架构师录制的视频录像:有Spring,MyBatis,Netty源码分析,高并发、高性能、分布式、微服务架构的原理,JVM性能优化这些成为架构师必备的知识体系。还能领取免费的学习资源,目前受益良多

转载请注明原文出处:Harries Blog™ » Netty系列(一):NioEventLoopGroup源码解析

赞 (0)
分享到:更多 ()

评论 0

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址