转载

Java线程的基本使用

首先,这篇文章写的都是一些比较基础的内容,也就是从API层面解释一下我们平时用的比较多的东西,其实我倒是也想写点底层的东西,可是我也不懂啊。虽然比较基础,但可能却比较容易忽略吧,如果有不对的地方,希望大佬嘴下留情。

在Java中使用多线程,本质上还是对Thread对象的操作。线程池只是为了方便对线程的管理,避免频繁的创建和销毁线程带来不必要的系统开销,内部通过指定的线程数和阻塞队列实现。

基本使用

创建一个Thread对象的时候一般会传递一个Runnable对象,任务逻辑就写在Runnable的run方法中。感觉这个Runnable的名字取得不太好,如果叫Task是不是会更好一些呢?

new Thread(()-> doXX() ).start();
复制代码

获取返回值

上面的那种方式使用起来是挺简单,但会遇到一些问题,比如:能获取返回值不?

通过全局变量

像上面这样是没办法获取返回值的,所以我们需要做一些处理,比如,将结果赋值给一个全局变量

private static int result;

public static void main(String[] args) throws InterruptedException {
    new Thread(() -> {
        System.out.println("处理业务逻辑");
        result = 1000;
    }).start();
    Thread.sleep(1000);
    System.out.println(result);
}
复制代码

result 就是一个全局变量,当任务执行完成之后,更新这个值。这其实都不能算是返回值,但有时候也能用:不需要立即知道任务的执行结果,在访问全部变量的时候,只需要获取它的值就好了。比如通过定时任务去更新缓存,不需要关注任务什么时候执行完成,我需要的只是缓存的值,任务执行了就获取最新的值,没有执行就获取旧值。

通过空轮询

那假如我就是想现在获取返回值咋办?因为我要用这个返回值作为下面逻辑的输入。那或许可以通过轮询的方式检测全局变量来达到目的?

while(result == null){
}
复制代码

除了白白浪费CPU,好像也行啊?但我现在考虑的只是两个线程,如果有多个线程该对全局变量修改该怎么办呢?那用ThreadLocal?算了,就此打住吧

通过简单封装

或许可以封装一下?再封装之前,先考虑几个问题

Task
public static void main(String[] args) throws InterruptedException {
    CallableThread callableThread = new CallableThread(() -> {
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "ccccc";
    });

    callableThread.start();
    System.out.println("开始时间 " + LocalDateTime.now());
    System.out.println(callableThread.get());
    System.out.println("结束时间 " + LocalDateTime.now());
}


class CallableThread<T> extends Thread {
    private Task<T> task;

    private T result;

    private volatile boolean finished = false;

    public CallableThread(Task<T> task) {
        this.task = task;
    }

    @Override
    public void run() {
        synchronized (this) {
            result = task.call();
            finished = true;
            notifyAll();
        }
    }

    public T get() {
        synchronized (this) {
            while (!finished) {
                try {
                    wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            return result;
        }
    }
}

@FunctionalInterface
interface Task<T> {
    T call();
}
复制代码

这样貌似也可以,但是不太好。Thread本来只是用于处理和线程相关的事情,现在将它和逻辑(Task)绑定在一起,如果有多个任务想共用一个Thread,那返回值怎么处理?

是否可以将这部分逻辑抽出来,放到一个新类当中?

public static void main(String[] args) throws InterruptedException {
    MyRunnable<String> myRunnable = new MyRunnable(() -> {
        // 模拟耗时的业务操作
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "我是结果";
    });
    System.out.println("开始时间 " + LocalDateTime.now());
    new Thread(myRunnable).start();

    System.out.println("result: " + myRunnable.get());
    System.out.println("结束时间 " + LocalDateTime.now());
}


class MyRunnable<T> implements Runnable {
    private Task<T> task;

    private T result;

    private volatile boolean finished = false;

    public MyRunnable(Task<T> task) {
        this.task = task;
    }

    @Override
    public void run() {
        synchronized (this) {
            result = task.call();
            finished = true;
            notifyAll();
        }
    }

    public T get() {
        synchronized (this) {
            while (!finished) {
                try {
                    wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            return result;
        }
    }
}
复制代码

这不是和java里面的Future有点像吗?确实有点像

Future模式

Future 里面有几个比较核心的概念

  1. Future:抽象出 获取任务返回值获取任务执行状态 等常用方法的接口
  2. Callable:类似于上面的 Task
  3. FutureTask:类似于上面的 MyRunnable

下面看一个例子

public static void main(String[] args) throws ExecutionException, InterruptedException {
    FutureTask<String> future = new FutureTask<>(() -> {
        Thread.sleep(3000);
        System.out.println(System.currentTimeMillis());
        return "hehehh";
    });
    new Thread(future).start();
    System.out.println("Start Get Result : " + System.currentTimeMillis());
    System.out.println("Get Result : " + future.get() + System.currentTimeMillis());
}
复制代码

Future

Future 接口除了提供获取返回值的接口,还提供了一些其他的接口,根据名字大概也可以猜到什么意思,不过多解释了。实在不行看看源码吧,这样子就很愉快了。

boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
复制代码

FutureTask

FutureTask 同时实现了 RunnableFuture 接口,

任务状态

FutureTask 中,任务的不同状态通过 state 变量来表示,状态有以下几种:

/*
 * NEW -> COMPLETING -> NORMAL
 * NEW -> COMPLETING -> EXCEPTIONAL
 * NEW -> CANCELLED
 * NEW -> INTERRUPTING -> INTERRUPTED
 */

private volatile int state;

private static final int NEW          = 0;
private static final int COMPLETING   = 1;
private static final int NORMAL       = 2;
private static final int EXCEPTIONAL  = 3;
private static final int CANCELLED    = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED  = 6;
复制代码

任务执行

因为 FutureTask 本身也实现了 Runnable 接口,所以核心关注它的run方法,执行逻辑其实比较简单:

  1. 先判断状态,如果不为 NEW 或者通过cas更新 runner 失败,则直接返回
  2. 执行 Callable#call 方法,根据执行结果,设置状态, 如果执行成功:先将state设置成 COMPLETING ,然后保存返回的结果保存到属性 outcome ,再将state设置成 NORMAL ,最后通过 LockSupport.unpark(t) 解除阻塞的线程; 如果执行失败:先将state设置成 COMPLETING ,然后异常信息保存到属性 outcome ,再将state设置成 EXCEPTIONAL ,最后通过 LockSupport.unpark(t) 解除阻塞的线程;

如何阻塞

当我们通过 FutureTask#get 方法获取返回值的时候,会阻塞当前线程,那是通过什么方式阻塞当前线程的?是通过 LockSupport 阻塞的,这个推荐看看博客吧。我也是看博客的,自己也解释的没人家好,嗯,就是这样的

public V get() throws InterruptedException, ExecutionException {
    int s = state;
    if (s <= COMPLETING)
        s = awaitDone(false, 0L);
    return report(s);
}

private int awaitDone(boolean timed, long nanos)
    throws InterruptedException {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    WaitNode q = null;
    boolean queued = false;
    for (;;) {
        if (Thread.interrupted()) {
            removeWaiter(q);
            throw new InterruptedException();
        }

        int s = state;
        // state > COMPLETING ,说明任务要么正常执行,要么异常结束,所以这里可以直接返回
        if (s > COMPLETING) {
            if (q != null)
                q.thread = null;   // 这应该是help GC吧?
            return s;
        }
        // 如果正在收尾阶段,交出CPU, 等下次循环
        else if (s == COMPLETING) // cannot time out yet
            Thread.yield();
        else if (q == null)
            q = new WaitNode();
        // 通过UNSAFE 设置 waiters
        else if (!queued)
            // 将新的`WaitNode`添加到单向链表的头部,waiters即对应头节点
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                    q.next = waiters, q);
        else if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                removeWaiter(q);
                return state;
            }
            LockSupport.parkNanos(this, nanos);
        }
        else
            LockSupport.park(this);   // 阻塞当前线程
    }
}
复制代码

上面我们看到了有一个 waiters ,这是用来干嘛的呢?它是一个单向链表结构,主要是为了处理多次调用 FutureTask#get 的情况,每调用一次 FutureTask#get 就会生成一个 WaitNode 节点,然后将它添加到单向链表的头部

那什么时候用到这个链表呢?在任务执行完成的时候,会执行 finishCompletion 方法,主要就是从头节点依次往下遍历,获取节点的 thread 属性,然后执行 LockSupport.unpark(thread) 解除阻塞

回调如何处理

相对之前的那种方式来说, FutureTask 已经很好用了,直接通过 FutureTask#get 方法就可以获取返回值了,确实蛮方便的。

不过方便是方便,但假如我想在获取返回值之后执行一些其他的逻辑该怎么处理呢?其实我最直接的想法就是回调了。比如,我们可以对上面的 MyRunnable 代码再扩展一下,例如

public MyRunnable addListener(Consumer c) {
    // 这里是一个例子,肯定不会每次都new一个线程,一般是使用线程池
        while (!finished) {
            try {
                Thread.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        c.accept(result);
    }).start();
    return this;
}
复制代码

我们给 MyRunnable 添加了一个 addListener 方法,接收一个 Consumer 作为入参,当任务执行完成之后就执行这段逻辑,如下:

public static void main(String[] args) throws InterruptedException {
    MyRunnable<String> myRunnable = new MyRunnable(() -> {
        // 模拟耗时的业务操作
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "我是结果";
    });
    System.out.println("开始时间 " + LocalDateTime.now());
    new Thread(myRunnable).start();

    myRunnable.addListener(result -> {
        System.out.println("当xxx执行完成之后,线程:" + Thread.currentThread().getName() + " 执行一些其他的任务");
        result = result + "   ggggg";
        System.out.println(result);
    });
}
复制代码

ListenableFuture

ListenableFutureguava 包里面的,对 Future 进行了增强, ListenableFuture 继承了 Future ,新增了一个添加回调的方法

/**
 * @param listener the listener to run when the computation is complete     回调逻辑
 * @param executor the executor to run the listener in  回调在哪个线程池执行
 */
void addListener(Runnable listener, Executor executor);
复制代码

ListenableFutureTask 继承了 FutureTask 并且是实现了 ListenableFuture 接口,看一个简单例子

public static void main(String[] args) throws InterruptedException {
    ListenableFutureTask futureTask = ListenableFutureTask.create(() -> {
        System.out.println("执行任务开始  " + LocalDateTime.now());
        Thread.sleep(3000);
        System.out.println("执行任务完成  " + LocalDateTime.now());
        return "结果";
    });

    futureTask.addListener(() -> System.out.println("获取结果之后,输出一条日志"), MoreExecutors.directExecutor());
    new Thread(futureTask).start();
}
复制代码

源码分析

原理就是将所有回调维护在一个单向链表中,也就是 ExecutionList ,然后通过重写``FutureTask#done`方法,在任务完成之后执行回调逻辑

// 每个回调就相当于是一个RunnableExecutorPair节点,所有RunnableExecutorPair节点构成一条链表,头插链表
private final ExecutionList executionList = new ExecutionList();

// ListenableFutureTask#addListener
public void addListener(Runnable listener, Executor exec) {
    executionList.add(listener, exec);
}


// ExecutionList#add
public void add(Runnable runnable, Executor executor) {
    // 上锁,因为它的内部属性 executed 可能会被任务逻辑线程更新,即 ListenableFutureTask 实现了 FutureTask 的done方法,然后会在里面更新 executed 的值为true
    // 还有一点,如果不加锁,当多个线程同时添加回调的时候,可能会造成节点丢失
    synchronized (this) {
        // 如果任务还没有执行完成,就将当前节点添加到头节点
        if (!executed) {
            runnables = new RunnableExecutorPair(runnable, executor, runnables);
            return;
        }
    }

    // 如果任务执行完成,就开始执行回调
    executeListener(runnable, executor);
}


// ExecutionList#executeListener
private static void executeListener(Runnable runnable, Executor executor) {
    try {
        // 直接将任务交给线程池
        executor.execute(runnable);
    } catch (RuntimeException e) {
        log.log(Level.SEVERE, "RuntimeException while executing runnable " + runnable + " with executor " + executor, e);
    }
}

// ExecutionList.RunnableExecutorPair
private static final class RunnableExecutorPair {
    final Runnable runnable;
    final Executor executor;
    @Nullable RunnableExecutorPair next;

    RunnableExecutorPair(Runnable runnable, Executor executor, RunnableExecutorPair next) {
        this.runnable = runnable;
        this.executor = executor;
        this.next = next;
    }
}
复制代码

ListenableFutureTask 是怎么知道任务是否执行完成了呢? 在 FutureTask#finishCompletion 方法中,解除阻塞的线程之后,还会执行一个 done 方法,不过该方法在 FutureTask 没有任何逻辑,可以把它当作是一个模板方法,而 ListenableFutureTask 实现了该方法,如下:

// ListenableFutureTask#done
protected void done() {
    executionList.execute();
}


// ExecutionList#execute
public void execute() {
    RunnableExecutorPair list;
    synchronized (this) {
        if (executed) {
            return;
        }
        // 首先将executed置为true
        executed = true;
        // runnables代表链表的头节点
        list = runnables;
        runnables = null; // allow GC to free listeners even if this stays around for a while.
    }



    RunnableExecutorPair reversedList = null;
    // 这其实是一个倒置的过程,因为我们添加节点的时候,是插入到头部的,为了保证回调按照我们添加时的顺序执行,即 先添加先执行,所以做了一个倒置
    while (list != null) {
        RunnableExecutorPair tmp = list;
        list = list.next;
        tmp.next = reversedList;
        reversedList = tmp;
    }

    // 遍历链表,依次执行回调逻辑
    while (reversedList != null) {
        executeListener(reversedList.runnable, reversedList.executor);
        reversedList = reversedList.next;
    }
}
复制代码

FutureCallback

通过 ListenableFutureTask ,我们可以在任务执行完成之后执行一些回调逻辑。可是细心的同学会发现, 回调方法无法使用任务的返回值 ,那假如我就是想先获取值然后再用这个返回值做下一步操作怎么办?还是只能先通过get方法阻塞当前线程吗?其实 guava 包中也给了我们相关的接口。先看一个例子:

public static void main(String[] args) throws InterruptedException {
    ListenableFutureTask futureTask = ListenableFutureTask.create(() -> {
        System.out.println("执行任务开始  " + LocalDateTime.now());
        Thread.sleep(3000);
        System.out.println("执行任务完成  " + LocalDateTime.now());
        return "结果";
    });

    Futures.addCallback(futureTask, new FutureCallback<String>() {
        @Override
        public void onSuccess(String result) {
            System.out.println("执行成功: " + result);
        }

        @Override
        public void onFailure(Throwable t) {
            System.out.println("执行失败");
        }
    });

    new Thread(futureTask).start();
}
复制代码

源码分析

FutureCallback 接口里面有两个方法,分别对应任务执行成功逻辑和任务失败逻辑

void onSuccess(@Nullable V result);

void onFailure(Throwable t);
复制代码

Futures 可以堪称是一个门面类,里面封装了一些操作

// Futures#addCallback
public static <V> void addCallback(
    ListenableFuture<V> future, FutureCallback<? super V> callback) {
        // 这里使用了DirectExecutor线程池,即直接在当前线程执行
        addCallback(future, callback, directExecutor());
}

// Futures#addCallback
public static <V> void addCallback(final ListenableFuture<V> future, final FutureCallback<? super V> callback, Executor executor) {
    Runnable callbackListener =
        new Runnable() {
            @Override
            public void run() {
                final V value;
                try {
                    value = getDone(future);
                } catch (ExecutionException e) {
                    callback.onFailure(e.getCause());
                    return;
                } catch (RuntimeException e) {
                    callback.onFailure(e);
                    return;
                } catch (Error e) {
                    callback.onFailure(e);
                    return;
                }
                callback.onSuccess(value);
            }
        };
    // 最终还是将这部分逻辑封装成一个回调,然后在这个回调中获取返回值,根据返回值的结果执行相应的FutureCallback方法
    future.addListener(callbackListener, executor);
}

// Futures#getDone
public static <V> V getDone(Future<V> future) throws ExecutionException {
    checkState(future.isDone(), "Future was expected to be done: %s", future);
    return getUninterruptibly(future);
}
public static <V> V getUninterruptibly(Future<V> future) throws ExecutionException {
    boolean interrupted = false;
    try {
      while (true) {
        try {
          return future.get();
        } catch (InterruptedException e) {
          interrupted = true;
        }
      }
    } finally {
      if (interrupted) {
        Thread.currentThread().interrupt();
      }
    }
}
复制代码

本质上其实就是将获取返回值的逻辑封装成一个回调,在这个回调中获取返回值,根据返回值的结果执行相应的 FutureCallback 方法,不过在使用上却方便了好多。

与我们直接通过get方法获取返回值然后再执行其他逻辑还是有区别的,因为我们直接调用 Future#get 方法会阻塞当前线程,而 guava 是在回调中执行这部逻辑,类似于一种通知机制,所以不会阻塞当前线程。

ListenableFutureTask

其实Spring里面也有一个 ListenableFutureTask ,实现上和 guava 大同小异,也是继承了 FutureTask 并且实现了自己的 ListenableFuture 接口,通过重写 FutureTask#done 方法,在该方法中获取返回值然后执行回调逻辑

public static void main(String[] args) {
    ListenableFutureTask future = new ListenableFutureTask(() -> "结果");

    future.addCallback(new ListenableFutureCallback() {
        @Override
        public void onSuccess(Object result) {
            System.out.println("callback " + result);
        }

        @Override
        public void onFailure(Throwable ex) {
            System.out.println("执行失败 ");
        }
    });
    new Thread(future).start();
}
复制代码

核心源码

它的Callback是保存在两个Queue里面的: successCallbacksfailureCallbacks ,数据结构是 LinkedList

private final Queue<SuccessCallback<? super T>> successCallbacks = new LinkedList<SuccessCallback<? super T>>();

private final Queue<FailureCallback> failureCallbacks = new LinkedList<FailureCallback>();
复制代码

重写的done方法如下,逻辑很简单,就不解释了

protected void done() {
    Throwable cause;
    try {
        T result = get();
        this.callbacks.success(result);
        return;
    }catch (InterruptedException ex) {
        Thread.currentThread().interrupt();
        return;
    }catch (ExecutionException ex) {
        cause = ex.getCause();
        if (cause == null) {
            cause = ex;
        }
    }
    catch (Throwable ex) {
        cause = ex;
    }
    this.callbacks.failure(cause);
}
复制代码
原文  https://juejin.im/post/5dfdf6d5f265da33af51502a
正文到此结束
Loading...