 
  
今天是猿灯塔“365篇原创计划”第五篇。 接下来的时间灯塔君持续更新Netty系列一共九篇
Netty 源码解析(一): 开始
Netty 源码解析(二): Netty 的 Channel
Netty 源码解析(三): Netty的 Future 和 Promise
Netty 源码解析(四): Netty 的 ChannelPipeline
当前:Netty 源码解析(五): Netty 的线程池分析
Netty 源码解析(六): Channel 的 register 操作
Netty 源码解析(七): NioEventLoop 工作流程
Netty 源码解析(八): 回到 Channel 的 register 操作
Netty 源码解析(九): connect 过程和 bind 过程分析
今天呢!灯塔君跟大家讲:
Netty 的线程池分析
接下来,我们来分析 Netty 中的线程池。Netty 中的线程池比较不好理解,因为它的类比较多,而且它们之间的关系错综复杂。看下图,感受下 NioEventLoop 类和 NioEventLoopGroup 类的继承结构:
   
  
这张图我按照继承关系整理而来,大家仔细看一下就会发现,涉及到的类确实挺多的。本节来给大家理理清楚这部分内容。
首先,我们说的 Netty 的线程池,指的就是 NioEventLoopGroup 的实例;线程池中的单个线程,指的是右边 NioEventLoop 的实例。
回顾下我们第一节介绍的 Echo 例子,客户端和服务端的启动代码中,最开始我们总是先实例化 NioEventLoopGroup:
EventLoopGroup group = new NioEventLoopGroup(); // EchoServer 代码最开始: EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup();
下面,我们就从 NioEventLoopGroup 的源码开始进行分析。 我们打开 NioEventLoopGroup 的源码,可以看到,NioEventLoopGroup 有多个构造方法用于参数设置,最简单地,我们采用无参构造函数,或仅仅设置线程数量就可以了,其他的参数采用默认值。
比如上面的代码中,我们只在实例化 bossGroup 的时候指定了参数,代表该线程池需要一个线程。
publicNioEventLoopGroup(){
this(0);
}
publicNioEventLoopGroup(intnThreads){
this(nThreads,(Executor)null);
}
...
//参数最全的构造方法
publicNioEventLoopGroup(intnThreads,Executorexecutor,EventExecutorChooserFactorychooserFactory,
finalSelectorProviderselectorProvider,
finalSelectStrategyFactoryselectStrategyFactory,
finalRejectedExecutionHandlerrejectedExecutionHandler){
//调用父类的构造方法
super(nThreads,executor,chooserFactory,selectorProvider,selectStrategyFactory,rejectedExecutionHandler);
}
复制代码 
 我们来稍微看一下构造方法中的各个参数:
这里介绍这些参数是希望大家有个印象而已,大家发现没有,在构造 NioEventLoopGroup 实例时的好几个参数,都是用来构造 NioEventLoop 用的。下面,我们从 NioEventLoopGroup 的无参构造方法开始,跟着源码走:
publicNioEventLoopGroup(){
this(0);
}
publicNioEventLoopGroup(){
this(0);
}
复制代码 
 然后一步步走下去,到这个构造方法:
publicNioEventLoopGroup(intnThreads,ThreadFactorythreadFactory,finalSelectorProviderselectorProvider,finalSelectStrategyFactoryselectStrategyFactory){
super(nThreads,threadFactory,selectorProvider,selectStrategyFactory,RejectedExecutionHandlers.reject());
}复制代码 
 大家自己要去跟一下源码,这样才知道中间设置了哪些默认值,下面这几个参数都被设置了默认值:
跟着源码走,我们会来到父类 MultithreadEventLoopGroup 的构造方法中:
protectedMultithreadEventLoopGroup(intnThreads,ThreadFactorythreadFactory,Object...args){
super(nThreads==0?DEFAULT_EVENT_LOOP_THREADS:nThreads,threadFactory,args);
}复制代码 
 这里我们发现,如果采用无参构造函数,那么到这里的时候,默认地 nThreads 会被设置为 CPU 核心数 *2。大家可以看下 DEFAULT_EVENT_LOOP_THREADS 的默认值,以及 static 代码块的设值逻辑。我们继续往下走:
protectedMultithreadEventExecutorGroup(intnThreads,ThreadFactorythreadFactory,Object...args){
this(nThreads,threadFactory==null?null:newThreadPerTaskExecutor(threadFactory),args);复制代码 
  到这一步的时候, new ThreadPerTaskExecutor(threadFactory) 会构造一个 executor。 
 我们现在还不知道这个 executor 怎么用。这里我们先看下它的源码: public final class ThreadPerTaskExecutor implements Executor { private final `ThreadFactorythreadFactory; pu blic T h readPerTaskExecutor(T hreadFactorythreadFactory) {` `i`f(`th readFactory==null){  th row` n`ewNul`l`Poi nterException("threadFactory"); } this . threa dF acto ry=threadFactory; } @Override` p`u`bl`ic void e`x`ecute(`R`unna`b`lecomma nd) { //为每个任务新建一个线 程 t hreadFactory.n e wThread(command).start(); } } Executor 作为线程池的 最 顶 层 接`口, 我们知道,它只有一个 execute(runnable) 方法,从上面我们可以看到,实现类 ThreadPerTaskExecutor 的逻辑就是每来一个任务,新建一个线程。我们先记住这个,前面也说了,它是给 NioEventLoop 用的,不是给 NioEventLoopGroup 用的。 
.一步设置完了 executor,我们继续往下看:
protectedMultithreadEventExecutorGroup(intnThreads,Executorexecutor,Object...args){
this(nThreads,executor,DefaultEventExecutorChooserFactory.INSTANCE,args);
}
这一步设置了 chooserFactory,用来实现从线程池中选择一个线程的选择策略。
复制代码 
  ChooserFactory 的逻辑比较简单,我们看下 DefaultEventExecutorChooserFactory 的实现: Override public `EventExecutorChooserne wChooser(E ventExecutor[]executors) { i f ( is PowerOfTwo(executors.length)){ ret`ur`n newP`o`wer OfTwoEventExecutorChooser(executors); }else { retu rn` n`ewGene`r`icE ventExecutorChooser(executors); } } 这里设置 的 策 略 也 很简单:1、如果线程池的线程数量是 2^n,采用下面的方式会高效一些:@Override publicEv e ntExec utornext() { re turn ex`e`c`u`tors[i dx.getAndIncrement()&executors.length-1]; } 2、如果不是,用取模的方式 : @ Override public EventExec u tornex t() { returnexe cuto rs`/[`M`a`th.abs (idx.getAndIncrement()%executors.length)]; }` 
走了这么久,我们终于到了一个干实事的构造方法中了:
protectedMultithreadEventExecutorGroup(intnThreads,Executorexecutor,
EventExecutorChooserFactorychooserFactory,Object...args){
if(nThreads<=0){
thrownewIllegalArgumentException(String.format("nThreads:%d(expected:>0)",nThreads));
}
// executor 如果是 null,做一次和前面一样的默认设置。
if(executor==null){
executor=newThreadPerTaskExecutor(newDefaultThreadFactory());
}
//这里的children数组非常重要,它就是线程池中的线程数组,这么说不太严谨,但是就大概这个意思
children=newEventExecutor[nThreads];
//下面这个for循环将实例化children数组中的每一个元素
for(inti=0;i<nThreads;i++){
booleansuccess=false;
try{
//实例化!!!!!!
children[i]=newChild(executor,args);
success=true;
}catch(Exceptione){
//TODO:Thinkaboutifthisisagoodexceptiontype
thrownewIllegalStateException("failedtocreateachildeventloop",e);
}finally{
//如果有一个child实例化失败,那么success就会为false,然后进入下面的失败处理逻辑
if(!success){
//把已经成功实例化的“线程”shutdown,shutdown是异步操作
for(intj=0;j<i;j++){
children[j].shutdownGracefully();
}
//等待这些线程成功shutdown
for(intj=0;j<i;j++){
EventExecutore=children[j];
try{
while(!e.isTerminated()){
e.awaitTermination(Integer.MAX_VALUE,TimeUnit.SECONDS);
}
}catch(InterruptedExceptioninterrupted){
//把中断状态设置回去,交给关心的线程来处理.
Thread.currentThread().interrupt();
break;
}
}
}
}
}
//================================================
//===到这里,就是代表上面的实例化所有线程已经成功结束===
//================================================
//通过之前设置的chooserFactory来实例化Chooser,把线程池数组传进去,
//这就不必再说了吧,实现线程选择策略
chooser=chooserFactory.newChooser(children);
//设置一个Listener用来监听该线程池的termination事件
//下面的代码逻辑是:给池中每一个线程都设置这个 listener,当监听到所有线程都 terminate 以后,这个线程池就算真正的 terminate 了。
finalFutureListener<Object>terminationListener=newFutureListener<Object>(){
@Override
publicvoidoperationComplete(Future<Object>future)throwsException{
if(terminatedChildren.incrementAndGet()==children.length){
terminationFuture.setSuccess(null);
}
}
};
for(EventExecutore:children){
e.terminationFuture().addListener(terminationListener);
}
//设置readonlyChildren,它是只读集合,以后用到再说
Set<EventExecutor>childrenSet=newLinkedHashSet<EventExecutor>(children.length);
Collections.addAll(childrenSet,children);
readonlyChildren=Collections.unmodifiableSet(childrenSet);
}复制代码 
 上面的代码非常简单吧,没有什么需要特别说的,接下来,我们来看看 newChild() 这个方法,这个方法非常重要,它将创建线程池中的线程。
我上面已经用过很多次"线程"这个词了,它可不是 Thread 的意思,而是指池中的个体,后面我们会看到每个"线程"在什么时候会真正创建 Thread 实例。反正每个 NioEventLoop 实例内部都会有一个自己的 Thread 实例,所以把这两个概念混在一起也无所谓吧。
 newChild(…) 方法在 NioEventLoopGroup 中覆写了,上面说的"线程"其实就是 NioEventLoop: 
@Override
protectedEventLoopnewChild(Executorexecutor,Object...args)throwsException{
returnnewNioEventLoop(this,executor,(SelectorProvider)args[0],
((SelectStrategyFactory)args[1]).newSelectStrategy(),(RejectedExecutionHandler)args[2]);
}
它调用了 NioEventLoop 的构造方法:复制代码 
 NioEventLoop(NioEventLoopGroupparent,Executorexecutor,SelectorProviderselectorProvider,
SelectStrategystrategy,RejectedExecutionHandlerrejectedExecutionHandler){
//调用父类构造器
super(parent,executor,false,DEFAULT_MAX_PENDING_TASKS,rejectedExecutionHandler);
if(selectorProvider==null){
thrownewNullPointerException("selectorProvider");
}
if(strategy==null){
thrownewNullPointerException("selectStrategy");
}
provider=selectorProvider;
//开启 NIO 中最重要的组件:Selector
finalSelectorTupleselectorTuple=openSelector();
selector=selectorTuple.selector;
unwrappedSelector=selectorTuple.unwrappedSelector;
selectStrategy=strategy;
}复制代码 
 我们先粗略观察一下,然后再往下看:
这个时候,我们来看一下 NioEventLoop 类的属性都有哪些,我们先忽略它继承自父类的属性,单单看它自己的:
privateSelectorselector; privateSelectorunwrappedSelector; privateSelectedSelectionKeySetselectedKeys; privatefinalSelectorProviderprovider; privatefinalAtomicBooleanwakenUp=newAtomicBoolean(); privatefinalSelectStrategyselectStrategy; privatevolatileintioRatio=50; privateintcancelledKeys; privatebooleanneedsToSelectAgain;复制代码
结合它的构造方法我们来总结一下:
然后我们继续走它的构造方法,我们看到上面的构造方法调用了父类的构造器,它的父类是 SingleThreadEventLoop。
protectedSingleThreadEventLoop(EventLoopGroupparent,Executorexecutor,
booleanaddTaskWakesUp,intmaxPendingTasks,
RejectedExecutionHandlerrejectedExecutionHandler){
super(parent,executor,addTaskWakesUp,maxPendingTasks,rejectedExecutionHandler);
//我们可以直接忽略这个东西,以后我们也不会再介绍它
tailTasks=newTaskQueue(maxPendingTasks);
}
复制代码 
 SingleThreadEventLoop 这个名字很诡异有没有?然后它的构造方法又调用了父类 SingleThreadEventExecutor 的构造方法:
protectedSingleThreadEventExecutor(EventExecutorGroupparent,Executorexecutor,
booleanaddTaskWakesUp,intmaxPendingTasks,
RejectedExecutionHandlerrejectedHandler){
super(parent);
this.addTaskWakesUp=addTaskWakesUp;
this.maxPendingTasks=Math.max(16,maxPendingTasks);
this.executor=ObjectUtil.checkNotNull(executor,"executor");
//taskQueue,这个东西很重要,提交给NioEventLoop的任务都会进入到这个taskQueue中等待被执行
//这个queue的默认容量是16
taskQueue=newTaskQueue(this.maxPendingTasks);
rejectedExecutionHandler=ObjectUtil.checkNotNull(rejectedHandler,"rejectedHandler");
}复制代码 
 到这里就更加诡异了,NioEventLoop 的父类是 SingleThreadEventLoop,而 SingleThreadEventLoop 的父类是 SingleThreadEventExecutor,它的名字告诉我们,它是一个 Executor,是一个线程池,而且是 Single Thread 单线程的。也就是说,线程池 NioEventLoopGroup 中的每一个线程 NioEventLoop 也可以当做一个线程池来用,只不过它只有一个线程。这种设计虽然看上去很巧妙,不过有点反人类的样子。上面这个构造函数比较简单:
 还记得默认策略吗:抛出RejectedExecutionException 异常。 在 NioEventLoopGroup 的默认构造中,它的实现是这样的: private static final `RejectedExecutionHandlerREJECT=newR eje ctedExecutionHandler(){ @Ove rr ide publ ic void r ejec t ed(Runna bletask,SingleThreadEventExecutorexecutor) { throw` `n`ew`Rejec`t`edE xecutionException(); } };` 
然后,我们再回到 NioEventLoop 的构造方法:
NioEventLoop(NioEventLoopGroupparent,Executorexecutor,SelectorProviderselectorProvider,
SelectStrategystrategy,RejectedExecutionHandlerrejectedExecutionHandler){
//我们刚刚说完了这个
super(parent,executor,false,DEFAULT_MAX_PENDING_TASKS,rejectedExecutionHandler);
if(selectorProvider==null){
thrownewNullPointerException("selectorProvider");
}
if(strategy==null){
thrownewNullPointerException("selectStrategy");
}
provider=selectorProvider;
//创建selector实例
finalSelectorTupleselectorTuple=openSelector();
selector=selectorTuple.selector;
unwrappedSelector=selectorTuple.unwrappedSelector;
selectStrategy=strategy;
}复制代码 
 可以看到,最重要的方法其实就是 openSelector() 方法,它将创建 NIO 中最重要的一个组件 Selector。在这个方法中,Netty 也做了一些优化,这部分我们就不去分析它了。到这里,我们的线程池 NioEventLoopGroup 创建完成了,并且实例化了池中的所有 NioEventLoop 实例。同时,大家应该已经看到,上面并没有真正创建 NioEventLoop 中的线程(没有创建 Thread 实例)。提前透露一下,创建线程的时机在第一个任务提交过来的时候,那么第一个任务是什么呢?是我们马上要说的 channel 的 register 操作。
365天干货不断微信搜索「猿灯塔」第一时间阅读,回复【资料】【面试】【简历】有我准备的一线大厂面试资料和简历模板