转载

Java并发类库的线程池

  • 频繁创建线程和销毁线程需要时间,大大降低系统的效率
  • 缺乏统一管理,可能无限制的新建对象,相互竞争,造成系统资源占用过多
  • 缺少更多执行,定期执行,线程中断等更多功能

线程池的好处

  • 减少了创建和销毁线程的次数,线程可被重复利用,可执行多个任务
  • 控制最大并发数,提高系统资源利用率
  • 避免过多资源竞争,避免阻塞
  • 提供定期执行,单线程,并发数控制等功能

线程池的生命周期

Java并发类库的线程池

ThreadPoolExecutor 核心类

public class ThreadPoolExecutor extends AbstractExecutorService {

    // 包含 4 个构造方法。 其他 3 个通过调用该构造方法。

    public ThreadPoolExecutor(
        int corePoolSize,                       // 核心线程数量
        int maximumPoolSize,                   // 最大线程数
        long keepAliveTime,                    // 线程没有执行任务时最大保持多久终止
        TimeUnit unit,                         // keepAliveTime的时间单位
        BlockingQueue<Runnable> workQueue,     // 阻塞队列,存储等待执行的任务
        ThreadFactory threadFactory,           // 线程工厂,用来创建线程
        RejectedExecutionHandler handler       // 当拒绝处理任务时的策略
    ) { }

}
复制代码

构造方法核心参数

int corePoolSize

核心线程数量。

默认情况下,创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列(workQueue)当中。

int maximumPoolSize

最大线程数。在线程池中最多能创建多少个线程。

int keepAliveTime

线程没有执行任务时最大保持多久终止。

默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用。

调用了allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0。

TimeUnit unit

参数 keepAliveTime 的时间单位

TimeUnit.DAYS;               //天
TimeUnit.HOURS;             //小时
TimeUnit.MINUTES;           //分钟
TimeUnit.SECONDS;           //秒
TimeUnit.MILLISECONDS;      //毫秒
TimeUnit.MICROSECONDS;      //微妙
TimeUnit.NANOSECONDS;       //纳秒
复制代码

BlockingQueue workQueue

阻塞队列,存储等待执行的任务,对线程池运行过程产生重大影响。

  • ArrayBlockingQueue
  • LinkedBlockingQueue
  • SynchronousQueue

ThreadFactory threadFactory

线程工厂,主要用来创建线程。

RejectedExecutionHandler handler

当拒绝处理任务时的策略。

// 丢弃任务,并抛出 RejectedExecutionException
ThreadPoolExecutor.AbortPolicy
// 丢弃任务,不抛出异常
ThreadPoolExecutor.DiscardPolicy
// 丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
ThreadPoolExecutor.DiscardOldestPolicy
// 调用线程处理该任务
ThreadPoolExecutor.CallerRunsPolicy
复制代码

线程池的交互和线程池的内部工作过程

Java并发类库的线程池

ThreadPoolExecutor常用方法

public class ThreadPoolExecutor extends AbstractExecutorService {
    // 提交任务,交给线程池执行
    public void execute(Runnable command) {}

    // 提交任务,能够返回执行结果 (execute + Future)
    public Future<?> submit(Runnable task) {}
    public <T> Future<T> submit(Runnable task, T result) {}
    public <T> Future<T> submit(Callable<T> task) {}

    // 关闭线程池,等待任务执行完
    public void shutdown() {}
            
    // 立即关闭,不等待任务关闭        
    public void shutdownNow() {}

    // 获得线程池中已执行和未执行的任务总数
    public long getTaskCount() {}

    // 获得已完成任务数量
    public long getCompletedTaskCount() {}

    // 线程池当前的线程数量
    public int getPoolSize() {}

    // 获得当前线程池中正在执行的线程数量
    public int getActiveCount() {
}
复制代码

内置线程池

  • newSingleThreadExecutor()
  • newFixedThreadPooll(int nThreads)
  • newCachedThreadPool()
  • newScheduledThreadPool(int corePoolSize) / newSingleThreadScheduledExecutor()
  • newWorkStealingPool(int parallelism)

利用Executors提供的通用线程池创建方法,创建不同配置的线程池,主要区别在于不同的ExecutorService类型或者不同的初始参数。

Java并发类库的线程池

newSingleThreadExecutor()

  • 创建一个单线程的线程池
  • 只有一个线程在工作,保证所有任务的执行顺序按照任务的提交顺序执行
  • 现行大多数GUI程序都是单线程的
  • Android中单线程可用于数据库操作,文件操作,应用批量安装,应用批量删除等不适合并发但可能IO阻塞性及影响UI线程响应的操作
public static void main(String[] args) {

    ExecutorService pool = Executors.newSingleThreadExecutor();

    // 执行过程将会顺序输出 0 --> 9
    for (int i = 0; i < 10; i++) {
        final int index = i;
        executorService.execute(() -> {
            System.out.println(index);
        })
    }
}
复制代码
// corePoolSize = 1
// maximumPoolSIze = 1
// keyAliveTime = 0L
// unit = TimeUnit.MILLISECONDS
// workQueue = new LinkedBlockingQueue<Runnable>()
// threadFactory = Executors.defaultThreadFactory()
// handler = defaultHandler = new AbortPolicy()
public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}
复制代码

newFixedThreadPooll(int nThreads)

  • 创建固定大小的线程池,每次提交一个任务就创建一个线程,直到达到nThreads
  • 线程池的大小一旦达到 nThreads 就保持不变,如果某个线程因为执行异常而结束,线程池会补充一个新线程
  • 可控制线程最大并发数,超出的线程会在队列中等待
public static void main(String[] args) {
    // 线程池大小为3
    ExecutorService executorService = Executors.newFixedThreadPool(3);

    // 每隔秒打印三个数字
    for (int i = 0; i < 50; i++) {
        final int index = i;
        executorService.execute(() -> {
            System.out.println(index);
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
    }
    executorService.shutdown();
}
复制代码
// corePoolSize = nThreads
// maximumPoolSIze = nThreads
// keyAliveTime = 0L
// unit = TimeUnit.MILLISECONDS
// workQueue = new LinkedBlockingQueue<Runnable>()
// threadFactory = Executors.defaultThreadFactory()
// handler = defaultHandler = new AbortPolicy()
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
            0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
}
复制代码

newCachedThreadPool()

  • 创建一个可缓存的线程池
  • 线程池的大小超过了处理任务所需要的线程,那么就会回收部分空闲(60s)的线程
  • 当任务数增加时,此线程池可智能的添加新线程来处理任务
  • 此线程池不会对线程池大小做限制,线程池大小完全依赖于操作系统(或者说JVM)能够创建的最大线程大小
public static void main(String[] args) {
    ExecutorService executorService = Executors.newCachedThreadPool();

    for (int i = 0; i < 10; i++) {
        final int index = i;
        executorService.execute(new Runnable() {
            @Override
            public void run() {
                System.out.println(index);
            }
        });
    }

    executorService.shuntdown();
}
复制代码
// corePoolSize = 0
// maximumPoolSIze = Integer.MAX_VALUE
// keyAliveTime = 60L
// unit = TimeUnit.MILLISECONDS
// workQueue = new LinkedBlockingQueue<Runnable>()
// threadFactory = Executors.defaultThreadFactory()
// handler = defaultHandler = new AbortPolicy()
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,
                                new SynchronousQueue<Runnable>());
}
复制代码

newScheduledThreadPool(int corePoolSize) / newSingleThreadScheduledExecutor()

  • 创建一个大小无限的线程池
  • 支持定时以及周期性执行任务的需求
public static void main(String[] args) {
    // new ScheduledThreadPoolExecutor(corePoolSize)
    ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5);

    // executorService 执行具有调度含义
    // delay: 3  SECONDS 后执行
	executorService.schedule(() -> System.out.println("schedule running"), 2, TimeUnit.SECONDS);

    executorService.shutdown();
}
复制代码
public static void main(String[] args) {
    ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5);
    
    // executorService 执行具有调度含义
    // scheduleAtFixedRate 以指定的速率运行 每隔一段时候就触发
    // 1: initalDelay 延迟1秒
    // 3: period 每格3秒
    executorService.scheduleAtFixedRate(() -> System.out.println(System.nanoTime()), 1, 3, TimeUnit.SECONDS);

    // 不适用关闭线程池
    // 若需要关闭线程池,可通过提供关闭信息,再调用该方法
    // executorService.shutdown();
}
复制代码
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}

// --- 

// DEFAULT_KEEPALIVE_MILLIS = 10L
// MILLISECONDS = TimeUnit.MILLI_SCALE
public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE,
            DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
            new DelayedWorkQueue());
}
复制代码

newWorkStealingPool(int parallelism)

Java 8才加入这个创建方法,其内部会构建ForkJoinPool,利用Work-Stealing算法,并行地处 理任务(默认为主机CPU的可用核心数),不保证处理顺序

public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
}

// --- 

public ForkJoinPool(int parallelism,
                        ForkJoinWorkerThreadFactory factory,
                        UncaughtExceptionHandler handler,
                        boolean asyncMode) {

    this(checkParallelism(parallelism),
            checkFactory(factory),
            handler,
            asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
            "ForkJoinPool-" + nextPoolId() + "-worker-");

    checkPermission();
}
复制代码
原文  https://juejin.im/post/5d4d289051882515fd6bd519
正文到此结束
Loading...