RabbitMQ 消息队列完全教程 / 第 8 章:消息路由
第 8 章:消息路由
路由是 RabbitMQ 消息分发的核心逻辑。本章将系统性地讲解各种路由策略、组合模式和高级用法。
8.1 路由基础
路由决策流程
Producer 发布消息
│
├── exchange = "" (默认) ──> routing_key 匹配队列名
│
└── exchange = "xxx" (具名)
│
├── Direct: routing_key == binding_key
├── Fanout: 忽略 routing_key,广播所有
├── Topic: routing_key 匹配 binding_key 通配符
└── Headers: 消息 headers 匹配 binding arguments
路由键设计规范
| 规范 | 说明 | 示例 |
|---|---|---|
使用 . 分隔层级 | 类似域名命名 | order.payment.created |
| 从左到右范围递减 | 通用 → 具体 | region.service.event |
| 使用小写字母 | 保持一致性 | user.registered |
| 避免特殊字符 | 仅用字母、数字、.、*、# | — |
| 长度适中 | 不宜超过 255 字符 | — |
常见命名模式
# 域.动作
order.create
order.payment.success
user.registered
inventory.reserved
# 域.子域.事件
payment.gateway.timeout
notification.email.sent
logistics.tracking.updated
# 环境.域.事件(多环境)
prod.order.created
staging.order.created
8.2 Direct 路由策略
精确匹配
import pika, json
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 声明 Direct 交换机
channel.exchange_declare(exchange='commands', exchange_type='direct', durable=True)
# 声明队列
services = {
'payment_service': 'payment',
'shipping_service': 'shipping',
'notification_service': 'notification',
'audit_service': 'audit'
}
for queue, routing_key in services.items():
channel.queue_declare(queue=queue, durable=True)
channel.queue_bind(exchange='commands', queue=queue, routing_key=routing_key)
# 发送支付命令
channel.basic_publish(
exchange='commands',
routing_key='payment', # 精确路由到 payment_service
body=json.dumps({'action': 'charge', 'amount': 99.9}),
properties=pika.BasicProperties(delivery_mode=2)
)
一对多路由(同一路由键绑定多个队列)
# 多个队列绑定同一个路由键
channel.queue_bind(exchange='commands', queue='audit_service', routing_key='payment')
channel.queue_bind(exchange='commands', queue='analytics_service', routing_key='payment')
# 消息同时路由到 audit_service 和 analytics_service
8.3 Topic 路由策略
通配符匹配
| 通配符 | 含义 | order.* 匹配 | order.# 匹配 |
|---|---|---|---|
* | 一个单词 | order.payment ✓ | order.payment ✓ |
# | 零或多个单词 | order.payment.done ✗ | order.payment.done ✓ |
多维度路由架构
场景: 多区域 × 多服务 × 多级别日志系统
Exchange: regional_logs (topic)
│
├── *.error.# ──> error_queue(所有区域的错误)
├── us.*.* ──> us_queue(美国区域所有日志)
├── eu.payment.* ──> eu_payment_queue(欧洲支付日志)
├── #.critical ──> critical_queue(所有临界日志)
└── # ──> archive_queue(全量归档)
消息路由键: "us.payment.error"
→ 匹配 error_queue ✓ (*.error.#)
→ 匹配 us_queue ✓ (us.*.*)
→ 匹配 archive_queue ✓ (#)
→ 不匹配 eu_payment_queue ✗
代码示例
import pika, json
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='regional_logs', exchange_type='topic', durable=True)
# 配置路由规则
routing_rules = {
'error_queue': '*.error.#',
'us_queue': 'us.#',
'eu_queue': 'eu.#',
'payment_queue': '#.payment.*',
'critical_queue': '#.critical',
'archive_queue': '#',
}
for queue_name, binding_key in routing_rules.items():
channel.queue_declare(queue=queue_name, durable=True)
channel.queue_bind(exchange='regional_logs', queue=queue_name, routing_key=binding_key)
# 发送测试日志
logs = [
('us.payment.error', 'US 支付系统错误'),
('eu.shipping.warning', 'EU 物流警告'),
('us.user.critical', 'US 用户系统临界错误'),
('eu.payment.info', 'EU 支付系统信息'),
('ap.auth.error', '亚太认证系统错误'),
]
for routing_key, message in logs:
channel.basic_publish(
exchange='regional_logs',
routing_key=routing_key,
body=json.dumps({'level': routing_key.split('.')[2], 'message': message}),
properties=pika.BasicProperties(delivery_mode=2)
)
print(f"[x] {routing_key}: {message}")
connection.close()
8.4 Headers 路由策略
多属性匹配
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='doc_router', exchange_type='headers', durable=True)
# 路由规则 1: PDF 且中文文档
channel.queue_declare(queue='pdf_chinese', durable=True)
channel.queue_bind(
exchange='doc_router', queue='pdf_chinese',
arguments={'x-match': 'all', 'format': 'pdf', 'language': 'zh'}
)
# 路由规则 2: 任何高优先级文档
channel.queue_declare(queue='urgent_docs', durable=True)
channel.queue_bind(
exchange='doc_router', queue='urgent_docs',
arguments={'x-match': 'any', 'priority': 'high', 'urgent': 'true'}
)
# 路由规则 3: 所有文档(归档)
channel.queue_declare(queue='all_docs', durable=True)
channel.queue_bind(
exchange='doc_router', queue='all_docs',
arguments={'x-match': 'any', 'format': 'pdf', 'format': 'docx', 'format': 'xlsx'}
)
# 发布消息
channel.basic_publish(
exchange='doc_router',
routing_key='', # Headers 交换机忽略 routing_key
body=b'PDF document content',
properties=pika.BasicProperties(
delivery_mode=2,
headers={'format': 'pdf', 'language': 'zh', 'priority': 'high'}
)
)
8.5 Exchange-to-Exchange 路由
交换机之间可以建立绑定关系,实现更复杂的路由拓扑。
┌──> Fanout: broadcast ──┬──> sms_queue
Producer ──> Topic: │ ├──> email_queue
events │ └──> push_queue
└──> Direct: priority ──> urgent_queue
# 声明交换机
channel.exchange_declare(exchange='events', exchange_type='topic', durable=True)
channel.exchange_declare(exchange='broadcast', exchange_type='fanout', durable=True)
channel.exchange_declare(exchange='priority', exchange_type='direct', durable=True)
# Exchange-to-Exchange 绑定
channel.exchange_bind(destination='broadcast', source='events', routing_key='notification.#')
channel.exchange_bind(destination='priority', source='events', routing_key='#.critical')
# 队列绑定
channel.queue_bind(exchange='broadcast', queue='sms_queue')
channel.queue_bind(exchange='broadcast', queue='email_queue')
channel.queue_bind(exchange='priority', queue='urgent_queue', routing_key='critical')
8.6 死信路由
死信路由链
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# === 死信链: 业务队列 → 重试交换机 → 重试队列 → 业务队列 ===
# 1. 业务交换机和队列
channel.exchange_declare(exchange='biz_exchange', exchange_type='direct', durable=True)
channel.queue_declare(
queue='biz_queue',
durable=True,
arguments={
'x-dead-letter-exchange': 'dlx_exchange',
'x-dead-letter-routing-key': 'dead',
'x-message-ttl': 0 # 不设置队列 TTL,由消息级别控制
}
)
channel.queue_bind(exchange='biz_exchange', queue='biz_queue', routing_key='task')
# 2. 死信交换机和队列(最终死信)
channel.exchange_declare(exchange='dlx_exchange', exchange_type='direct', durable=True)
channel.queue_declare(queue='dead_letter_queue', durable=True)
channel.queue_bind(exchange='dlx_exchange', queue='dead_letter_queue', routing_key='dead')
# 3. 重试交换机和队列(延迟重试)
channel.exchange_declare(exchange='retry_exchange', exchange_type='x-delayed-message',
durable=True, arguments={'x-delayed-type': 'direct'})
channel.queue_declare(
queue='retry_queue',
durable=True,
arguments={
'x-dead-letter-exchange': 'biz_exchange',
'x-dead-letter-routing-key': 'task'
}
)
channel.queue_bind(exchange='retry_exchange', queue='retry_queue', routing_key='retry')
死信路由拓扑
Producer ──> biz_exchange ──> biz_queue ──> Consumer
│
├── NACK (requeue=false)
│ │
│ v
│ dlx_exchange ──> dead_letter_queue
│ (告警/人工处理)
│
└── 消息 TTL 过期
│
v
dlx_exchange ──> dead_letter_queue
8.7 延迟消息路由
延迟消息实现方案
| 方案 | 精确度 | 适用场景 |
|---|---|---|
| Delayed Message Exchange 插件 | 秒级 | 推荐方案 |
| TTL + DLX | 分钟级 | 简单延迟(精度不高) |
| Redis ZSET + 定时扫描 | 秒级 | 不想安装插件 |
| 数据库定时任务 | 分钟级 | 已有调度系统 |
方案一:Delayed Message Exchange(推荐)
# 安装插件后使用
channel.exchange_declare(
exchange='delayed_ex',
exchange_type='x-delayed-message',
durable=True,
arguments={'x-delayed-type': 'direct'}
)
# 发送延迟消息
channel.basic_publish(
exchange='delayed_ex',
routing_key='task',
body=json.dumps({'task': 'auto_cancel_order', 'order_id': '001'}),
properties=pika.BasicProperties(
delivery_mode=2,
headers={'x-delay': 1800000} # 延迟 30 分钟
)
)
方案二:TTL + DLX(不依赖插件)
# 创建延迟队列(消息只停留指定 TTL 后进入 DLX)
channel.queue_declare(
queue='delay_30min',
durable=True,
arguments={
'x-message-ttl': 1800000, # 30 分钟
'x-dead-letter-exchange': 'biz_exchange',
'x-dead-letter-routing-key': 'task',
'x-max-length': 0 # 不限制(或设置合理值)
}
)
# 发布消息到延迟队列
channel.basic_publish(
exchange='',
routing_key='delay_30min',
body=json.dumps({'task': 'auto_cancel', 'order_id': '001'}),
properties=pika.BasicProperties(delivery_mode=2)
)
# 30 分钟后消息自动进入 biz_exchange,路由到 biz_queue
多级延迟阶梯
# 5 个延迟级别: 1s, 5s, 30s, 5min, 30min
DELAY_QUEUES = {
'delay_1s': 1000,
'delay_5s': 5000,
'delay_30s': 30000,
'delay_5min': 300000,
'delay_30min': 1800000,
}
for queue_name, ttl in DELAY_QUEUES.items():
channel.queue_declare(
queue=queue_name,
durable=True,
arguments={
'x-message-ttl': ttl,
'x-dead-letter-exchange': 'biz_exchange',
'x-dead-letter-routing-key': 'task'
}
)
# 发送延迟消息(选择合适的延迟级别)
def send_delayed(message, delay_ms):
queue_name = 'delay_30min' # 默认最大延迟
for name, ttl in sorted(DELAY_QUEUES.items(), key=lambda x: x[1]):
if delay_ms <= ttl:
queue_name = name
break
channel.basic_publish(
exchange='',
routing_key=queue_name,
body=json.dumps(message),
properties=pika.BasicProperties(delivery_mode=2)
)
8.8 路由策略组合模式
模式一:事件驱动架构(EDA)
Domain Events Exchange (topic)
│
├── order.# ──> Order Service Queue
├── payment.# ──> Payment Service Queue
├── inventory.# ──> Inventory Service Queue
├── notification.# ──> Notification Service Queue
└── #.error ──> Error Handler Queue
模式二:CQRS 事件分发
Command Exchange (direct)
│
├── create_order ──> Order Command Queue
├── cancel_order ──> Order Command Queue
└── refund ──> Payment Command Queue
Event Exchange (fanout) ──> 所有读模型服务
模式三:多级路由
Producer ──> L1 Exchange (topic)
│
├── routing: "us.#" ──> L2 Exchange: us_regional (direct)
│ ├── payment ──> us_payment_q
│ └── shipping ──> us_shipping_q
│
└── routing: "eu.#" ──> L2 Exchange: eu_regional (direct)
├── payment ──> eu_payment_q
└── shipping ──> eu_shipping_q
8.9 路由调试
使用管理 API 查看路由
# 查看交换机绑定
curl -u admin:admin123 http://localhost:15672/api/exchanges/%2F/my_exchange/bindings
# 查看队列绑定
curl -u admin:admin123 http://localhost:15672/api/queues/%2F/my_queue/bindings
# 测试消息路由(rabbitmqadmin)
rabbitmqadmin publish exchange=my_exchange routing_key=test.payload payload="test"
路由失败排查
# 使用 mandatory + return 回调检测路由失败
channel.add_on_return_callback(
lambda ch, method, props, body:
print(f"[!] 路由失败: exchange={method.exchange}, "
f"routing_key={method.routing_key}, "
f"reply={method.reply_text}")
)
# 使用 alternate-exchange 兜底
channel.exchange_declare(
exchange='main_exchange',
exchange_type='direct',
durable=True,
arguments={'alternate-exchange': 'unroutable_exchange'}
)
8.10 注意事项
⚠️ Topic 匹配的性能考量
# 通配符会导致 Broker 遍历所有绑定规则,绑定数量多时性能下降。尽量使用 * 或精确路由键。
⚠️ 路由键区分大小写
Order.Create 和 order.create 是不同的路由键。
⚠️ Headers 匹配的额外开销
Headers 交换机需要检查消息头的每个键值对,比 Direct/Topic 性能略低。
⚠️ Dead Letter 循环
确保死信队列本身不配置死信交换机指向同一队列,否则会形成无限循环。
🔥 最佳实践: 使用 Topic 交换机 + 有意义的路由键命名规范,实现灵活的消息路由。
8.11 扩展阅读
下一章: 第 9 章:集群与高可用 — 学习 RabbitMQ 集群架构、镜像队列和仲裁队列。