知乎专栏 |
public Future<Double> getPriceAsync(String product) { //创建 CompletableFuture 对象,对象中包含异步计算结果 CompletableFuture<Double> futurePrice = new CompletableFuture<>(); //新建线程计算商品价格 new Thread(() -> { try { double price = calculatePrice(product); //将异步计算得到的结果设置到 CompletableFuture 中, futurePrice.complete(price); } catch (Exception e) { futurePrice.completeExceptionally(e); } }).start(); //无需等待计算结果,直接返回 CompletableFuture 对象 return futurePrice; }
runAsync 创建没有返回值的异步任务。它有如下两个方法,一个是使用默认线程池(ForkJoinPool.commonPool())的方法,一个是自定义线程池的重载方法
package cn.netkiller; import cn.netkiller.thread.ThreadManager; import lombok.SneakyThrows; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class Main { @SneakyThrows public static void main(String[] args) { CompletableFuture.runAsync(() -> { System.out.println("do something..."); try { Thread.sleep(1000); } catch (InterruptedException e) { throw new RuntimeException(e); } }); ExecutorService executorService = Executors.newSingleThreadExecutor(); CompletableFuture.runAsync(() -> { System.out.println("do something..."); try { Thread.sleep(1000); } catch (InterruptedException e) { throw new RuntimeException(e); } }, executorService); CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> { System.out.println("do something...."); // Thread.currentThread().setName("测试有返回值的异步执行"); try { Thread.sleep(1000); } catch (InterruptedException e) { throw new RuntimeException(e); } }); ThreadManager tm = new ThreadManager(); System.out.println(tm.show()); try { Thread.sleep(5000); } catch (InterruptedException e) { throw new RuntimeException(e); } System.out.println("Result ->" + completableFuture.isDone()); } }
do something... do something... do something.... ======================================================================================= | ID | Name | Group | Daemon | State | Priority | --------------------------------------------------------------------------------------- | 1 | main | main | false | RUNNABLE | 5 | | 21 | ForkJoinPool.commonPool-worker-1 | main | true | TIMED_WAITING | 5 | | 22 | pool-1-thread-1 | main | false | TIMED_WAITING | 5 | | 23 | ForkJoinPool.commonPool-worker-2 | main | true | TIMED_WAITING | 5 | ======================================================================================= Result ->true
package cn.netkiller.test; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicInteger; public class Test { public static void main(String[] args) throws ExecutionException, InterruptedException { System.out.println(Thread.currentThread()); AtomicInteger variable = new AtomicInteger(0); CompletableFuture<Void> runAsync = CompletableFuture.runAsync(() -> process(variable)); runAsync.join(); System.out.println(variable.get()); } public static void process(AtomicInteger variable) { System.out.println(Thread.currentThread().getName() + " Process..."); variable.set(1024); } }
thenRun/thenRunAsync 功能是什么?完成前置任务之后,自己在执行。
thenRun/thenRunAsync 区别是什么?thenRun 使用同一个线程执行任务,thenRunAsync 会再开一个新线程执行任务。
@GetMapping("/completableFutureRun") public String completableFutureRun() { CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> { System.out.println(Thread.currentThread().getName() + " - CompletableFuture 前置任务"); try { Thread.sleep(5 * 1000); } catch (InterruptedException e) { throw new RuntimeException(e); } }); CompletableFuture thenRun = completableFuture.thenRun(() -> { System.out.println(Thread.currentThread().getName() + " - 接着执行第二个 thenRun 任务"); }); CompletableFuture thenRunAsync = completableFuture.thenRunAsync(() -> { System.out.println(Thread.currentThread().getName() + " - 接着执行第二个 thenRunAsync 任务"); }); return "Done"; }
运行结果
ForkJoinPool.commonPool-worker-1 - CompletableFuture 前置任务 ForkJoinPool.commonPool-worker-1 - 接着执行第二个 thenRun 任务 ForkJoinPool.commonPool-worker-2 - 接着执行第二个 thenRunAsync 任务
这里可以看到 thenRunAsync 的线程变化,开启新线程 ForkJoinPool.commonPool-worker-2 处理任务
supplyAsync 创建带有返回值的异步任务。它有如下两个方法,一个是使用默认线程池(ForkJoinPool.commonPool())的方法,一个是带有自定义线程池的重载方法
// 带返回值异步请求,默认线程池 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) // 带返回值的异步请求,可以自定义线程池 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
package cn.netkiller; import cn.netkiller.thread.ThreadManager; import lombok.SneakyThrows; import java.util.concurrent.CompletableFuture; public class Main { @SneakyThrows public static void main(String[] args) { CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> { System.out.println("do something...."); return "done"; }); System.out.println("Result ->" + completableFuture.get()); ThreadManager tm = new ThreadManager(); System.out.println(tm.show()); } }
运行结果
do something.... Result ->done ======================================================================================= | ID | Name | Group | Daemon | State | Priority | --------------------------------------------------------------------------------------- | 1 | main | main | false | RUNNABLE | 5 | | 21 | ForkJoinPool.commonPool-worker-1 | main | true | TIMED_WAITING | 5 | =======================================================================================
package cn.netkiller; import cn.netkiller.thread.ThreadManager; import lombok.SneakyThrows; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class Main { @SneakyThrows public static void main(String[] args) { ExecutorService executorService = Executors.newSingleThreadExecutor(); CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> { System.out.println("do something...."); try { Thread.sleep(30000); } catch (InterruptedException e) { throw new RuntimeException(e); } return "done"; }, executorService); System.out.println("Result ->" + completableFuture.get()); ThreadManager tm = new ThreadManager(); System.out.println(tm.show()); } }
运行结果
do something.... Result ->done ====================================================================== | ID | Name | Group | Daemon | State | Priority | ---------------------------------------------------------------------- | 1 | main | main | false | RUNNABLE | 5 | | 21 | pool-1-thread-1 | main | false | WAITING | 5 | ======================================================================
设置线程名称
package cn.netkiller; import cn.netkiller.thread.ThreadManager; import lombok.SneakyThrows; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class Main { @SneakyThrows public static void main(String[] args) { CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> { System.out.println("do something...."); Thread.currentThread().setName("测试有返回值的异步执行"); return "done"; }); System.out.println("Result ->" + completableFuture.get()); ThreadManager tm = new ThreadManager(); System.out.println(tm.show()); } }
运行结果
do something.... Result ->done ================================================================== | ID | Name | Group | Daemon | State | Priority | ------------------------------------------------------------------ | 1 | main | main | false | RUNNABLE | 5 | | 21 | 测试有返回值的异步执行 | main | true | TIMED_WAITING | 5 | ==================================================================
通过 Supplier 对象创建异步执行
CompletableFuture<String> future = CompletableFuture.supplyAsync(new Supplier<String>() { @Override public String get() { return "https://www.netkiller.cn"; } }).exceptionally((throwable) -> { return throwable.toString(); });
thenAccept/thenAcceptAsync 的功能是,前置任务执行完毕之后,将返回值给到 thenAccept/thenAcceptAsync,再执行接下来的任务。
@GetMapping("/completableFutureAccept") public String completableFutureAccept() { CompletableFuture<String> supplyAsync = CompletableFuture.supplyAsync(() -> { log.info(Thread.currentThread().getName() + " - CompletableFuture 前置任务"); try { Thread.sleep(5 * 1000); } catch (InterruptedException e) { throw new RuntimeException(e); } return "前置任务执行完成"; }); CompletableFuture<Void> thenAccept = supplyAsync.thenAccept((rev) -> { log.info(Thread.currentThread().getName() + " - 接着执行第二个 thenAccept 任务"); log.info("前置任务返回值:" + rev); }); CompletableFuture<Void> thenAcceptAsync = supplyAsync.thenAcceptAsync((rev) -> { log.info(Thread.currentThread().getName() + " - 接着执行第二个 thenAcceptAsync 任务"); log.info("前置任务返回值:" + rev); }); return "Done"; }
输出结果
2023-05-10T10:38:48.008+08:00 INFO 96282 --- [onPool-worker-1] c.n.c.test.TestThreadController : ForkJoinPool.commonPool-worker-1 - CompletableFuture 前置任务 2023-05-10T10:38:53.015+08:00 INFO 96282 --- [onPool-worker-2] c.n.c.test.TestThreadController : ForkJoinPool.commonPool-worker-2 - 接着执行第二个 thenAcceptAsync 任务 2023-05-10T10:38:53.015+08:00 INFO 96282 --- [onPool-worker-1] c.n.c.test.TestThreadController : ForkJoinPool.commonPool-worker-1 - 接着执行第二个 thenAccept 任务 2023-05-10T10:38:53.016+08:00 INFO 96282 --- [onPool-worker-2] c.n.c.test.TestThreadController : 前置任务返回值:前置任务执行完成 2023-05-10T10:38:53.016+08:00 INFO 96282 --- [onPool-worker-1] c.n.c.test.TestThreadController : 前置任务返回值:前置任务执行完成
CompletableFuture<String> completableFuture = CompletableFuture.completedFuture("Hello netkiller"); if (completableFuture.isDone()) { System.out.println(completableFuture.get()); }
Spring Service 用法
@Service public class MyService { @Async public CompletableFuture<String> asyncMethod() { // 异步方法逻辑... return CompletableFuture.completedFuture("Result"); } } // 调用异步方法并获取结果 CompletableFuture<String> future = myService.asyncMethod(); String result = future.get(); // 阻塞等待结果
thenApply/thenApplyAsync 前置任务执行完毕之后,结果作为入参,thenApply/thenApplyAsync 执行完毕之后再返回执行结果
@GetMapping("/completableFutureApply") public String completableFutureApply() throws ExecutionException, InterruptedException { CompletableFuture<String> supplyAsync = CompletableFuture.supplyAsync(() -> { log.info(Thread.currentThread().getName() + " - CompletableFuture 前置任务"); try { Thread.sleep(5 * 1000); } catch (InterruptedException e) { throw new RuntimeException(e); } return "第一步"; }); CompletableFuture<String> thenApply = supplyAsync.thenApply((rev) -> { log.info(Thread.currentThread().getName() + " - 接着执行第二个 thenApply 任务"); log.info("前置任务返回值:" + rev); return "第二步"; }); CompletableFuture<String> thenApplyAsync = supplyAsync.thenApplyAsync((rev) -> { log.info(Thread.currentThread().getName() + " - 接着执行第二个 thenApplyAsync 任务"); log.info("前置任务返回值:" + rev); return "第二步"; }); log.info("supplyAsync:{}", supplyAsync.get()); log.info("thenApply:{}", thenApply.get()); log.info("thenApplyAsync:{}", thenApplyAsync.get()); return "Done"; }
2023-05-10T10:39:57.913+08:00 INFO 96282 --- [onPool-worker-2] c.n.c.test.TestThreadController : ForkJoinPool.commonPool-worker-2 - CompletableFuture 前置任务 2023-05-10T10:40:02.917+08:00 INFO 96282 --- [ XNIO-1 task-2] c.n.c.test.TestThreadController : XNIO-1 task-2 - 接着执行第二个 thenApply 任务 2023-05-10T10:40:02.917+08:00 INFO 96282 --- [onPool-worker-2] c.n.c.test.TestThreadController : ForkJoinPool.commonPool-worker-2 - 接着执行第二个 thenApplyAsync 任务 2023-05-10T10:40:02.918+08:00 INFO 96282 --- [ XNIO-1 task-2] c.n.c.test.TestThreadController : 前置任务返回值:第一步 2023-05-10T10:40:02.918+08:00 INFO 96282 --- [onPool-worker-2] c.n.c.test.TestThreadController : 前置任务返回值:第一步 2023-05-10T10:40:02.918+08:00 INFO 96282 --- [ XNIO-1 task-2] c.n.c.test.TestThreadController : supplyAsync:第一步 2023-05-10T10:40:02.918+08:00 INFO 96282 --- [ XNIO-1 task-2] c.n.c.test.TestThreadController : thenApply:第二步 2023-05-10T10:40:02.919+08:00 INFO 96282 --- [ XNIO-1 task-2] c.n.c.test.TestThreadController : thenApplyAsync:第二步
runAsync 配合 thenRun/thenRunAsync 使用
runAsync --> thenRun/thenRunAsync 无返回值
supplyAsync 配合 thenAccept/thenAcceptAsync 使用
supplyAsync -- 返回值 --> thenAccept/thenAcceptAsync --> 无返回值
supplyAsync 配合 thenApply/thenApplyAsync 使用
supplyAsync -- 返回值 --> thenApply/thenApplyAsync -- 返回值 -->
whenComplete 与 runAsync / thenAccept / thenApply 区别是能处理 Throwable
@GetMapping("/completableFutureWhenComplete") public String completableFutureWhenComplete() throws ExecutionException, InterruptedException { CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> { System.out.println("当前线程名称:" + Thread.currentThread().getName()); return "前置任务完成"; }).whenComplete((result, throwable) -> { System.out.println("前置任务返回值:" + result); }); System.out.println(completableFuture.get()); return "Done"; }
运行结果
当前线程名称:ForkJoinPool.commonPool-worker-1 前置任务返回值:前置任务完成 前置任务完成
future.whenCompleteAsync(new BiConsumer<String, Throwable>() { @Override public void accept(String s, Throwable throwable) { System.out.println("whenCompleteAsync: " + System.currentTimeMillis() + " : " + s); System.out.println("whenCompleteAsync: " + System.currentTimeMillis() + " : " + throwable.toString()); } });
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(5 * 1000); } catch (InterruptedException e) { e.printStackTrace(); } return "hello"; }); future.completeOnTimeout("default timeout result", 3 * 1000, TimeUnit.MILLISECONDS);
@Service public class MyService { @Async("threadPoolTaskExecutor") public CompletableFuture<String> asyncMethod1() { // 异步方法1逻辑... return CompletableFuture.completedFuture("Result1"); } @Async("threadPoolTaskExecutor") public CompletableFuture<String> asyncMethod2() { // 异步方法2逻辑... return CompletableFuture.completedFuture("Result2"); } } // 调用异步方法并处理结果顺序 CompletableFuture<String> future1 = myService.asyncMethod1(); CompletableFuture<String> future2 = future1.thenCompose(result1 -> myService.asyncMethod2()); String finalResult = future2.get(); // 阻塞等待最终结果
thenCombine会将两个任务的执行结果作为所提供函数的参数,且该方法有返回值。
thenAcceptBoth同样将两个任务的执行结果作为方法入参,但是无返回值。
runAfterBoth没有入参,也没有返回值。注意两个任务中只要有一个执行异常,则将该异常信息作为指定任务的执行结果。
public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread() + " completableFuture1 do something...."); return 1; }); CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread() + " completableFuture2 do something...."); return 2; }); CompletableFuture<Integer> completableFuture3 = completableFuture1.thenCombine(completableFuture2, (a, b) -> { System.out.println(Thread.currentThread() + " completableFuture3 do something...."); return a + b; }); System.out.println("completableFuture3结果->" + completableFuture3.get()); } public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread() + " completableFuture1 do something...."); return 1; }); CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread() + " completableFuture2 do something...."); return 2; }); CompletableFuture<Void> completableFuture3 = completableFuture1.thenAcceptBoth(completableFuture2, (a, b) -> { System.out.println(Thread.currentThread() + " completableFuture3 do something...."); System.out.println(a + b); }); System.out.println("completableFuture3结果->" + completableFuture3.get()); } public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread() + " completableFuture1 do something...."); return 1; }); CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread() + " completableFuture2 do something...."); return 2; }); CompletableFuture<Void> completableFuture3 = completableFuture1.runAfterBoth(completableFuture2, () -> { System.out.println(Thread.currentThread() + " completableFuture3 do something...."); }); System.out.println("completableFuture3结果->" + completableFuture3.get()); }
这三个方法和上面一样也是将两个CompletableFuture组合起来处理,当有一个任务正常完成时,就会进行下阶段任务。
applyToEither会将已经完成任务的执行结果作为所提供函数的参数,且该方法有返回值;
acceptEither同样将已经完成任务的执行结果作为方法入参,但是无返回值;
runAfterEither没有入参,也没有返回值。
public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<String> completableFuture1 = CompletableFuture.supplyAsync(() -> { try { System.out.println(Thread.currentThread() + " completableFuture1 do something...."); Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return "completableFuture1 任务完成"; }); CompletableFuture<String> completableFuture2 = CompletableFuture.supplyAsync(() -> { try { System.out.println(Thread.currentThread() + " completableFuture2 do something...."); Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } return "completableFuture2 任务完成"; }); CompletableFuture<String> completableFuture3 = completableFuture1.applyToEither(completableFuture2, (result) -> { System.out.println("接收到" + result); System.out.println(Thread.currentThread() + " completableFuture3 do something...."); return "completableFuture3 任务完成"; }); System.out.println("completableFuture3结果->" + completableFuture3.get()); } public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<String> completableFuture1 = CompletableFuture.supplyAsync(() -> { try { System.out.println(Thread.currentThread() + " completableFuture1 do something...."); Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return "completableFuture1 任务完成"; }); CompletableFuture<String> completableFuture2 = CompletableFuture.supplyAsync(() -> { try { System.out.println(Thread.currentThread() + " completableFuture2 do something...."); Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } return "completableFuture2 任务完成"; }); CompletableFuture<Void> completableFuture3 = completableFuture1.acceptEither(completableFuture2, (result) -> { System.out.println("接收到" + result); System.out.println(Thread.currentThread() + " completableFuture3 do something...."); }); System.out.println("completableFuture3结果->" + completableFuture3.get()); } public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<String> completableFuture1 = CompletableFuture.supplyAsync(() -> { try { System.out.println(Thread.currentThread() + " completableFuture1 do something...."); Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("completableFuture1 任务完成"); return "completableFuture1 任务完成"; }); CompletableFuture<String> completableFuture2 = CompletableFuture.supplyAsync(() -> { try { System.out.println(Thread.currentThread() + " completableFuture2 do something...."); Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("completableFuture2 任务完成"); return "completableFuture2 任务完成"; }); CompletableFuture<Void> completableFuture3 = completableFuture1.runAfterEither(completableFuture2, () -> { System.out.println(Thread.currentThread() + " completableFuture3 do something...."); System.out.println("completableFuture3 任务完成"); }); System.out.println("completableFuture3结果->" + completableFuture3.get()); }
allOf:CompletableFuture是多个任务都执行完成后才会执行,只有有一个任务执行异常,则返回的CompletableFuture执行get方法时会抛出异常,如果都是正常执行,则get返回null。
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "AAA").whenComplete((value, throwable) -> { System.out.println(value); }); CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "BBB").whenComplete((value, throwable) -> { System.out.println(value); }); CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> "CCC").whenComplete((value, throwable) -> { System.out.println(value); }); CompletableFuture.allOf(new CompletableFuture[]{future1, future2, future3}).join();
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "AAA").whenComplete((value, throwable) -> { System.out.println(value); }); CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "BBB").whenComplete((value, throwable) -> { System.out.println(value); }); CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> "CCC").whenComplete((value, throwable) -> { System.out.println(value); }); CompletableFuture.allOf(future1, future2, future3).join();
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "AAA").whenComplete((value, throwable) -> { System.out.println(value); }); CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "BBB").whenComplete((value, throwable) -> { System.out.println(value); }); CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> "CCC").whenComplete((value, throwable) -> { System.out.println(value); }); List<CompletableFuture<String>> completableFutures = Stream.of(future1, future2, future3).toList(); var completableFutureArray = completableFutures.toArray(CompletableFuture[]::new); CompletableFuture.allOf(completableFutureArray).join();
anyOf:CompletableFuture是多个任务只要有一个任务执行完成,则返回的CompletableFuture执行get方法时会抛出异常,如果都是正常执行,则get返回执行完成任务的结果。
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "AAA").whenComplete((value, throwable) -> { System.out.println(value); }); CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "BBB").whenComplete((value, throwable) -> { System.out.println(value); }); CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> "CCC").whenComplete((value, throwable) -> { System.out.println(value); }); CompletableFuture<Object> completableFuture = CompletableFuture.anyOf(future1, future2, future3); System.out.println(completableFuture.get()); CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "AAA").thenApply((value) -> { System.out.println(value); return value; }); CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "BBB").thenApply((value) -> { System.out.println(value); return value; }); CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> "CCC").thenApply((value) -> { System.out.println(value); return value; }); List<CompletableFuture<String>> completableFutures = Stream.of(future1, future2, future3).toList(); var completableFutureArray = completableFutures.toArray(CompletableFuture[]::new); CompletableFuture.allOf(completableFutureArray).join(); completableFutures.forEach(task -> { try { System.out.println(task.get()); } catch (InterruptedException e) { throw new RuntimeException(e); } catch (ExecutionException e) { throw new RuntimeException(e); } });
package cn.netkiller.test; import lombok.Data; import java.util.concurrent.CompletableFuture; @Data public class CompletableFutureExample { public static void main(String[] args) { CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> { System.out.println("Task 1 started"); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Task 1 completed"); }); CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> { System.out.println("Task 2 started"); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Task 2 completed"); }); CompletableFuture<Void> future3 = CompletableFuture.runAsync(() -> { System.out.println("Task 3 started"); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Task 3 completed"); }); CompletableFuture<Void> allFutures = CompletableFuture.allOf(future1, future2, future3); allFutures.thenRun(() -> { System.out.println("All tasks completed"); // 在这里执行下一步操作 }); // 防止 JVM 在 CompletableFuture 执行完之前退出 try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } } }
package cn.netkiller.test; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; public class Test { public static void main(String[] args) throws ExecutionException, InterruptedException { System.out.println(Thread.currentThread()); Parallel parallel = new Parallel(); parallel.addAsyncTask(() -> { System.out.println(Thread.currentThread().getName()); return "task1"; }) .addAsyncTask(() -> { System.out.println(Thread.currentThread().getName()); return "task2"; }) .addAsyncTask(() -> { System.out.println(Thread.currentThread().getName()); return "task3"; }) .addAsyncTask(() -> { System.out.println(Thread.currentThread().getName()); try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } return "finally"; }).join(); List<CompletableFuture<String>> futures = parallel.get(); futures.stream().forEach(item -> { System.out.println(item.getNow("no result")); }); } public static class Parallel<T> { private final List<CompletableFuture<T>> futures; Parallel() { this(10); } Parallel(int size) { futures = new ArrayList<>(size); } public Parallel addAsyncTask(Supplier<T> supplier) { futures.add(CompletableFuture.supplyAsync(supplier)); return this; } public List<CompletableFuture<T>> get() { return futures; } public void join() { CompletableFuture.allOf(futures.toArray(new CompletableFuture[]{})).join(); } public void clear() { futures.clear(); } } }
package cn.netkiller.test; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; import java.util.function.Supplier; public class Test { public static void main(String[] args) throws InterruptedException, ExecutionException { CompletableFuture<String> future = CompletableFuture.supplyAsync(new Supplier<String>() { @Override public String get() { try { while (true) { TimeUnit.SECONDS.sleep(1); System.out.println("supplyAsync: " + System.currentTimeMillis()); } } catch (Exception e) { e.printStackTrace(); } return "https://www.netkiller.cn"; } }); future.whenCompleteAsync(new BiConsumer<String, Throwable>() { @Override public void accept(String s, Throwable throwable) { System.out.println("whenCompleteAsync: " + System.currentTimeMillis() + " : " + s); System.out.println("whenCompleteAsync: " + System.currentTimeMillis() + " : " + throwable.toString()); } }); new Thread(new Runnable() { @Override public void run() { try { TimeUnit.SECONDS.sleep(5); } catch (Exception e) { //异常退出。 future.completeExceptionally(e); } // CompletableFuture被通知线程任务完成。 System.out.println("完成任务: " + System.currentTimeMillis()); future.complete("任务完成"); } }).start(); System.out.println("任务返回:" + future.get()); } }
completeExceptionally 抛出异常,终止执行
CompletableFuture completableFuture = CompletableFuture.completedFuture("message").thenApplyAsync(String::toUpperCase, CompletableFuture.delayedExecutor(1, TimeUnit.SECONDS)); completableFuture.completeExceptionally(new Exception("异常终止")); try { completableFuture.join(); System.out.println(completableFuture.get()); } catch (CompletionException ex) { // just for testing System.err.println("completed exceptionally: " + ex.getCause().getMessage()); }
package cn.netkiller.test; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; public class Test { public static void main(String[] args) throws InterruptedException, ExecutionException { CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { throw new RuntimeException("Neo!"); // return "Neo"; }).thenApply(i -> "Success: " + i); CompletableFuture<String> result = future.exceptionally(e -> { return e.toString(); }); System.out.println(result.get()); } }
public static CompletableFuture whenComplete(int a, int b){ return CompletableFuture.supplyAsync(() -> a/b) .whenComplete((result, ex) -> { if (null != ex) { System.out.println("whenComplete error:\t"+ex.getMessage()); } }); } try { System.out.println("success:\t"+whenComplete(6,3).get()); System.out.println("exception:\t"+whenComplete(6,0).get()); } catch (Exception exception){ System.out.println("catch===="+exception.getMessage()); } 输出结果: success: 2 whenComplete error: java.lang.ArithmeticException: / by zero catch====java.lang.ArithmeticException: / by zero
public static CompletableFuture divide(int a, int b){ return CompletableFuture.supplyAsync(() -> a/b) .handle((result, ex) -> { if (null != ex) { System.out.println(ex.getMessage()); return 0; } else { return result; } }); } try { System.out.println("success:\t"+divide(6,3).get()); System.out.println("exception:\t"+divide(6,0).get()); } catch (Exception exception){ System.out.println("catch="+exception.getMessage()); } 输出结果: success: 2 java.lang.ArithmeticException: / by zero exception: 0
package cn.netkiller.test; import lombok.SneakyThrows; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; public class CompletableFuturePipeline { private final Parallel parallel = new Parallel(); public CompletableFuturePipeline() { } public static void main(String[] args) throws ExecutionException, InterruptedException { System.out.println(Thread.currentThread().getName()); CompletableFuturePipeline test = new CompletableFuturePipeline(); test.begin().batch().run().end(); test.parallel().supplyAsync(() -> { System.out.println(Thread.currentThread().getName() + ": parallel1"); return "parallel1"; }).supplyAsync(() -> { System.out.println(Thread.currentThread().getName() + ": parallel2"); return "parallel2"; }).join(); } @SneakyThrows public CompletableFuturePipeline begin() { String method = Thread.currentThread().getStackTrace()[1].getMethodName(); parallel.runAsync(() -> { String thread = Thread.currentThread().getName(); System.out.printf("%s - %s\n", thread, method); }); return this; } public CompletableFuturePipeline run() throws ExecutionException, InterruptedException { String method = Thread.currentThread().getStackTrace()[1].getMethodName(); parallel.supplyAsync(() -> { System.out.printf("%s - %s\n", Thread.currentThread().getName(), method); return "OK"; }); System.out.println(this.asyncMethod1().get()); return this; } public CompletableFuturePipeline batch() { String method = Thread.currentThread().getStackTrace()[1].getMethodName(); Parallel batchs = this.parallel(); batchs.runAsync(() -> { String thread = Thread.currentThread().getName(); System.out.printf("%s - %s - task1\n", thread, method); }) .runAsync(() -> { try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { throw new RuntimeException(e); } String thread = Thread.currentThread().getName(); System.out.printf("%s - %s - task2\n", thread, method); }) .runAsync(() -> { String thread = Thread.currentThread().getName(); System.out.printf("%s - %s - task3\n", thread, method); }) .supplyAsync(() -> { String thread = Thread.currentThread().getName(); System.out.printf("%s - %s - finally\n", thread, method); return "finally"; }).join(); System.out.println(batchs.futures.size()); return this; } @SneakyThrows public CompletableFuturePipeline end() { String method = Thread.currentThread().getStackTrace()[1].getMethodName(); parallel.supplyAsync(() -> { System.out.printf("%s - %s\n", Thread.currentThread().getName(), method); return "End"; }); parallel.join(); List<CompletableFuture<String>> futures = parallel.get(); futures.stream().forEach(item -> { System.out.println(item.getNow("no result")); }); System.out.println(parallel.futures.size()); return this; } public Parallel parallel() { return new Parallel(); } public Parallel parallel(int size) { return new Parallel(size); } public CompletableFuture<String> asyncMethod1() { // Thread.currentThread().setName(Thread.currentThread().getName() + "-" + this.getClass().getSimpleName()); System.out.println(Thread.currentThread().getName() + ": asyncMethod1"); return CompletableFuture.completedFuture("Result1"); } public static class Parallel<T> { private final List<CompletableFuture> futures; // private final List<CompletableFuture<Void>> voids = new ArrayList<>(); Parallel() { this(10); } Parallel(int size) { futures = new ArrayList<>(size); } public Parallel runAsync(Runnable runnable) { futures.add(CompletableFuture.runAsync(runnable)); return this; } public Parallel runAsync(Runnable runnable, Executor executor) { futures.add(CompletableFuture.runAsync(runnable, executor)); return this; } public Parallel supplyAsync(Supplier<T> supplier) { futures.add(CompletableFuture.supplyAsync(supplier)); return this; } public Parallel supplyAsync(Supplier<T> supplier, Executor executor) { futures.add(CompletableFuture.supplyAsync(supplier, executor)); return this; } public List<CompletableFuture> get() { return futures; } public void join() { CompletableFuture.allOf(futures.toArray(new CompletableFuture[]{})).join(); } public void clear() { futures.clear(); } } }