转载

ThreadPoolExecutor

ThreadPoolExecutor

ThreadPoolExecutor的创建

ThreadPoolExecutor提供了4种构造方法,以最多参数的为例

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)
序号 参数名 类型 含义
1 corePoolSize int 核心线程数
2 maximumPoolSize int 最大线程数
3 keepAliveTime long 线程最大空闲时间
4 unit TimeUnit 空闲时间单位
5 workQueue BlockingQueue<Runnable> 工作队列
6 threadFactory ThreadFactory 线程工厂
7 handler RejectedExecutionHandler 拒绝策略

workQueue

移步

threadFactory

新的线程通过指定的ThreadFactory创建。 如果未指定,则使用Executors.defaultThreadFactory默认工厂,创建的线程将全部属于同一个ThreadGroup中。

DefaultThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                                  Thread.currentThread().getThreadGroup();
            namePrefix = "pool-" +
                          poolNumber.getAndIncrement() +
                         "-thread-";
        }

所有的线程具有相同的NORM_PRIORITY优先级和非守护进程状态。

public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                                  namePrefix + threadNumber.getAndIncrement(),
                                  0);
            if (t.isDaemon())
                t.setDaemon(false);
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }

通过指定的ThreadFactory,可以控制线程的名称,线程组,优先级,守护进程状态等。

handler

ThreadPoolExecutor.AbortPolicy 抛出java.util.concurrent.RejectedExecutionException异常

ThreadPoolExecutor.CallerRunsPolicy 线程调用运行该任务的execute 本身运行被拒绝的任务;如果执行程序已关闭,则会丢弃该任务

ThreadPoolExecutor.DiscardPolicy 默认情况下将丢弃被拒绝的任务

ThreadPoolExecutor.DiscardOldestPolicy 如果执行程序尚未关闭,则位于工作队列头部的任务将被删除,然后重试执行程序

ThreadPoolExecutor的工作流程

ThreadPoolExecutor

当一个任务通过execute(Runnable)方法欲添加到线程池时

如果此时线程池中的数量小于corePoolSize,即使线程池中的线程都处于空闲状态,也要创建新的线程来处理被添加的任务。

如果此时线程池中的数量等于 corePoolSize,但是缓冲队列 workQueue未满,那么任务被放入缓冲队列。

如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量小于maximumPoolSize,建新的线程来处理被添加的任务。

如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量等于maximumPoolSize,就要通过 handler所指定的策略来处理被拒绝的任务。

当线程池中的线程数量大于 corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止。这样,线程池可以动态的调整池中的线程数。

Executors提供的预定义线程池

newFixedThreadPool

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

corePoolSize与maximumPoolSize相等,所有线程都是核心线程,线程池大小固定;

keepAliveTime = 0 该参数无效,因为FixedThreadPool全部为核心线程;

workQueue = LinkedBlockingQueue,缺省初始化大小时队列最大长度为Integer.MAX_VALUE,实际上有内存大小控制队列实际长度,(JVM线程分配内存 -Xss)如果任务提交速度持续大余任务处理速度,大量线程阻塞在队列中,可能在拒绝策略前OOM;

LinkedBlockingQueue 中的putLock和takeLock皆为非公平锁,因此FixedThreadPool的任务执行是无序的;

newSingleThreadPool

public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

特殊的FixedThreadPool,线程池大小固定1,单线程执行

newCachedThreadPool

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

corePoolSize = 0,maximumPoolSize = Integer.MAX_VALUE,执行线程数无限制(实际数量由虚拟机内存限制);

keepAliveTime = 60s,线程空闲60s后回收。

workQueue = SynchronousQueue,同步队列,此队列入队必须出队必须同时传递,队列内部不会存储元素,因此CachedThreadPool线程实际上不会有队列等待;

newScheduledThreadPool

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }
public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE,
              DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
              new DelayedWorkQueue());
    }

DelayedWorkQueue保证添加到队列中的任务,会按照任务的延时时间进行排序,延时时间少的任务首先被获取

newWorkStealingPool

public static ExecutorService newWorkStealingPool(int parallelism) {
        return new ForkJoinPool
            (parallelism,
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
    }

newWorkStealingPool是一个并行的线程池,参数中传入的是一个线程并发的数量,和之前4种线程池不同,这个线程池不会保证任务的顺序执行,也就是 WorkStealing(工作窃取)的意思,抢占式的工作会创建一个含有足够多线程的线程池,来维持相应的并行级别,它会通过工作窃取的方式使多核的CPU不会闲置,总会有活着的线程让CPU运行

原文  https://segmentfault.com/a/1190000023025846
正文到此结束
Loading...