Java 8 的 CompletableFuture
private static void create() {
// 新建一个已经完成的
CompletableFuture.<em>completedFuture</em>("");
// JDK 9+
// CompletableFuture.failedFuture(new Exception());
// 类似 SettableFuture.create() 新建一个壳子
CompletableFuture<String> hello = new CompletableFuture<>();
// 然后在合适的时机使它完成
hello.complete("");
hello.completeExceptionally(new Exception());
// 异步运行
CompletableFuture.<em>runAsync</em>(() -> System.<em>out</em>.println("CompletableFuture<Void>"));
// 异步计算
CompletableFuture.<em>supplyAsync</em>(() -> "CompletableFuture<String>");
}
private static void then() {
CompletableFuture.<em>completedFuture</em>("")
// 完成后执行一个 Function 进行数据转换 类似 Stream.map
.thenApply(String::length)
// 联合另一个 CompletableFuture,都完成后执行一个 BiFunction 进行数据转换
.thenCombine(CompletableFuture.<em>completedFuture</em>(1), (length, num) -> "len+num=" + length + num)
// 当两个都完成后执行一个动作 返回新的 CompletableFuture<Void>
.thenAcceptBoth(CompletableFuture.<em>completedFuture</em>(2), (s, num) -> System.<em>out</em>.printf("%s,%d/n", s, num))
.thenApply(v -> "s")
// 将完成后的结果转换为另一个 CompletableFuture 类似 Stream.flatMap
.thenCompose(s -> CompletableFuture.<em>completedFuture</em>("compose:" + s))
// 当出现异常时 处理异常并返回异常时的值
.exceptionally(e -> "之前是什么类型这里就需要返回什么类型")
// 完成后执行的动作 返回新的 CompletableFuture<Void>
.thenAccept(System.<em>out</em>::println)
// 类似 Accept 完成后执行的动作,返回新的 CompletableFuture<Void>
.whenComplete((result, ex) -> {
})
// 类似 Apply 完成后执行的动作,返回新的 CompletableFuture<U>
.handle((result, ex) -> "new")
;
}
该 CompletableFuture 在哪个线程完成的,它之后紧接着的 then 操作就在这个线程运行。
private static void thread() {
CompletableFuture<String> f = new CompletableFuture<>();
CompletableFuture<Long> other = new CompletableFuture<>();
System.<em>out</em>.println("out: " + Thread.<em>currentThread</em>().getName());
f
.thenApply(s -> {
// 这里的线程是给 f 设置结果的线程
System.<em>out</em>.println("thenApply: " + Thread.<em>currentThread</em>().getName());
return s;
})
.thenApplyAsync(s -> {
// Async 结尾的方法不传 线程池则是 ForkJoinPoll.commonPoll
System.<em>out</em>.println(Thread.<em>currentThread</em>().getName());
return s;
})
.thenAccept(s -> {
// 和上个操作是同一个线程
System.<em>out</em>.println(s + " " + Thread.<em>currentThread</em>().getName());
})
.thenCompose(v -> other)
.thenApply(num -> {
// 执行线程 取决于哪个 future 后完成
System.<em>out</em>.println(num + " .thenCompose.thenApply: " + Thread.<em>currentThread</em>().getName());
return num + 1;
})
;
new Thread(() -> {
// sleep(50);
f.complete("Ha");
}, "MyThread-1").start();
new Thread(() -> {
<em>sleep</em>(50);
other.complete(1L);
}, "MyThread-2").start();
}
private static void sleep(long mills) {
try {
Thread.<em>sleep</em>(mills);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
如果是已完成的 Future, 调用各种 then 方法,就直接执行了;如果还没完成,则需要把各种 Function/Consumer 等需要执行的回调动作保存起来,待完成后再执行。
private static void debug() {
// 实际开发一般都是一条链走到底,这里为了 Debug 好对比哪个实例是哪个 故分开写变量
CompletableFuture<String> hello = new CompletableFuture<>();
CompletableFuture<Void> print = hello.thenAccept(System.<em>out</em>::println);
CompletableFuture<String> upper = hello.thenApply(String::toUpperCase);
CompletableFuture<Void> v1 = upper.thenAccept(System.<em>out</em>::println);
CompletableFuture<Void> v2 = print.thenCombine(upper, (aVoid, s) -> s.toCharArray())
.thenCompose(chars -> CompletableFuture.<em>completedFuture</em>(chars.length))
.thenAccept(System.<em>out</em>::println);
hello.complete("Hello");
}
参考链接: