转载

JAVA并发之多线程基础(6)

JAVA并发之多线程基础(6)

在并发相关,不仅仅依靠之前介绍的各种锁或者队列操作,同时我们也需要考虑到资源的消耗情况(力扣上各种题目比消耗与时间。。)。这个时候我们就引入了线程池。

针对于大家熟悉的 Executors 进行入手,我们经常性的使用里面的线程池。当然,根据阿里巴巴的规范手册上来说,不建议我们直接通过这个类去创建一个线程池,需要通过 ThreadPoolExecutor 自行去创建,这样会让我们懂得线程池中的线程各个时间的状态变化,以防止线程池中的线程异常。

Executors

newFixedThreadPool(int nThreads)
newSingleThreadExecutor()
newCachedThreadPool()
newScheduledThreadPool(int corePoolSize)

前三个里面的实现都是利用了 ThreadPoolExecutor ,只不过传入的参数是不同的,然后造就了不同的线程池,接下来就看看 ThreadPoolExecutor 里面参数的各个含义。

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)
复制代码
corePoolSize
maximumPoolSize
keepAliveTime
unit
workQueue

默认会帮我们填充的两个参数:

  • threadFactory 线程工厂,用于产生线程放入到线程池中
  • handler 拒绝策略处理器,用于当前线程池中拒绝多余的任务等,默认是 AbortPolicy 拒绝策略

线程工厂里面就会帮我们产生一个个线程放入到线程池中:

public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                                  namePrefix + threadNumber.getAndIncrement(),
                                  0);
            if (t.isDaemon())//是否为守护线程
                t.setDaemon(false);
            if (t.getPriority() != Thread.NORM_PRIORITY)//是否为默认的优先级
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
复制代码

在线程池中的 execute(Runnable command) 方法是会去帮助我们去执行我们所需要的任务:

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();//ctl是一个包装的原子类,里面包含了线程的数量以及状态
        if (workerCountOf(c) < corePoolSize) {//工作线程数量小于当前的核心线程数
            if (addWorker(command, true))//加入到队列中,并且true代表使用核心线程
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {//判断线程池是否在运行,同时将任务加入到队列中
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))//如果线程池不是运行状态则进行拒绝任务操作
                reject(command);
            else if (workerCountOf(recheck) == 0)//如果线程数为0,则开辟一个线程去执行,不采用核心线程
                addWorker(null, false);
        }
        else if (!addWorker(command, false))//加入队列失败则进行线程数增加,这里采用的线程是大于核心线程数小于最大线程数。
            reject(command);
    }
复制代码

ForkJoinTask

这是一个比较特殊的线程池,可以将一个很大的任务进行分解成为若干个小的任务去执行。执行完成之后再将每个任务的结果进行整合返回。底下派生出两个抽象类: RecursiveAction 是没有返回值的,只是将任务划分去执行。 RecursiveTask 是有返回值的,将任务执行完成之后结果整合进行返回。

package com.montos.lock;

import java.util.ArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;

public class CountTask extends RecursiveTask<Long> {
	private static final long serialVersionUID = 1L;
	private static final int THRESHOLD = 10000;
	private long start;
	private long end;

	public CountTask(long start, long end) {
		super();
		this.start = start;
		this.end = end;
	}

	@Override
	protected Long compute() {
		long sum = 0;
		boolean canCompute = (end - start) < THRESHOLD;//阈值判断
		if (canCompute) {
			for (long i = start; i <= end; i++) {
				sum += i;
			}
		} else {
			long step = (start + end) / 100;
			ArrayList<CountTask> subTasks = new ArrayList<CountTask>();
			long pos = start;
			for (int i = 0; i < 100; i++) {
				long lastOne = pos + step;
				if (lastOne > end)
					lastOne = end;
				CountTask subTask = new CountTask(pos, lastOne);
				pos += step + 1;
				subTasks.add(subTask);
				subTask.fork();//子线程进行求解
			}
			for (CountTask t : subTasks) {
				sum += t.join();//返回所有的结果集进行求和返回
			}
		}
		return sum;
	}
	public static void main(String[] args) {
		ForkJoinPool forkJoinPool = new ForkJoinPool();
		CountTask task = new CountTask(0, 200000l);
		ForkJoinTask<Long> result = forkJoinPool.submit(task);
		try {
			Long res = result.get();
			System.out.println("result is :" + res);
		} catch (InterruptedException | ExecutionException e) {
			e.printStackTrace();
		}

	}
}

复制代码

上面是有返回值的demo,执行完之后控制台会会出现 result is :20000100000 这个返回结果。

有些小伙伴可能会遇到两个错误(修改里面的参数值,而导致出现的问题,这里我就遇到了!!):

  • 1. java.util.concurrent.ExecutionException:java.lang.StackOverflowError 。原因是因为 ForkJoin 不会对堆栈进行控制,编写代码时注意方法递归不能超过jvm的内存,如果必要需要调整jvm的内存:在Eclipse中JDK的配置中加上 -XX:MaxDirectMemorySize=128 (默认是64M)。改为128后不报栈溢出,但是报下一个错。
  • 2. java.lang.NoClassDefFoundError: Could not initialize class java.util.concurrent.locks.AbstractQueuedSynchronizer$Node 。这个导致的原因是因为子任务的处理长度不平衡。我们需要对原来的长度进行计算处理。

至此 JDK 中大部分的并发类都谈及到用法,对于底层代码的描述和处理,这块期待我之后的文章。

原文  https://juejin.im/post/5cf8608f6fb9a07ed911b15d
正文到此结束
Loading...