转载

CompletableFuture 也没有那么废柴嘛!

我们知道,Java 里把 Promise 叫作 CompletableFuture,相比那个只能用于线程同步的 Future,CompletableFuture 新增了很多方法用于串联异步事件,比如常用的一些:

  • thenApply :拿到结果后对其 apply 一个函数,返回一个新的值 thenApply(T -> R)
  • thenCompose :拿到结果后其 apply 一个函数,返回一个新的 CompletableFuture thenCompose(T -> CompletableFuture<R>)
  • thenAccept :拿到结果后消费它,不需要返回结果 thenAccept(Consumer<T>)

如果不引入任何第三方库,CompletableFuture 仍是目前 Java 上最好的异步编程方式。之前一直觉得这个东西难用,直到我想明白一件事,证明了 CompletableFuture 虽然麻烦了点但是能做任何事情,然后用它的时候心里就没有这么膈应了。

本文会以一个例子来讲解:如何把 任意函数 转换成异步调用风格。用不用 CompletableFuture 倒是其次的,这项技术更多的是在于编程本身。

这篇文章不会谈论太多 CompletableFuture 的用法,你可以参考 Javadoc 或者 这篇文章 。

证明 CompletableFuture 是足够的

首先来(极不严谨地)说明一件事情, 为什么 CompletableFuture 是足够用的 ,换句话说,证明 CompletableFuture 能表达一切计算流程

如果你有一些函数式编程的基础,比如会一点 Haskell,这就是一句话的事情:CompletableFuture 其实是一个 Monad —— 因为它的 thenCompose 实现了 Monad 的 >>= 操作符。既然 Monad 能用来表示任何计算过程,CompletableFuture 当然也能。

class Applicative m => Monad (m :: * -> *) where
  (>>=) :: m a -> (a -> m b) -> m b  -- thenCompose 实现了它 
  (>>) :: m a -> m b -> m b
  return :: a -> m a
  fail :: String -> m a
  {-# MINIMAL (>>=) #-} -- 这是在说:只要实现 (>>=) 就够了

其实想想也很明白,Monad 表示一个带 context 的计算过程,比如可能抛异常之类的(纯函数是不会抛异常的)。CompletableFuture 也一样,他包裹一串计算过程并且处理异常。

如果看不懂上面的也没关系,我们用另一种方式再说明一下:

任何程序的流程控制都可以用 ifgoto 来组合起来。无论是 for 还是 while 循环,desurge 之后不过就是 ifgoto 的组合。 通过 thenCompose 就可以表达 ifgoto

这里说的不够严谨,其实 if 也是 surge,最终会变成条件跳转指令。

cf.thenCompose(v -> {
    if (v < 100) {
        return doStage1(); // doStage1() 返回一个 CompletableFuture,决定下一步做什么,相当于 goto
    } else {
        return doStage2(); // 同上
    }
})

你看这个例子, ifgoto 都有了,所以无论程序的控制流多复杂,我们都能组合出来。怎么组合?别急,下面我们就来讲这个。

CompletableFuture in Practice

我们从一个普通的函数开始。考虑到复杂性和完整性,我们用 Merge 2 Sorted Streams 作为演示,如果你不清楚这个是干嘛的,可以先做一下 这道算法题 。

下面是最普通的实现,输入两个数组,输出一个数组:

Stream merge(Stream inputA, Stream inputB) {
    List<Integer> results = new ArrayList<>();
    Integer headA = inputA.next();
    Integer headB = inputB.next();
    while (headA != null || headB != null) {
        if (headA == null || headB != null && headA > headB) {
            results.add(headB);
            headB = inputB.next();
        } else {
            results.add(headA);
            headA = inputA.next();
        }
    }
    return new Stream(results);
}

class Stream {
    private final Queue<Integer> numbers;
    public Stream(List<Integer> numbers) { this.numbers = new LinkedList<>(numbers); }
    public Integer { return numbers.poll(); }
}

这个实现有什么问题呢?作为算法足够 OK。但是从工程意义上说,如果输入的 Stream 很大,包含 million 级的元素,那更好的方式是把 Stream 的输入输出作为 Iterator,只在 next() 的时候计算下一个需要的元素。这样内存占用是常数级的,完全不用担心数据量过大呢!

为了看清一步一步的变化过程,我们先假装 Java 有 Generator 语法 。标记为 Generator 的函数不再是一个函数,而是类似一个 Iterator。一旦调用 next() ,“函数”代码运行到 yield 返回一个值,然后函数似乎 停在 了这里。下次 next() ,“函数”又接着刚刚的地方运行。

如果有 Generator 的话,函数应该长下面这样,注意 [yield] :

Stream merge(Stream inputA, Stream inputB) {
    Integer headA = inputA.next();
    Integer headB = inputB.next();
    while (headA != null || headB != null) {
        if (headA == null || headB != null && headA > headB) {
            [yield] headB;
            headB = inputB.next();
        } else {
            [yield] headA;
            headA = inputA.next();
        }
    }
    [yield] null;
}

哇,这个函数几乎没有改动,真是太方便了!(然而并没有卵用)

Function → Iterator

现在我们回到现实:Java 并没有 Generator 语法,所以我们要人肉实现一个 Generator。

为了通用性,首先做一个 desurge,把 while 循环改成 ifgoto 的组合,这太简单了:

Stream merge(Stream inputA, Stream inputB) {
    Integer headA = inputA.next();
    Integer headB = inputB.next();
    WHILE_LOOP:
    if (headA != null || headB != null) {
        if (headA == null || headB != null && headA > headB) {
            [yield] headB;
            headB = inputB.next();
        } else {
            [yield] headA;
            headA = inputA.next();
        }
        goto WHILE_LOOP; // again,假设 Java 也有 goto
    }
    [yield] null;
}

下一步是去掉 yield ,刚刚说到 Generator 的每次 next() 似乎会让函数 停在 一个地方,如何实现 停在 一个地方?记下来呗!加一个标记 状态 的变量,这个状态会告诉我下次 next() 的时候从哪里继续运行。

首先画出函数的控制流图,然后做一件事:想象所有的 yield 之后都有一个断点,我们在断点处切开,标记它为某个 State,这样下次 next() 的时候就能从断点继续。

下图的 S0 ~ S2 是我标记好的断点,S0 就是起始位置,S1 是两个 yield result 之后断下来的地方(恰好是同一个地方),S2 是 yield null 之后断下来的地方。

CompletableFuture 也没有那么废柴嘛!

我们按照图中的 State 标记机械地把它切开,就得到了下面这个类,它就是由 merge() 变换得到的 Generator:

class Merger implements Iterator<Integer> { 
    // Arguments
    final Iterator inputA;
    final Iterator inputB;
    
    // Internal states
    private int state = 0; // 我们加上的状态变量
    private Integer headA; // 变换前的局部变量,因为跨了多次 next() 调用,不能再是局部变量了
    private Integer headB; // 同上

    public Merger(Iterator inputA, Iterator inputB) {
        this.inputA = inputA;
        this.inputB = inputB;
    }

    public Integer next() {
        for (;;) { // 这个循环是有用的,往下看几行
            switch (state) {
            case 0:
                headA = inputA.next();
                headB = inputB.next();
                state = 1;
                break; // 这里就用上了外层的循环
            case 1:
                if (headA != null || headB != null) {
                    if (headA == null || headB != null && headA > headB ) {
                        final int result = headB;
                        headB = inputB.next();
                        state = 1; // 可以省略
                        return result; // 变换前是 yield result
                    } else {
                        final int result = headA;
                        headA = inputA.next();
                        state = 1; // 可以省略
                        return result; // 变换前是 yield result
                    }
                } else {
                    state = 2;
                    return null; // 变换前是 yield null
                }
            case 2:
                // Generator 已经终结了(变换前:函数已经走到底了)
                throw new IllegalStateException("Generator has been exhausted!");
            default:
                throw new AssertionError("Unreachable!");
            }
        }
    }
}

别急,最后我们会简化这些充满废话的代码。

阶段性总结一下:到现在为止,我们做了一件伟大的事情—— 把一个函数变成了 Iterator,函数已经不再是函数,而是一个状态机,这个状态记录了下次调用 next() 需要从哪继续

套用一下术语:“从哪继续”就是 Continuation ,把 Continuation 搞出来的这个过程称为 CPS 变换 。

Iterator → AsyncIterator

呃…… 说好的 CompletableFuture 呢?离 CompletableFuture 只有一步之遥了!

先从接口下手。想象两个 Stream Input 都是从 IO 拿到的数据,所以每次 next() 其实背后都是一次 IO,应该把它用 CompletableFuture 包成异步的,接口大概长这样:

interface AsyncIterator<T> {
    CompletableFuture<T> next();
}

类似刚刚引入 Generator 一样,我们再假装有 await 关键字。 await 关键字表示异步地等待结果返回,有了它,函数就魔法般的暂停在等待异步 IO 的地方:

Stream merge(Stream inputA, Stream inputB) {
    Integer headA = inputA.next();
    Integer headB = inputB.next();
    WHILE_LOOP:
    if (headA != null || headB != null) {
        if (headA == null || headB != null && headA > headB) {
            Integer result = headB;
            headB = [await] inputB.next(); // await 会魔法般地等待 next() 完成再继续运行
            [yield] result;
        } else {
            Integer result = headA;
            headA = [await] inputA.next();
            [yield] result;
        }
        goto WHILE_LOOP;
    }
    [yield] null;
}

因为 await 也会暂停这个“函数”,所以和刚刚对 yield 的处理一样,我们想象 await 这里有一个断点,我们也要为它设置 State 标记:

CompletableFuture 也没有那么废柴嘛!

糟糕!这状态数有点多啊!好在 Java 8 提供了 Lambda 表达式,和 CompletableFuture 搭配食用口味更佳。图中的大多数状态都可以借助 Lambda 表达式来实现,节约了不少代码:

class Merger implements AsyncIterator<Integer> {
    // Arguments
    final Stream inputA;
    final Stream inputB;

    // Internal states
    private int state = 0;
    private Integer headA;
    private Integer headB;

    public Merger(Stream inputA, Stream inputB) {
        this.inputA = inputA;
        this.inputB = inputB;
    }

    public CompletableFuture<Integer> next() {
        switch (state) {
        case 0:
            return inputA.next().thenCompose(a -> { // State 1 在这里!
                headA = a;
                return inputB.next();
            }).thenCompose(b -> { // State 2 在这里!
                headB = b;
                state = 3;
                return next(); // 相当于原来的外层循环
            });
        case 3:
            if (headA != null || headB != null) {
                if (headA == null || headB != null && headA > headB) {
                    final Integer result = headB;
                    return inputB.next().thenCompose(b -> { // State 4 在这里!
                        headB = b;
                        state = 3; // 可以省略
                        return CompletableFuture.completedFuture(result);
                    });
                } else {
                    final Integer result = headA;
                    return inputA.next().thenCompose(a -> { // State 5 在这里!
                        headA = a;
                        state = 3; // 可以省略
                        return CompletableFuture.completedFuture(result);
                    });
                }
            } else {
                state = 6;
                return CompletableFuture.completedFuture(null);
            }
        case 6:
            throw new IllegalStateException("Generator has been exhausted!");
        default:
            throw new AssertionError("Unreachable!");
        }
    }
}

Refinement

上面我们只用了 thenCompose ,理论上这是 OK 的,但是实际上 CompletableFuture 有上百个方法,最合适的才是坠吼的。

  • 如果仅仅是返回一个值(而非阶段),可以用 thenApply
  • thenCombine 等待两个 CompletableFuture 都完成了再去调用 BiFunction (T, U) -> R 来消费。

思考题:有兴趣的读者可以思考一下 thenCombine 的实现。

整理一下上面的代码,比如这样:

static class Merger {
    // States
    enum State { START, ITERATING, DONE }
    
    // Arguments
    final Stream inputA;
    final Stream inputB;
    
    // Internal states
    private State state = State.START;
    private Integer headA;
    private Integer headB;

    public Merger(Stream inputA, Stream inputB) {
        this.inputA = inputA;
        this.inputB = inputB;
    }

    private CompletableFuture<Integer> next() {
        switch (state) {
        case START:
            // 这里做了小小的优化:这两个 next() 可以并行等待
            return inputA.next().thenCombine(inputB.next(), (a, b) -> {
                headA = a;
                headB = b;
                state = State.ITERATING;
                return (Void)null;
            }).thenCompose(__ -> next());
        case ITERATING:
            if (headA != null || headB != null) {
                if (headA == null || headB != null && headA > headB) {
                    final Integer result = headB;
                    return inputB.next().thenApply(b -> { // thenCompose 某个值 <=> thenApply
                        headB = b;
                        return result;
                    });
                } else {
                    final Integer result = headA;
                    return inputA.next().thenApply(a -> { // 同上
                        headA = a;
                        return result;
                    });
                }
            } else {
                state = State.DONE;
                return CompletableFuture.completedFuture(null);
            }
        case DONE:
            throw new IllegalStateException("Generator has been exhausted!");
        default:
            throw new AssertionError("Unreachable!");
        }
    }
}

总结

任何函数都可以用 CompletableFuture 实现异步化,最通用的方式如下:

  1. 在函数里加上 yield (返回下一个结果)和 await (等待输入值)来标记断点;
  2. 画出控制流图,注意要在 yieldawait 处断开,断开处标记为状态;
  3. 实现一个状态机类,把控制流图中的代码块、状态都无脑填进去,搞定。

这一刻,我们都是(人肉)编译器。

原文  https://ericfu.me/completable-future-not-so-bad/
正文到此结束
Loading...