转载

Netty异步Future机制

在说JDK的异步Future之前,先简单介绍一下JDK自带的Future机制.

首先先上一段代码

public class JDKFuture {

    static ExecutorService executors = new ThreadPoolExecutor(1,
            1,
            60,
            TimeUnit.SECONDS,
            new LinkedBlockingQueue<Runnable>(16));
    public static void main(String[] args) throws Exception{
        int cnt = 1;
        Future[] jdkFuture=new Future[cnt];
        Object jdkFutureResult;
        for(int i = 0;i < cnt; i++){
            jdkFuture[i] = executors.submit(new JDKCallable(i));
        }
        System.out.println(String.format("%s 在 %s 即将获取任务执行结果", Thread.currentThread(), new Date()));
        jdkFutureResult = jdkFuture[0].get();
        System.out.println(String.format("%s 在 %s 任务结果获取完毕 %s", Thread.currentThread(), new Date(), jdkFutureResult));
        executors.shutdown();
    }

    static class JDKCallable implements Callable{

        int index;

        JDKCallable(int ind){
            this.index = ind;
        }

        public Object call() throws Exception {
            try {
                System.out.println(String.format("线程 [%s] 提交任务[%s]", Thread.currentThread(), this.index));
              // 耗时2秒,模拟耗时操作
                Thread.sleep(2000);
                System.out.println(String.format("线程 [%s] 执行任务[%s]执行完毕", Thread.currentThread(), this.index));
            }catch(InterruptedException e){
                e.printStackTrace();
            }
            return String.format("任务%s执行结果",this.index);
        }
    }
}
复制代码

输出结果为:

线程 [Thread[pool-1-thread-1,5,main]] 提交任务[0]
Thread[main,5,main] 在 Mon Dec 16 16:40:38 CST 2019 即将获取任务执行结果
线程 [Thread[pool-1-thread-1,5,main]] 执行任务[0]执行完毕
Thread[main,5,main] 在 Mon Dec 16 16:40:40 CST 2019 任务结果获取完毕 任务0执行结果
复制代码

可以看到主线程在使用 future.get() 的时候,因为子线程还未处理完返回结果而导致主线程活生生的等了2秒钟(耗时操作),这也是JDK自带的Future机制不够完善的地方.因为jdk自身的future机制不够完善,所以Netty自实现了一套Future机制.

Netty 异步Future/Promise

Netty的Future是异步的,那他是怎么实现的呢?接下来就从源码开始探究.

先看一下 Netty 的 FuturePromise 这两个接口

Future

/**
 * The result of an asynchronous operation
 * 异步操作的结果
 * 对状态的判断、添加listener、获取结果
 */
public interface Future<V> extends java.util.concurrent.Future<V> {
    boolean isSuccess();
    boolean isCancellable();
    Throwable cause();
    Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
    Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
    Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
    Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
    Future<V> sync() throws InterruptedException;
    Future<V> syncUninterruptibly();
    Future<V> await() throws InterruptedException;
    Future<V> awaitUninterruptibly();
    boolean await(long timeout, TimeUnit unit) throws InterruptedException;
    boolean await(long timeoutMillis) throws InterruptedException;
    boolean awaitUninterruptibly(long timeout, TimeUnit unit);
    boolean awaitUninterruptibly(long timeoutMillis);
    V getNow();
  
    @Override
    boolean cancel(boolean mayInterruptIfRunning);
}
复制代码

Promise

Promise是一个特殊的Future,它可写,可写意味着可以修改里面的结果.

/**
 * Special {@link Future} which is writable.
 * 一个可写的特殊的Future
 * 继承 Future, 继承的方法就不列出
 */
public interface Promise<V> extends Future<V> {

    /**
     * Marks this future as a success and notifies all
     * listeners.
     * If it is success or failed already it will throw an {@link IllegalStateException}.
     * 将这个 future 标记为 success 并且通知所有的 listeners
     * 如果已经成功或者失败将会抛出异常
     */
    Promise<V> setSuccess(V result);

    /**
     * Marks this future as a success and notifies all
     * listeners.
     *
     * @return {@code true} if and only if successfully marked this future as
     *         a success. Otherwise {@code false} because this future is
     *         already marked as either a success or a failure.
     * 尝试设置结果,成功返回true, 失败 false, 上面的方法设置失败会抛出异常
     */
    boolean trySuccess(V result);

  	// 这2个跟上面的差不多
    Promise<V> setFailure(Throwable cause);
    boolean tryFailure(Throwable cause);

    /**
     * Make this future impossible to cancel.
     *
     * @return {@code true} if and only if successfully marked this future as uncancellable or it is already done
     *         without being cancelled.  {@code false} if this future has been cancelled already.
     */
    boolean setUncancellable();
}
复制代码

源码解读

看到这里都同学都默认是用netty写过程序的~,还没写过的话可以看看官方文档或者我的另一篇Netty使用.

接下来就开始源码的解读.

那么从哪里开始呢?

总所周知! ,我们使用Netty开发的时候,写出数据用的是 writeAndFlush(msg) , 至于 write(msg) 嘛, 不就是少了个 flush (没错,是我比较懒).

开始

在大家知道 channel().writectx.write 的区别后, 我们就从 channel().write 开始讲起.

不行,我感觉还是要说一下一些补充的,不然心里不舒服.

Netty中有一个 pipeline ,也就是事件调用链,开发的时候在调用链里面加入自己处理事件的handle,但是在这条 pipeline 中, Netty给我们加上了 Headtail 这两个handle,方便Netty框架处理事件.

先看 DefaultChannelPipeline 的初始化,在初始化代码里给我们添加了2个handle, head 和 tail, 这2个东西很有用,为什么这么说呢?详情看后面解答

protected DefaultChannelPipeline(Channel channel) {
        this.channel = (Channel)ObjectUtil.checkNotNull(channel, "channel");
        this.succeededFuture = new SucceededChannelFuture(channel, (EventExecutor)null);
        this.voidPromise = new VoidChannelPromise(channel, true);
        // ChannelInboundHandler
        this.tail = new DefaultChannelPipeline.TailContext(this);
	      // ChannelInboundHandler && ChannelOutboundHandler
        this.head = new DefaultChannelPipeline.HeadContext(this);
        this.head.next = this.tail;
        this.tail.prev = this.head;
    }
复制代码

Real 开始

没错,还是从 channel().write(msg) 开始说起(为什么我要用还是).

跟踪代码 channel().write(), 首先会调用到 DefaultChannelPipeline的 writeAndFlush 方法.

###1.DefaultChannelPipeline#writeAndFlush

public final ChannelFuture writeAndFlush(Object msg) {
        return this.tail.writeAndFlush(msg);
    }
复制代码

this.tail 就是上面构造函数里面初始化的 tailHandle , 而 write 是出栈事件, 会从 tailHandle 开始往前传递,最后传递到 headHandle (怎么感觉好像提前剧透了).

public ChannelFuture writeAndFlush(Object msg) {
  	// 这里new了一个 promise, 然后这个promise将会一直传递,一直传递.....
    return this.writeAndFlush(msg, this.newPromise());
}
复制代码

接下来来到了 AbstractChannelHandlerContext 的 writeAndFlush.

/**
     * 执行 write and flush 操作
     * @param msg
     * @param promise
     */
    private void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
        // 这个方法在 ChannelHandler#handlerAdded 调用后,才会返回 true
        if (invokeHandler()) {
            // write 继续传递
            invokeWrite0(msg, promise);
            // flush data
            invokeFlush0();
        } else {
            writeAndFlush(msg, promise);
        }
    }

	private void write(Object msg, boolean flush, ChannelPromise promise) {
        // 查找下一个 OutboundHandle, 因为是要输出
        AbstractChannelHandlerContext next = findContextOutbound();
        final Object m = pipeline.touch(msg, next);
        // 下一个 OutboundHandle 所在的线程
        EventExecutor executor = next.executor();
        // 如果在是同一个线程(由于Netty的channel在一个ThreadPool中只绑定一个Thread, 不同线程的话也意味着是不同线程池)
        if (executor.inEventLoop()) {
            // 在同一个线程池(这里意味着同一个线程)中,
            if (flush) {
                next.invokeWriteAndFlush(m, promise);
            } else {
                next.invokeWrite(m, promise);
            }
        } else {
            // 在不同线程池(不同线程池那自然就是不同线程),需要创建一个任务,提交到下一个线程池
            final AbstractWriteTask task;
            if (flush) {
                // 提交给下一个线程池 && flush
                task = WriteAndFlushTask.newInstance(next, m, promise);
            }  else {
                task = WriteTask.newInstance(next, m, promise);
            }
            // 因为是 write 事件, so 接下来提交任务到下一个 OutboundHandle(出栈) 所在的线程, 由它执行
            if (!safeExecute(executor, task, promise, m)) {
                // We failed to submit the AbstractWriteTask. We need to cancel it so we decrement the pending bytes
                // and put it back in the Recycler for re-use later.
                //
                // See https://github.com/netty/netty/issues/8343.
                // 任务提交失败,取消任务
                task.cancel();
            }
        }
    }
复制代码

2.HeadContext#write、flush

接下来本篇文章最重要的地方了, **HeadContext **!

HeadContext的write和flush方法 实际上都是调用 unsafe的方法实现.

write

// 如果是 writeAndFlush ,调用 write后会调用flush
				@Override
        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
            // 这个调用 AbstrachUnsafe.write
            unsafe.write(msg, promise);
        }

				// 这是 unsafe 的 write 方法
        @Override
        public final void write(Object msg, ChannelPromise promise) {
            assertEventLoop();

            ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
            // outboundBuffer = null 表明 channel已经关闭并且需要将 future 结果设置为 false
            if (outboundBuffer == null) {
                // If the outboundBuffer is null we know the channel was closed and so
                // need to fail the future right away. If it is not null the handling of the rest
                // will be done in flush0()
                // See https://github.com/netty/netty/issues/2362
                safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION);
                // release message now to prevent resource-leak
                ReferenceCountUtil.release(msg);
                return;
            }

            int size;
            try {
                msg = filterOutboundMessage(msg);
                size = pipeline.estimatorHandle().size(msg);
                if (size < 0) {
                    size = 0;
                }
            } catch (Throwable t) {
                safeSetFailure(promise, t);
                ReferenceCountUtil.release(msg);
                return;
            }
            // 将 msg添加进 buffer 中
            outboundBuffer.addMessage(msg, size, promise);
        }
复制代码

flush

如果是WriteAndFlush, 则在调用write后,会调用Head的flush方法,同 write是调用AbstractUnsafe的flush

/**
         * write 之后再调用这个 flush
         */
        @Override
        public final void flush() {
            assertEventLoop();

            ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
            if (outboundBuffer == null) {
                return;
            }

            // buffer 标记为可以被 flush
            outboundBuffer.addFlush();
            // 接下来就是真正的 flush
            flush0();
        }
复制代码

ChannelOutboundBuffer 是个啥呢?

ChannelOutboundBuffer 简单来说就是 存储当前channel写出的数据 , 并且在调用flush的时候将他们都写出去.

跟着源码一直走,在flush0之后,最终会调用到 AbstractNioMessageChannel#doWrite 方法.(上面还有doRead方法,是接收数据的时候调用的)

@Override
    protected void doWrite(ChannelOutboundBuffer in) throws Exception {
        final SelectionKey key = selectionKey();
        final int interestOps = key.interestOps();

        for (;;) {
            //
            Object msg = in.current();
            if (msg == null) {
                // Wrote all messages.
                // 判断写事件
                if ((interestOps & SelectionKey.OP_WRITE) != 0) {
                    key.interestOps(interestOps & ~SelectionKey.OP_WRITE);
                }
                break;
            }
            try {
                // 循环写出数据
                boolean done = false;
                for (int i = config().getWriteSpinCount() - 1; i >= 0; i--) {
                    // 真正的写出数据
                    // 最终会调用 javaChannel().send(nioData, mi); 
                    // 很眼熟吧,这个是java nio的方法,注册的时候也是javaChannel().register()
                    if (doWriteMessage(msg, in)) {
                        done = true;
                        break;
                    }
                }

                // 成功写出,从 buffer 中移除刚才写出的数据
                if (done) {
                    in.remove();
                } else {
                    // Did not write all messages.
                    // 写出失败
                    if ((interestOps & SelectionKey.OP_WRITE) == 0) {
                        key.interestOps(interestOps | SelectionKey.OP_WRITE);
                    }
                    break;
                }
            } catch (Exception e) {
                // 出错后是否继续写出后面的数据
                if (continueOnWriteError()) {
                    in.remove(e);
                } else {
                    throw e;
                }
            }
        }
    }
复制代码

3.Promise

到上面位置,数据是写出去了,那promise的相关作用呢?没看出来啊?

说实话,这个藏得挺深,居然! 放在了 buffer.remove() 里 !

public boolean remove() {
        // 刚写出去数据的Entry
        Entry e = flushedEntry;
        if (e == null) {
            clearNioBuffers();
            return false;
        }
        Object msg = e.msg;

      	// 这个就是writeAndFlush 的时候 new 的 DefaultPromise()
        ChannelPromise promise = e.promise;
        int size = e.pendingSize;

	      // buffer 中移除
        removeEntry(e);

        if (!e.cancelled) {
            // only release message, notify and decrement if it was not canceled before.
            ReferenceCountUtil.safeRelease(msg);
	          // !!! 划重点 !!!
						// 这里设置了 promise 的结果, 调用了 trySuccess, 通知所有 listener
            // !!! 划重点 !!!
            safeSuccess(promise);
            decrementPendingOutboundBytes(size, false, true);
        }

        // recycle the entry
	      // 重置Entry的信息,方便重用.
        // 跟 Entry entry = Entry.newInstance(msg, size, total(msg), promise); 相对应, newInstance 是获取一个缓存的 Entry
        e.recycle();
        return true;
    }
复制代码

promise 通知所有 listener 是在写数据成功,并且在 buffer.remove() 调用的时候在里面 safeSuccess(promise) , 最终调用 Promise 的 trySuccess() 从而触发 notifyListeners() 通知所有 listeners.

4.NotifyListener

这个是在 Promise#trySuccess的时候调用的,通知所有listeners操作已经完成.

/**
     * 通知监听者,任务已经完成
     */
    private void notifyListeners() {
        // 获取future所属线程(池)
        EventExecutor executor = executor();
        // 执行通知是当前线程 则直接回调信息
        // currentThread == this.executor
        if (executor.inEventLoop()) {
            // 获取 ThreadLocal 变量
            final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
            // listen 的层级数
            final int stackDepth = threadLocals.futureListenerStackDepth();
            if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
                threadLocals.setFutureListenerStackDepth(stackDepth + 1);
                try {
                    // 通知所有的 listener
                    notifyListenersNow();
                } finally {
                    threadLocals.setFutureListenerStackDepth(stackDepth);
                }
                return;
            }
        }

        // 如果 executor 不是当前线程, 则交给 future 所属 executor 去执行
        // 意思是添加通知的 executor 可能是前面的 executor , 然后到后面的 executor 也就是当前线程才执行通知
        // 此时将通知交回给之前的 executor
        // 执行通知的不是当前线程, 封装成一个任务, 由之前提交的线程完成通知(回调)
        // 到这里就是 Netty Future 异步的原因, 任务在其它线程执行完毕后, 封装成任务交还给 创建 Future的线程,由该线程完成回调
        safeExecute(executor, new Runnable() {
            @Override
            public void run() {
                notifyListenersNow();
            }
        });
    }
复制代码

总结

Netty 的 Future 异步机制是在操作完成后,将通知封装成Task,由Promise所属线程(Executors)执行(回调).

最后,觉得文章有帮助的同学不要忘了点个赞(我就是来骗赞的),你们的每个赞对我来说都非常重要,非常感谢你们能看到这里!!!

原文  https://juejin.im/post/5df771ee6fb9a0161d743069
正文到此结束
Loading...