转载

行到水穷处,坐看云起时 - JUC(一)

  1. JUC(一)
    • 线程池
    • ThreadLocal
  2. JUC(二)
    • CAS
  3. JUC(三)
    • 并发容器
    • 并发流程
  4. JUC(四)
    • AQS
    • 治理线程

线程池

一、相同的线程是如何执行不同的任务的?超过核心线程数的线程是如何被回收的?

先看一下线程池里面很重要的两个成员变量

/**
 * The queue used for holding tasks and handing off to worker
 * threads.
 */
private final BlockingQueue<Runnable> workQueue;
/**
 * Set containing all worker threads in pool. Accessed only when
 * holding mainLock.
 */
private final HashSet<Worker> workers = new HashSet<Worker>();
复制代码

这里的Worker即持有具体thread的工作线程,workQueue(阻塞队列)则是我们传入的一个个需要被执行的任务(run方法里面的内容)。其他的结构性的说明可以瞅瞅这篇美团技术团队的文章,写的特别好: https://tech.meituan.com/2020/04/02/java-pooling-pratice-in-meituan.html

那相同的线程是如何执行不同的任务的?

来看看线程池的runWorker方法

行到水穷处,坐看云起时 - JUC(一)

工作线程在执行完成一个任务以后,就会去阻塞队列获取新的任务,执行任务的run方法。

有一个挺有意思的地方:线程池在新任务进来时候,如果核心线程数没有满,则会去再开一个线程,而不是复用已存在的空闲的核心线程,因为runWork的方法,在getTask的地方会阻塞,但是他是阻塞在了获取队列中的task。

回收?保留?

上面说到,getTask会阻塞线程。这和回收保留有什么关系呢?我们来看看这段代码

行到水穷处,坐看云起时 - JUC(一)

可以很清晰的看到:核心线程永久阻塞,非核心线程则计时阻塞

ThreadLocal

二、ThreadLocal用在哪些场景?

  1. 每个需要一个独享的对象(通常是工具类,典型的有SimpleDateFormat和Random)
  2. 每个线程内需要保存的全局变量,例如拦截器中获取的用户信息

ThreadLocal是什么?

我们先看看它的存在方式,在Thread类里面有一个成员变量:

/* ThreadLocal values pertaining to this thread. This map is maintained
 * by the ThreadLocal class. */
ThreadLocal.ThreadLocalMap threadLocals = null;
复制代码

它被一个线程所携带,存放在一个map里面,这个map的key为ThreadLocal对象,value为你需要设置的值。

当我们调用ThreadLocal的set方法时,其实是把当前的threadlocal作为key,加上你的value,放入了当前线程的那个Map里面。

为什么需要使用ThreadLocal?

我们来一步步看一下如下场景(模拟大量请求得到服务的情况,在这条请求链路中,我们都需要使用相同的格式进行打印时间:从0-999):

  1. 最暴力的方式,我们直接在每一个任务运行时候创建一个SimpleDateFormat对象,这样虽然没什么问题,但是1000个“相同”对象的创建和销毁,着实没有必要
public class ThreadLocalNormalUsage {

    public static ExecutorService threadPool = Executors.newFixedThreadPool(10);

    public static void main(String[] args) {
        for (int i = 0; i < 1000; i++) {
            int finalI = i;
            threadPool.submit(() -> {
                String date = new ThreadLocalNormalUsage().date(finalI);
                System.out.println(date);
                // 更多的service层任务
            });
        }
        threadPool.shutdown();
    }

    public String date(int seconds) {
        //参数的单位是毫秒,从1970.1.1 00:00:00 GMT计时
        Date date = new Date(1000 * seconds);
        SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        return dateFormat.format(date);
    }
}
复制代码
  1. 那既然你说是相同的,直接使用全局的静态变量怎么样呢?是不是就解决问题了?
public class ThreadLocalNormalUsage {

    public static ExecutorService threadPool = Executors.newFixedThreadPool(10);
    static SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

    public static void main(String[] args) {
        for (int i = 0; i < 1000; i++) {
            int finalI = i;
            threadPool.submit(() -> {
                String date = new ThreadLocalNormalUsage().date(finalI);
                System.out.println(date);
                // 更多的service层任务
            });
        }
        threadPool.shutdown();
    }

    public String date(int seconds) {
        //参数的单位是毫秒,从1970.1.1 00:00:00 GMT计时
        Date date = new Date(1000 * seconds);
        return dateFormat.format(date);
    }
}
复制代码
行到水穷处,坐看云起时 - JUC(一)

现在问题就来了,居然出现了相同的时间打印,显然这是不应该的呀。

因为SimpleDateFormat在多线程访问下就会出现问题,因为他本身并不是线程安全的类。

  1. 这样的话,我们使用synchronized把关键操作保护起来如何?
public class ThreadLocalNormalUsage {

    public static ExecutorService threadPool = Executors.newFixedThreadPool(10);
    static SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

    public static void main(String[] args) {
        for (int i = 0; i < 1000; i++) {
            int finalI = i;
            threadPool.submit(() -> {
                String date = new ThreadLocalNormalUsage().date(finalI);
                System.out.println(date);
                // 更多的service层任务
            });
        }
        threadPool.shutdown();
    }

    public String date(int seconds) {
        //参数的单位是毫秒,从1970.1.1 00:00:00 GMT计时
        Date date = new Date(1000 * seconds);
        String s;
        synchronized (ThreadLocalNormalUsage.class) {
            s = dateFormat.format(date);
        }
        return s;
    }
}
复制代码

没错,这次结果正常了。

但是这样不行呀,使用同步保护后,所有并发的线程都在这排队,性能损耗岂不是很严重,这还得了。

  1. 好了,主角闪亮登场:对应线程池中的10个线程,我们为每一个线程创建一个SimpleDateFormat对象,这样既保证了并发安全,又节省了对象的开销
public class ThreadLocalNormalUsage {

    public static ExecutorService threadPool = Executors.newFixedThreadPool(10);

    public static void main(String[] args) {
        for (int i = 0; i < 1000; i++) {
            int finalI = i;
            threadPool.submit(() -> {
                String date = new ThreadLocalNormalUsage().date(finalI);
                System.out.println(date);
                // 更多的service层任务
            });
        }
        threadPool.shutdown();
    }

    public String date(int seconds) {
        //参数的单位是毫秒,从1970.1.1 00:00:00 GMT计时
        Date date = new Date(1000 * seconds);
        SimpleDateFormat dateFormat = ThreadSafeFormatter.dateFormatThreadLocal2.get();
        return dateFormat.format(date);
    }
}

/**
 * 两种效果一样的写法,都是重写initialValue方法
 * initialValue方法会延迟加载,在使用get方法时候才会触发
 */
class ThreadSafeFormatter {
    

    public static ThreadLocal<SimpleDateFormat> dateFormatThreadLocal = new ThreadLocal<SimpleDateFormat>() {
        @Override
        protected SimpleDateFormat initialValue() {
            return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        }
    };

    public static ThreadLocal<SimpleDateFormat> dateFormatThreadLocal2 = ThreadLocal
            .withInitial(() -> new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"));
}
复制代码

再一种场景就是需要在整条链路传参(用户信息)了,我们虽然可以使用方法参数的方式,但是并不优雅,ThreadLocal的set、get了解一下

public class ThreadLocalNormalUsage {

    public static void main(String[] args) {
        new Service1().process("mrhe");

    }
}

class Service1 {

    public void process(String name) {
        User user = new User(name);
        UserContextHolder.holder.set(user);
        new Service2().process();
    }
}

class Service2 {

    public void process() {
        User user = UserContextHolder.holder.get();
        System.out.println("Service2拿到用户名:" + user.name);
        new Service3().process();
    }
}

class Service3 {

    public void process() {
        User user = UserContextHolder.holder.get();
        System.out.println("Service3拿到用户名:" + user.name);
        UserContextHolder.holder.remove();
    }
}

class UserContextHolder {
    public static ThreadLocal<User> holder = new ThreadLocal<>();
}

class User {

    String name;

    public User(String name) {
        this.name = name;
    }
}
复制代码

相关知识点

ThreadLocal导致内存泄露

为什么会内存泄露呢?会发生在哪儿?

我们先来分析一下ThreadLocal里面的那个Map

/**
 * The entries in this hash map extend WeakReference, using
 * its main ref field as the key (which is always a
 * ThreadLocal object).  Note that null keys (i.e. entry.get()
 * == null) mean that the key is no longer referenced, so the
 * entry can be expunged from table.  Such entries are referred to
 * as "stale entries" in the code that follows.
 */
static class Entry extends WeakReference<ThreadLocal<?>> {
    /** The value associated with this ThreadLocal. */
    Object value;

    Entry(ThreadLocal<?> k, Object v) {
        super(k);
        value = v;
    }
}
复制代码

显然,这个key使用的是弱引用,既然是弱引用,那内存泄露应该不是发生在这里(那就只有value咯~)。

正常情况下,当线程终止,保存在ThreadLocal里的value就会被垃圾回收,因为没有强引用了。但是,如果线程不终止(比如线程池中反复使用并保持的线程),那么key对应的value就不能被回收,因为有如下的调用链:

Thread -> ThreadLocalMap -> Entry(key为null) -> Value

因为这个强引用链路还存在,所以value就无法被回收,就可能出现OOM。JDK已经考虑到这个问题,所以在set、remove和rehash方法中会扫描key为null的Entry,进而把value置为null:

/**
 * Double the capacity of the table.
 */
private void resize() {
    Entry[] oldTab = table;
    int oldLen = oldTab.length;
    int newLen = oldLen * 2;
    Entry[] newTab = new Entry[newLen];
    int count = 0;

    for (int j = 0; j < oldLen; ++j) {
        Entry e = oldTab[j];
        if (e != null) {
            ThreadLocal<?> k = e.get();
            if (k == null) {
                e.value = null; // Help the GC
            } else {
                int h = k.threadLocalHashCode & (newLen - 1);
                while (newTab[h] != null)
                    h = nextIndex(h, newLen);
                newTab[h] = e;
                count++;
            }
        }
    }

    setThreshold(newLen);
    size = count;
    table = newTab;
}
复制代码

但是如果一个ThreadLocal不被使用,那么实际上set、rehash等方法也不再被调用,这时线程又不停止的话,就会内存泄漏了。也就是说,需要我们手动去remove。

话说回来,我们一般使用的是static的ThreadLocal,那JDK这个机制也就无效了。而且我们在使用线程池的时候,线程是会复用的,那这个时候为了防止无用value不断堆积又该怎么办呢?

那我们最好在每个任务执行完成的时候做一下必要的清理工作:

/**
 * 重写线程池中的方法
 */
protected void afterExecute(Runnable r, Throwable t) { 
    Thread.currentThread().threadLocals = null;
}
复制代码
原文  https://juejin.im/post/5eaa8e97e51d454db74367fd
正文到此结束
Loading...