转载

FutureTask是怎样获取到异步执行结果的?

所谓异步任务,就是不在当前线程中进行执行,而是另外起一个线程让其执行。那么当前线程如果想拿到其执行结果,该怎么办呢?

如果我们使用一个公共变量作为结果容器,两个线程共用这个值,那么应该是可以拿到结果的,但是这样一来,对业务就会造成侵入干扰了,因为你始终得考虑将这个共享变量传入到这个异步线程中去且要维持其安全性。

我们知道,Future.get() 可以获取异步执行的结果,那么它是怎么做到的呢?

要实现线程的数据交换,我们按照进程间的通信方式可知有: 管道、共享内存、Socket套接字。而同一个jvm的两个线程通信,所有线程共享内存区域,则一定是通过共享内存再简单不过了。

本文将以 ThreadPoolExecutor 线程池 来解释这个过程。

首先,如果想要获取一个线程的执行结果,需要调用  ThreadPoolExecutor.submit(Callable); 方法。然后该方法会返回一个 Future 对象,通过 Future.get(); 即可获取结果了。

它具体是怎么实现的呢?

一、首先,我们来看一下 submit 过程

仅为返回了一个 Future<?> 的对象供下游调用!

    // AbstractExecutorService
    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        // 包装一层结果,RunnableFuture, 也实现了 Runnable 接口
        // 实际上就是 FutureTask 
        RunnableFuture<T> ftask = newTaskFor(task);
        // 然后交由 线程池进行调用任务了,即由 jvm 调用执行 Thread
        // 具体执行逻辑,在我之前的文章中也已经阐述,自行搜索
        execute(ftask);
        // 最后,把包装对象返回即可
        return ftask;
    }

    /**
     * Returns a {@code RunnableFuture} for the given callable task.
     *
     * @param callable the callable task being wrapped
     * @param <T> the type of the callable's result
     * @return a {@code RunnableFuture} which, when run, will call the
     * underlying callable and which, as a {@code Future}, will yield
     * the callable's result as its result and provide for
     * cancellation of the underlying task
     * @since 1.6
     */
    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTask<T>(callable);
    }
    
    // FutureTask 实例化
    /**
     * Creates a {@code FutureTask} that will, upon running, execute the
     * given {@code Callable}.
     *
     * @param  callable the callable task
     * @throws NullPointerException if the callable is null
     */
    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable
    }

二、异步线程如何执行?

通过上面的分析,我们可以看到,异步线程的执行被包装成了 FutureTask, 而java的异步线程执行都是由jvm调用Thread.run()进行, 所以异步起点也应该从这里去找:

    // FutureTask.run()
    public void run() {
        // 不允许多次执行
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    // 直接调用 call() 方法,获取返回结果
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    // 执行异常,包装异常信息
                    setException(ex);
                }
                // 将结果设置到当前的 FutureTask 实例变量 outcome 中,这样当前线程就可以获取了
                // 设置结果时,会将 state 同时变更
                if (ran)
                    set(result);
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }
    
    /**
     * Sets the result of this future to the given value unless
     * this future has already been set or has been cancelled.
     *
     * <p>This method is invoked internally by the {@link #run} method
     * upon successful completion of the computation.
     *
     * @param v the value
     */
    protected void set(V v) {
        // 设置结果时,还不代表可以直接获取了,还有后续工作,所以设置为 COMPLETING 中间态
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = v;
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
            // 通知线程执行完成等后续工作
            finishCompletion();
        }
    }
    
    /**
     * Removes and signals all waiting threads, invokes done(), and
     * nulls out callable.
     */
    private void finishCompletion() {
        // assert state > COMPLETING;
        // 外部看起来是一个 for, 实际上只会执行一次, 目的是为了保证内部的锁获取成功
        // 如果有其他线程成功后, waiters也就会为null, 从而自身也一起退出了
        for (WaitNode q; (q = waiters) != null;) {
            // 保证更新的线程安全性
            // 只要锁获取成功,就会一次性更新完成,不会失败
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                for (;;) {
                    Thread t = q.thread;
                    // 依次唤醒等待的线程
                    if (t != null) {
                        q.thread = null;
                        LockSupport.unpark(t);
                    }
                    WaitNode next = q.next;
                    // 只有把所有 wait 线程都通知完后,才可以退出
                    if (next == null)
                        break;
                    q.next = null; // unlink to help gc
                    q = next;
                }
                break;
            }
        }

        // 完成后钩子方法,默认为空,如果需要做特殊操作可以自行复写即可
        done();

        callable = null;        // to reduce footprint
    }
    // 简单看一下异常信息的包装,与 正常结束方法类似,只是将 outcome 设置为了异常信息,完成状态设置为 EXCEPTIONAL
    /**
     * Causes this future to report an {@link ExecutionException}
     * with the given throwable as its cause, unless this future has
     * already been set or has been cancelled.
     *
     * <p>This method is invoked internally by the {@link #run} method
     * upon failure of the computation.
     *
     * @param t the cause of failure
     */
    protected void setException(Throwable t) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = t;
            UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
            finishCompletion();
        }
    }

在上面这些实现中,我们也会有点迷糊,我干啥来了?

不管怎么样,你明白一点,所有的执行结果都被放到 FutureTask 的 outcome 变量中了,我们如果想要知道结果,那么,只需要获取这个变量就可以了。

当然,也不可能这么简单了,起码你得知道什么时候获取该变量是合适的才行!接下来!

三、如何获取异步执行结果?

当然是用户调用 future.get() 获取了!

    // Future.get()
    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        // 只要状态值小于 COMPLETING, 就说明任务还未完成, 去等待完成
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        // 只要等待完成, 再去把结果取回即可
        return report(s);
    }
    // 我们先看一下结果的取回逻辑 report(), 果然不出意外的简单, 只管取 outcome 即可
    /**
     * Returns result or throws exception for completed task.
     *
     * @param s completed state value
     */
    @SuppressWarnings("unchecked")
    private V report(int s) throws ExecutionException {
        Object x = outcome;
        // 正常执行完成, 直接返回
        if (s == NORMAL)
            return (V)x;
        // 此处会包含 CANCELLED/INTERRUPTING/INTERRUPTED
        if (s >= CANCELLED)
            throw new CancellationException();
        // 业务异常则会被包装成 ExecutionException
        throw new ExecutionException((Throwable)x);
    }
    // 看到取结果这么简单,那么 等待结束的逻辑的呢?看起来好像没那么简单了
    /**
     * Awaits completion or aborts on interrupt or timeout.
     *
     * @param timed true if use timed waits
     * @param nanos time to wait, if timed
     * @return state upon completion
     */
    private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        for (;;) {
            // 中断则退出
            if (Thread.interrupted()) {
                // 因 q 是链表的头,所以会移除所有的等待队列,即中断是对所有线程的
                removeWaiter(q);
                throw new InterruptedException();
            }

            int s = state;
            // 执行完成后,将线程置空即可,删除工作会有其他地方完成
            if (s > COMPLETING) {
                if (q != null)
                    q.thread = null;
                return s;
            }
            // 正在处理结果,稍作等待即可
            else if (s == COMPLETING) // cannot time out yet
                Thread.yield();
            // 其他情况,先创建自己的等待标识,以便在下一次循环中进行入队等待
            else if (q == null)
                q = new WaitNode();
            // 进行一次入队等待,将 q 作为头节点
            else if (!queued)
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);
            // 限时的等待,等待超时后,直接返回当前状态即可
            else if (timed) {
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {
                    removeWaiter(q);
                    return state;
                }
                // 最长等待预定时间
                LockSupport.parkNanos(this, nanos);
            }
            // 此处进行无限期等待,但当被唤醒时,一定有状态变更的时候,应该会在下一个周期结束循环
            else
                LockSupport.park(this);
        }
    }

可以看到,等待逻辑还是有点多的,毕竟场景多。至此,我们已经完全看到了一个,如何获取异步线程的执行结果实现了。总结下:

1. 实现Runnable接口,由jvm进行线程调用;

2. 包装 Callable.call()方法,带返回值,当线程被调起时,转给 call() 方法执行,并返回结果;

3. 将结果封装到当前future实例中,以备查;

4. 当用户调用get()方法时,保证状态完成情况下,最快速地返回结果;

四、扩展: Future.get() vs Thread.join()

Future.get()方法,一方面是为了获取异步线程的执行结果,另一方面也做到了等待线程执行完成的效果。

而 Thread.join() 则纯粹是为了等待异步线程执行完成,那它们有什么异曲同工之妙吗?来看下

    // Thread.join(), 通过 isAlive() 判断是否完成
    /**
     * Waits for this thread to die.
     *
     * <p> An invocation of this method behaves in exactly the same
     * way as the invocation
     *
     * <blockquote>
     * {@linkplain #join(long) join}{@code (0)}
     * </blockquote>
     *
     * @throws  InterruptedException
     *          if any thread has interrupted the current thread. The
     *          <i>interrupted status</i> of the current thread is
     *          cleared when this exception is thrown.
     */
    public final void join() throws InterruptedException {
        join(0);
    }

    /**
     * Waits at most {@code millis} milliseconds for this thread to
     * die. A timeout of {@code 0} means to wait forever.
     *
     * <p> This implementation uses a loop of {@code this.wait} calls
     * conditioned on {@code this.isAlive}. As a thread terminates the
     * {@code this.notifyAll} method is invoked. It is recommended that
     * applications not use {@code wait}, {@code notify}, or
     * {@code notifyAll} on {@code Thread} instances.
     *
     * @param  millis
     *         the time to wait in milliseconds
     *
     * @throws  IllegalArgumentException
     *          if the value of {@code millis} is negative
     *
     * @throws  InterruptedException
     *          if any thread has interrupted the current thread. The
     *          <i>interrupted status</i> of the current thread is
     *          cleared when this exception is thrown.
     */
    public final synchronized void join(long millis)
    throws InterruptedException {
        long base = System.currentTimeMillis();
        long now = 0;

        if (millis < 0) {
            throw new IllegalArgumentException("timeout value is negative");
        }

        // 无限期等待
        if (millis == 0) {
            while (isAlive()) {
                // 这是个 native 方法,即由jvm进行控制
                // Thread 任务执行完成后,将进行 notifyAll()
                // 同理下面的限时等待
                wait(0);
            }
        } else {
            // 限时等待
            while (isAlive()) {
                long delay = millis - now;
                if (delay <= 0) {
                    break;
                }
                wait(delay);
                now = System.currentTimeMillis() - base;
            }
        }
    }

可以看到, Thread.join() 的等待逻辑是依赖于 jvm 的调度的, 通过 wait/notify 机制实现。与 Future.get() 相比,它是在 之后的,且无法获取结果。

五、Runnable如何包装成Callable ?

Callable 其实就只是实现了一个 call() 方法而已,如果我们只实现了 Runnable, 是否就拿不到返回值呢?并不是,我们可以直接指定返回值对象或者不指定,使用Runnable进行submit();

    // 不指定返回值的 Runnable, 此处的返回值一定 void
    public Future<?> submit(Runnable task);
    // 指定返回值的 Runnable, 由 T 进行返回值接收
    public <T> Future<T> submit(Runnable task, T result);

但是 Runnable 是怎么变成 Callable 的呢?其实就是一个 适配器模式的应用,我们来看一下!

    // AbstractExecutorService.submit()
    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        // 明确返回值为 Void
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }
    // 同样使用 FutureTask 进行封装,只是调用了不同的构造器
    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
    }
    // FutureTask, 使用 Executors 工具类生成一个 callable, 屏蔽掉 Callable 与 Runnable 的差异 
    public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
    }
    // Executors 使用一个适合器类将 Runnable 封装成 Callable
    /**
     * Returns a {@link Callable} object that, when
     * called, runs the given task and returns the given result.  This
     * can be useful when applying methods requiring a
     * {@code Callable} to an otherwise resultless action.
     * @param task the task to run
     * @param result the result to return
     * @param <T> the type of the result
     * @return a callable object
     * @throws NullPointerException if task null
     */
    public static <T> Callable<T> callable(Runnable task, T result) {
        if (task == null)
            throw new NullPointerException();
        return new RunnableAdapter<T>(task, result);
    }
    // 而 RunnableAdapter 也是很简单, 仅将 call() 转而调用 run() 方法即可
    /**
     * A callable that runs given task and returns given result
     */
    static final class RunnableAdapter<T> implements Callable<T> {
        final Runnable task;
        final T result;
        RunnableAdapter(Runnable task, T result) {
            this.task = task;
            this.result = result;
        }
        public T call() {
            task.run();
            return result;
        }
    }

简单不失优雅!这就是,大巧不工啊!

但是有一个点我们可以看到,那就是 result 的获取,其实就是传入什么值,就返回值。而如果想在想要改变其结果,唯一的办法是使 result 变量 对 Runnable.run() 可见,从而在 run() 方法中改变其值。这就看你怎么用了!

原文  http://www.cnblogs.com/yougewe/p/11666284.html
正文到此结束
Loading...