异步与协程精讲 / 第10章:Java 虚拟线程 —— Loom 项目的新篇章
第10章:Java 虚拟线程 —— Loom 项目的新篇章
10.1 为什么 Java 需要虚拟线程?
Java 生态中的主流异步方案(CompletableFuture、Reactor、RxJava)虽然解决了性能问题,但带来了巨大的认知负担和调试困难。
传统线程模型的困境
// 传统方式:一个请求一个线程
// 问题:10,000 个并发请求 = 10,000 个线程 = ~10GB 内存
ExecutorService pool = Executors.newFixedThreadPool(200);
pool.submit(() -> {
var user = db.findUser(userId); // 阻塞 50ms
var orders = db.findOrders(userId); // 阻塞 50ms
var recs = callRecommendation(userId); // 阻塞 100ms
return buildResponse(user, orders, recs);
});
异步方式的困境
// 异步方式:代码复杂度爆炸
CompletableFuture.supplyAsync(() -> db.findUser(userId))
.thenCompose(user ->
CompletableFuture.allOf(
db.findOrders(userId),
callRecommendation(userId)
).thenApply(results -> buildResponse(user, results))
)
.exceptionally(err -> handleError(err));
Project Loom 的答案
Project Loom 的目标:让简单的阻塞代码也能高效运行。
// 虚拟线程:阻塞但高效
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
executor.submit(() -> {
var user = db.findUser(userId); // 阻塞,但不占用平台线程
var orders = db.findOrders(userId); // 阻塞,但不占用平台线程
var recs = callRecommendation(userId); // 阻塞,但不占用平台线程
return buildResponse(user, orders, recs);
});
}
10.2 虚拟线程基础
创建虚拟线程
// 方式一:直接创建
Thread vt = Thread.ofVirtual().name("my-vt").start(() -> {
System.out.println("运行在虚拟线程: " + Thread.currentThread());
});
// 方式二:使用 ExecutorService(推荐)
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
for (int i = 0; i < 100_000; i++) {
final int taskId = i;
executor.submit(() -> {
Thread.sleep(Duration.ofSeconds(1));
return taskId;
});
}
} // 自动等待所有任务完成
// 方式三:虚拟线程工厂
ThreadFactory factory = Thread.ofVirtual().factory();
Thread vt = factory.newThread(() -> doWork());
vt.start();
虚拟线程 vs 平台线程
| 特性 | 平台线程(Platform Thread) | 虚拟线程(Virtual Thread) |
|---|---|---|
| 实现 | 1:1 映射 OS 线程 | M:N 映射到载体线程 |
| 栈大小 | ~1MB(固定) | ~几百字节初始(按需增长) |
| 创建开销 | ~1ms | ~几μs |
| 最大数量 | 数千 | 数百万 |
| 阻塞代价 | 高(OS 线程阻塞) | 低(自动卸载载体线程) |
| 适用场景 | CPU 密集型 | I/O 密集型 |
载体线程(Carrier Thread)
虚拟线程的工作原理:
虚拟线程 1 虚拟线程 2 虚拟线程 3
(运行中) (阻塞中) (运行中)
│ │ │
│ │ (挂载到栈) │
▼ ▼ ▼
┌─────────┐ ┌─────────┐ ┌─────────┐
│载体线程 0│ │载体线程 1│ │载体线程 2│
│(OS线程) │ │(OS线程) │ │(OS线程) │
└─────────┘ └─────────┘ └─────────┘
当虚拟线程阻塞时(如 I/O、sleep、锁):
→ 虚拟线程的栈帧被复制到堆内存
→ 载体线程被释放去执行其他虚拟线程
→ 阻塞结束后,虚拟线程被重新调度到载体线程
10.3 结构化并发(Structured Concurrency)
概念
结构化并发确保:
- 子任务的生命周期不超过父任务
- 父任务可以看到所有子任务的结果
- 取消父任务时,所有子任务也被取消
// Java 21 预览(StructuredTaskScope)
import jdk.incubator.concurrent.StructuredTaskScope;
Response handle(Request request) throws Exception {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
// 启动子任务
Subtask<User> userTask = scope.fork(() -> findUser(request.userId()));
Subtask<List<Order>> orderTask = scope.fork(() -> fetchOrders(request.userId()));
Subtask<Recs> recsTask = scope.fork(() -> getRecommendations(request.userId()));
// 等待所有子任务完成或失败
scope.join().throwIfFailed();
// 获取结果(此时所有子任务都已完成)
return new Response(userTask.get(), orderTask.get(), recsTask.get());
}
}
ShutdownOnFailure vs ShutdownOnSuccess
| 策略 | 行为 | 适用场景 |
|---|---|---|
ShutdownOnFailure | 任一失败则取消所有 | 需要所有结果 |
ShutdownOnSuccess | 任一成功则取消其余 | 竞速(race) |
// 竞速模式:从多个源获取数据,使用第一个成功的
try (var scope = new StructuredTaskScope.ShutdownOnSuccess<String>()) {
scope.fork(() -> fetchFromPrimary());
scope.fork(() -> fetchFromBackup1());
scope.fork(() -> fetchFromBackup2());
String result = scope.join().result();
// 第一个成功的结果,其余被取消
}
10.4 虚拟线程的陷阱
陷阱一:Pinning(钉住载体线程)
当虚拟线程在 synchronized 块中执行阻塞操作时,载体线程会被"钉住"(pinned),无法释放。
// ❌ 问题代码:synchronized 块中的阻塞
synchronized (lock) {
// 阻塞操作会导致载体线程被 pin
db.query(sql); // 这会 pin 住载体线程!
}
// ✅ 解决方案:使用 ReentrantLock
lock.lock();
try {
db.query(sql); // 不会 pin 住载体线程
} finally {
lock.unlock();
}
Pinning 检测
# 启动时添加诊断参数
java -Djdk.tracePinnedThreads=short MyApp
陷阱二:ThreadLocal 的内存问题
// ❌ 每个虚拟线程创建 ThreadLocal 副本 — 可能耗尽内存
static ThreadLocal<Connection> connLocal = ThreadLocal.withInitial(() -> createConnection());
// 在 100 万个虚拟线程中,创建 100 万个 Connection!
for (int i = 0; i < 1_000_000; i++) {
vtExecutor.submit(() -> {
Connection conn = connLocal.get(); // 💥
conn.query(sql);
});
}
// ✅ 使用 ScopedValue(Java 21 预览)
private static final ScopedValue<Connection> SCOPED_CONN = ScopedValue.newInstance();
ScopedValue.runWhere(SCOPED_CONN, createConnection(), () -> {
Connection conn = SCOPED_CONN.get();
conn.query(sql);
});
陷阱三:不要池化虚拟线程
// ❌ 错误:池化虚拟线程没有意义
ExecutorService pool = Executors.newFixedThreadPool(100); // 100 个虚拟线程的池
// ✅ 正确:每个任务一个虚拟线程
ExecutorService vt = Executors.newVirtualThreadPerTaskExecutor();
10.5 与传统异步方案对比
代码对比:数据库查询
// 方式一:传统线程池
ExecutorService pool = Executors.newFixedThreadPool(200);
Future<User> future = pool.submit(() -> db.findUser(id));
User user = future.get(); // 阻塞
// 方式二:CompletableFuture(响应式)
CompletableFuture<User> cf = CompletableFuture.supplyAsync(
() -> db.findUser(id), ioPool
);
cf.thenApply(user -> process(user))
.exceptionally(err -> handleError(err));
// 方式三:虚拟线程(推荐)
try (var vt = Executors.newVirtualThreadPerTaskExecutor()) {
Future<User> future = vt.submit(() -> db.findUser(id));
User user = future.get(); // 阻塞但高效
}
性能对比
| 方案 | 10K 并发请求 | 内存 | 代码复杂度 | 调试难度 |
|---|---|---|---|---|
| 固定线程池 (200) | 需排队,延迟高 | ~200MB | 低 | 低 |
| CompletableFuture | 高并发,高吞吐 | ~100MB | 高 | 高 |
| 虚拟线程 | 高并发,高吞吐 | ~50MB | 低 | 低 |
10.6 Spring Boot + 虚拟线程
@Configuration
public class VirtualThreadConfig {
@Bean
public TomcatProtocolHandlerCustomizer<?> protocolHandler() {
return handler -> {
handler.setExecutor(Executors.newVirtualThreadPerTaskExecutor());
};
}
}
10.7 业务场景:微服务网关
public class GatewayService {
private final HttpClient client = HttpClient.newBuilder()
.executor(Executors.newVirtualThreadPerTaskExecutor())
.build();
public GatewayResponse handleRequest(GatewayRequest request) {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
// 并发调用多个微服务
var userSvc = scope.fork(() ->
callService(client, "user-service", request.userId()));
var orderSvc = scope.fork(() ->
callService(client, "order-service", request.userId()));
var recSvc = scope.fork(() ->
callService(client, "rec-service", request.userId()));
scope.join().throwIfFailed();
return new GatewayResponse(
userSvc.get(), orderSvc.get(), recSvc.get()
);
} catch (Exception e) {
throw new GatewayException("聚合失败", e);
}
}
}
10.8 本章小结
| 要点 | 说明 |
|---|---|
| 虚拟线程 | M:N 映射,轻量级,数百万级并发 |
| 结构化并发 | 子任务生命周期受父任务管理 |
| Pinning | synchronized 中的阻塞会钉住载体线程 |
| ThreadLocal | 虚拟线程中慎用,考虑 ScopedValue |
| 适用场景 | I/O 密集型,如数据库查询、HTTP 调用 |
下一章预告:Erlang 的轻量级进程模型是 Actor 模型的先驱,它如何实现"九个九"的可用性?