HTTP/2 与 RPC 精讲教程 / 03 - 多路复用与流控制
第 03 章:多路复用与流控制
一根连接,万流并发——HTTP/2 最核心的性能特性
3.1 什么是多路复用
多路复用(Multiplexing)是 HTTP/2 最具革命性的特性。它允许在单一 TCP 连接上同时发送和接收多个请求/响应,且互不干扰。
3.1.1 HTTP/1.1 的串行困境
HTTP/1.1 连接模型(并发受限):
连接 1: [请求A]────[响应A]────[请求C]────[响应C]
连接 2: [请求B]────[响应B]────[请求D]────[响应D]
连接 3: [请求E]────[响应E]
问题:
- 浏览器限制 6 个并发连接
- 每个连接同一时刻只能处理一个请求
- 管线化(Pipelining)要求响应严格按序返回
3.1.2 HTTP/2 的多路复用
HTTP/2 单连接模型(完全并行):
连接: [流1 HEADERS][流3 HEADERS][流1 DATA][流5 HEADERS][流3 DATA][流1 DATA]...
┌──────────────────────────────────┐
│ TCP 连接 (1个) │
│ │
│ 流1: ──H──D──D──D── │
│ 流3: ──H──D──D──────── │
│ 流5: ──H──────────────D── │
│ 流7: ──H──D──D──────── │
│ │
└──────────────────────────────────┘
优势:
- 无需建立多个 TCP 连接
- 帧可以交错发送
- 流之间完全独立
3.2 并发流管理
3.2.1 最大并发流限制
| 参数 | 默认值 | 建议范围 | 说明 |
|---|---|---|---|
| SETTINGS_MAX_CONCURRENT_STREAMS | 无限 (建议 100) | 100-256 | 客户端/服务器同时活跃的流数量上限 |
# 模拟并发流管理器
import threading
from collections import deque
class ConcurrencyManager:
def __init__(self, max_concurrent: int = 100):
self.max_concurrent = max_concurrent
self.active_streams: set = set()
self.waiting_queue: deque = deque()
self.lock = threading.Lock()
self.condition = threading.Condition(self.lock)
def acquire(self, stream_id: int) -> bool:
"""获取流的并发许可"""
with self.condition:
if len(self.active_streams) < self.max_concurrent:
self.active_streams.add(stream_id)
return True
else:
self.waiting_queue.append(stream_id)
return False
def release(self, stream_id: int):
"""释放流的并发占用"""
with self.condition:
self.active_streams.discard(stream_id)
self.condition.notify()
def wait_for_slot(self, stream_id: int):
"""等待可用的并发槽位"""
with self.condition:
while len(self.active_streams) >= self.max_concurrent:
self.condition.wait()
self.active_streams.add(stream_id)
# 使用示例
manager = ConcurrencyManager(max_concurrent=100)
manager.acquire(1) # stream 1 获得许可
manager.acquire(3) # stream 3 获得许可
manager.release(1) # stream 1 释放
3.2.2 流状态与并发
package main
import (
"fmt"
"sync"
)
// Stream 表示一个 HTTP/2 流
type Stream struct {
ID uint32
State string
HeadersSent bool
DataComplete bool
mu sync.Mutex
}
// Connection 管理多个流
type Connection struct {
streams map[uint32]*Stream
maxStreams int
mu sync.RWMutex
}
func NewConnection(maxStreams int) *Connection {
return &Connection{
streams: make(map[uint32]*Stream),
maxStreams: maxStreams,
}
}
func (c *Connection) CreateStream(id uint32) (*Stream, error) {
c.mu.Lock()
defer c.mu.Unlock()
if len(c.streams) >= c.maxStreams {
return nil, fmt.Errorf("超出最大并发流限制 (%d)", c.maxStreams)
}
if _, exists := c.streams[id]; exists {
return nil, fmt.Errorf("流 %d 已存在", id)
}
stream := &Stream{ID: id, State: "open"}
c.streams[id] = stream
return stream, nil
}
func (c *Connection) CloseStream(id uint32) {
c.mu.Lock()
defer c.mu.Unlock()
delete(c.streams, id)
}
func (c *Connection) ActiveStreams() int {
c.mu.RLock()
defer c.mu.RUnlock()
return len(c.streams)
}
func main() {
conn := NewConnection(100)
// 模拟并发请求
for i := uint32(1); i <= 10; i += 2 {
stream, err := conn.CreateStream(i)
if err != nil {
fmt.Printf("创建流失败: %v\n", err)
continue
}
fmt.Printf("创建流 %d,当前活跃: %d\n", stream.ID, conn.ActiveStreams())
}
}
3.3 队头阻塞(Head-of-Line Blocking)
3.3.1 HTTP/1.1 的队头阻塞
队头阻塞是 HTTP/1.1 最严重的性能瓶颈之一:
问题场景:请求 A 的响应耗时较长
HTTP/1.1 管线化:
客户端发送: [A][B][C][D]
服务器响应: [响应A]----→[响应B][响应C][响应D]
↑
B、C、D 必须等待 A 完成
结果:即使 B、C、D 已处理完毕,也必须排队等待
3.3.2 HTTP/2 如何解决应用层队头阻塞
HTTP/2 多路复用:
客户端发送: 流1-H 流3-H 流5-H 流7-H
服务器响应: 流5-D 流3-D 流1-D 流7-D
↑
谁先处理完谁先返回
关键:帧级别的交错发送,流之间互不影响
# 模拟对比实验
import time
import concurrent.futures
from dataclasses import dataclass
from typing import List
@dataclass
class Request:
id: int
path: str
delay_ms: int # 服务器处理耗时
def simulate_http11(requests: List[Request], max_conn: int = 6) -> dict:
"""模拟 HTTP/1.1 请求(受限于连接数和队头阻塞)"""
results = {}
start = time.time()
# 分批处理(每批最多 max_conn 个)
for i in range(0, len(requests), max_conn):
batch = requests[i:i+max_conn]
# 每个连接串行处理
for req in batch:
time.sleep(req.delay_ms / 1000)
results[req.id] = time.time() - start
return results
def simulate_http2(requests: List[Request]) -> dict:
"""模拟 HTTP/2 请求(真正的多路复用)"""
results = {}
start = time.time()
with concurrent.futures.ThreadPoolExecutor(max_workers=100) as executor:
futures = {}
for req in requests:
future = executor.submit(lambda r: time.sleep(r.delay_ms/1000), req)
futures[future] = req
for future in concurrent.futures.as_completed(futures):
req = futures[future]
results[req.id] = time.time() - start
return results
# 测试场景:10 个请求,处理时间各异
requests = [
Request(1, "/css/style.css", 50),
Request(2, "/js/app.js", 100),
Request(3, "/api/users", 200),
Request(4, "/api/orders", 300), # 最慢
Request(5, "/image/logo.png", 30),
Request(6, "/js/utils.js", 60),
Request(7, "/api/products", 150),
Request(8, "/css/theme.css", 40),
Request(9, "/image/bg.jpg", 80),
Request(10, "/api/config", 20),
]
print("=== HTTP/1.1 (6 并发连接) ===")
h1_results = simulate_http11(requests, max_conn=6)
for rid, t in sorted(h1_results.items()):
print(f" 请求 {rid}: {t:.3f}s")
print("\n=== HTTP/2 (多路复用) ===")
h2_results = simulate_http2(requests)
for rid, t in sorted(h2_results.items()):
print(f" 请求 {rid}: {t:.3f}s")
3.3.3 TCP 层队头阻塞
⚠️ HTTP/2 并未完全消除队头阻塞:
TCP 层的队头阻塞仍然存在:
应用层(HTTP/2):
流1: 帧A1, 帧A2, 帧A3
流2: 帧B1, 帧B2, 帧B3
TCP 层(单一字节流):
发送: [A1][B1][A2][B2][A3][B3]
如果 B1 的 TCP 段丢失:
TCP 必须等待 B1 重传后才能交付后续数据
即使 A2、A3 已经到达,也必须等待
解决方案 → HTTP/3 (QUIC) 基于 UDP,流级别独立重传
3.4 流优先级(Stream Priority)
3.4.1 优先级模型
HTTP/2 使用**依赖树(Dependency Tree)和权重(Weight)**来表达流之间的优先级关系。
依赖树示例(Web 页面资源加载):
根节点 (流 0)
│
┌──────┼──────┐
│ │ │
流 1 流 3 流 5
HTML CSS JS
w=256 w=256 w=256
│ │
┌─┘ └─┐
│ │
流 7 流 9
字体 图片
w=128 w=64
说明:
- w = weight(权重,1-256)
- 子节点共享父节点的带宽
- 同级节点按权重比例分配带宽
- 依赖关系:被依赖的流优先获得资源
3.4.2 优先级类型
| 策略 | 说明 | 适用资源 |
|---|---|---|
| 最高优先级 + 独占 | 关键资源独占连接 | 首屏 CSS、关键 JS |
| 高优先级 + 依赖 | 依赖于关键资源 | 字体、图片 |
| 低优先级 + 依赖 | 后台加载 | 预取资源、懒加载图片 |
| 最低优先级 | 可延迟加载 | 分析脚本、广告 |
3.4.3 优先级代码示例
package main
import (
"fmt"
"net/http"
)
func main() {
// Go 的 HTTP/2 支持通过 ResponseWriter 设置优先级
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
// 获取流的优先级信息(来自客户端的 PRIORITY 帧)
priority := r.Header.Get("Priority")
fmt.Printf("流优先级: %s\n", priority)
// 不同资源类型返回不同的缓存策略
switch r.URL.Path {
case "/critical.css":
// CSS 关键资源,高优先级
w.Header().Set("Content-Type", "text/css")
w.Header().Set("Cache-Control", "public, max-age=31536000")
w.Write([]byte("body { margin: 0; }"))
case "/analytics.js":
// 分析脚本,低优先级
w.Header().Set("Content-Type", "application/javascript")
w.Header().Set("Cache-Control", "public, max-age=3600")
w.Write([]byte("// analytics code"))
default:
w.Write([]byte("Hello"))
}
})
}
3.4.4 优先级的局限性
⚠️ 优先级并非强制:
- 服务器可以忽略客户端的优先级建议
- Chrome 和 Firefox 的优先级实现不完全相同
- 某些 CDN 对优先级的支持有限
💡 实用建议:
- 关键资源使用
<link rel="preload">提示 - 不要过度依赖优先级,使用资源加载策略(如懒加载)更可靠
- HTTP/3 使用了改进的优先级方案(Extensible Priorities)
3.5 多路复用的性能优化
3.5.1 并发流数量调优
| 场景 | 建议并发流数 | 理由 |
|---|---|---|
| 静态资源服务 | 100-256 | 资源多但单个请求小 |
| API 网关 | 50-100 | 请求多样,处理时间差异大 |
| 微服务通信 | 100-200 | 高频调用,需要高并发 |
| 实时推送 | 50-100 | 长时间占用流资源 |
// 服务端并发流配置
package main
import (
"log"
"net/http"
)
func main() {
server := &http.Server{
Addr: ":8443",
Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte("Hello"))
}),
// 最大并发流数(通过 SETTINGS 帧告知客户端)
MaxConcurrentStreams: 100,
}
log.Printf("启动 HTTP/2 服务器,最大并发流: %d", 100)
log.Fatal(server.ListenAndServeTLS("cert.pem", "key.pem"))
}
3.5.2 流级别的资源分配
# 根据流优先级分配服务器资源
import asyncio
from enum import IntEnum
from dataclasses import dataclass
class Priority(IntEnum):
CRITICAL = 0 # 关键渲染路径
HIGH = 1 # 重要资源
MEDIUM = 2 # 普通资源
LOW = 3 # 非关键资源
BACKGROUND = 4 # 后台任务
@dataclass
class StreamRequest:
stream_id: int
path: str
priority: Priority
class StreamScheduler:
def __init__(self, max_workers: int = 10):
self.max_workers = max_workers
self.queues = {p: asyncio.Queue() for p in Priority}
self.workers = []
async def submit(self, request: StreamRequest):
await self.queues[request.priority].put(request)
async def process(self):
"""按优先级顺序处理请求"""
while True:
# 优先处理高优先级队列
for priority in Priority:
if not self.queues[priority].empty():
request = await self.queues[priority].get()
await self.handle_request(request)
return
await asyncio.sleep(0.001)
async def handle_request(self, request: StreamRequest):
# 模拟处理
await asyncio.sleep(0.01)
print(f"处理流 {request.stream_id}: {request.path} (优先级: {request.priority.name})")
# 使用示例
async def main():
scheduler = StreamScheduler()
requests = [
StreamRequest(1, "/css/critical.css", Priority.CRITICAL),
StreamRequest(3, "/api/users", Priority.HIGH),
StreamRequest(5, "/image/logo.png", Priority.MEDIUM),
StreamRequest(7, "/analytics.js", Priority.LOW),
]
for req in requests:
await scheduler.submit(req)
for _ in requests:
await scheduler.process()
asyncio.run(main())
3.6 业务场景:微服务间的多路复用
场景:电商系统中,订单服务需要调用多个下游服务
传统 HTTP/1.1:
订单服务 → 用户服务 (10ms)
订单服务 → 库存服务 (15ms)
订单服务 → 价格服务 (20ms)
订单服务 → 支付服务 (25ms)
总耗时: 串行 70ms / 并行 25ms(需要 4 个连接)
HTTP/2 多路复用:
订单服务 → 用户服务 ┐
库存服务 ├─ 同一 TCP 连接
价格服务 │
支付服务 ┘
总耗时: 并行 25ms(仅 1 个连接)
优势:
- 减少 TCP/TLS 握手开销
- 降低文件描述符消耗
- 更好的拥塞控制
- 统一的连接管理
// 微服务间 HTTP/2 通信示例
package main
import (
"context"
"fmt"
"io"
"net/http"
"sync"
"time"
)
type ServiceClient struct {
client *http.Client
baseURL string
}
func NewServiceClient(baseURL string) *ServiceClient {
transport := &http.Transport{
MaxIdleConns: 100,
MaxIdleConnsPerHost: 100,
IdleConnTimeout: 90 * time.Second,
}
return &ServiceClient{
client: &http.Client{Transport: transport, Timeout: 10 * time.Second},
baseURL: baseURL,
}
}
func (c *ServiceClient) Call(ctx context.Context, path string) ([]byte, error) {
req, err := http.NewRequestWithContext(ctx, "GET", c.baseURL+path, nil)
if err != nil {
return nil, err
}
resp, err := c.client.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
return io.ReadAll(resp.Body)
}
// 并行调用多个微服务
func CallServices(ctx context.Context, client *ServiceClient, paths []string) (map[string][]byte, error) {
results := make(map[string][]byte)
var mu sync.Mutex
var wg sync.WaitGroup
errs := make(chan error, len(paths))
for _, path := range paths {
wg.Add(1)
go func(p string) {
defer wg.Done()
data, err := client.Call(ctx, p)
if err != nil {
errs <- err
return
}
mu.Lock()
results[p] = data
mu.Unlock()
}(path)
}
wg.Wait()
close(errs)
for err := range errs {
return nil, err
}
return results, nil
}
func main() {
client := NewServiceClient("https://api.example.com")
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
services := []string{
"/users/123",
"/orders/recent",
"/products/search?q=shoes",
}
start := time.Now()
results, err := CallServices(ctx, client, services)
if err != nil {
fmt.Printf("调用失败: %v\n", err)
return
}
fmt.Printf("调用完成,耗时: %v\n", time.Since(start))
for path, data := range results {
fmt.Printf(" %s: %d bytes\n", path, len(data))
}
}
3.7 注意事项
⚠️ 连接管理:
- 单连接故障会影响所有流,需实现连接健康检查
- 连接空闲时应发送 PING 帧保持活性
- 服务器应实现 GOAWAY 处理,优雅迁移流
⚠️ 并发流上限:
- 设置过低会限制吞吐量,过高会增加内存消耗
- 建议根据服务器资源动态调整
- 监控活跃流数量,避免资源耗尽
⚠️ 流优先级:
- 不同浏览器/客户端的优先级策略不同
- 服务器可能忽略优先级建议
- 关键资源建议使用 preload 显式提示
💡 性能调优:
- 合并小请求减少流数量
- 使用流优先级确保关键资源先加载
- 监控流创建/关闭频率,避免流风暴
3.8 扩展阅读
- 📖 RFC 7540 Section 5 - Streams and Multiplexing
- 📖 RFC 9218 - Extensible Prioritization Scheme for HTTP
- 📖 The HTTP/2 Priority Tree
- 📖 TCP Head-of-Line Blocking Explained