RabbitMQ 消息队列完全教程 / 第 4 章:交换机详解
第 4 章:交换机详解
交换机(Exchange)是 RabbitMQ 消息路由的核心组件。本章将逐一讲解每种交换机类型的工作原理、使用场景和最佳实践。
4.1 交换机类型全景
| 类型 | 路由策略 | 典型场景 | 复杂度 |
|---|---|---|---|
| Default | 路由键匹配队列名 | 简单点对点 | ★ |
| Direct | 路由键精确匹配 | 任务分发、精确路由 | ★★ |
| Fanout | 广播所有绑定队列 | 通知广播、事件分发 | ★ |
| Topic | 路由键模式匹配 | 日志分类、多维路由 | ★★★ |
| Headers | 消息头属性匹配 | 复杂条件路由 | ★★★★ |
| Dead Letter | 无法消费的消息转发 | 消息重试、异常处理 | ★★★ |
| Delayed | 延迟投递消息 | 定时任务、超时处理 | ★★★ |
4.2 Default Exchange
默认交换机是 RabbitMQ 预先声明的、名称为空字符串 "" 的 Direct 类型交换机。
工作原理
Producer ──publish──> Exchange "" (direct)
│
routing_key = "my_queue"
│
└──> Queue "my_queue"(自动绑定同名队列)
使用示例
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 声明队列(自动绑定到默认交换机,路由键 = 队列名)
channel.queue_declare(queue='task_queue', durable=True)
# 使用默认交换机(exchange=''),路由键为队列名
channel.basic_publish(
exchange='', # 默认交换机
routing_key='task_queue', # 路由键 = 队列名
body='Hello World!',
properties=pika.BasicProperties(delivery_mode=2)
)
connection.close()
💡 提示: Default Exchange 适合简单的点对点场景,生产环境建议使用具名交换机以便于管理和监控。
4.3 Direct Exchange(直连交换机)
Direct Exchange 将消息路由到 Binding Key 与消息 Routing Key 完全匹配的队列。
路由规则
消息 Routing Key == 队列 Binding Key → 路由成功
架构图
┌── (binding_key: "payment") ──> Queue: payment
Producer ──> Direct ┼── (binding_key: "shipping") ──> Queue: shipping
Exchange └── (binding_key: "email") ──> Queue: email
完整代码示例
import pika
import json
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 声明 Direct 交换机
channel.exchange_declare(exchange='order_direct', exchange_type='direct', durable=True)
# 声明多个队列
for queue_name in ['payment_queue', 'shipping_queue', 'email_queue']:
channel.queue_declare(queue=queue_name, durable=True)
# 绑定队列到交换机
channel.queue_bind(exchange='order_direct', queue='payment_queue', routing_key='payment')
channel.queue_bind(exchange='order_direct', queue='shipping_queue', routing_key='shipping')
channel.queue_bind(exchange='order_direct', queue='email_queue', routing_key='email')
# 发布消息 - 路由到 payment_queue
channel.basic_publish(
exchange='order_direct',
routing_key='payment',
body=json.dumps({'order_id': '001', 'amount': 299.9}),
properties=pika.BasicProperties(delivery_mode=2)
)
print("[x] 支付消息已发送")
connection.close()
多个队列绑定同一路由键
Producer ──> Direct Exchange
│
routing_key = "notification"
│
├──> Queue: sms_queue
└──> Queue: email_queue
当多个队列使用相同的 Binding Key 绑定时,消息会同时路由到所有匹配的队列,实现类似广播的效果。
4.4 Fanout Exchange(扇出交换机)
Fanout Exchange 将消息广播到所有绑定的队列,忽略 Routing Key。
路由规则
所有绑定队列均收到消息(无论 Routing Key 是什么)
架构图
┌──> Queue: sms_queue
Producer ──> Fanout ──┼──> Queue: email_queue
Exchange ├──> Queue: push_queue
└──> Queue: wechat_queue
代码示例:事件广播系统
import pika
import json
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 声明 Fanout 交换机
channel.exchange_declare(exchange='user_events', exchange_type='fanout', durable=True)
# 声明各服务的队列
channels = ['sms_queue', 'email_queue', 'push_queue']
for q in channels:
channel.queue_declare(queue=q, durable=True)
channel.queue_bind(exchange='user_events', queue=q)
# 发布用户注册事件
event = {
'event': 'user_registered',
'user_id': 'u_1001',
'username': 'zhangsan',
'email': '[email protected]',
'phone': '13800138000'
}
channel.basic_publish(
exchange='user_events',
routing_key='', # Fanout 忽略 routing_key
body=json.dumps(event),
properties=pika.BasicProperties(delivery_mode=2)
)
print("[x] 用户注册事件已广播")
connection.close()
Fanout + 临时队列(排他队列)
# 消费者使用临时队列接收广播
result = channel.queue_declare(queue='', exclusive=True) # 匿名排他队列
queue_name = result.method.queue
channel.queue_bind(exchange='user_events', queue=queue_name)
def callback(ch, method, properties, body):
print(f"[x] 收到事件: {body.decode()}")
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()
4.5 Topic Exchange(主题交换机)
Topic Exchange 使用通配符模式匹配路由消息,是最灵活的交换机类型。
匹配规则
| 通配符 | 说明 | 示例 |
|---|---|---|
* | 匹配恰好一个单词 | order.* 匹配 order.payment,不匹配 order.payment.success |
# | 匹配零个或多个单词 | order.# 匹配 order、order.payment、order.payment.success |
词边界规则
路由键以 . 分隔为单词:order.payment.success → 三个单词:order、payment、success
匹配示例
| 绑定键 (Binding Key) | 匹配的路由键 | 不匹配的路由键 |
|---|---|---|
order.* | order.payment, order.shipping | order.payment.success |
*.payment | order.payment, subscription.payment | order.payment.success |
order.# | order, order.payment, order.payment.success | user.order |
# | 任意路由键 | — |
order.*.success | order.payment.success | order.success |
#.error.# | error, order.error, order.error.fatal, payment.error.timeout | order.errors |
架构图
Producer ──> Topic Exchange
│
├── (binding: "order.payment.#") ──> Queue: payment_logs
├── (binding: "order.shipping.#") ──> Queue: shipping_logs
├── (binding: "#.error.#") ──> Queue: error_logs
└── (binding: "#") ──> Queue: all_logs
消息路由键: "order.payment.error"
→ 匹配 payment_logs (order.payment.#) ✓
→ 匹配 error_logs (#.error.#) ✓
→ 匹配 all_logs (#) ✓
→ 不匹配 shipping_logs ✗
完整代码示例:日志分级系统
import pika
import json
from datetime import datetime
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 声明 Topic 交换机
channel.exchange_declare(exchange='app_logs', exchange_type='topic', durable=True)
# 声明队列
queues_config = {
'error_queue': '#.ERROR.#', # 所有错误日志
'warning_queue': '#.WARNING.#', # 所有警告日志
'payment_queue': 'payment.#', # 支付相关日志
'critical_queue': '*.CRITICAL.#', # 临界错误
'all_queue': '#', # 所有日志
}
for queue_name, binding_key in queues_config.items():
channel.queue_declare(queue=queue_name, durable=True)
channel.queue_bind(exchange='app_logs', queue=queue_name, routing_key=binding_key)
# 发送日志
logs = [
('payment.INFO', '用户支付成功'),
('payment.ERROR', '支付网关超时'),
('order.WARNING', '库存不足'),
('system.CRITICAL', '数据库连接失败'),
('user.INFO', '用户登录'),
]
for routing_key, message in logs:
log_entry = {
'level': routing_key.split('.')[1],
'service': routing_key.split('.')[0],
'message': message,
'timestamp': datetime.now().isoformat()
}
channel.basic_publish(
exchange='app_logs',
routing_key=routing_key,
body=json.dumps(log_entry),
properties=pika.BasicProperties(delivery_mode=2)
)
print(f"[x] {routing_key}: {message}")
connection.close()
4.6 Headers Exchange(头部交换机)
Headers Exchange 根据消息的 Headers 属性(而非 Routing Key)进行路由匹配。
匹配模式
| 模式 | 说明 |
|---|---|
x-match=all | 所有 header 键值对都匹配(AND) |
x-match=any | 任意一个 header 键值对匹配(OR) |
代码示例
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 声明 Headers 交换机
channel.exchange_declare(exchange='doc_headers', exchange_type='headers', durable=True)
# 声明队列
channel.queue_declare(queue='pdf_queue', durable=True)
channel.queue_declare(queue='urgent_queue', durable=True)
# 绑定 - 匹配 format=pdf 的文档
channel.queue_bind(
exchange='doc_headers', queue='pdf_queue',
arguments={'x-match': 'all', 'format': 'pdf'}
)
# 绑定 - 匹配 priority=high 或 format=pdf
channel.queue_bind(
exchange='doc_headers', queue='urgent_queue',
arguments={'x-match': 'any', 'priority': 'high', 'format': 'pdf'}
)
# 发布消息(通过 headers 路由)
channel.basic_publish(
exchange='doc_headers',
routing_key='', # Headers 交换机忽略 routing_key
body=b'PDF document content',
properties=pika.BasicProperties(
delivery_mode=2,
headers={'format': 'pdf', 'priority': 'high', 'language': 'zh'}
)
)
print("[x] 文档消息已发送")
connection.close()
Headers 匹配行为
| 消息 Headers | x-match=all, format=pdf | x-match=any, priority=high, format=pdf |
|---|---|---|
{format: pdf} | ✅ 匹配 | ✅ 匹配 |
{format: pdf, lang: zh} | ✅ 匹配 | ✅ 匹配 |
{format: docx} | ❌ 不匹配 | ❌ 不匹配 |
{priority: high} | ❌ 不匹配 | ✅ 匹配 |
{priority: high, format: docx} | ❌ 不匹配 | ✅ 匹配 |
4.7 Dead Letter Exchange(死信交换机)
死信(Dead Letter)是无法被正常消费的消息。死信交换机负责接收这些消息并路由到死信队列。
消息成为死信的条件
| 条件 | 说明 |
|---|---|
| 消费者拒绝消息 | basic.reject 或 basic.nack 且 requeue=false |
| 消息 TTL 过期 | 消息或队列设置的 TTL 到期 |
| 队列已满 | 超出 x-max-length 或 x-max-length-bytes |
| 消息被拒绝且无重新入队 | ACK 模式下的 NACK |
配置死信交换机
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 1. 声明死信交换机和死信队列
channel.exchange_declare(exchange='dlx_exchange', exchange_type='direct', durable=True)
channel.queue_declare(queue='dead_letter_queue', durable=True)
channel.queue_bind(exchange='dlx_exchange', queue='dead_letter_queue', routing_key='dead')
# 2. 声明业务队列(配置死信交换机)
channel.queue_declare(
queue='business_queue',
durable=True,
arguments={
'x-dead-letter-exchange': 'dlx_exchange',
'x-dead-letter-routing-key': 'dead',
'x-message-ttl': 30000, # 消息 30 秒未消费成为死信
'x-max-length': 1000 # 超出 1000 条成为死信
}
)
channel.queue_bind(exchange='business_exchange', queue='business_queue', routing_key='task')
死信处理模式
# 死信队列消费者 - 记录/告警/重试
def dead_letter_handler(ch, method, properties, body):
print(f"[DLX] 收到死信: {body.decode()}")
print(f" 原交换机: {properties.headers.get('x-first-death-exchange', 'N/A')}")
print(f" 原队列: {properties.headers.get('x-first-death-queue', 'N/A')}")
print(f" 死因: {properties.headers.get('x-first-death-reason', 'N/A')}")
# 记录到数据库 / 发送告警 / 尝试重试
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_consume(queue='dead_letter_queue', on_message_callback=dead_letter_handler)
死信消息的 Headers
| Header | 说明 |
|---|---|
x-first-death-reason | 第一次成为死信的原因 |
x-first-death-queue | 第一次成为死信的队列名 |
x-first-death-exchange | 第一次成为死信的交换机名 |
x-death | 死信历史记录(数组) |
4.8 Delayed Message Exchange(延迟交换机)
延迟交换机通过 rabbitmq_delayed_message_exchange 插件实现消息的定时投递。
安装插件
# 下载插件(版本需匹配 RabbitMQ 版本)
wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.13.0/rabbitmq_delayed_message_exchange-3.13.0.ez
cp rabbitmq_delayed_message_exchange-3.13.0.ez /opt/rabbitmq/plugins/
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
使用延迟交换机
import pika
import json
import time
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 声明延迟交换机(type=x-delayed-message)
args = {'x-delayed-type': 'direct'} # 内部路由类型
channel.exchange_declare(
exchange='delayed_exchange',
exchange_type='x-delayed-message',
durable=True,
arguments=args
)
channel.queue_declare(queue='delayed_queue', durable=True)
channel.queue_bind(exchange='delayed_exchange', queue='delayed_queue', routing_key='delay')
# 发送延迟消息(延迟 5 秒)
channel.basic_publish(
exchange='delayed_exchange',
routing_key='delay',
body=json.dumps({
'task': 'send_reminder',
'user_id': 'u_1001',
'message': '您的订单即将超时,请尽快支付'
}),
properties=pika.BasicProperties(
delivery_mode=2,
headers={'x-delay': 5000} # 延迟 5000 毫秒
)
)
print(f"[x] 延迟消息已发送,将在 5 秒后投递")
connection.close()
延迟交换机适用场景
📌 业务场景:
| 场景 | 延迟时间 | 说明 |
|---|---|---|
| 订单超时取消 | 30 分钟 | 下单后 30 分钟未支付自动取消 |
| 消息重试 | 指数退避 | 失败后延迟重试 |
| 定时提醒 | 自定义 | 预约提醒、到期提醒 |
| 延迟关闭连接 | 5 分钟 | 用户离开后延迟处理 |
4.9 交换机高级特性
Alternate Exchange(备用交换机)
当消息无法路由到任何队列时,转发到备用交换机。
# 声明备用交换机
channel.exchange_declare(exchange='fallback_exchange', exchange_type='fanout', durable=True)
channel.queue_declare(queue='unroutable_queue', durable=True)
channel.queue_bind(exchange='fallback_exchange', queue='unroutable_queue')
# 声明主交换机,指定备用交换机
channel.exchange_declare(
exchange='main_exchange',
exchange_type='direct',
durable=True,
arguments={'alternate-exchange': 'fallback_exchange'}
)
Internal Exchange(内部交换机)
内部交换机不能被生产者直接发布消息,只能通过其他交换机绑定转发。
channel.exchange_declare(
exchange='internal_router',
exchange_type='topic',
durable=True,
internal=True # 标记为内部交换机
)
4.10 交换机选择决策表
| 需求 | 推荐交换机类型 | 理由 |
|---|---|---|
| 简单点对点 | Default / Direct | 最简单直接 |
| 任务分发到不同工作队列 | Direct | 精确路由 |
| 事件广播给所有订阅者 | Fanout | 零配置广播 |
| 按规则分发日志/事件 | Topic | 灵活的通配符匹配 |
| 基于消息属性的复杂路由 | Headers | 支持多条件匹配 |
| 消息延迟投递 | Delayed (插件) | 原生延迟支持 |
| 处理无法路由的消息 | Alternate Exchange | 兜底路由 |
| 处理无法消费的消息 | Dead Letter | 异常消息处理 |
4.11 注意事项
⚠️ 不要使用匿名交换机(空字符串)进行生产发布
匿名交换机仅适合简单场景,生产环境应使用具名交换机以便于监控和管理。
⚠️ Fanout 交换机会忽略 Routing Key
即使设置了 Routing Key,Fanout 也会忽略它。
⚠️ Topic 匹配性能
# 通配符在路由键很长时可能导致性能下降,避免过度使用。
⚠️ 延迟交换机不保证精确延迟
延迟消息的实际投递时间可能略有偏差,不适合对精度要求极高的场景。
⚠️ 死信循环
如果死信交换机指向的队列又配置了同一个死信交换机,会形成死信循环。需确保死信队列不配置死信转发。
🔥 最佳实践: 为每个业务域使用独立的交换机,配合有意义的命名约定(如 order.events、payment.commands)。
4.12 扩展阅读
下一章: 第 5 章:队列详解 — 深入理解队列类型、持久化策略、仲裁队列等核心概念。