| 知乎专栏 |
public Future<Double> getPriceAsync(String product) {
//创建 CompletableFuture 对象,对象中包含异步计算结果
CompletableFuture<Double> futurePrice = new CompletableFuture<>();
//新建线程计算商品价格
new Thread(() -> {
try {
double price = calculatePrice(product);
//将异步计算得到的结果设置到 CompletableFuture 中,
futurePrice.complete(price);
} catch (Exception e) {
futurePrice.completeExceptionally(e);
}
}).start();
//无需等待计算结果,直接返回 CompletableFuture 对象
return futurePrice;
}
runAsync 创建没有返回值的异步任务。它有如下两个方法,一个是使用默认线程池(ForkJoinPool.commonPool())的方法,一个是自定义线程池的重载方法
package cn.netkiller;
import cn.netkiller.thread.ThreadManager;
import lombok.SneakyThrows;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Main {
@SneakyThrows
public static void main(String[] args) {
CompletableFuture.runAsync(() -> {
System.out.println("do something...");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
ExecutorService executorService = Executors.newSingleThreadExecutor();
CompletableFuture.runAsync(() -> {
System.out.println("do something...");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}, executorService);
CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
System.out.println("do something....");
// Thread.currentThread().setName("测试有返回值的异步执行");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
ThreadManager tm = new ThreadManager();
System.out.println(tm.show());
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("Result ->" + completableFuture.isDone());
}
}
do something... do something... do something.... ======================================================================================= | ID | Name | Group | Daemon | State | Priority | --------------------------------------------------------------------------------------- | 1 | main | main | false | RUNNABLE | 5 | | 21 | ForkJoinPool.commonPool-worker-1 | main | true | TIMED_WAITING | 5 | | 22 | pool-1-thread-1 | main | false | TIMED_WAITING | 5 | | 23 | ForkJoinPool.commonPool-worker-2 | main | true | TIMED_WAITING | 5 | ======================================================================================= Result ->true
package cn.netkiller.test;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
public class Test {
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println(Thread.currentThread());
AtomicInteger variable = new AtomicInteger(0);
CompletableFuture<Void> runAsync = CompletableFuture.runAsync(() -> process(variable));
runAsync.join();
System.out.println(variable.get());
}
public static void process(AtomicInteger variable) {
System.out.println(Thread.currentThread().getName() + " Process...");
variable.set(1024);
}
}
thenRun/thenRunAsync 功能是什么?完成前置任务之后,自己在执行。
thenRun/thenRunAsync 区别是什么?thenRun 使用同一个线程执行任务,thenRunAsync 会再开一个新线程执行任务。
@GetMapping("/completableFutureRun")
public String completableFutureRun() {
CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
System.out.println(Thread.currentThread().getName() + " - CompletableFuture 前置任务");
try {
Thread.sleep(5 * 1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
CompletableFuture thenRun = completableFuture.thenRun(() -> {
System.out.println(Thread.currentThread().getName() + " - 接着执行第二个 thenRun 任务");
});
CompletableFuture thenRunAsync = completableFuture.thenRunAsync(() -> {
System.out.println(Thread.currentThread().getName() + " - 接着执行第二个 thenRunAsync 任务");
});
return "Done";
}
运行结果
ForkJoinPool.commonPool-worker-1 - CompletableFuture 前置任务 ForkJoinPool.commonPool-worker-1 - 接着执行第二个 thenRun 任务 ForkJoinPool.commonPool-worker-2 - 接着执行第二个 thenRunAsync 任务
这里可以看到 thenRunAsync 的线程变化,开启新线程 ForkJoinPool.commonPool-worker-2 处理任务
supplyAsync 创建带有返回值的异步任务。它有如下两个方法,一个是使用默认线程池(ForkJoinPool.commonPool())的方法,一个是带有自定义线程池的重载方法
// 带返回值异步请求,默认线程池 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) // 带返回值的异步请求,可以自定义线程池 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
package cn.netkiller;
import cn.netkiller.thread.ThreadManager;
import lombok.SneakyThrows;
import java.util.concurrent.CompletableFuture;
public class Main {
@SneakyThrows
public static void main(String[] args) {
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("do something....");
return "done";
});
System.out.println("Result ->" + completableFuture.get());
ThreadManager tm = new ThreadManager();
System.out.println(tm.show());
}
}
运行结果
do something.... Result ->done ======================================================================================= | ID | Name | Group | Daemon | State | Priority | --------------------------------------------------------------------------------------- | 1 | main | main | false | RUNNABLE | 5 | | 21 | ForkJoinPool.commonPool-worker-1 | main | true | TIMED_WAITING | 5 | =======================================================================================
package cn.netkiller;
import cn.netkiller.thread.ThreadManager;
import lombok.SneakyThrows;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Main {
@SneakyThrows
public static void main(String[] args) {
ExecutorService executorService = Executors.newSingleThreadExecutor();
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("do something....");
try {
Thread.sleep(30000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "done";
}, executorService);
System.out.println("Result ->" + completableFuture.get());
ThreadManager tm = new ThreadManager();
System.out.println(tm.show());
}
}
运行结果
do something.... Result ->done ====================================================================== | ID | Name | Group | Daemon | State | Priority | ---------------------------------------------------------------------- | 1 | main | main | false | RUNNABLE | 5 | | 21 | pool-1-thread-1 | main | false | WAITING | 5 | ======================================================================
设置线程名称
package cn.netkiller;
import cn.netkiller.thread.ThreadManager;
import lombok.SneakyThrows;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Main {
@SneakyThrows
public static void main(String[] args) {
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("do something....");
Thread.currentThread().setName("测试有返回值的异步执行");
return "done";
});
System.out.println("Result ->" + completableFuture.get());
ThreadManager tm = new ThreadManager();
System.out.println(tm.show());
}
}
运行结果
do something.... Result ->done ================================================================== | ID | Name | Group | Daemon | State | Priority | ------------------------------------------------------------------ | 1 | main | main | false | RUNNABLE | 5 | | 21 | 测试有返回值的异步执行 | main | true | TIMED_WAITING | 5 | ==================================================================
通过 Supplier 对象创建异步执行
CompletableFuture<String> future = CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
return "https://www.netkiller.cn";
}
}).exceptionally((throwable) -> {
return throwable.toString();
});
thenAccept/thenAcceptAsync 的功能是,前置任务执行完毕之后,将返回值给到 thenAccept/thenAcceptAsync,再执行接下来的任务。
@GetMapping("/completableFutureAccept")
public String completableFutureAccept() {
CompletableFuture<String> supplyAsync = CompletableFuture.supplyAsync(() -> {
log.info(Thread.currentThread().getName() + " - CompletableFuture 前置任务");
try {
Thread.sleep(5 * 1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "前置任务执行完成";
});
CompletableFuture<Void> thenAccept = supplyAsync.thenAccept((rev) -> {
log.info(Thread.currentThread().getName() + " - 接着执行第二个 thenAccept 任务");
log.info("前置任务返回值:" + rev);
});
CompletableFuture<Void> thenAcceptAsync = supplyAsync.thenAcceptAsync((rev) -> {
log.info(Thread.currentThread().getName() + " - 接着执行第二个 thenAcceptAsync 任务");
log.info("前置任务返回值:" + rev);
});
return "Done";
}
输出结果
2023-05-10T10:38:48.008+08:00 INFO 96282 --- [onPool-worker-1] c.n.c.test.TestThreadController : ForkJoinPool.commonPool-worker-1 - CompletableFuture 前置任务 2023-05-10T10:38:53.015+08:00 INFO 96282 --- [onPool-worker-2] c.n.c.test.TestThreadController : ForkJoinPool.commonPool-worker-2 - 接着执行第二个 thenAcceptAsync 任务 2023-05-10T10:38:53.015+08:00 INFO 96282 --- [onPool-worker-1] c.n.c.test.TestThreadController : ForkJoinPool.commonPool-worker-1 - 接着执行第二个 thenAccept 任务 2023-05-10T10:38:53.016+08:00 INFO 96282 --- [onPool-worker-2] c.n.c.test.TestThreadController : 前置任务返回值:前置任务执行完成 2023-05-10T10:38:53.016+08:00 INFO 96282 --- [onPool-worker-1] c.n.c.test.TestThreadController : 前置任务返回值:前置任务执行完成
CompletableFuture<String> completableFuture = CompletableFuture.completedFuture("Hello netkiller");
if (completableFuture.isDone()) {
System.out.println(completableFuture.get());
}
Spring Service 用法
@Service
public class MyService {
@Async
public CompletableFuture<String> asyncMethod() {
// 异步方法逻辑...
return CompletableFuture.completedFuture("Result");
}
}
// 调用异步方法并获取结果
CompletableFuture<String> future = myService.asyncMethod();
String result = future.get(); // 阻塞等待结果
thenApply/thenApplyAsync 前置任务执行完毕之后,结果作为入参,thenApply/thenApplyAsync 执行完毕之后再返回执行结果
@GetMapping("/completableFutureApply")
public String completableFutureApply() throws ExecutionException, InterruptedException {
CompletableFuture<String> supplyAsync = CompletableFuture.supplyAsync(() -> {
log.info(Thread.currentThread().getName() + " - CompletableFuture 前置任务");
try {
Thread.sleep(5 * 1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "第一步";
});
CompletableFuture<String> thenApply = supplyAsync.thenApply((rev) -> {
log.info(Thread.currentThread().getName() + " - 接着执行第二个 thenApply 任务");
log.info("前置任务返回值:" + rev);
return "第二步";
});
CompletableFuture<String> thenApplyAsync = supplyAsync.thenApplyAsync((rev) -> {
log.info(Thread.currentThread().getName() + " - 接着执行第二个 thenApplyAsync 任务");
log.info("前置任务返回值:" + rev);
return "第二步";
});
log.info("supplyAsync:{}", supplyAsync.get());
log.info("thenApply:{}", thenApply.get());
log.info("thenApplyAsync:{}", thenApplyAsync.get());
return "Done";
}
2023-05-10T10:39:57.913+08:00 INFO 96282 --- [onPool-worker-2] c.n.c.test.TestThreadController : ForkJoinPool.commonPool-worker-2 - CompletableFuture 前置任务 2023-05-10T10:40:02.917+08:00 INFO 96282 --- [ XNIO-1 task-2] c.n.c.test.TestThreadController : XNIO-1 task-2 - 接着执行第二个 thenApply 任务 2023-05-10T10:40:02.917+08:00 INFO 96282 --- [onPool-worker-2] c.n.c.test.TestThreadController : ForkJoinPool.commonPool-worker-2 - 接着执行第二个 thenApplyAsync 任务 2023-05-10T10:40:02.918+08:00 INFO 96282 --- [ XNIO-1 task-2] c.n.c.test.TestThreadController : 前置任务返回值:第一步 2023-05-10T10:40:02.918+08:00 INFO 96282 --- [onPool-worker-2] c.n.c.test.TestThreadController : 前置任务返回值:第一步 2023-05-10T10:40:02.918+08:00 INFO 96282 --- [ XNIO-1 task-2] c.n.c.test.TestThreadController : supplyAsync:第一步 2023-05-10T10:40:02.918+08:00 INFO 96282 --- [ XNIO-1 task-2] c.n.c.test.TestThreadController : thenApply:第二步 2023-05-10T10:40:02.919+08:00 INFO 96282 --- [ XNIO-1 task-2] c.n.c.test.TestThreadController : thenApplyAsync:第二步
runAsync 配合 thenRun/thenRunAsync 使用
runAsync --> thenRun/thenRunAsync 无返回值
supplyAsync 配合 thenAccept/thenAcceptAsync 使用
supplyAsync -- 返回值 --> thenAccept/thenAcceptAsync --> 无返回值
supplyAsync 配合 thenApply/thenApplyAsync 使用
supplyAsync -- 返回值 --> thenApply/thenApplyAsync -- 返回值 -->
whenComplete 与 runAsync / thenAccept / thenApply 区别是能处理 Throwable
@GetMapping("/completableFutureWhenComplete")
public String completableFutureWhenComplete() throws ExecutionException, InterruptedException {
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("当前线程名称:" + Thread.currentThread().getName());
return "前置任务完成";
}).whenComplete((result, throwable) -> {
System.out.println("前置任务返回值:" + result);
});
System.out.println(completableFuture.get());
return "Done";
}
运行结果
当前线程名称:ForkJoinPool.commonPool-worker-1 前置任务返回值:前置任务完成 前置任务完成
future.whenCompleteAsync(new BiConsumer<String, Throwable>() {
@Override
public void accept(String s, Throwable throwable) {
System.out.println("whenCompleteAsync: " + System.currentTimeMillis() + " : " + s);
System.out.println("whenCompleteAsync: " + System.currentTimeMillis() + " : " + throwable.toString());
}
});
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(5 * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "hello";
});
future.completeOnTimeout("default timeout result", 3 * 1000, TimeUnit.MILLISECONDS);
@Service
public class MyService {
@Async("threadPoolTaskExecutor")
public CompletableFuture<String> asyncMethod1() {
// 异步方法1逻辑...
return CompletableFuture.completedFuture("Result1");
}
@Async("threadPoolTaskExecutor")
public CompletableFuture<String> asyncMethod2() {
// 异步方法2逻辑...
return CompletableFuture.completedFuture("Result2");
}
}
// 调用异步方法并处理结果顺序
CompletableFuture<String> future1 = myService.asyncMethod1();
CompletableFuture<String> future2 = future1.thenCompose(result1 -> myService.asyncMethod2());
String finalResult = future2.get(); // 阻塞等待最终结果
thenCombine会将两个任务的执行结果作为所提供函数的参数,且该方法有返回值。
thenAcceptBoth同样将两个任务的执行结果作为方法入参,但是无返回值。
runAfterBoth没有入参,也没有返回值。注意两个任务中只要有一个执行异常,则将该异常信息作为指定任务的执行结果。
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread() + " completableFuture1 do something....");
return 1;
});
CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread() + " completableFuture2 do something....");
return 2;
});
CompletableFuture<Integer> completableFuture3 = completableFuture1.thenCombine(completableFuture2, (a, b) -> {
System.out.println(Thread.currentThread() + " completableFuture3 do something....");
return a + b;
});
System.out.println("completableFuture3结果->" + completableFuture3.get());
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread() + " completableFuture1 do something....");
return 1;
});
CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread() + " completableFuture2 do something....");
return 2;
});
CompletableFuture<Void> completableFuture3 = completableFuture1.thenAcceptBoth(completableFuture2, (a, b) -> {
System.out.println(Thread.currentThread() + " completableFuture3 do something....");
System.out.println(a + b);
});
System.out.println("completableFuture3结果->" + completableFuture3.get());
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread() + " completableFuture1 do something....");
return 1;
});
CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread() + " completableFuture2 do something....");
return 2;
});
CompletableFuture<Void> completableFuture3 = completableFuture1.runAfterBoth(completableFuture2, () -> {
System.out.println(Thread.currentThread() + " completableFuture3 do something....");
});
System.out.println("completableFuture3结果->" + completableFuture3.get());
}
这三个方法和上面一样也是将两个CompletableFuture组合起来处理,当有一个任务正常完成时,就会进行下阶段任务。
applyToEither会将已经完成任务的执行结果作为所提供函数的参数,且该方法有返回值;
acceptEither同样将已经完成任务的执行结果作为方法入参,但是无返回值;
runAfterEither没有入参,也没有返回值。
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> completableFuture1 = CompletableFuture.supplyAsync(() -> {
try {
System.out.println(Thread.currentThread() + " completableFuture1 do something....");
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "completableFuture1 任务完成";
});
CompletableFuture<String> completableFuture2 = CompletableFuture.supplyAsync(() -> {
try {
System.out.println(Thread.currentThread() + " completableFuture2 do something....");
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "completableFuture2 任务完成";
});
CompletableFuture<String> completableFuture3 = completableFuture1.applyToEither(completableFuture2, (result) -> {
System.out.println("接收到" + result);
System.out.println(Thread.currentThread() + " completableFuture3 do something....");
return "completableFuture3 任务完成";
});
System.out.println("completableFuture3结果->" + completableFuture3.get());
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> completableFuture1 = CompletableFuture.supplyAsync(() -> {
try {
System.out.println(Thread.currentThread() + " completableFuture1 do something....");
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "completableFuture1 任务完成";
});
CompletableFuture<String> completableFuture2 = CompletableFuture.supplyAsync(() -> {
try {
System.out.println(Thread.currentThread() + " completableFuture2 do something....");
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "completableFuture2 任务完成";
});
CompletableFuture<Void> completableFuture3 = completableFuture1.acceptEither(completableFuture2, (result) -> {
System.out.println("接收到" + result);
System.out.println(Thread.currentThread() + " completableFuture3 do something....");
});
System.out.println("completableFuture3结果->" + completableFuture3.get());
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> completableFuture1 = CompletableFuture.supplyAsync(() -> {
try {
System.out.println(Thread.currentThread() + " completableFuture1 do something....");
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("completableFuture1 任务完成");
return "completableFuture1 任务完成";
});
CompletableFuture<String> completableFuture2 = CompletableFuture.supplyAsync(() -> {
try {
System.out.println(Thread.currentThread() + " completableFuture2 do something....");
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("completableFuture2 任务完成");
return "completableFuture2 任务完成";
});
CompletableFuture<Void> completableFuture3 = completableFuture1.runAfterEither(completableFuture2, () -> {
System.out.println(Thread.currentThread() + " completableFuture3 do something....");
System.out.println("completableFuture3 任务完成");
});
System.out.println("completableFuture3结果->" + completableFuture3.get());
}
allOf:CompletableFuture是多个任务都执行完成后才会执行,只有有一个任务执行异常,则返回的CompletableFuture执行get方法时会抛出异常,如果都是正常执行,则get返回null。
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "AAA").whenComplete((value, throwable) -> {
System.out.println(value);
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "BBB").whenComplete((value, throwable) -> {
System.out.println(value);
});
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> "CCC").whenComplete((value, throwable) -> {
System.out.println(value);
});
CompletableFuture.allOf(new CompletableFuture[]{future1, future2, future3}).join();
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "AAA").whenComplete((value, throwable) -> {
System.out.println(value);
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "BBB").whenComplete((value, throwable) -> {
System.out.println(value);
});
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> "CCC").whenComplete((value, throwable) -> {
System.out.println(value);
});
CompletableFuture.allOf(future1, future2, future3).join();
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "AAA").whenComplete((value, throwable) -> {
System.out.println(value);
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "BBB").whenComplete((value, throwable) -> {
System.out.println(value);
});
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> "CCC").whenComplete((value, throwable) -> {
System.out.println(value);
});
List<CompletableFuture<String>> completableFutures = Stream.of(future1, future2, future3).toList();
var completableFutureArray = completableFutures.toArray(CompletableFuture[]::new);
CompletableFuture.allOf(completableFutureArray).join();
anyOf:CompletableFuture是多个任务只要有一个任务执行完成,则返回的CompletableFuture执行get方法时会抛出异常,如果都是正常执行,则get返回执行完成任务的结果。
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "AAA").whenComplete((value, throwable) -> {
System.out.println(value);
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "BBB").whenComplete((value, throwable) -> {
System.out.println(value);
});
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> "CCC").whenComplete((value, throwable) -> {
System.out.println(value);
});
CompletableFuture<Object> completableFuture = CompletableFuture.anyOf(future1, future2, future3);
System.out.println(completableFuture.get());
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "AAA").thenApply((value) -> {
System.out.println(value);
return value;
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "BBB").thenApply((value) -> {
System.out.println(value);
return value;
});
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> "CCC").thenApply((value) -> {
System.out.println(value);
return value;
});
List<CompletableFuture<String>> completableFutures = Stream.of(future1, future2, future3).toList();
var completableFutureArray = completableFutures.toArray(CompletableFuture[]::new);
CompletableFuture.allOf(completableFutureArray).join();
completableFutures.forEach(task -> {
try {
System.out.println(task.get());
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
});
package cn.netkiller.test;
import lombok.Data;
import java.util.concurrent.CompletableFuture;
@Data
public class CompletableFutureExample {
public static void main(String[] args) {
CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> {
System.out.println("Task 1 started");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Task 1 completed");
});
CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> {
System.out.println("Task 2 started");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Task 2 completed");
});
CompletableFuture<Void> future3 = CompletableFuture.runAsync(() -> {
System.out.println("Task 3 started");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Task 3 completed");
});
CompletableFuture<Void> allFutures = CompletableFuture.allOf(future1, future2, future3);
allFutures.thenRun(() -> {
System.out.println("All tasks completed");
// 在这里执行下一步操作
});
// 防止 JVM 在 CompletableFuture 执行完之前退出
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
package cn.netkiller.test;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
public class Test {
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println(Thread.currentThread());
Parallel parallel = new Parallel();
parallel.addAsyncTask(() -> {
System.out.println(Thread.currentThread().getName());
return "task1";
})
.addAsyncTask(() -> {
System.out.println(Thread.currentThread().getName());
return "task2";
})
.addAsyncTask(() -> {
System.out.println(Thread.currentThread().getName());
return "task3";
})
.addAsyncTask(() -> {
System.out.println(Thread.currentThread().getName());
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "finally";
}).join();
List<CompletableFuture<String>> futures = parallel.get();
futures.stream().forEach(item -> {
System.out.println(item.getNow("no result"));
});
}
public static class Parallel<T> {
private final List<CompletableFuture<T>> futures;
Parallel() {
this(10);
}
Parallel(int size) {
futures = new ArrayList<>(size);
}
public Parallel addAsyncTask(Supplier<T> supplier) {
futures.add(CompletableFuture.supplyAsync(supplier));
return this;
}
public List<CompletableFuture<T>> get() {
return futures;
}
public void join() {
CompletableFuture.allOf(futures.toArray(new CompletableFuture[]{})).join();
}
public void clear() {
futures.clear();
}
}
}
package cn.netkiller.test;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
public class Test {
public static void main(String[] args) throws InterruptedException, ExecutionException {
CompletableFuture<String> future = CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
try {
while (true) {
TimeUnit.SECONDS.sleep(1);
System.out.println("supplyAsync: " + System.currentTimeMillis());
}
} catch (Exception e) {
e.printStackTrace();
}
return "https://www.netkiller.cn";
}
});
future.whenCompleteAsync(new BiConsumer<String, Throwable>() {
@Override
public void accept(String s, Throwable throwable) {
System.out.println("whenCompleteAsync: " + System.currentTimeMillis() + " : " + s);
System.out.println("whenCompleteAsync: " + System.currentTimeMillis() + " : " + throwable.toString());
}
});
new Thread(new Runnable() {
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(5);
} catch (Exception e) {
//异常退出。
future.completeExceptionally(e);
}
// CompletableFuture被通知线程任务完成。
System.out.println("完成任务: " + System.currentTimeMillis());
future.complete("任务完成");
}
}).start();
System.out.println("任务返回:" + future.get());
}
}
completeExceptionally 抛出异常,终止执行
CompletableFuture completableFuture = CompletableFuture.completedFuture("message").thenApplyAsync(String::toUpperCase, CompletableFuture.delayedExecutor(1, TimeUnit.SECONDS));
completableFuture.completeExceptionally(new Exception("异常终止"));
try {
completableFuture.join();
System.out.println(completableFuture.get());
} catch (CompletionException ex) { // just for testing
System.err.println("completed exceptionally: " + ex.getCause().getMessage());
}
package cn.netkiller.test;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class Test {
public static void main(String[] args) throws InterruptedException, ExecutionException {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
throw new RuntimeException("Neo!");
// return "Neo";
}).thenApply(i -> "Success: " + i);
CompletableFuture<String> result = future.exceptionally(e -> {
return e.toString();
});
System.out.println(result.get());
}
}
public static CompletableFuture whenComplete(int a, int b){
return CompletableFuture.supplyAsync(() -> a/b)
.whenComplete((result, ex) -> {
if (null != ex) {
System.out.println("whenComplete error:\t"+ex.getMessage());
}
});
}
try {
System.out.println("success:\t"+whenComplete(6,3).get());
System.out.println("exception:\t"+whenComplete(6,0).get());
} catch (Exception exception){
System.out.println("catch===="+exception.getMessage());
}
输出结果:
success: 2
whenComplete error: java.lang.ArithmeticException: / by zero
catch====java.lang.ArithmeticException: / by zero
public static CompletableFuture divide(int a, int b){
return CompletableFuture.supplyAsync(() -> a/b)
.handle((result, ex) -> {
if (null != ex) {
System.out.println(ex.getMessage());
return 0;
} else {
return result;
}
});
}
try {
System.out.println("success:\t"+divide(6,3).get());
System.out.println("exception:\t"+divide(6,0).get());
} catch (Exception exception){
System.out.println("catch="+exception.getMessage());
}
输出结果:
success: 2
java.lang.ArithmeticException: / by zero
exception: 0
package cn.netkiller.test;
import lombok.SneakyThrows;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
public class CompletableFuturePipeline {
private final Parallel parallel = new Parallel();
public CompletableFuturePipeline() {
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println(Thread.currentThread().getName());
CompletableFuturePipeline test = new CompletableFuturePipeline();
test.begin().batch().run().end();
test.parallel().supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + ": parallel1");
return "parallel1";
}).supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + ": parallel2");
return "parallel2";
}).join();
}
@SneakyThrows
public CompletableFuturePipeline begin() {
String method = Thread.currentThread().getStackTrace()[1].getMethodName();
parallel.runAsync(() -> {
String thread = Thread.currentThread().getName();
System.out.printf("%s - %s\n", thread, method);
});
return this;
}
public CompletableFuturePipeline run() throws ExecutionException, InterruptedException {
String method = Thread.currentThread().getStackTrace()[1].getMethodName();
parallel.supplyAsync(() -> {
System.out.printf("%s - %s\n", Thread.currentThread().getName(), method);
return "OK";
});
System.out.println(this.asyncMethod1().get());
return this;
}
public CompletableFuturePipeline batch() {
String method = Thread.currentThread().getStackTrace()[1].getMethodName();
Parallel batchs = this.parallel();
batchs.runAsync(() -> {
String thread = Thread.currentThread().getName();
System.out.printf("%s - %s - task1\n", thread, method);
})
.runAsync(() -> {
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
String thread = Thread.currentThread().getName();
System.out.printf("%s - %s - task2\n", thread, method);
})
.runAsync(() -> {
String thread = Thread.currentThread().getName();
System.out.printf("%s - %s - task3\n", thread, method);
})
.supplyAsync(() -> {
String thread = Thread.currentThread().getName();
System.out.printf("%s - %s - finally\n", thread, method);
return "finally";
}).join();
System.out.println(batchs.futures.size());
return this;
}
@SneakyThrows
public CompletableFuturePipeline end() {
String method = Thread.currentThread().getStackTrace()[1].getMethodName();
parallel.supplyAsync(() -> {
System.out.printf("%s - %s\n", Thread.currentThread().getName(), method);
return "End";
});
parallel.join();
List<CompletableFuture<String>> futures = parallel.get();
futures.stream().forEach(item -> {
System.out.println(item.getNow("no result"));
});
System.out.println(parallel.futures.size());
return this;
}
public Parallel parallel() {
return new Parallel();
}
public Parallel parallel(int size) {
return new Parallel(size);
}
public CompletableFuture<String> asyncMethod1() {
// Thread.currentThread().setName(Thread.currentThread().getName() + "-" + this.getClass().getSimpleName());
System.out.println(Thread.currentThread().getName() + ": asyncMethod1");
return CompletableFuture.completedFuture("Result1");
}
public static class Parallel<T> {
private final List<CompletableFuture> futures;
// private final List<CompletableFuture<Void>> voids = new ArrayList<>();
Parallel() {
this(10);
}
Parallel(int size) {
futures = new ArrayList<>(size);
}
public Parallel runAsync(Runnable runnable) {
futures.add(CompletableFuture.runAsync(runnable));
return this;
}
public Parallel runAsync(Runnable runnable, Executor executor) {
futures.add(CompletableFuture.runAsync(runnable, executor));
return this;
}
public Parallel supplyAsync(Supplier<T> supplier) {
futures.add(CompletableFuture.supplyAsync(supplier));
return this;
}
public Parallel supplyAsync(Supplier<T> supplier, Executor executor) {
futures.add(CompletableFuture.supplyAsync(supplier, executor));
return this;
}
public List<CompletableFuture> get() {
return futures;
}
public void join() {
CompletableFuture.allOf(futures.toArray(new CompletableFuture[]{})).join();
}
public void clear() {
futures.clear();
}
}
}