RabbitMQ 消息队列完全教程 / 第 13 章:消息模式
第 13 章:消息模式
本章汇总 RabbitMQ 中最经典和实用的消息设计模式,帮助你针对不同业务场景选择最合适的模式。
13.1 模式总览
| 模式 | 核心思想 | 交换机类型 | 典型场景 |
|---|---|---|---|
| Work Queue | 竞争消费,任务分发 | Default/Direct | 异步任务处理 |
| Publish/Subscribe | 广播给所有消费者 | Fanout | 事件通知 |
| Routing | 按规则路由到特定队列 | Direct | 日志分级 |
| Topics | 模式匹配路由 | Topic | 多维度分发 |
| RPC | 请求-响应模式 | Default | 远程调用 |
| Priority Queue | 高优先级消息优先消费 | Direct | VIP 处理 |
| Delayed Message | 延迟投递消息 | Delayed | 超时处理 |
| Competing Consumers | 多消费者竞争消费 | Direct | 水平扩展 |
| Claim Check | 大消息外置存储 | Direct | 大文件处理 |
13.2 Work Queue(工作队列)
多个消费者竞争消费同一个队列的消息,实现负载均衡。
架构
Producer ──> [Queue] ──> Consumer 1
├──> Consumer 2
└──> Consumer 3
Python 实现
# Producer
import pika, json, time
conn = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
ch = conn.channel()
ch.queue_declare(queue='work_queue', durable=True)
for i in range(20):
task = {'id': i, 'data': f'task_{i}', 'complexity': i % 3}
ch.basic_publish(
exchange='', routing_key='work_queue',
body=json.dumps(task),
properties=pika.BasicProperties(delivery_mode=2)
)
print(f"[x] 发送任务 {i}")
conn.close()
# Consumer
import pika, json, time
conn = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
ch = conn.channel()
ch.queue_declare(queue='work_queue', durable=True)
ch.basic_qos(prefetch_count=1)
def callback(ch, method, properties, body):
task = json.loads(body)
print(f"[x] 处理任务 {task['id']}")
time.sleep(task['complexity'] + 1) # 模拟耗时
print(f"[✓] 任务 {task['id']} 完成")
ch.basic_ack(delivery_tag=method.delivery_tag)
ch.basic_consume(queue='work_queue', on_message_callback=callback)
ch.start_consuming()
📌 业务场景: 后台任务处理(报表生成、数据导出、图片处理)
13.3 Publish/Subscribe(发布订阅)
生产者发布的消息被所有订阅者同时接收。
架构
┌──> Queue A ──> Consumer A
Producer ──> Fanout ──┼──> Queue B ──> Consumer B
Exchange └──> Queue C ──> Consumer C
实现
# Producer - 事件发布者
import pika, json
conn = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
ch = conn.channel()
ch.exchange_declare(exchange='events', exchange_type='fanout', durable=True)
event = {'type': 'user_registered', 'user_id': 'u001', 'email': '[email protected]'}
ch.basic_publish(
exchange='events',
routing_key='',
body=json.dumps(event),
properties=pika.BasicProperties(delivery_mode=2)
)
print("[x] 事件已发布")
conn.close()
# Consumer - 事件订阅者(每个服务一个实例)
import pika, json
conn = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
ch = conn.channel()
ch.exchange_declare(exchange='events', exchange_type='fanout', durable=True)
# 使用排他临时队列
result = ch.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
ch.queue_bind(exchange='events', queue=queue_name)
def callback(ch, method, properties, body):
event = json.loads(body)
print(f"[x] 收到事件: {event['type']}")
# 处理事件...
ch.basic_ack(delivery_tag=method.delivery_tag)
ch.basic_consume(queue=queue_name, on_message_callback=callback)
ch.start_consuming()
📌 业务场景: 用户注册后同时通知邮件服务、积分服务、统计服务
13.4 Routing(路由模式)
根据路由键将消息分发到不同的队列。
实现
# Producer
import pika, json
conn = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
ch = conn.channel()
ch.exchange_declare(exchange='logs_direct', exchange_type='direct', durable=True)
# 发送不同级别的日志
for level, message in [
('error', '数据库连接失败'),
('info', '用户登录成功'),
('warning', '磁盘空间不足'),
('error', '支付网关超时')
]:
ch.basic_publish(
exchange='logs_direct',
routing_key=level,
body=json.dumps({'level': level, 'message': message}),
properties=pika.BasicProperties(delivery_mode=2)
)
conn.close()
# Consumer - 仅接收 error 日志
import pika
conn = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
ch = conn.channel()
ch.exchange_declare(exchange='logs_direct', exchange_type='direct', durable=True)
ch.queue_declare(queue='error_logs', durable=True)
ch.queue_bind(exchange='logs_direct', queue='error_logs', routing_key='error')
def callback(ch, method, properties, body):
print(f"[ERROR] {body.decode()}")
ch.basic_ack(delivery_tag=method.delivery_tag)
ch.basic_consume(queue='error_logs', on_message_callback=callback)
ch.start_consuming()
📌 业务场景: 日志分级处理、告警分发
13.5 Topics(主题模式)
使用通配符进行灵活的消息路由。
实现
# Producer
import pika, json
conn = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
ch = conn.channel()
ch.exchange_declare(exchange='sensor_data', exchange_type='topic', durable=True)
# 发送传感器数据
sensors = [
('sensor.temperature.room1', {'temp': 25.5, 'room': 'room1'}),
('sensor.humidity.room1', {'humidity': 60, 'room': 'room1'}),
('sensor.temperature.room2', {'temp': 22.3, 'room': 'room2'}),
('sensor.alert.temperature', {'alert': 'high', 'value': 38.2}),
]
for routing_key, data in sensors:
ch.basic_publish(
exchange='sensor_data',
routing_key=routing_key,
body=json.dumps(data),
properties=pika.BasicProperties(delivery_mode=2)
)
conn.close()
# Consumer - 接收所有温度数据
import pika
conn = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
ch = conn.channel()
ch.exchange_declare(exchange='sensor_data', exchange_type='topic', durable=True)
ch.queue_declare(queue='temperature_data', durable=True)
ch.queue_bind(exchange='sensor_data', queue='temperature_data', routing_key='sensor.temperature.*')
ch.queue_declare(queue='alerts', durable=True)
ch.queue_bind(exchange='sensor_data', queue='alerts', routing_key='sensor.alert.#')
📌 业务场景: IoT 传感器数据分发、多维度事件路由
13.6 RPC(远程过程调用)
通过消息队列实现同步请求-响应模式。
架构
Client Server
│ │
│──1. 发送请求 (reply_to, corr_id)──>│
│ │──3. 处理请求
│<──4. 发送响应 (corr_id)──────────│
│ │
│──2. 监听回复队列 │
Server 实现
# RPC Server
import pika, json
conn = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
ch = conn.channel()
ch.queue_declare(queue='rpc_queue', durable=True)
ch.basic_qos(prefetch_count=1)
def fibonacci(n):
if n <= 1:
return n
return fibonacci(n-1) + fibonacci(n-2)
def on_request(ch, method, props, body):
request = json.loads(body)
n = request['n']
print(f"[x] 计算 fibonacci({n})")
result = fibonacci(n)
response = {'n': n, 'result': result}
ch.basic_publish(
exchange='',
routing_key=props.reply_to,
properties=pika.BasicProperties(
correlation_id=props.correlation_id,
content_type='application/json'
),
body=json.dumps(response)
)
ch.basic_ack(delivery_tag=method.delivery_tag)
print(f"[✓] fibonacci({n}) = {result}")
ch.basic_consume(queue='rpc_queue', on_message_callback=on_request)
print("[*] RPC Server 等待请求...")
ch.start_consuming()
Client 实现
# RPC Client
import pika, json, uuid
class RpcClient:
def __init__(self):
self.conn = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
self.ch = self.conn.channel()
# 声明回调队列
result = self.ch.queue_declare(queue='', exclusive=True)
self.callback_queue = result.method.queue
self.ch.basic_consume(
queue=self.callback_queue,
on_message_callback=self._on_response,
auto_ack=True
)
self.response = None
self.corr_id = None
def _on_response(self, ch, method, props, body):
if self.corr_id == props.correlation_id:
self.response = json.loads(body)
def call(self, n):
self.response = None
self.corr_id = str(uuid.uuid4())
self.ch.basic_publish(
exchange='',
routing_key='rpc_queue',
properties=pika.BasicProperties(
reply_to=self.callback_queue,
correlation_id=self.corr_id,
content_type='application/json'
),
body=json.dumps({'n': n})
)
while self.response is None:
self.conn.process_data_events()
return self.response
# 使用
client = RpcClient()
for n in [10, 20, 30]:
response = client.call(n)
print(f"[✓] fibonacci({n}) = {response['result']}")
📌 业务场景: 需要同步结果的远程计算、查询
13.7 Delayed Message(延迟消息模式)
延迟消息 + DLX 实现
import pika, json
conn = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
ch = conn.channel()
# 业务交换机和队列
ch.exchange_declare(exchange='order_events', exchange_type='direct', durable=True)
ch.queue_declare(queue='order_timeout', durable=True,
arguments={
'x-dead-letter-exchange': 'order_events',
'x-dead-letter-routing-key': 'timeout_process'
})
ch.queue_bind(exchange='order_events', queue='order_timeout', routing_key='timeout_process')
# 延迟队列(消息在 TTL 过期后进入 DLX)
ch.queue_declare(queue='delay_30min', durable=True,
arguments={
'x-message-ttl': 1800000, # 30 分钟
'x-dead-letter-exchange': 'order_events',
'x-dead-letter-routing-key': 'timeout_process',
'x-max-length': 100000
})
# 创建订单时发送延迟消息
order = {'order_id': 'ORD001', 'user_id': 'U1001', 'status': 'pending'}
ch.basic_publish(
exchange='', routing_key='delay_30min',
body=json.dumps(order),
properties=pika.BasicProperties(delivery_mode=2)
)
print("[x] 订单超时消息已入队,30 分钟后处理")
conn.close()
📌 业务场景: 订单超时取消、定时提醒、延迟重试
13.8 Claim Check(消息存证模式)
大消息不直接发送,而是将内容存到外部存储,消息中只传递引用。
实现
import pika, json, hashlib, redis
redis_client = redis.Redis(host='localhost', port=6379, db=0)
def send_claim_check(channel, exchange, routing_key, large_data):
"""发送 Claim Check 消息"""
# 1. 将大数据存入外部存储
check_id = hashlib.sha256(large_data.encode()).hexdigest()[:16]
redis_client.setex(f'claim:{check_id}', 3600, large_data) # 1 小时过期
# 2. 发送引用消息
claim_message = {
'check_id': check_id,
'content_type': 'application/json',
'size': len(large_data)
}
channel.basic_publish(
exchange=exchange,
routing_key=routing_key,
body=json.dumps(claim_message),
properties=pika.BasicProperties(delivery_mode=2)
)
def receive_claim_check(body):
"""接收并还原 Claim Check 消息"""
claim = json.loads(body)
check_id = claim['check_id']
# 从外部存储获取原始数据
original_data = redis_client.get(f'claim:{check_id}')
if original_data:
return json.loads(original_data)
else:
raise Exception(f"Claim data expired or not found: {check_id}")
📌 业务场景: 大文件传输、日志批量处理
13.9 模式选择决策
需要同步响应?
├── 是 ──> RPC 模式
└── 否 ──> 消息需要广播?
├── 是 ──> Publish/Subscribe (Fanout)
└── 否 ──> 需要延迟?
├── 是 ──> Delayed Message
└── 否 ──> 需要优先级?
├── 是 ──> Priority Queue
└── 否 ──> 需要复杂路由?
├── 是 ──> Topic / Headers
└── 否 ──> Work Queue (Direct)
13.10 注意事项
⚠️ RPC 模式的超时处理
客户端需要设置超时机制,避免永久等待响应。
⚠️ Fanout 不过滤消息
所有绑定队列都会收到完整消息,注意消息大小对网络的影响。
⚠️ 延迟消息不保证精确
TTL 到期后的投递可能有延迟,不适合精确定时场景。
⚠️ Claim Check 的存储管理
外部存储中的引用数据需要定期清理,防止数据泄漏。
🔥 最佳实践: 优先选择异步模式(Work Queue / Pub-Sub),仅在必要时使用 RPC。
13.11 扩展阅读
下一章: 第 14 章:Docker 与 Kubernetes — 生产级容器化部署方案。