Rust 系统编程语言完全教程 / 第17章:异步编程
第17章:异步编程
17.1 同步 vs 异步
| 特性 | 同步(线程) | 异步(async/await) |
|---|
| 切换开销 | 较大(OS 线程切换) | 极小(用户态切换) |
| 内存占用 | 每线程 ~2MB 栈 | 每任务 ~几KB |
| 并发数 | 数千(受限于 OS) | 数百万(受限于内存) |
| 适用场景 | CPU 密集型 | I/O 密集型 |
| 编程模型 | 阻塞 | 非阻塞 |
17.2 基本 async/await
选择运行时
Rust 标准库不包含异步运行时,需要使用第三方库:
# Cargo.toml
[dependencies]
tokio = { version = "1", features = ["full"] }
第一个异步程序
use tokio::time::{sleep, Duration};
async fn fetch_data(id: u32) -> String {
sleep(Duration::from_millis(100)).await;
format!("数据#{}", id)
}
async fn process() {
let data = fetch_data(1).await;
println!("获取到: {}", data);
}
#[tokio::main]
async fn main() {
process().await;
}
17.3 并发执行
tokio::join!
use tokio::time::{sleep, Duration};
async fn task_a() -> &'static str {
sleep(Duration::from_millis(100)).await;
"任务A完成"
}
async fn task_b() -> &'static str {
sleep(Duration::from_millis(150)).await;
"任务B完成"
}
#[tokio::main]
async fn main() {
// 并发执行多个任务
let (a, b) = tokio::join!(task_a(), task_b());
println!("{}, {}", a, b);
}
tokio::spawn
use tokio::time::{sleep, Duration};
async fn background_task(id: u32) -> String {
sleep(Duration::from_millis(50 * id as u64)).await;
format!("任务{}完成", id)
}
#[tokio::main]
async fn main() {
let mut handles = vec![];
for i in 1..=5 {
let handle = tokio::spawn(background_task(i));
handles.push(handle);
}
for handle in handles {
let result = handle.await.unwrap();
println!("{}", result);
}
}
17.4 select! 宏
use tokio::time::{sleep, Duration};
use tokio::sync::oneshot;
#[tokio::main]
async fn main() {
let (tx, rx) = oneshot::channel();
tokio::spawn(async move {
sleep(Duration::from_millis(50)).await;
let _ = tx.send("消息");
});
tokio::select! {
msg = rx => {
println!("收到: {}", msg.unwrap());
}
_ = sleep(Duration::from_millis(100)) => {
println!("超时");
}
}
}
17.5 异步通道
mpsc 通道
use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
let (tx, mut rx) = mpsc::channel(32);
tokio::spawn(async move {
for i in 0..5 {
tx.send(format!("消息{}", i)).await.unwrap();
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
}
});
while let Some(msg) = rx.recv().await {
println!("收到: {}", msg);
}
}
broadcast 通道
use tokio::sync::broadcast;
#[tokio::main]
async fn main() {
let (tx, _) = broadcast::channel(16);
let mut rx1 = tx.subscribe();
let mut rx2 = tx.subscribe();
tokio::spawn(async move {
while let Ok(msg) = rx1.recv().await {
println!("接收者1: {}", msg);
}
});
tokio::spawn(async move {
while let Ok(msg) = rx2.recv().await {
println!("接收者2: {}", msg);
}
});
for i in 0..3 {
tx.send(format!("广播{}", i)).unwrap();
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
}
tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
}
17.6 Future Trait
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
struct CountDown {
count: u32,
}
impl Future for CountDown {
type Output = String;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if self.count == 0 {
Poll::Ready("倒计时完成!".to_string())
} else {
println!("倒计时: {}", self.count);
self.count -= 1;
cx.waker().wake_by_ref();
Poll::Pending
}
}
}
#[tokio::main]
async fn main() {
let countdown = CountDown { count: 5 };
let result = countdown.await;
println!("{}", result);
}
17.7 Pin 与 Unpin
use std::pin::Pin;
use std::future::Future;
// Pin 确保值不会在内存中移动
// 对于自引用结构体很重要
async fn example() {
let data = vec![1, 2, 3];
let reference = &data; // 自引用
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
println!("{:?}", reference);
}
fn main() {
// 大多数情况下不需要直接操作 Pin
// 编译器和 async/await 语法会自动处理
let fut = example();
// tokio::pin!(fut); // 将 future 固定在栈上
println!("Pin 大小: {} 字节", std::mem::size_of::<Pin<Box<dyn Future<Output = ()>>>>());
}
17.8 Stream
use tokio_stream::StreamExt;
use tokio::time::{sleep, Duration};
async fn number_stream() -> impl tokio_stream::Stream<Item = i32> {
tokio_stream::iter(1..=10)
}
#[tokio::main]
async fn main() {
let mut stream = number_stream().await;
while let Some(value) = stream.next().await {
print!("{} ", value);
}
println!();
// 异步迭代器适配器
let sum: i32 = tokio_stream::iter(1..=100)
.filter(|x| x % 2 == 0)
.fold(0, |acc, x| acc + x)
.await;
println!("偶数之和: {}", sum);
}
17.9 异步同步原语
use tokio::sync::{Mutex, RwLock, Semaphore};
use std::sync::Arc;
#[tokio::main]
async fn main() {
// 异步 Mutex
let counter = Arc::new(Mutex::new(0));
let mut handles = vec![];
for _ in 0..10 {
let counter = Arc::clone(&counter);
handles.push(tokio::spawn(async move {
let mut num = counter.lock().await;
*num += 1;
}));
}
for handle in handles {
handle.await.unwrap();
}
println!("异步Mutex计数: {}", counter.lock().await);
// 信号量
let semaphore = Arc::new(Semaphore::new(3)); // 最多3个并发
let mut handles = vec![];
for i in 0..10 {
let sem = Arc::clone(&semaphore);
handles.push(tokio::spawn(async move {
let _permit = sem.acquire().await.unwrap();
println!("任务 {} 开始", i);
sleep(Duration::from_millis(100)).await;
println!("任务 {} 结束", i);
}));
}
for handle in handles {
handle.await.unwrap();
}
}
17.10 业务场景示例
并发 HTTP 客户端
use tokio::time::{sleep, Duration};
async fn fetch_url(url: &str) -> Result<String, String> {
// 模拟 HTTP 请求
sleep(Duration::from_millis(100)).await;
Ok(format!("来自 {} 的响应", url))
}
async fn fetch_all(urls: Vec<&str>) {
let mut handles = vec![];
for url in urls {
handles.push(tokio::spawn(async move {
match fetch_url(url).await {
Ok(data) => println!("✅ {}: {} 字节", url, data.len()),
Err(e) => println!("❌ {}: {}", url, e),
}
}));
}
for handle in handles {
handle.await.unwrap();
}
}
#[tokio::main]
async fn main() {
let urls = vec![
"https://api.example.com/users",
"https://api.example.com/orders",
"https://api.example.com/products",
"https://api.example.com/stats",
];
fetch_all(urls).await;
}
17.11 本章小结
| 要点 | 说明 |
|---|
| async/await | 异步编程核心语法 |
| Tokio | 最流行的异步运行时 |
| Future | 异步计算的抽象 |
| Pin | 防止值在内存中移动 |
| Stream | 异步迭代器 |
| join! | 并发等待多个 future |
| select! | 等待第一个完成的 future |
| 异步原语 | Mutex、Semaphore 等异步版本 |
扩展阅读
- Tokio 教程 — 官方异步教程
- Async Book — 异步编程详解
- Rust for Rustaceans — 深入异步机制