强曰为道
与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

Python 编程教程 / 13 - 并发编程

第 13 章:并发编程

掌握 Python 的线程、多进程和并发编程模型。


13.1 并发 vs 并行

概念说明适用场景
并发(Concurrency)交替执行多个任务I/O 密集型
并行(Parallelism)同时执行多个任务CPU 密集型
并发(单核):
线程 A: ████░░░░████░░░░
线程 B: ░░░░████░░░░████

并行(多核):
线程 A: ████████████
线程 B: ████████████

13.2 GIL(全局解释器锁)

CPython 的 GIL 限制同一时刻只有一个线程执行 Python 字节码。

影响说明
I/O 密集型几乎不受影响,线程在等待 I/O 时释放 GIL
CPU 密集型严重受限,多线程无法利用多核
解决方案使用 multiprocessing 或 C 扩展
import sys
print(sys._is_gil_enabled())  # Python 3.13+: False 表示 free-threaded 模式

13.3 threading(多线程)

13.3.1 基本用法

import threading
import time

def worker(name: str, seconds: float):
    print(f"线程 {name} 开始")
    time.sleep(seconds)
    print(f"线程 {name} 结束")

# 创建线程
t1 = threading.Thread(target=worker, args=("A", 2))
t2 = threading.Thread(target=worker, args=("B", 1))

# 启动
t1.start()
t2.start()

# 等待完成
t1.join()
t2.join()
print("所有线程完成")

13.3.2 线程安全

import threading

counter = 0
lock = threading.Lock()

def increment():
    global counter
    for _ in range(100_000):
        with lock:  # 获取锁
            counter += 1

threads = [threading.Thread(target=increment) for _ in range(10)]
for t in threads:
    t.start()
for t in threads:
    t.join()

print(f"Counter: {counter}")  # 1000000(正确)

13.3.3 线程同步原语

原语用途
Lock互斥锁
RLock可重入锁
Semaphore信号量(限制并发数)
Event事件通知
Condition条件变量
Barrier屏障(等待所有线程就绪)
import threading

# 生产者-消费者模式
buffer = []
not_empty = threading.Condition()

def producer():
    for i in range(5):
        with not_empty:
            buffer.append(i)
            not_empty.notify()

def consumer():
    for _ in range(5):
        with not_empty:
            while not buffer:
                not_empty.wait()
            item = buffer.pop(0)
            print(f"消费: {item}")

t1 = threading.Thread(target=producer)
t2 = threading.Thread(target=consumer)
t1.start(); t2.start()
t1.join(); t2.join()

13.3.4 ThreadPoolExecutor

from concurrent.futures import ThreadPoolExecutor, as_completed
import time

def fetch_url(url: str) -> str:
    time.sleep(1)  # 模拟网络请求
    return f"Response from {url}"

urls = [f"https://api.example.com/page/{i}" for i in range(5)]

with ThreadPoolExecutor(max_workers=3) as executor:
    # 方式一:map
    results = list(executor.map(fetch_url, urls))
    
    # 方式二:submit + as_completed(先完成先返回)
    futures = {executor.submit(fetch_url, url): url for url in urls}
    for future in as_completed(futures):
        url = futures[future]
        result = future.result()
        print(f"{url}: {result}")

13.4 multiprocessing(多进程)

13.4.1 基本用法

import multiprocessing
import os

def worker(name: str):
    print(f"进程 {name} (PID: {os.getpid()})")

if __name__ == "__main__":
    p1 = multiprocessing.Process(target=worker, args=("A",))
    p2 = multiprocessing.Process(target=worker, args=("B",))
    p1.start(); p2.start()
    p1.join(); p2.join()

13.4.2 进程间通信

import multiprocessing

def sender(conn):
    conn.send({"msg": "hello"})
    conn.close()

def receiver(conn):
    data = conn.recv()
    print(f"收到: {data}")

if __name__ == "__main__":
    parent_conn, child_conn = multiprocessing.Pipe()
    p1 = multiprocessing.Process(target=sender, args=(child_conn,))
    p2 = multiprocessing.Process(target=receiver, args=(parent_conn,))
    p1.start(); p2.start()
    p1.join(); p2.join()

13.4.3 共享内存

import multiprocessing

def increment(counter):
    for _ in range(1000):
        with counter.get_lock():
            counter.value += 1

if __name__ == "__main__":
    counter = multiprocessing.Value("i", 0)  # "i" = int
    processes = [multiprocessing.Process(target=increment, args=(counter,)) for _ in range(10)]
    for p in processes:
        p.start()
    for p in processes:
        p.join()
    print(f"Counter: {counter.value}")

13.4.4 ProcessPoolExecutor

from concurrent.futures import ProcessPoolExecutor
import math

def compute_heavy(n: int) -> float:
    return sum(math.sin(i) * math.cos(i) for i in range(n))

if __name__ == "__main__":
    numbers = [10**6] * 8

    with ProcessPoolExecutor() as executor:
        results = list(executor.map(compute_heavy, numbers))

    print(f"结果总和: {sum(results):.2f}")

13.5 线程 vs 进程选型

特性线程进程
内存共享内存独立内存
创建开销
GIL 影响有(CPU 密集受限)
通信方式共享变量Pipe / Queue / 共享内存
适用场景I/O 密集型CPU 密集型

13.6 queue 模块

import queue
import threading

q = queue.Queue(maxsize=10)

def producer():
    for i in range(20):
        q.put(i)
        print(f"生产: {i}")

def consumer():
    while True:
        item = q.get()
        if item is None:
            break
        print(f"消费: {item}")
        q.task_done()

t1 = threading.Thread(target=producer)
t2 = threading.Thread(target=consumer)
t1.start(); t2.start()
q.join()
q.put(None)  # 停止信号
t2.join()

13.7 注意事项

🔴 注意

  • GIL 使多线程不能并行执行 CPU 密集任务
  • 多进程的启动方式在 Windows 上必须使用 if __name__ == "__main__" 守卫
  • 线程共享内存时必须使用锁,否则会出现竞态条件
  • 进程间通信比线程间通信开销大

💡 提示

  • I/O 密集型任务使用 ThreadPoolExecutor
  • CPU 密集型任务使用 ProcessPoolExecutor
  • 使用 as_completed() 获取先完成的结果
  • Python 3.13+ 的 free-threaded 模式移除了 GIL 限制

📌 业务场景

from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass
import time

@dataclass
class HealthCheck:
    service: str
    status: bool
    latency: float

def check_service(name: str) -> HealthCheck:
    start = time.monotonic()
    time.sleep(0.5)  # 模拟请求
    return HealthCheck(name, True, time.monotonic() - start)

def check_all_services(services: list[str]) -> list[HealthCheck]:
    results = []
    with ThreadPoolExecutor(max_workers=10) as executor:
        futures = {executor.submit(check_service, s): s for s in services}
        for future in as_completed(futures):
            results.append(future.result())
    return results

services = ["user-api", "order-api", "payment-api", "notification-api"]
for check in check_all_services(services):
    print(f"{check.service}: {'✅' if check.status else '❌'} ({check.latency:.3f}s)")

13.8 扩展阅读