转载

Java多线程-线程池的创建使用与源码拓展

多线程的设计方法确实可以最大限度的发挥多核处理器的计算能力,提高吞吐量和性能。但是如果不加控制随意使用线程,对系统的性能反而会产生不利。

和进程相比,线程虽然是一种轻量级的,但是创建和关闭依然需要花费时间,如果每一个小的任务都创建一个线程,则会很有可能出现创建和销毁线程占用的时间大于该线程任务所消耗的时间。其次线程本身也是需要占用内存空间的,大量的线程会抢占宝贵的内存资源。

因此线程的使用需要掌握一个度,再有限的范围内增加线程的数量可以提高系统的性能,一旦超过这个范围,大量的线程只会拖垮整个系统。

1 什么是线程池

为了避免系统频繁的创建和销毁线程,我们可以让创建的线程复用。我们可以使用一个线程池维护一些线程,当你需要使用线程的时候,可以从池子中随便拿一个空闲线程,当完成工作时,并不急着关闭线程,而是将这些线程退回到线程池中,方便下次使用。

简而言之,再使用线程池后,创建线程编程了从线程池中获得空闲线程,关闭线程变为想线程池归还线程。

2 线程池的创建

线程池的成员都在java.util.concurrent包中,是JDK并发包的核心。其中ThreadPoolExecutor表示一个线程池。Executors类则是一个线程工厂的角色,通过Executors可以取得一个拥有特定功能的线程池,通过Executors可以取得一个特定功能的线程池。

2.1 newFixedThreadPool()方法

该方法返回一个固定线程数量的线程池。该线程池中的线程数量始终不变。当有一个新的任务提交时,线程池中若有空闲线程,则立即执行。若没有则新的任务会暂存在一个任务队列中,待有线程空闲时,便处理任务队列中的队列。

2.2 newSingleThreadExecutor()方法

该方法返回一个只有一个线程的线程池。若有多余的任务被提交到线程池,任务会被保存在一个任务队列中,待线程空闲,按先入先出的顺序执行队列中的任务。

2.3 newCachedThreadPool()方法

该方法返回一个可根据实际情况调整线程数量的线程池。线程池的线程数量不确定,但若有空闲线程可以复用,则会优先使用可复用的线程。若所有的线程都在工作,又有新的任务提交,则会创建新的线程处理任务。所有线程在当前任务执行完毕后,将返回线程池进行复用。

2.4 newSingleThreadScheduledExecutor()方法

该方法返回一个ScheduledExecutorService对象,线程池大小为1。ScheduledExecutorService接口在ExecutorService接口上扩展了在给定时间执行某任务的功能,如在某个固定的延时后执行,或者周期性执行某个任务。

2.5 newScheduledThreadPool()方法

该方法会返回一个ScheduledExecutorService对象,但该线程池可以执行线程数量。

创建固定大小的线程池

public class ThreadPoolThread {

    public static class MyTask implements Runnable{

        @Override
        public void run() {
            System.out.println(System.currentTimeMillis() + ":Thread ID: " + Thread.currentThread().getId());
            try{
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        public static void main(String[] args) {
            MyTask myTask = new MyTask();
            ExecutorService executorService = Executors.newFixedThreadPool(5);
            for (int i = 0;i< 10 ;i++){
                executorService.submit(myTask);
            }
        }
    }

}
复制代码
1562554721820:Thread ID: 12
1562554721820:Thread ID: 15
1562554721820:Thread ID: 16
1562554721820:Thread ID: 13
1562554721820:Thread ID: 14
1562554722821:Thread ID: 15
1562554722821:Thread ID: 16
1562554722821:Thread ID: 12
1562554722821:Thread ID: 13
1562554722821:Thread ID: 14
复制代码

计划执行任务

newScheduledThreadPool()方法返回一个ScheduledExecutorService对象,可以根据时间需要对线程进行调度。主要方法如下

public <V> ScheduledFuture<V> schedule(Callable<V> callable,long delay, TimeUnit unit);
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,         long period,TimeUnit unit);
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit);
复制代码

和其他线程不同,ScheduledExecutorService不一定会立即安排执行任务。他其实是起到了计划任务的作用,会在指定的时间对任务进行调度。

schedule()会在给定时间对任务进行一次调度。scheduleAtFixedRate()和scheduleWithFixedDelay()方法会对任务进行周期性调度,但是二者还是有区别的。scheduleAtFixedRate()方法的任务调度频率是一定的,它是以上一个任务开始执行的时间为起点,再在规定的时间调度下一次任务。而scheduleWithFixedDelay()方法是以上一个任务的结束后再经过规定时间进行任务调度。

public class ScheduleExecutorServiceDemo {

    public static void main(String[] args) {
        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(10);
        scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(1000);
                    System.out.println(System.currentTimeMillis());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },0,2, TimeUnit.SECONDS);
    }
}
复制代码
1562555518798
1562555520798
1562555522798
1562555524799
1562555526800
复制代码

可以看出任务每两秒被调度一次。

如果任务的执行时间大于调度时间,则任务就会在上一个任务结束后立即被调用。

将代码修改为8秒

Thread.sleep(8000);
复制代码
1562555680333
1562555688333
1562555696333
1562555704333
复制代码

调度程序实际上并不保证任务会无限期的持续调用,如果任务本身抛出异常,那么后续所有执行都会中断。

3 线程池的内部实现

对于几个核心的线程池,虽然看着创建的线程池有着不同的功能特点,但是其内部都是使用了ThreadPoolExecutor类。

看几个线程池的创建源码:

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
    }
    
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());
    }
复制代码

可以看出他们都是ThreadPoolExecutor类的封装,来看一下ThreadPoolExecutor类的构造:

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
复制代码
  • corePoolSize:指定了线程池中的线程数量。
  • maximumPoolSize:指定了线程池中的最大线程数。
  • keepAliveTime:当线程池中的线程数量超过corePoolSize,多余的空闲线程的存活时间,即超过corePoolSize的空闲线程在多长时间内被销毁。
  • unit:keepAliveTime的单位。
  • workQueue:任务队列,被提交但尚未被执行的任务的存放队列。
  • threadFacotry:线程工厂,用于创建线程。
  • handler:拒绝策略。当任务太多来不及处理时如何拒绝任务。

3.1 workQueue-任务队列

参数workQueue指被提交但未执行的任务队列,他是一个BlockingQueue接口的对象,仅用于存放Runnable对象。在ThreadPoolExecutor构造中可以使用一下几种BlockingQueue接口:

  • 直接提交队列:有SynchronousQueue对象提供。SynchronousQueue没有容量,每插入一个操作都要等待一个相应的删除操作,反之每一个删除操作都要等待对应的插入操作。使用SynchronousQueue如果总有新的任务提交给线程执行,如果没有空闲进程会尝试创建新的线程,如果线程数已经到达最大值则执行拒绝策略。使用SynchronousQueue通常要设置很大的maximumPoolSize值,否则很容易执行拒绝策略。
  • 有界的任务队列:有界的任务队列使用ArrayBlockingQueue类实现,ArrayBlockingQueue类的构造函数必须带一个容量参数,表示该队列的最大容量。使用有界的任务队列时,若有新的任务要执行,当线程池的实际线程小于corePoolSize则优先创建线程,若大于corePoolSize则将任务加入等待队列。若等待队列已满,则在总线程不大于maximumPoolSize的情况下创建新的线程执行任务,若大于maximumPoolSize则执行拒绝策略。
  • 无界的任务队列:无界的任务队列使用LinkedBlockingQueue类实现。与有界的任务队列相比,无界的任务队列不会出现入队失败的情况。使用LinkedBlockingQueue当有新的任务需要线程执行时,若线程数小于corePoolSize,则会创建新的线程,但当线程数达到corePoolSize后就不会继续增长了。后续若有新的任务加入有没有空闲线程则直接进入队列等待。
  • 优先任务队列:优先任务队列时带有执行优先级的队列。通过PriorityBlockingQueue类实现,可以控制任务的执行先后顺序。它是一个特殊的无界队列。PriorityBlockingQueue类可以根据任务自身的优先级顺序进行执行。

4 拒绝策略

拒绝策略是当任务数量超过系统实际承载能力时执行的策略。拒绝策略可以说时系统超负荷运行时的补救措施。

JDK内置了四种拒绝策略:

  • AbortPolicy策略:该策略会直接抛出异常,阻止正常工作。
  • CallerRunsPolicy策略:只要线程池未关闭,该策略直接在当前调用者线程中运行当前被丢弃的任务。这样不会真的丢弃线程,但是会使任务提交线程性能下降。
  • DiscardOldestPolicy策略:该策略将丢弃最老的一个请求,也就是即将被执行的一个任务,并尝试再次提交当前任务。
  • DiscardPolicy策略:该策略丢弃无法处理的任务,不进行任何处理。

上面的策略都实现了RejectedExecutionHandler接口,如果以上策略无法满足实际开发,可以自己扩展。

RejectedExecutionHandler接口构造:

public interface RejectedExecutionHandler {
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
复制代码

自定义拒绝策略:

//拒绝策略demo
public class RejectThreadPoolDemo {

    public static class MyTask implements Runnable{

        @Override
        public void run() {
            System.out.println(System.currentTimeMillis() + ": Thread ID : " + Thread.currentThread().getId());
            try{
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        MyTask myTask = new MyTask();
        ThreadPoolExecutor es = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(10), Executors.privilegedThreadFactory(), new RejectedExecutionHandler() {
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                System.out.println(r.toString() + "被拒绝");
            }
        });
        for (int i = 0;i<Integer.MAX_VALUE;i++){
           es.submit(myTask);
           Thread.sleep(10);
        }
    }

}
复制代码
1562575292467: Thread ID : 14
1562575292478: Thread ID : 15
1562575292489: Thread ID : 16
java.util.concurrent.FutureTask@b4c966a被拒绝
java.util.concurrent.FutureTask@2f4d3709被拒绝
java.util.concurrent.FutureTask@4e50df2e被拒绝
复制代码

5 自定义线程创建:ThreadFactory

ThreadFactory是一个接口,他只有一个用来创建线程的方法。

Thread newThread(Runnable r);
复制代码

通过自定义线程创建我们可以跟踪线程池在何时创建了多少线程,自定义线程名等。

public class ThreadFactoryDemo {

    static volatile int i = 0;

    public static class TestTask implements Runnable{

        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName());
        }
    }

    public static void main(String[] args) {
        TestTask testTask = new TestTask();
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, new SynchronousQueue<>(), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r,"test--" + i);
                i++;
                return thread;
            }
        });
        for (int i = 0;i<5;i++){
            threadPoolExecutor.submit(testTask);
        }
    }

}
复制代码
test--0
test--1
test--4
test--2
test--3
复制代码

6 扩展线程池

虽然JDK已经帮我们实现了稳定的线程池,但如果我们想要对线程池进行一些扩展,比如监控任务执行的开始和结束时间怎么办呢。

ThreadPoolExecutor是一个可以扩展的线程池,它提供了beforExecutor(),afterExecutor()和terminated()三个接口来对其进行扩展。

public class ThreadFactoryDemo {

    static volatile int i = 0;

    public static class TestTask implements Runnable{

        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName());
        }
    }

    public static void main(String[] args) {
        TestTask testTask = new TestTask();
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, new SynchronousQueue<>(), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r,"test--" + i);
                i++;
                return thread;
            }
        }){
            @Override
            protected void beforeExecute(Thread t,Runnable r){
                System.out.println("task-----准备执行");
            }
        };
        for (int i = 0;i<5;i++){
            threadPoolExecutor.submit(testTask);
        }
    }

}
复制代码
task-----准备执行
task-----准备执行
test--2
task-----准备执行
test--1
task-----准备执行
test--4
task-----准备执行
test--3
test--0
复制代码

7 submit和execute的区别

7.1 execute()方法

execute提交的方式只能提交一个Runnable的对象,且该方法的返回值是void,也即是提交后如果线程运行后,和主线程就脱离了关系了,当然可以设置一些变量来获取到线程的运行结果。并且当线程的执行过程中抛出了异常通常来说主线程也无法获取到异常的信息的,只有通过ThreadFactory主动设置线程的异常处理类才能感知到提交的线程中的异常。

7.2 sumbit()方法

sumbit()方法有三种形式:

public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }

    /**
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc}
     */
    public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task, result);
        execute(ftask);
        return ftask;
    }

    /**
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc}
     */
    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }
复制代码

sumbit方法会返回一个Future对象,这个Future对象代表这线程的执行结果,当主线程调用Future的get方法的时候会获取到从线程中返回的结果数据。如果在线程的执行过程中发生了异常,get会获取到异常的信息。

原文  https://juejin.im/post/5d23083ff265da1b971a9823
正文到此结束
Loading...