转载

手把手教你使用 CompletableFuture

背景

在jdk5中,我们通过使用Future和Callable,可以在任务执行完毕后得到任务执行结果。可以使用isDone检测计算是否完成,使用cancle停止执行任务,使用阻塞方法get阻塞住调用线程来获取返回结果,使用阻塞方式获取执行结果,有违异步编程的初衷,而且Future的异常只能自己内部处理。

jdk8中加入了实现类CompletableFuture<T>,用于异步编程。底层做任务使用的是ForkJoin, 顾名思义,是将任务的数据集分为多个子数据集,而每个子集,都可以由独立的子任务来处理,最后将每个子任务的结果汇集起来。它是ExecutorService接口的一个实现,它把子任务分配给线程池(称为ForkJoinPool)中的工作线程。 从api文档看,它实现了2个接口 CompletionStage<T>, Future<T>,CompletableFuture<T>拥有Future的所有特性。 CompletionStage支持lambda表达式,接口的方法的功能都是在某个阶段得到结果后要做的事情。 CompletableFuture内置lambda表达式,支持异步回调,结果转换等功能,它有以下Future实现不了的功能

  • 合并两个相互独立的异步计算的结果。

  • 等待异步任务的所有任务都完成。

  • 等待异步任务的其中一个任务完成就返回结果。

  • 任务完成后调用回调方法

  • 任务完成的结果可以用于下一个任务。

  • 任务完成时发出通知

  • 提供原生的异常处理api

CompletableFuture的使用方法

首先说下获取结果方式 CompletableFuture获取结果的方式有如下4个方法:

1:get 阻塞获取结果,实现Future的get接口,显式抛出异常

2:getNow(T valueIfAbsent) 获取执行结果,如果当前任务未执行完成,则返回valueIfAbsent

3: join 执行完成后返回执行结果,或者抛出unchecked异常

4: T get(long timeout, TimeUnit unit) 在有限时间内获取数据

以下是CompletableFuture的创建对象以及api的使用

1: 创建CompletableFuture 对象

public static <U> CompletableFuture<U> completedFuture(U value) 

静态方法,返回一个已经计算好的CompletableFuture 比如

@Testpublic void testStatic() {
	CompletableFuture<String> completableFuture =              CompletableFuture.completedFuture("test");	//判断cf是否 执行完毕
	assertTrue(completableFuture.isDone());	//getNow获取结果,如果获取不到,返回默认值null
	assertEquals("test", completableFuture.getNow(null));
 }

completableFuture 还能主动结束运算,并显示处理异常,如下是异步执行的代码

@Test
public void testActive() {
	CompletableFuture<String> completableFuture = new CompletableFuture();	new Thread(() -> {		try {
			String string = null;
			string.length();
			Thread.currentThread().sleep(2000);			
                        // 通知完成计算 ,并将结果complete返回
			completableFuture.complete("complete");
		} catch (Exception e) {
		    // 处理异常 在获取结果地方可以捕获到异常
		    completableFuture.completeExceptionally(e);
		}
	}).start();	
        try {
		// 同步等待返回结果  如果thread内部未发生异常并执行了complete方法,将得到字符串“complete”的结果
		System.out.println(completableFuture.join());
	} catch (Exception e) {
		//捕获线程内部的异常  捕获空指针异常
		System.out.println("发生异常了" + e.getMessage());
	}
}

2: 使用工厂方式创建cf对象

CompletableFuture主要有以下四个工厂方式创建对象的静态方法:

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {  
    return asyncSupplyStage(asyncPool, supplier);
}
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor) {
  return asyncSupplyStage(screenExecutor(executor), supplier);
}
public static CompletableFuture<Void> runAsync(Runnable runnable) {    
   return asyncRunStage(asyncPool, runnable);
}
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor) {   
   return asyncRunStage(screenExecutor(executor), runnable);
}

Supplier是java8函数式编程的一个接口,是一个生产者,可以不接收参数。只有一个get方法返回一个泛型实例。 很明显,Async结尾的都是可以异步执行,runAsync 接收一个Runnable函数式接口类型参数,不返回结算结果。supplyAsync接收一个函数式接口类型Supplier ,可以返回计算结果。以上方法如果不指定执行任务的线程池Executor ,则默认使用ForkJoinPool.commonPoolcommonPool执行任务。这些接口都支持lambda实现异步的操作。 以下是SupplyAsync异步执行的简单示例

@Testpublic void testSupplyAsync() {
	CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {		//执行耗时任务
		try {
			Thread.sleep(2000);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}		return "glz";
	});	//获取结果
	System.out.println(cf.join());
}

3: CompletableFuture 的异步回调功能

上面的方法,执行任务是异步操作。但是调用线程还在等待结果。我们还可以给cf添加回调方法,在任务执行完成后使用cf的结果再做下一步操作,转换。所以 执行以下方法时,cf已经计算完毕。

public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn);public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn);public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn,Executor executor);

从参数类型可以看到,这是接收一个cf计算的结果T,经过处理后返回参数类型为U的cf。 其中第一个方法是在cf完成的线程中调用。而带Async将在与调用者cf不同的线程中异步调用。

@Test
public void testThenApply() {
	CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {		// 执行耗时任务
		try {
			Thread.sleep(2000);
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
               return "123";
	});
	// 这里cf的计算结果传个thenApply作为参数,执行字符串转int的方法,并返回一个cf对象cf1
	CompletableFuture<Integer> cf1 = cf.thenApply(Integer::parseInt);
	// cf1的计算结果作为参数x传给thenApply,返回一个心得cf对象 cf2.	CompletableFuture<Double> cf2 = cf1.thenApply(x -> x * 0.01);
	// 获取最终结果
	System.out.println(cf2.join());
	//如果回调函数比较耗时,可以使用异步的方法thenApplyAsync}

4: 运行完成时的代码,即对结果进行消耗

public CompletionStage<Void> thenAccept(Consumer<? super T> action); 
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor);

入参是Consumer ,执行Consumer 后没有返回结果,所以称为消耗。

@Test
public void thenAccept(){    
    CompletableFuture.supplyAsync(() -> "gong").thenAccept(x -> System.out.println(s+" lz"));
}

结果是 gong lz

5:上一步结果与下一步操作无关系

在执行cf后,如果得到的结果对下一步没有影响,也就是说下一步的操作并不关心上一步的结果,最终也不返回值,可以使用thenRun 参数传递一个Runnable.

public CompletionStage<Void> thenRun(Runnable action);public CompletionStage<Void> thenRunAsync(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action,Executor executor);
@Test
public void thenRun() {
	CompletableFuture.supplyAsync(() -> "hello").thenRun(() -> System.out.println("hello world"));
}

6: 对2个cf的结果进行组合thenCompose

public <U> CompletionStage<U> thenCompose  (Function<? super T, ? extends CompletionStage<U>> fn); 
 public <U> CompletionStage<U> thenComposeAsync (Function<? super T, ? extends CompletionStage<U>> fn); 
 public <U> CompletionStage<U> thenComposeAsync (Function<? super T, ? extends CompletionStage<U>> fn, Executor executor);

thenCompose方法,可以将2个独立的任务进行流水线操作 。将当前cf的计算结果作为参数传递给后面的cf

@Test
public void testCompose() {
	CompletableFuture<String> cf = CompletableFuture.completedFuture("hello")
			.thenCompose(result -> CompletableFuture.supplyAsync(() -> {
				System.out.println(result);				
                                return "result";
			}));	
        System.out.println(cf.join());
}

7: 结合2个cf的结果 thenCombine

可以将2个完全不相干的对象的结果整合起来,2项任务可以同时执行,比如一个对外的接口服务,既查询数据库中要查询数据的总量,也要返回具体某一页的数据,可以一个cf负责执行查询总条数count的sql,一个查询一页数据。BiFunction是合并结果数据的函数

public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)

其中T是调用thenCombine的cf的结果数据,U是other的结果,v就是合并的结果类型。

@Test
public void testCombine() {
	CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {		try {
			Thread.sleep(3000);
			System.out.println("cf1 is doning");
		} catch (InterruptedException e) {
			e.printStackTrace();
		} // 返回结果
		return "hello";
	});
	CompletableFuture<String> result = cf1.thenCombine(CompletableFuture.supplyAsync(() -> {		try {
			Thread.sleep(500);
			System.out.println("cf2 is doning");
		} catch (InterruptedException e) {
			e.printStackTrace();
		}	
return "world"; }), (x, y) -> x + y);//合并2个操作结果
S
ystem.out.println(result.join());
}

8:消耗两个cf的结果,不返回结果

<U> CompletableFuture<Void> thenAcceptBoth(CompletableFuture<? extends U> other, BiConsumer<? super T,? super U> block)
CompletableFuture<Void> runAfterBoth(CompletableFuture<?> other, Runnable action)

对于2个cf,我们只想在他们执行完成时,消耗执行结果,但是不做数据返回,,我们只是希望当完成时得到通知. 此方法与thenCombine相似,只不过返回 CompletableFuture<Void> ,只做消耗处理

@Test
public void testAcceptBoth(){
    CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "100");
    CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> 100);
    CompletableFuture<Void> future = future1.thenAcceptBoth(future2, (s, i) -> System.out.println(Double.parseDouble(s + i)));

    try {
        future.get();
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (ExecutionException e) {
        e.printStackTrace();
    }
}

9:取计算速度最快的结果

针对两个CompletionStage,将计算最快的那个CompletionStage的结果用来作为下一步的消耗。 此方法接受Consumer只对结果进行消耗.

public CompletionStage<Void> acceptEither(CompletionStage<? extends T> other,Consumer<? super T> action); 
 public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action); 
 public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action,Executor executor);
@Test
public void acceptEither() {
	CompletableFuture.supplyAsync(() -> {		
               try {	// 如果不加sleep,可能打印hello
			Thread.currentThread().sleep(10L);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}		return "hello";
	}).acceptEither(CompletableFuture.supplyAsync(() -> "world"), result -> {
		System.out.println(result);
	});
}

10: 计算最快的cf的结果转换

针对两个CompletionStage,将计算的快的那个CompletionStage的结果用来作为下一步的转换操作。

public <U> CompletionStage<U> applyToEither(CompletionStage<? extends T> other,Function<? super T, U> fn);
public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn);
public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn,Executor executor);

fn是对 调用applyToEither的调用者和 other 2个计算最快的那个结果进行处理,传入t类型数据,返回一个CompletionStage

@Test
public void applyToEither() {
    double result = CompletableFuture.supplyAsync(() -> {        
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }        return "0.001";
    }).applyToEither(CompletableFuture.supplyAsync(() -> {       
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }        return "0.002";
    }), s -> Double.valueOf(s)).join();   
     System.out.println(result);
}
    //由于返回0.002的cf睡眠时间比较短,先执行完毕,优先返回结果,所以2个cf最先返回0.002.最终result就是0.002

11:2个cf都执行完后执行操作

2个cf都执行完后,执行操作Runnable,Runnable不关心2个cf的执行结果

public CompletionStage<Void> runAfterBoth(CompletionStage<?> other,Runnable action);
public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action);
public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action,Executor executor);
    @Test
    public void runAfterBoth() {
	 CompletableFuture.supplyAsync(() -> "m").runAfterBothAsync(CompletableFuture.supplyAsync(() ->		"n"), () -> System.out.println("hello world"));
    }

12:处理cf数组

以上介绍的都是2个future的组合使用。cf还提供allOf,参数是cf数组,当数组中所有的cf都执行完成时,返回一个CompletableFuture<Void>。调用返回的cf的join方法阻塞等待cf数组中所有cf执行完成。 anyOf是当cf数组中任意一个cf执行完成后,就返回一个cf。

public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs);
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs);

读者可自行编写示例代码

13: 异常处理

在上面手工创建cf对象中,介绍过异常的处理,同样使用工厂创建的cf也具有异常管理机制,读者可自行举一反三。

小结

本文简单介绍了cf的使用方法,读者可参阅java8实战这本书,更深入学习CompletableFuture的应用场景。

参考: java8实战

手把手教你使用 CompletableFuture

原文  https://mp.weixin.qq.com/s/gRHaRvOoa1PJ2U0MECsoVw
正文到此结束
Loading...