为什么需要CompletableFuture
Jdk8之前,通过调用线程池的submit方法可以让任务以异步的方式运行,该方法会返回一个Future对象,通过调用get方法获取异步执行的结果。
private static List<String> findPriceFutureAsync(String product) {
ExecutorService es = Executors.newCachedThreadPool();
List<Future<String>> futureList = shopList.stream().map(shop -> es.submit(() -> String.format("%s price is %.2f",
shop.getName(), shop.getPrice(product)))).collect(Collectors.toList());
return futureList.stream()
.map(f -> {
String result = null;
try {
result = f.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
return result;
}).collect(Collectors.toList());
}
如果使用CompletionService,任务先完成可优先获取到,即结果按照完成先后顺序排序。
CompletionService<String> completion = new ExecutorCompletionService<>(taskExecutor);
completion.submit(()->{//...});
使用Future最大的问题就是不能将控制流分离到不同的事件处理器中。线程等待各个异步执行的线程返回的结果来做下一步操作,则必须阻塞在future.get()的地方等待结果返回。这时候又变成同步了。
CompletableFuture实现了Future和CompletionStage接口,保留了Future的优点,并且弥补了其不足。即异步的任务完成后,需要用其结果继续操作时,无需等待。可以直接通过thenAccept、thenApply、thenCompose等方式将前面异步处理的结果交给另外一个异步事件处理线程来处理。
CompletableFuture使用
CompletableFuture的创建
- 使用new方法
CompletableFuture<Double> futurePrice = new CompletableFuture<>();
- 使用CompletableFuture#completedFuture静态方法创建
public static <U> CompletableFuture<U> completedFuture(U value) { return new CompletableFuture<U>((value == null) ? NIL : value); }
参数的值为任务执行完的结果,一般该方法在实际应用中较少应用
- 使用 CompletableFuture#supplyAsync静态方法创建 supplyAsync有两个重载方法
//方法一 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) { return asyncSupplyStage(asyncPool, supplier); } //方法二 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor) { return asyncSupplyStage(screenExecutor(executor), supplier); }
- 使用CompletableFuture#runAsync静态方法创建 runAsync有两个重载方法
//方法一 public static CompletableFuture<Void> runAsync(Runnable runnable) { return asyncRunStage(asyncPool, runnable); } //方法二 public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor) { return asyncRunStage(screenExecutor(executor), runnable); }
说明:
- 两个重载方法之间的区别 => 后者可以传入自定义Executor,前者是默认的,使用的ForkJoinPool
- supplyAsync和runAsync方法之间的区别 => 前者有返回值,后者无返回值
- Supplier是函数式接口,因此该方法需要传入该接口的实现类,追踪源码会发现在run方法中会调用该接口的方法。因此使用该方法创建CompletableFuture对象只需重写Supplier中的get方法,在get方法中定义任务即可。又因为函数式接口可以使用Lambda表达式,和new创建CompletableFuture对象相比代码会简洁不少
结果的获取
对于结果的获取CompltableFuture类提供了四种方式
//方式一
public T get()
//方式二
public T get(long timeout, TimeUnit unit)
//方式三
public T getNow(T valueIfAbsent)
//方式四
public T join()
说明:
- get()和get(long timeout, TimeUnit unit) => 在Future中就已经提供了,后者提供超时处理,如果在指定时间内未获取结果将抛出超时异常
- getNow => 立即获取结果不阻塞,结果计算已完成将返回结果或计算过程中的异常,如果未计算完成将返回设定的valueIfAbsent值
- join => 方法里不会抛出异常
public class AcquireResultTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//getNow方法测试
CompletableFuture<String> cp1 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(60 * 1000 * 60 );
} catch (InterruptedException e) {
e.printStackTrace();
}
return "hello world";
});
System.out.println(cp1.getNow("hello h2t"));
//join方法测试
CompletableFuture<Integer> cp2 = CompletableFuture.supplyAsync((()-> 1 / 0));
System.out.println(cp2.join());
//get方法测试
CompletableFuture<Integer> cp3 = CompletableFuture.supplyAsync((()-> 1 / 0));
System.out.println(cp3.get());
}
}
说明
- 第一个执行结果为hello h2t,因为要先睡上1分钟结果不能立即获取
- join方法获取结果方法里不会抛异常,但是执行结果会抛异常,抛出的异常为CompletionException
- get方法获取结果方法里将抛出异常,执行结果抛出的异常为ExecutionException
异常处理
使用静态方法创建的CompletableFuture对象无需显示处理异常,使用new创建的对象需要调用completeExceptionally方法设置捕获到的异常
CompletableFuture completableFuture = new CompletableFuture();
new Thread(() -> {
try {
//doSomething,调用complete方法将其他方法的执行结果记录在completableFuture对象中
completableFuture.complete(null);
} catch (Exception e) {
//异常处理
completableFuture.completeExceptionally(e);
}
}).start();
其他API介绍
whenComplete 计算结果的处理
对前面计算结果进行处理,不返回新值
//方法一
public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)
//方法二
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
//方法三
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
- BiFunction<? super T,? super U,? extends V> fn参数 => 定义对结果的处理
- Executor executor参数 => 自定义线程池
- 以async结尾的方法将会在一个新的线程中执行组合操作
示例:
public class WhenCompleteTest {
public static void main(String[] args) {
CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> "hello");
CompletableFuture<String> cf2 = cf1.whenComplete((v, e) ->
System.out.println(String.format("value:%s, exception:%s", v, e)));
System.out.println(cf2.join());
}
}
thenApply 转换
将前面计算结果的的CompletableFuture传递给thenApply,返回thenApply处理后的结果。可以认为通过thenApply方法实现CompletableFuture
//方法一
public <U> CompletableFuture<U> thenApply(
Function<? super T,? extends U> fn) {
return uniApplyStage(null, fn);
}
//方法二
public <U> CompletableFuture<U> thenApplyAsync(
Function<? super T,? extends U> fn) {
return uniApplyStage(asyncPool, fn);
}
//方法三
public <U> CompletableFuture<U> thenApplyAsync(
Function<? super T,? extends U> fn, Executor executor) {
return uniApplyStage(screenExecutor(executor), fn);
}
- Function<? super T,? extends U> fn参数 => 对前一个CompletableFuture 计算结果的转化操作
- Executor executor参数 => 自定义线程池\
- 以async结尾的方法将会在一个新的线程中执行组合操作
示例:
//将前一个CompletableFuture计算出来的结果扩大八倍
public class ThenApplyTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Integer> result = CompletableFuture.supplyAsync(ThenApplyTest::randomInteger).thenApply((i) -> i * 8);
System.out.println(result.get());
}
public static Integer randomInteger() {
return 10;
}
}
thenAccept 结果处理
thenApply也可以归类为对结果的处理,thenAccept和thenApply的区别就是没有返回值
//方法一
public CompletableFuture<Void> thenAccept(Consumer<? super T> action) {
return uniAcceptStage(null, action);
}
//方法二
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) {
return uniAcceptStage(asyncPool, action);
}
//方法三
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action,
Executor executor) {
return uniAcceptStage(screenExecutor(executor), action);
}
- Consumer<? super T> action参数 => 对前一个CompletableFuture计算结果的操作
- Executor executor参数 => 自定义线程池
- 同理以async结尾的方法将会在一个新的线程中执行组合操作
示例:
//将前一个CompletableFuture计算出来的结果打印出来
public class ThenAcceptTest {
public static void main(String[] args) {
CompletableFuture.supplyAsync(ThenAcceptTest::getList).thenAccept(strList -> strList.stream()
.forEach(m -> System.out.println(m)));
}
public static List<String> getList() {
return Arrays.asList("a", "b", "c");
}
}
thenCompose 异步结果流水化
将两个异步操作进行流水操作
//方法一
public <U> CompletableFuture<U> thenCompose(
Function<? super T, ? extends CompletionStage<U>> fn) {
return uniComposeStage(null, fn);
}
//方法二
public <U> CompletableFuture<U> thenComposeAsync(
Function<? super T, ? extends CompletionStage<U>> fn) {
return uniComposeStage(asyncPool, fn);
}
//方法三
public <U> CompletableFuture<U> thenComposeAsync(
Function<? super T, ? extends CompletionStage<U>> fn,
Executor executor) {
return uniComposeStage(screenExecutor(executor), fn);
}
- Function<? super T, ? extends CompletionStage<U>> fn参数 => 当前CompletableFuture计算结果的执行
- Executor executor参数 => 自定义线程池
- 同理以async结尾的方法将会在一个新的线程中执行组合操作
示例:
public class ThenComposeTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Integer> result = CompletableFuture.supplyAsync(ThenComposeTest::getInteger)
.thenCompose(i -> CompletableFuture.supplyAsync(() -> i * 10));
System.out.println(result.get());
}
private static int getInteger() {
return 666;
}
private static int expandValue(int num) {
return num * 10;
}
}
执行流程图
thenCombine 组合结果
thenCombine方法将两个无关的CompletableFuture组合起来,第二个Completable并不依赖第一个Completable的结果
//方法一
public <U,V> CompletableFuture<V> thenCombine(
CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn) {
return biApplyStage(null, other, fn);
}
//方法二
public <U,V> CompletableFuture<V> thenCombineAsync(
CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn) {
return biApplyStage(asyncPool, other, fn);
}
//方法三
public <U,V> CompletableFuture<V> thenCombineAsync(
CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn, Executor executor) {
return biApplyStage(screenExecutor(executor), other, fn);
}
- CompletionStage<? extends U> other参数 => 新的CompletableFuture的计算结果
- BiFunction<? super T,? super U,? extends V> fn参数 => 定义了两个CompletableFuture对象完成计算后如何合并结果,该参数是一个函数式接口,因此可以使用Lambda表达式
- Executor executor参数 => 自定义线程池
- 同理以async结尾的方法将会在一个新的线程中执行组合操作
示例:
public class ThenCombineTest {
private static Random random = new Random();
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Integer> result = CompletableFuture.supplyAsync(ThenCombineTest::randomInteger).thenCombine(
CompletableFuture.supplyAsync(ThenCombineTest::randomInteger), (i, j) -> i * j
);
System.out.println(result.get());
}
public static Integer randomInteger() {
return random.nextInt(100);
}
}
将两个线程计算出来的值做一个乘法在返回 执行流程图:
allOf&anyOf 组合多个CompletableFuture
//allOf
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
return andTree(cfs, 0, cfs.length - 1);
}
//anyOf
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) {
return orTree(cfs, 0, cfs.length - 1);
}
- allOf => 所有的CompletableFuture都执行完后执行计算。
- anyOf => 任意一个CompletableFuture执行完后就会执行计算
//allOf方法没有返回值,适合没有返回值并且需要前面所有任务执行完毕才能执行后续任务的应用场景
public class AllOfTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Void> future1 = CompletableFuture.supplyAsync(() -> {
System.out.println("hello");
return null;
});
CompletableFuture<Void> future2 = CompletableFuture.supplyAsync(() -> {
System.out.println("world"); return null;
});
CompletableFuture<Void> result = CompletableFuture.allOf(future1, future2);
System.out.println(result.get());
}
}
//两个线程都会将结果打印出来,但是get方法只会返回最先完成任务的结果。该方法比较适合只要有一个返回值就可以继续执行其他任务的应用场景
public class AnyOfTest {
private static Random random = new Random();
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
randomSleep();
System.out.println("hello");
return "hello";});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
randomSleep();
System.out.println("world");
return "world";
});
CompletableFuture<Object> result = CompletableFuture.anyOf(future1, future2);
System.out.println(result.get());
}
private static void randomSleep() {
try {
Thread.sleep(random.nextInt(10));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
循环创建并发线程
基本思路是:将所有的子线程任务通过循环的方式放入到一个List<CompletableFuture>
里,根据业务的场景,选择不同的方法:
- 所有子线程都需要完成后再执行主线程
CompletableFuture.allOf().join()
- 其中任何一个子线程完成后就执行主线程
ComPletableFuture.anyOf()
业务场景:根据上传的多个行政区编码(adCode)并发查询天气信息。
因为qWeatherByCode()方法有返回值,所以需要使用CompletableFuture.supplyAsync()
方法。
该方法返回一个CompletableFuture对象,然后加入到List<CompletableFuture>
对象里。
然后使用CompletableFuture.allOf().join()
方法,当调用该方法时,主线程会一直阻塞,直到List
List<CompletableFuture> futures = new ArrayList();
for (String adCode : adCodeList) {
futures.add(CompletableFuture.supplyAsync(()->
qWeatherByCode(adCode),
asyncExecutor() //自定义线程池
));
}
CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])).join();
一些例子
public class CompletableFutureDemo {
/**
* 创建CompletableFuture
* - runAsync
* - supplyAsync
* - completedFuture
* <p>
* 异步计算启用的线程池是守护线程
*/
@Test
public void test1() {
//1、异步计算:无返回值
//默认线程池为:ForkJoinPool.commonPool()
CompletableFuture.runAsync(() -> {
// TODO: 2018/9/8 无返回异步计算
System.out.println(Thread.currentThread().isDaemon());
});
//指定线程池,(到了jdk9CompletableFuture还拓展了延迟的线程池)
CompletableFuture.runAsync(() -> {
// TODO: 2018/9/8 无返回异步计算
}, Executors.newFixedThreadPool(2));
//2、异步计算:有返回值
// 使用默认线程池
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "result1");
//getNow指定异步计算抛出异常或结果返回null时替代的的值
String result1 = future1.getNow(null);
// 指定线程池
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "result2", Executors.newFixedThreadPool(2));
//getNow指定异步计算抛出异常或结果返回null时替代的的值
String result2 = future2.getNow(null);
//3、初始化一个有结果无计算的CompletableFuture
CompletableFuture<String> future = CompletableFuture.completedFuture("result");
String now = future.getNow(null);
System.out.println("now = " + now);
}
/**
* 计算完成时需要对异常进行处理或者对结果进行处理
* - whenComplete:同步处理包括异常
* - thenApply:同步处理正常结果(前提是没有异常)
* <p>
* - whenCompleteAsync:异步处理包括异常
* - thenApplyAsync:异步处理正常结果(前提是没有异常)
* <p>
* - exceptionally : 处理异常
*/
@Test
public void test2() {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "result");
//whenComplete方法收future的结果和异常,可灵活进行处理
//1、同步处理
// 无返回值:可处理异常
future.whenComplete((result, throwable) -> System.out.println("result = " + result));
// 有返回值:没有异常处理(前提)
CompletableFuture<String> resultFuture1 = future.thenApply(result -> "result");
String result1 = resultFuture1.getNow(null);
//2、异步处理:
// 无返回值: 默认线程池
future.whenCompleteAsync((result, throwable) -> System.out.println("result = " + result));
// 无返回值:指定线程池
future.whenCompleteAsync((result, throwable) -> System.out.println("result = " + result), Executors.newFixedThreadPool(2));
// 有返回值:默认线程池
CompletableFuture<String> resultFuture2 = future.thenApplyAsync(result -> "result");
String result2 = resultFuture2.getNow(null);
// 有返回值:指定线程池
CompletableFuture<String> resultFuture3 = future.thenApplyAsync(result -> "result", Executors.newFixedThreadPool(2));
String result3 = resultFuture3.getNow(null);
//3、处理异常,处理完之后返回一个结果
CompletableFuture<String> exceptionallyFuture = future.whenCompleteAsync((result, throwable) -> System.out.println("result = " + 1 / 0))
.exceptionally(throwable -> "发生异常了:" + throwable.getMessage());
System.out.println(exceptionallyFuture.getNow(null));
}
/**
* 异常处理还可以使用以下两个方法
* - handle
* - handleAsync
* <p>
* 备注:exceptionally同步和异步计算一起用如果出现异常会把异常抛出。用以上的方法可以拦截处理
*/
@Test
public void test3() {
CompletableFuture<String> exceptionoHandle = CompletableFuture.completedFuture("produce msg")
.thenApplyAsync(s -> "result" + 1 / 0);
String handleResult1 = exceptionoHandle.handle((s, throwable) -> {
if (throwable != null) {
return throwable.getMessage();
}
return s;
}).getNow(null);
//指定线程池
String handleResult2 = exceptionoHandle.handleAsync((s, throwable) -> {
if (throwable != null) {
return throwable.getMessage();
}
return s;
}, Executors.newFixedThreadPool(2)).getNow(null);
}
/**
* 生产--消费
* - thenAccept:同步的
* - thenAcceptAsync:异步的
* <p>
* 接受上一个处理结果,并实现一个Consumer,消费结果
*/
@Test
public void test4() {
//同步的
CompletableFuture.completedFuture("produce msg")
.thenAccept(s -> System.out.println("sync consumed msg : " + s));
//异步的
//默认线程池
CompletableFuture.completedFuture("produce msg")
.thenAcceptAsync(s -> System.out.println("async consumed msg : " + s));
//指定线程池
CompletableFuture.completedFuture("produce msg")
.thenAcceptAsync(s -> System.out.println("async consumed msg : " + s), Executors.newFixedThreadPool(2));
}
/**
* 取消任务
* - cancel
*/
@Test
public void test5() throws InterruptedException {
CompletableFuture<String> message = CompletableFuture.completedFuture("message").thenApplyAsync(s -> {
try {
Thread.sleep(800);
} catch (InterruptedException e) {
e.printStackTrace();
}
return s + "result";
});
String now = message.getNow(null);
System.out.println("now = " + now);
//取消
boolean cancel = message.cancel(true);
System.out.println("cancel = " + cancel);
//如果这里再去获取,会抛出异常,说明已经取消了
//String now1 = message.getNow(null);
Thread.sleep(1000);
}
/**
* 两个异步计算
* - applyToEither:有返回值,同步
* - acceptEither:无返回值,同步
* - applyToEitherAsync:有返回值,异步
* -
*/
@Test
public void test6() {
CompletableFuture<String> task1 = CompletableFuture.completedFuture("task1")
.thenApply(s -> "task1的计算结果:s1 = " + s);
//同步,有返回值
//applyToEither第二个参数接收的值是task1计算的返回值
CompletableFuture<String> result1 = task1.applyToEither(CompletableFuture.completedFuture("task2")
.thenApply(s -> "task2的计算结果:s2 = " + s), s -> s);
System.out.println("task2:" + result1.getNow(null));
//同步,无返回值
task1.acceptEither(CompletableFuture.completedFuture("task3")
.thenApply(s -> "task3的计算结果:s3 = " + s), s -> System.out.println("task3:" + s));
//异步有返回值,默认线程池,也可以指定
CompletableFuture<String> result2 = task1.applyToEitherAsync(CompletableFuture.completedFuture("task4")
.thenApply(s -> "task4的计算结果:s4 = " + s), s -> s);
//由于是异步的,主线程跑的快一点,因此join()之后才能看到跑完的结果
System.out.println("task4:" + result2.join());
//异步无返回值,指定线程池,也可以使用默认线程池
CompletableFuture<Void> task5 = task1.acceptEitherAsync(CompletableFuture.completedFuture("task5")
.thenApply(s -> "task5的计算结果:s5 = " + s), s -> System.out.println("task5:" + s), Executors.newFixedThreadPool(2));
task5.join();
}
/**
* 组合计算结果
* - runAfterBoth:都计算完之后执行一段代码
* - thenAcceptBoth:都计算完之后把结果传入,并执行一段代码
* <p>
* - thenCombine:组合两个结果
* - thenCompose:组合两个结果
*/
@Test
public void test7() {
//runAfterBoth方式
StringBuilder msg = new StringBuilder("jorgeZhong");
CompletableFuture.completedFuture(msg)
.thenApply(s -> s.append(" task1,"))
.runAfterBoth(CompletableFuture.completedFuture(msg)
.thenApply(s -> s.append(" task2")), () -> System.out.println(msg));
//thenAcceptBoth方式
CompletableFuture.completedFuture("jorgeZhong")
.thenApplyAsync(String::toLowerCase)
.thenAcceptBoth(CompletableFuture.completedFuture("jorgeZhong")
.thenApplyAsync(String::toUpperCase), (s, s2) -> System.out
.println("s1:" + s + ", s2:" + s2));
//thenCombine方式
CompletableFuture<String> result1 = CompletableFuture.completedFuture("jorgeZhong")
.thenApply(String::toLowerCase)
.thenCombine(CompletableFuture.completedFuture("jorgeZhong")
.thenApply(String::toUpperCase), (s, s2) -> "s1:" + s + ", s2:" + s2);
System.out.println("result1:" + result1.getNow(null));
//异步
CompletableFuture<String> result11 = CompletableFuture.completedFuture("jorgeZhong")
.thenApply(String::toLowerCase)
.thenCombineAsync(CompletableFuture.completedFuture("jorgeZhong")
.thenApplyAsync(String::toUpperCase), (s, s2) -> "s1:" + s + ", s2:" + s2);
System.out.println("result11:" + result11.join());
//thenCompose方式
CompletableFuture<String> result2 = CompletableFuture.completedFuture("jorgeZhong")
.thenApply(String::toLowerCase)
.thenCompose(s -> CompletableFuture.completedFuture("jorgeZhong")
.thenApply(String::toUpperCase)
.thenApply(s1 -> "s:" + s + ", s1:" + s1));
System.out.println("result2:" + result2.getNow(null));
//异步
CompletableFuture<String> result22 = CompletableFuture.completedFuture("jorgeZhong")
.thenApply(String::toLowerCase)
.thenComposeAsync(s -> CompletableFuture.completedFuture("jorgeZhong")
.thenApplyAsync(String::toUpperCase)
.thenApplyAsync(s1 -> "s:" + s + ", s1:" + s1));
System.out.println("result22:" + result22.join());
}
/**
* 多个CompletableFuture策略
* - anyOf:接受一个CompletableFuture数组,任意一个任务执行完返回。都会触发该CompletableFuture
* - whenComplete:计算执行完之后执行实现的一段代码,将上一个结果和异常作为参数传入
*/
@Test
public void test8() throws InterruptedException {
List<String> messages = Arrays.asList("a", "b", "c");
CompletableFuture.anyOf(messages.stream()
.map(o -> CompletableFuture.completedFuture(o).thenApplyAsync(s -> {
try {
Thread.sleep(new Random().ints(99, 300).findFirst().getAsInt());
} catch (InterruptedException e) {
e.printStackTrace();
}
return s.toUpperCase();
}))
.toArray(CompletableFuture[]::new))
.whenComplete((res, throwable) -> {
if (throwable == null) {
System.out.println(res.toString());
}
});
Thread.sleep(1000);
}
/**
* 多个CompletableFuture策略
* - allOf:接受一个CompletableFuture数组,所有任务返回后,创建一个CompletableFuture
*/
@Test
public void test9() {
List<String> messages = Arrays.asList("a", "b", "c");
CompletableFuture[] cfs = messages.stream()
.map(s -> CompletableFuture.completedFuture(s).thenApplyAsync(String::toUpperCase))
.toArray(CompletableFuture[]::new);
CompletableFuture.allOf(cfs)
.whenCompleteAsync((aVoid, throwable) -> Arrays.stream(cfs).forEach(completableFuture -> System.out
.println(completableFuture.getNow(null))));
}
}
后记
很多方法都提供了异步实现【带async后缀】,但是需小心谨慎使用这些异步方法,因为异步意味着存在上下文切换,可能性能不一定比同步好。如果需要使用异步的方法,先做测试,用测试数据说话!!!
参考