强曰为道
与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

异步与协程精讲 / 第8章:Python asyncio —— 从生成器到协程

第8章:Python asyncio —— 从生成器到协程

8.1 Python 异步的演进史

Python 的异步编程经历了三个阶段:

时间线:

2001  ──  Python 2.2 引入生成器(yield)
2005  ──  PEP 342: 增强生成器(send/throw)
2006  ──  Twisted 框架成熟(回调风格)
2010  ──  Tornado 框架(协程 + 回调混合)
2012  ──  PEP 3156: asyncio 提案(tulip 项目)
2014  ──  Python 3.4: asyncio 标准库(@asyncio.coroutine + yield from)
2015  ──  Python 3.5: async/await 关键字(PEP 492)
2018  ──  Python 3.7: asyncio.run(),简化入口
2020  ──  Python 3.9: asyncio.to_thread()
2021  ──  Python 3.10: 任务组(TaskGroup)
2023  ──  Python 3.12: 改进的 Task 生命周期管理

三个时代的对比

时代版本风格示例
生成器协程2.2+yielddef gen(): yield 1
原生协程3.4@asyncio.coroutine@coroutine\ndef f(): yield from ...
async/await3.5+async/awaitasync def f(): await ...

8.2 事件循环基础

启动事件循环

import asyncio

async def main():
    print('Hello')
    await asyncio.sleep(1)
    print('World')

# Python 3.7+ — 推荐方式
asyncio.run(main())

# 手动管理(更灵活)
loop = asyncio.new_event_loop()
try:
    loop.run_until_complete(main())
finally:
    loop.close()

事件循环的生命周期

asyncio.run(main())
    │
    ├── 创建新的事件循环
    ├── 创建 main() 的 Task
    ├── 运行事件循环直到 main() 完成
    ├── 关闭所有异步生成器
    ├── 关闭事件循环
    └── 返回 main() 的结果

asyncio.run() vs loop.run_until_complete()

特性asyncio.run()loop.run_until_complete()
创建循环自动需手动创建
清理自动清理异步生成器不清理
嵌套不允许允许
推荐场景程序入口库内部使用

8.3 Task 与 Future

Future — 结果的占位符

async def demo_future():
    loop = asyncio.get_running_loop()
    future = loop.create_future()

    # 模拟异步操作设置结果
    async def set_result():
        await asyncio.sleep(1)
        future.set_result(42)

    asyncio.create_task(set_result())

    # 等待结果
    result = await future
    print(f"结果: {result}")  # 42

Task — 包装协程

async def fetch_data(url: str) -> dict:
    await asyncio.sleep(1)  # 模拟 I/O
    return {"url": url, "status": "ok"}

async def main():
    # 创建任务(立即开始执行)
    task = asyncio.create_task(fetch_data("https://api.example.com"))

    # 任务状态
    print(task.done())      # False

    # 等待结果
    result = await task
    print(task.done())      # True
    print(result)           # {"url": "...", "status": "ok"}

Task vs Future 对比

特性FutureTask
角色结果占位符协程的包装器
创建方式loop.create_future()asyncio.create_task(coro)
设置结果手动 set_result()协程返回值自动设置
可等待
使用场景底层 API大多数场景

8.4 并发执行

asyncio.gather — 收集结果

async def fetch_user(user_id: int) -> dict:
    await asyncio.sleep(0.5)
    return {"id": user_id, "name": f"User_{user_id}"}

async def main():
    # 并发执行,等待所有完成
    users = await asyncio.gather(
        fetch_user(1),
        fetch_user(2),
        fetch_user(3),
    )
    print(users)
    # [{"id": 1, "name": "User_1"}, {"id": 2, "name": "User_2"}, ...]

asyncio.gather 的 return_exceptions 参数

async def main():
    results = await asyncio.gather(
        fetch_user(1),
        fetch_user(2),       # 假设这个会失败
        fetch_user(3),
        return_exceptions=True,  # 异常作为结果返回,不抛出
    )
    for r in results:
        if isinstance(r, Exception):
            print(f"错误: {r}")
        else:
            print(f"成功: {r}")

asyncio.wait — 更灵活的等待

async def main():
    tasks = [
        asyncio.create_task(fetch_user(1)),
        asyncio.create_task(fetch_user(2)),
        asyncio.create_task(fetch_user(3)),
    ]

    # 等待第一个完成
    done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
    for task in done:
        print(f"第一个完成: {task.result()}")

    # 等待所有完成
    done, pending = await asyncio.wait(tasks, return_when=asyncio.ALL_COMPLETED)

    # 出现第一个异常就返回
    done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION)

等待策略对比

函数返回类型特点
gather()结果列表简单易用,结果有序
wait()(done, pending) 集合灵活,支持多种等待策略
as_completed()迭代器按完成顺序返回

asyncio.as_completed — 按完成顺序处理

async def main():
    tasks = [fetch_user(i) for i in range(10)]

    for coro in asyncio.as_completed(tasks):
        result = await coro
        print(f"完成: {result}")  # 谁先完成就先处理谁

8.5 同步原语

Lock — 互斥锁

async def safe_increment(counter: dict, key: str, lock: asyncio.Lock):
    async with lock:
        current = counter.get(key, 0)
        await asyncio.sleep(0.01)  # 模拟异步操作
        counter[key] = current + 1

Semaphore — 信号量(并发限制)

async def fetch_with_limit(sem: asyncio.Semaphore, url: str):
    async with sem:  # 限制并发数
        async with aiohttp.ClientSession() as session:
            async with session.get(url) as response:
                return await response.json()

async def main():
    sem = asyncio.Semaphore(5)  # 最多同时 5 个请求
    urls = [f"https://api.example.com/{i}" for i in range(100)]

    tasks = [fetch_with_limit(sem, url) for url in urls]
    results = await asyncio.gather(*tasks)

Event — 事件通知

async def waiter(event: asyncio.Event):
    print("等待事件...")
    await event.wait()
    print("事件触发!")

async def setter(event: asyncio.Event):
    await asyncio.sleep(2)
    event.set()  # 触发事件

async def main():
    event = asyncio.Event()
    await asyncio.gather(waiter(event), setter(event))

Queue — 异步队列

async def producer(queue: asyncio.Queue, n: int):
    for i in range(n):
        await asyncio.sleep(0.5)
        await queue.put(i)
        print(f"生产: {i}")
    await queue.put(None)  # 毒丸(Poison Pill),通知消费者停止

async def consumer(queue: asyncio.Queue, name: str):
    while True:
        item = await queue.get()
        if item is None:
            await queue.put(None)  # 通知其他消费者
            break
        print(f"[{name}] 消费: {item}")
        await asyncio.sleep(1)  # 模拟处理
        queue.task_done()

async def main():
    queue = asyncio.Queue(maxsize=10)

    producers = [asyncio.create_task(producer(queue, 20))]
    consumers = [asyncio.create_task(consumer(queue, f"C{i}")) for i in range(3)]

    await asyncio.gather(*producers)
    await asyncio.gather(*consumers)

8.6 aiohttp — 异步 HTTP 客户端

import aiohttp
import asyncio

async def fetch_all(urls: list[str]) -> list[dict]:
    async with aiohttp.ClientSession() as session:
        tasks = []
        for url in urls:
            tasks.append(fetch_one(session, url))
        return await asyncio.gather(*tasks)

async def fetch_one(session: aiohttp.ClientSession, url: str) -> dict:
    async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as response:
        return {
            "url": url,
            "status": response.status,
            "data": await response.json(),
        }

# 使用
urls = [
    "https://api.github.com/users/python",
    "https://api.github.com/users/golang",
    "https://api.github.com/users/rust-lang",
]
results = asyncio.run(fetch_all(urls))

8.7 TaskGroup — Python 3.11+

async def main():
    results = []
    errors = []

    async with asyncio.TaskGroup() as tg:
        task1 = tg.create_task(fetch_user(1))
        task2 = tg.create_task(fetch_user(2))
        task3 = tg.create_task(fetch_user(3))

    # TaskGroup 结束时,所有任务都已完成(或有异常)
    # 如果任何一个任务抛出异常,会收集所有异常并抛出 ExceptionGroup

    print(task1.result())
    print(task2.result())
    print(task3.result())

TaskGroup vs gather

特性gather()TaskGroup
取消不自动取消其他任务一个失败自动取消所有
错误收集单个异常或 return_exceptionsExceptionGroup
动态添加不方便tg.create_task() 动态添加
推荐场景简单并发结构化并发

8.8 运行阻塞代码

异步代码中不能直接调用阻塞函数(如 time.sleep()requests.get())。

import asyncio
from concurrent.futures import ThreadPoolExecutor

async def main():
    loop = asyncio.get_running_loop()

    # 方式一:在默认线程池中运行
    result = await loop.run_in_executor(
        None,  # 使用默认线程池
        blocking_function,
        arg1, arg2
    )

    # 方式二:使用自定义线程池
    with ThreadPoolExecutor(max_workers=10) as pool:
        result = await loop.run_in_executor(pool, blocking_io)

    # 方式三:Python 3.9+ 的便捷函数
    result = await asyncio.to_thread(blocking_function, arg1, arg2)

8.9 常见陷阱

陷阱一:忘记 await

async def buggy():
    # ❌ 创建了 Task 但没有 await,协程可能不会执行
    asyncio.create_task(some_coroutine())

async def correct():
    # ✅ 保存引用并 await
    task = asyncio.create_task(some_coroutine())
    await task

陷阱二:在 async 函数中调用阻塞函数

async def buggy():
    # ❌ 阻塞整个事件循环!
    time.sleep(5)
    requests.get("https://api.example.com")

async def correct():
    # ✅ 使用异步版本
    await asyncio.sleep(5)
    async with aiohttp.ClientSession() as session:
        async with session.get("https://api.example.com") as resp:
            return await resp.json()

陷阱三:未处理的异常导致 Task 泄漏

async def buggy():
    # ❌ 异常被静默忽略
    asyncio.create_task(may_fail())

async def correct():
    # ✅ 添加异常回调
    task = asyncio.create_task(may_fail())
    task.add_done_callback(lambda t: t.result() if not t.cancelled() else None)

8.10 业务场景:异步数据采集

import asyncio
import aiohttp
from dataclasses import dataclass

@dataclass
class FetchResult:
    url: str
    status: int
    data: dict | None
    error: str | None

async def fetch_one(session: aiohttp.ClientSession, url: str, sem: asyncio.Semaphore) -> FetchResult:
    async with sem:
        try:
            async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as resp:
                data = await resp.json()
                return FetchResult(url=url, status=resp.status, data=data, error=None)
        except Exception as e:
            return FetchResult(url=url, status=0, data=None, error=str(e))

async def bulk_fetch(urls: list[str], concurrency: int = 10) -> list[FetchResult]:
    sem = asyncio.Semaphore(concurrency)
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_one(session, url, sem) for url in urls]
        return await asyncio.gather(*tasks)

async def main():
    urls = [f"https://api.example.com/items/{i}" for i in range(1000)]
    results = await bulk_fetch(urls, concurrency=20)

    success = [r for r in results if r.error is None]
    failed = [r for r in results if r.error is not None]
    print(f"成功: {len(success)}, 失败: {len(failed)}")

asyncio.run(main())

8.11 本章小结

要点说明
事件循环asyncio 的核心调度器
Task协程的包装器,立即开始执行
Future结果的占位符,底层抽象
gather/wait/as_completed三种并发执行模式
同步原语Lock、Semaphore、Event、Queue
TaskGroupPython 3.11+ 的结构化并发
阻塞代码使用 to_thread()run_in_executor()

下一章预告:Rust 的 async 如何通过零成本抽象实现高性能异步编程?


扩展阅读