Java线程池/Runnable/Callable/Future

  • java线程池创建的方式
  • 实例
  • 实例分析
  • 运行结果
  • 补充

java线程池的创建方式

java提供了快速创建线程池的方法,即使用java.util.concurrent包提供的Executors类进行线程池的创建,Executors提供了多种常见线程池的创建方式,其中:

//创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
public static ExecutorService newFixedThreadPool(int nThreads);

//创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
public static ExecutorService newSingleThreadExecutor()

//创建一个定长线程池,支持定时及周期性任务执行。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)

//创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
public static ExecutorService newCachedThreadPool()
复制代码

以上四种线程池几乎涵盖了我们日常普通编码时遇到的大多数场景,其用法大同小异, 本篇将着重使用newSingleThreadExecutor及newFixedThreadPool两种场景,配合Runnable/Callable/Future进行线程池经典用法的讲解

实例

我们先来看一段代码:

public int doSomething()
{
    try
    {
        System.out.println("do something.");
        Thread.sleep(100);
    }
    catch (InterruptedException e)
    {
        e.printStackTrace();
    }

    return (new Random(1000)).nextInt();
}
复制代码

简单来看,我们需要用100ms的时间去模拟一段任务,然后返回一个随机数。以此功能为基础,我们知道引入多线程可以完成任务并发,减少任务耗时,提高系统处理任务的能力。

那么现在思考几个问题:

  • 1、此任务为批量执行任务,一次请求可携带大量需处理任务标识及任务数量
  • 2、要求可以同时并发处理多次请求;
  • 3、要求符合系统实际承载能力。

对以上需求修改:

  • 1、为满足批量执行任务,引入多线程修改方案,按照批量处理个数创建指定数目线程池实现
  • 2、为满足每次请求可以并发处理,引入单线程线程池,每次接受请求时,在单线程线程池中启动批量处理流程
  • 3、为满足系统实际承载能力,将第一步中创建指定数目的线程池的处理方式修改为系统所有请求共享线程资源

实例分析

不说思路,直接看代码:

public class ThreadPool
{

    public static ExecutorService es = Executors.newFixedThreadPool(20);

    public static void main(String[] args)
        throws InterruptedException
    {
        fixedThreadPool(5);
        Thread.sleep(2000); //模拟请求发起时间的不确定性。
        fixedThreadPool(50);
        fixedThreadPool(4);
        fixedThreadPool(6);

        System.out.println("submit task all");
        return;
    }

    private static void fixedThreadPool(int count)
    {
        long startTime = System.currentTimeMillis();
        Executors.newSingleThreadExecutor().execute(new Runnable()
        {
            @Override
            public void run()
            {
                Map<Integer, Future<Integer>> futures = new HashMap<>();
                //加入5个Callable任务,直到返回结果
                for (int i = 1; i <= count; i++)
                {
                    final long time = System.currentTimeMillis();
                    final int task = i;
                    Future<Integer> future = es.submit(new Callable<Integer>()
                    {
                        @Override
                        public Integer call()
                            throws Exception
                        {
                            try
                            {
                                Thread.sleep(100);
                            }
                            catch (InterruptedException e)
                            {
                                e.printStackTrace();
                            }

                            System.out.println("TaskFlag: " + count + ", Install MCS plugins " + task + " is working.");
                            return (new Random(1000)).nextInt();
                        }
                    });
                          //future.get(); //次步骤将阻塞线程的提交流程,但并不会阻塞线程池中的任务。
                    futures.put(task, future);
                }

                for (int i = 1; i <= count; i++)
                {

                    try
                    {

                        System.out.println("taskID: " + i + ", Res: " + futures.get(i).get());
                    }
                    catch (Exception e)
                    {
                        e.printStackTrace();
                    }
                }

                System.out.println(count + " 个 安装MCS 插件任务submit完成!Time:" + (System.currentTimeMillis() - startTime));


                //虽然shutdown方法是等所有任务跑完后才真正停掉线程池,但该方法不会造成堵塞,也就是这代码运行后,下一行代码会立刻运行
                //es.shutdown();
                System.out.println("Executors.newSingleThreadExecutor().execute : shutdown后退出! Time:" + (System.currentTimeMillis() - startTime));

            }
        });
    }
}
复制代码

代码分为三部分

  • 1、系统线程池定义,目前有20个长度的线程池;
  • 2、main函数中调用fixedThreadPool()进行模拟请求;
  • 3、fixedThreadPool中对请求内容进行定义,即接受前端请求,每次执行用户指定数目的获取随机数操作;

代码实现的功能

  • 1、创建单线程线程池的方式进行异步请求处理,不关心执行结果,直接返回;
  • 2、在请求的单线程线程池中,按照用户输入的批量操作数进行批量提交至系统线程池中,完成提交后等待操作结果,并打印。

代码分析

1、

public static ExecutorService es = Executors.newFixedThreadPool(20);
复制代码

根据系统实现能力创建的共享线程池,数值可以根据系统承载能力进行计算,如CPU个数,这里不进行扩展。

2、

Executors.newSingleThreadExecutor().execute(new Runnable()
    {
        @Override
        public void run()
        {
	//do somethings
        }
    });
复制代码

在每个请求中,通过单线程线程池直接执行一个匿名类的 run 方法,执行操作。 Java java.util.concurrent 包中提供的单线程线程池将返回一个 ExecutorService 对象,我们再来看 ExecutorService 的定义:

Java线程池/Runnable/Callable/Future
ExecutorService 实现了 Executor

接口:

Java线程池/Runnable/Callable/Future

因此可以通过 Executors.newSingleThreadExecutor() 创建的单线程线程池的 execute 方法执行一个线程。

execute 方法接受一个实现了 Runnable 接口的类的对象,可以是实际的类或匿名类,本例中使用了匿名类的方式;

本例中通过单线程池方式实现请求的异步处理。

3、

Future<Integer> future = es.submit(new Callable<Integer>()
{
    @Override
    public Integer call()
        throws Exception
    {
        try
        {
            Thread.sleep(100);
        }
        catch (InterruptedException e)
        {
            e.printStackTrace();
        }

        System.out.println("TaskFlag: " + count + ", Install MCS plugins " + task + " is working.");
        return (new Random(1000)).nextInt();
    }
});

futures.put(task, future);

复制代码

先看看本例中使用的线程池: es 我们回到java源码:

Java线程池/Runnable/Callable/Future

同理我们发现 newFixedThreadPool 同样返回一个 ExecutorService 对象,java源码中,此方法返回一个 LinkedBlockingQueue<Runnable> 对象。但本篇中不详细讲解,有兴趣可进一步读java源码; 创建定长线程池时在本例中,使用 submit 的方式来执行一个任务:

Future<Integer> future = es.submit(new Callable<Integer>()
{
    @Override
    public Integer call()
        throws Exception
    {
        //do something                            
        return (new Random(1000)).nextInt();
    }
});
复制代码

此时与前一步创建的单线程池中有三点不同,我们分别讲解:

  • 1、单线程池中未使用 Future 对象进行返回结果查询
  • 2、单线程池使用 execute 的方式执行,定长线程池使用 submit 的方式执行;
  • 3、单线程执行时使用实现 Runnable 接口的类,定长线程池使用实现 Callable 接口的类;

我们逐个分析

  • 1、单线程池中未使用 Future 对象进行返回结果查询 现在先了解一下 Future 类: 我们先看看 submit 方法的定义:
Java线程池/Runnable/Callable/Future

我对此注释进行了简要翻译:

提交一个有返回值的任务,方法将返回一个Future类型的结果,使用Future提供的get函数将在线程完全执行成功后,得到线程的执行结果。
注释中对Future类型做了简要解释:表示一个挂起的任务的执行结果。
复制代码

在对 Future 类型的定义进行了查找:

public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCancelled();
    boolean isDone();
    V get() throws InterruptedException, ExecutionException;
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}
复制代码

官方的注释进行比较详细的说明( 这里是重点

表示异步计算的结果。提供了检查计算是否完成、等待计算完成以及检索计算结果的方法。 只有在计算完成后,才能使用方法@code get检索结果,如有必要,将其阻塞,直到其就绪。 取消由@code cancel方法执行。提供了其他方法来确定任务是否正常完成或被取消。一旦计算完成,就不能取消计算。

我认为不需要过多解释我们就可以明确为什么会使用 Future 的原因: 我们使用 Future 作为任务执行结果的一个获取,而此种获取方式是异步的,因此我们通过 submit 方式提交一个任务时,将返回一个 Future 对象作为当前提交线程的 控制对象

结论:在本例中,需要对执行结果进行操作,因此使用Future对象;

  • 2、单线程池使用 execute 的方式执行,定长线程池使用 submit 的方式执行?

通过对第一个问题的简单分析,我们可以明确到,本例中,需要对执行结果进行处理(虽然在本例中仅仅做了打印),那么我们所执行的线程对象必然要有返回值,那么 Executor 接口提供的 execute 没有返回值;

  • 3、单线程执行时使用实现 Runnable 接口的类,定长线程池使用实现 Callable 接口的类; 分析完以上两个问题,那么第三个问题就迎刃而解了 Runnable 不支持线程返回结果,而 Callable 接口支持;

通过以上三点的简要分析,可以得出以下结论:

  • 1、 RunnableCallable 的核心差别: Runnable 不支持线程返回结果,而 Callable 接口支持;
  • 2、 submitexecute 作用相同,都用于执行线程池中的线程,但 execute 无返回值,但 submit 可返回线程的执行结果, submit 也可以提交 Runnable 接口实现的线程对象
Java线程池/Runnable/Callable/Future
  • 3、Future对象可以获取执行线程的控制权,停止线程、以阻塞的方式获取线程结果。

执行结果

TaskFlag: 5, Install MCS plugins 3 is working.
TaskFlag: 5, Install MCS plugins 2 is working.
TaskFlag: 5, Install MCS plugins 1 is working.
TaskFlag: 5, Install MCS plugins 5 is working.
taskID: 1, Res: -1244746321
taskID: 2, Res: -1244746321
taskID: 3, Res: -1244746321
TaskFlag: 5, Install MCS plugins 4 is working.
taskID: 4, Res: -1244746321
taskID: 5, Res: -1244746321
5 个 安装MCS 插件任务submit完成!Time:114
Executors.newSingleThreadExecutor().execute : shutdown后退出! Time:114
submit task all
TaskFlag: 50, Install MCS plugins 4 is working.
TaskFlag: 50, Install MCS plugins 2 is working.
TaskFlag: 6, Install MCS plugins 1 is working.
TaskFlag: 50, Install MCS plugins 3 is working.
TaskFlag: 6, Install MCS plugins 2 is working.
taskID: 1, Res: -1244746321
taskID: 2, Res: -1244746321
TaskFlag: 6, Install MCS plugins 5 is working.
TaskFlag: 6, Install MCS plugins 3 is working.
taskID: 3, Res: -1244746321
TaskFlag: 6, Install MCS plugins 6 is working.
TaskFlag: 4, Install MCS plugins 3 is working.
TaskFlag: 50, Install MCS plugins 8 is working.
TaskFlag: 4, Install MCS plugins 2 is working.
TaskFlag: 50, Install MCS plugins 7 is working.
TaskFlag: 4, Install MCS plugins 4 is working.
TaskFlag: 4, Install MCS plugins 1 is working.
TaskFlag: 50, Install MCS plugins 9 is working.
taskID: 1, Res: -1244746321
TaskFlag: 50, Install MCS plugins 10 is working.
taskID: 2, Res: -1244746321
taskID: 3, Res: -1244746321
taskID: 4, Res: -1244746321
4 个 安装MCS 插件任务submit完成!Time:104
Executors.newSingleThreadExecutor().execute : shutdown后退出! Time:104
TaskFlag: 50, Install MCS plugins 6 is working.
TaskFlag: 50, Install MCS plugins 5 is working.
TaskFlag: 6, Install MCS plugins 4 is working.
taskID: 4, Res: -1244746321
taskID: 5, Res: -1244746321
taskID: 6, Res: -1244746321
6 个 安装MCS 插件任务submit完成!Time:105
Executors.newSingleThreadExecutor().execute : shutdown后退出! Time:105
TaskFlag: 50, Install MCS plugins 1 is working.
taskID: 1, Res: -1244746321
taskID: 2, Res: -1244746321
taskID: 3, Res: -1244746321
taskID: 4, Res: -1244746321
taskID: 5, Res: -1244746321
taskID: 6, Res: -1244746321
taskID: 7, Res: -1244746321
taskID: 8, Res: -1244746321
taskID: 9, Res: -1244746321
taskID: 10, Res: -1244746321
TaskFlag: 50, Install MCS plugins 11 is working.
taskID: 11, Res: -1244746321
TaskFlag: 50, Install MCS plugins 13 is working.
TaskFlag: 50, Install MCS plugins 12 is working.
taskID: 12, Res: -1244746321
taskID: 13, Res: -1244746321
TaskFlag: 50, Install MCS plugins 16 is working.
TaskFlag: 50, Install MCS plugins 14 is working.
TaskFlag: 50, Install MCS plugins 17 is working.
TaskFlag: 50, Install MCS plugins 19 is working.
TaskFlag: 50, Install MCS plugins 21 is working.
TaskFlag: 50, Install MCS plugins 18 is working.
TaskFlag: 50, Install MCS plugins 15 is working.
TaskFlag: 50, Install MCS plugins 28 is working.
TaskFlag: 50, Install MCS plugins 26 is working.
TaskFlag: 50, Install MCS plugins 24 is working.
TaskFlag: 50, Install MCS plugins 27 is working.
TaskFlag: 50, Install MCS plugins 29 is working.
TaskFlag: 50, Install MCS plugins 20 is working.
taskID: 14, Res: -1244746321
taskID: 15, Res: -1244746321
taskID: 16, Res: -1244746321
taskID: 17, Res: -1244746321
taskID: 18, Res: -1244746321
taskID: 19, Res: -1244746321
taskID: 20, Res: -1244746321
taskID: 21, Res: -1244746321
TaskFlag: 50, Install MCS plugins 25 is working.
TaskFlag: 50, Install MCS plugins 23 is working.
TaskFlag: 50, Install MCS plugins 22 is working.
taskID: 22, Res: -1244746321
taskID: 23, Res: -1244746321
taskID: 24, Res: -1244746321
taskID: 25, Res: -1244746321
taskID: 26, Res: -1244746321
taskID: 27, Res: -1244746321
taskID: 28, Res: -1244746321
taskID: 29, Res: -1244746321
TaskFlag: 50, Install MCS plugins 30 is working.
taskID: 30, Res: -1244746321
TaskFlag: 50, Install MCS plugins 31 is working.
taskID: 31, Res: -1244746321
TaskFlag: 50, Install MCS plugins 33 is working.
TaskFlag: 50, Install MCS plugins 32 is working.
taskID: 32, Res: -1244746321
taskID: 33, Res: -1244746321
TaskFlag: 50, Install MCS plugins 34 is working.
taskID: 34, Res: -1244746321
TaskFlag: 50, Install MCS plugins 37 is working.
TaskFlag: 50, Install MCS plugins 36 is working.
TaskFlag: 50, Install MCS plugins 35 is working.
taskID: 35, Res: -1244746321
taskID: 36, Res: -1244746321
taskID: 37, Res: -1244746321
TaskFlag: 50, Install MCS plugins 45 is working.
TaskFlag: 50, Install MCS plugins 38 is working.
TaskFlag: 50, Install MCS plugins 40 is working.
TaskFlag: 50, Install MCS plugins 42 is working.
taskID: 38, Res: -1244746321
TaskFlag: 50, Install MCS plugins 44 is working.
TaskFlag: 50, Install MCS plugins 43 is working.
TaskFlag: 50, Install MCS plugins 41 is working.
TaskFlag: 50, Install MCS plugins 39 is working.
taskID: 39, Res: -1244746321
taskID: 40, Res: -1244746321
taskID: 41, Res: -1244746321
taskID: 42, Res: -1244746321
taskID: 43, Res: -1244746321
taskID: 44, Res: -1244746321
taskID: 45, Res: -1244746321
TaskFlag: 50, Install MCS plugins 49 is working.
TaskFlag: 50, Install MCS plugins 48 is working.
TaskFlag: 50, Install MCS plugins 47 is working.
TaskFlag: 50, Install MCS plugins 46 is working.
taskID: 46, Res: -1244746321
taskID: 47, Res: -1244746321
taskID: 48, Res: -1244746321
taskID: 49, Res: -1244746321
TaskFlag: 50, Install MCS plugins 50 is working.
taskID: 50, Res: -1244746321
50 个 安装MCS 插件任务submit完成!Time:308
Executors.newSingleThreadExecutor().execute : shutdown后退出! Time:308
复制代码

补充

  • 1、Future提供的get方法是阻塞的,它将阻塞当前运行方法,直至有结果返回 因此,我们这里有一种错误的用法: 批量提交任务时,在提交任务后立即执行future.get() 此操作会阻塞线程的提交,导致线程池的能力不能发挥到最大;

  • 2、ExecutorService提供了shutdown方法和shutdownNow方法,区别在于shutdown将与线程池等待队列中的所有线程执行完成后退出; 而shutdownNow则是强制退出,无论是否有任务执行。

原文  https://juejin.im/post/5cefb4e151882548e438ca8c
正文到此结束
Loading...