CompletableFuture

上一章的线程池让我们”提交任务”。但拿到结果后呢?Future.get() 是阻塞的——你必须等结果出来才能继续,无法”完成后自动回调”。这是 Java 异步编程一直以来的痛点。

JDK 8 带来了 CompletableFuture——一个可组合、可回调、可异常处理的异步编程 API。它把 Java 异步编程拉到与 JavaScript Promise、C# Task、Scala Future 同一水平。在现代微服务架构里,“并行调多个接口聚合结果”几乎是标配——CompletableFuture 是这套打法的核心工具。

一、Future 的局限

先看 Future 的问题:

ExecutorService pool = Executors.newFixedThreadPool(4);
Future<String> future = pool.submit(() -> queryDb());

// 只能阻塞等
String result = future.get();   // 阻塞!

// 不能回调
// 不能组合:"等 A 完成后再用 A 的结果调 B"
// 不能合并:"同时跑 A 和 B,等都完成"
// 不能异常处理:"失败时返回默认值"

Future 的痛点:

  • 只能阻塞 get()——不能”完成时回调”。
  • 不能链式组合——A 完成后用结果跑 B,得手动 get() 后再 submit
  • 不能合并多个——AB 并行,要等都完成,得自己写 CountDownLatch
  • 异常处理弱——get()ExecutionException,但没法”失败回退到默认值”。

CompletableFuture 解决了所有这些痛点。

二、CompletableFuture 的创建

2.1 异步执行任务

// 无返回值
CompletableFuture<Void> f1 = CompletableFuture.runAsync(() -> {
    System.out.println("跑在 " + Thread.currentThread().getName());
});

// 有返回值
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> {
    return "结果 from " + Thread.currentThread().getName();
});

System.out.println(f2.join());   // join 类似 get,但不抛受检异常

runAsyncRunnable 返回 CompletableFuture<Void>supplyAsyncSupplier<T> 返回 CompletableFuture<T>

2.2 指定线程池

默认用 ForkJoinPool.commonPool()——前面说过,不要在 commonPool 跑阻塞任务。生产代码应指定自己的线程池:

ExecutorService myPool = Executors.newFixedThreadPool(8);
CompletableFuture<String> f = CompletableFuture.supplyAsync(() -> query(), myPool);

2.3 已知结果

CompletableFuture<String> done = CompletableFuture.completedFuture("已知值");
CompletableFuture<String> failed = CompletableFuture.failedFuture(new RuntimeException("失败"));

failedFuture 是 Java 9+ 的方法。

2.4 join vs get

方法阻塞异常
get()InterruptedException + ExecutionException
join()CompletionException(非受检)
getNow(T)未完成返回默认值
get(long, unit)超时抛 TimeoutException

join 适合在流式 API 末尾取值——不抛受检异常,省去 try-catch

三、异步回调

3.1 thenApply:转换结果

CompletableFuture<Integer> f = CompletableFuture
    .supplyAsync(() -> "42")
    .thenApply(Integer::parseInt)        // String → Integer
    .thenApply(x -> x * 2);              // 42 → 84

System.out.println(f.join());   // 84

thenApplyFunction<T, R>——把上一步结果转换为新值。

3.2 thenAccept:消费结果

CompletableFuture<Void> f = CompletableFuture
    .supplyAsync(() -> "hello")
    .thenAccept(s -> System.out.println("收到: " + s));   // 消费,无返回

thenAcceptConsumer<T>——消费结果,返回 CompletableFuture<Void>

3.3 thenRun:不关心结果,只触发动作

CompletableFuture<Void> f = CompletableFuture
    .supplyAsync(() -> "hello")
    .thenRun(() -> System.out.println("完成了,不在乎结果"));

thenRunRunnable——上一步完成后执行,不接收上一步结果。

3.4 回调的”Async”变体

每个回调方法都有 Async 变体:

.thenApply(fn)            // 在上一步完成的线程跑
.thenApplyAsync(fn)       // 提交到 ForkJoinPool.commonPool 跑
.thenApplyAsync(fn, pool) // 提交到指定 pool 跑

默认非 Async 在上一步完成的线程跑——可能是有意的(避免线程切换)也可能是坑(在主线程跑)。Async 重新提交到池里跑——明确线程切换。

实践建议:链短的话非 Async 即可,长链或怕阻塞主线程用 Async。

3.5 速查表

方法入参返回描述
thenApplyFunction<T,R>R转换
thenAcceptConsumer<T>void消费
thenRunRunnablevoid触发
thenApplyAsync同上同上异步执行

四、异常处理

CompletableFuture 的异常不在 get 时抛——可以在链里处理。

4.1 exceptionally:仅在异常时

CompletableFuture<String> f = CompletableFuture
    .supplyAsync(() -> { throw new RuntimeException("DB 挂了"); })
    .exceptionally(ex -> "默认值");
System.out.println(f.join());   // 默认值

exceptionallyFunction<Throwable, T>——只有上一步异常时才执行,返回默认值恢复正常。

4.2 handle:异常和正常都处理

CompletableFuture<String> f = CompletableFuture
    .supplyAsync(() -> { if (Math.random() < 0.5) throw new RuntimeException(); return "ok"; })
    .handle((result, ex) -> {
        if (ex != null) return "失败: " + ex.getMessage();
        return "成功: " + result;
    });

handleBiFunction<T, Throwable, R>——异常和正常都调用,根据情况返回。

4.3 whenComplete:观察但不修改

CompletableFuture<String> f = CompletableFuture
    .supplyAsync(() -> "ok")
    .whenComplete((result, ex) -> {
        if (ex != null) log.error("失败", ex);
        else log.info("成功: {}", result);
    });

whenCompleteBiConsumer<T, Throwable>——观察结果或异常,但不修改——返回类型和原 future 一致。常用于日志、监控、清理。

4.4 三者对比

方法触发时机能否修改结果
exceptionally仅异常✓(恢复成默认值)
handle异常和正常
whenComplete异常和正常✗(仅观察)

五、多任务组合

5.1 thenCombine:合并两个 Future

CompletableFuture<Integer> price = CompletableFuture.supplyAsync(() -> 100);
CompletableFuture<Double> discount = CompletableFuture.supplyAsync(() -> 0.8);

CompletableFuture<Integer> finalPrice = price.thenCombine(discount, (p, d) -> (int)(p * d));
System.out.println(finalPrice.join());   // 80

thenCombine 把两个 Future 的结果合并——两者并行跑,都完成后调用合并函数。

5.2 thenCompose:串联(flatmap)

// 串行:先查用户,再查订单
CompletableFuture<User> userFuture = CompletableFuture.supplyAsync(() -> findUser(1));
CompletableFuture<Order> orderFuture = userFuture.thenCompose(user ->
    CompletableFuture.supplyAsync(() -> findOrder(user.orderId))
);

thenComposeFunction<T, CompletableFuture<R>>——上一步完成后,用结果启动另一个异步任务,并把那个 Future 的结果作为最终结果。这是 flatMap 的语义——避免 CompletableFuture<CompletableFuture<R>> 嵌套。

5.3 allOf:等所有完成

CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> "A");
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> "B");
CompletableFuture<String> f3 = CompletableFuture.supplyAsync(() -> "C");

CompletableFuture<Void> all = CompletableFuture.allOf(f1, f2, f3);
all.join();   // 等三个都完成

// 取每个结果
String a = f1.join();
String b = f2.join();
String c = f3.join();

allOf 返回 CompletableFuture<Void>——所有都完成时完成。注意它不返回结果集合——要单独 join() 每个 future 取结果。

5.4 anyOf:任一完成

CompletableFuture<String> fast = CompletableFuture.supplyAsync(() -> {
    Thread.sleep(100); return "fast";
});
CompletableFuture<String> slow = CompletableFuture.supplyAsync(() -> {
    Thread.sleep(1000); return "slow";
});

Object first = CompletableFuture.anyOf(fast, slow).join();
System.out.println(first);   // fast

anyOf 任意一个完成即完成——返回最快的结果。常用于”多副本读取,最快返回的赢”。

5.5 速查表

方法描述
thenCombine(other, fn)合并两个 future 的结果
thenCompose(fn)串联——上一步结果启动新 future
allOf(f1, f2, ...)等所有完成
anyOf(f1, f2, ...)任一完成

六、其他常用方法

6.1 orTimeout / completeOnTimeout(Java 9+)

CompletableFuture<String> f = CompletableFuture
    .supplyAsync(() -> { Thread.sleep(5000); return "late"; })
    .orTimeout(1, TimeUnit.SECONDS);   // 1 秒超时抛 TimeoutException
// f.join() 抛 TimeoutException

CompletableFuture<String> f2 = CompletableFuture
    .supplyAsync(() -> { Thread.sleep(5000); return "late"; })
    .completeOnTimeout("default", 1, TimeUnit.SECONDS);   // 1 秒超时返回默认值
// f2.join() 返回 "default"

6.2 complete / completeExceptionally

手动完成 future:

CompletableFuture<String> f = new CompletableFuture<>();
f.complete("手动完成");   // 或 f.completeExceptionally(new RuntimeException())

常用于”把回调式 API 转成 CompletableFuture 式”。

6.3 delayedExecutor(Java 9+)

CompletableFuture<String> f = CompletableFuture
    .supplyAsync(() -> "delayed",
        CompletableFuture.delayedExecutor(1, TimeUnit.SECONDS));
// 1 秒后才执行

七、实战:异步聚合多个接口

下面这个例子模拟”微服务网关”——并行调用户、订单、商品三个接口,聚合返回。同时演示 thenCompose 串联、thenCombine 合并、allOf 等所有完成、anyOf 任一完成、异常处理。

Java · 在线运行

观察重点

  • thenApply 链"42"4284,每步转换。
  • exceptionally:异常时返回默认值,恢复正常。
  • thenCompose:用户查询和订单查询串行——总耗时 ≈ 200+300=500ms(不是并行)。
  • allOf:三个接口并行——总耗时 ≈ max(300, 250, 200) = 300ms,不是串行的 750ms。 > - anyOf:副本更快(100ms < 200ms),返回副本结果。
  • thenCombine:price 和 discount 并行,合并成最终价。
  • orTimeout:500ms 超时——任务 2000ms 没完成,500ms 时抛 TimeoutException
  • 失败重试:用 handle 递归——前两次失败,第三次成功。

八、本章小结

方法用途
runAsync/supplyAsync创建异步任务
thenApply转换结果
thenAccept/thenRun消费/触发
thenCombine合并两个 future
thenCompose串联(flatmap)
allOf等所有完成
anyOf任一完成
exceptionally仅异常时恢复
handle异常/正常都处理
whenComplete观察,不修改
orTimeout超时抛异常(Java 9+)
completeOnTimeout超时返回默认值(Java 9+)
概念核心要点
Future 局限只能阻塞 get,不能回调/组合/异常处理
Async 变体提交到池执行,非 Async 在上一步线程跑
thenApply vs thenCompose前者转换值,后者串联另一个 future
thenCombine vs allOf前者合并两个结果,后者等所有但取值要单独 join
exceptionally vs handle前者仅异常,后者异常和正常都处理
默认线程池ForkJoinPool.commonPool(),生产要指定自定义池
join vs getjoin 不抛受检异常,方便链式

记忆口诀

  • thenApply 转换,thenCompose 串联,thenCombine 合并——三个核心组合。
  • allOf 等所有,anyOf 任一完成——多任务协调。
  • exceptionally 异常恢复,handle 异常和正常都处理——异常工具。
  • 默认 commonPool 别用——指定自己的线程池跑阻塞任务。
  • orTimeout 防止永远等——Java 9+ 超时利器。

结语:从同步到异步

CompletableFuture 是 Java 异步编程的分水岭——它让”并行调多个接口、串行编排、异常回退、超时控制”成为可能。在微服务架构里,网关聚合多个下游接口、批量请求并发处理,都靠它。

CompletableFuture 的链式 API 也有局限——长链可读性差、调试栈跟踪丢失、阻塞调用还是阻塞(只是换了个线程跑)。Java 21 引入的虚拟线程给出了另一种思路——用同步代码风格获得异步性能,根本不需要 CompletableFuture 编排。

下一章我们讲 虚拟线程与结构化并发——Java 并发的未来。这是 20 年来 Java 并发最大的变革,也是这一阶段的收官。