Redis 完全指南 / 08 - 发布订阅
发布订阅
8.1 Pub/Sub 概述
Redis 的 Pub/Sub(发布/订阅)是一种消息通信模式:发送者(Publisher)发送消息,订阅者(Subscriber)接收消息。Redis 充当消息代理(Broker),将消息路由到所有订阅者。
Pub/Sub 架构
Publisher 1 ──PUBLISH──→ ┌──────────┐ ──推送──→ Subscriber A
Publisher 2 ──PUBLISH──→ │ Redis │ ──推送──→ Subscriber B
Publisher 3 ──PUBLISH──→ │ (Broker)│ ──推送──→ Subscriber C
└──────────┘
Pub/Sub 特性
| 特性 | 说明 |
|---|---|
| 实时推送 | 消息发布后立即推送给所有订阅者 |
| 不持久化 | 消息不会存储,订阅者断开连接后丢失 |
| 不确认机制 | 没有 ACK,不保证送达 |
| 无消费者组 | 每个订阅者都收到全量消息 |
| Fire-and-Forget | 发后即忘,适合实时通知 |
8.2 基本 Pub/Sub 命令
发布消息
# 发布消息到频道
PUBLISH channel:news "Breaking: Redis 8.0 Released!"
# (integer) 3 ← 返回收到消息的订阅者数量
PUBLISH channel:sports "Team A won the match"
# (integer) 2
订阅频道
# 订阅频道(进入订阅模式,阻塞等待消息)
SUBSCRIBE channel:news channel:sports
# 1) "subscribe"
# 2) "channel:news"
# 3) (integer) 1 ← 当前订阅的频道数量
# 进入订阅模式后,只能使用以下命令:
# SUBSCRIBE, UNSUBSCRIBE, PING, QUIT
# 收到的消息格式:
# 1) "message" ← 消息类型
# 2) "channel:news" ← 频道名
# 3) "Breaking news!" ← 消息内容
# 退订频道
UNSUBSCRIBE channel:news
UNSUBSCRIBE # 退订所有频道
模式订阅(Pattern Subscribe)
# 使用通配符订阅匹配的频道
PSUBSCRIBE channel:* # 订阅所有 channel: 开头的频道
PSUBSCRIBE news:* sports:* # 订阅多个模式
PSUBSCRIBE * # 订阅所有频道
# 消息格式(比 SUBSCRIBE 多一个字段):
# 1) "pmessage" ← 消息类型
# 2) "channel:*" ← 匹配的模式
# 3) "channel:news" ← 实际频道名
# 4) "Hello World" ← 消息内容
# 退订模式
PUNSUBSCRIBE channel:*
PUNSUBSCRIBE # 退订所有模式
查看订阅信息
# 查看频道的订阅者数量
PUBSUB CHANNELS # 列出所有有订阅者的频道
PUBSUB CHANNELS channel:* # 匹配模式
# 查看频道的订阅者数量
PUBSUB NUMSUB channel:news channel:sports
# 1) "channel:news"
# 2) (integer) 3
# 3) "channel:sports"
# 4) (integer) 2
# 查看模式订阅的数量
PUBSUB NUMPAT # 当前客户端的模式订阅数量
8.3 多客户端演示
使用 redis-cli 演示
终端 1(订阅者 A):
redis-cli
> SUBSCRIBE channel:news
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "channel:news"
3) (integer) 1
终端 2(订阅者 B):
redis-cli
> SUBSCRIBE channel:news channel:sports
终端 3(发布者):
redis-cli
> PUBLISH channel:news "Hello subscribers!"
(integer) 2
> PUBLISH channel:sports "Goal!"
(integer) 1
终端 1 和 2 都会收到 “Hello subscribers!",终端 2 还会收到 “Goal!"。
Python 客户端实现
import redis
import threading
import time
def subscriber(name, channels):
"""订阅者线程"""
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
pubsub = r.pubsub()
# 订阅频道
pubsub.subscribe(*channels)
print(f"[{name}] Subscribed to {channels}")
# 监听消息
for message in pubsub.listen():
if message['type'] == 'message':
print(f"[{name}] Received: {message['data']} (from {message['channel']})")
elif message['type'] == 'subscribe':
print(f"[{name}] Subscribed to {message['channel']}")
def publisher():
"""发布者"""
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
for i in range(5):
msg = f"Message #{i+1}"
count = r.publish('channel:news', msg)
print(f"[Publisher] Sent: '{msg}' to {count} subscribers")
time.sleep(1)
# 启动订阅者(后台线程)
t1 = threading.Thread(target=subscriber, args=("Sub-A", ["channel:news"]), daemon=True)
t2 = threading.Thread(target=subscriber, args=("Sub-B", ["channel:news", "channel:sports"]), daemon=True)
t1.start()
t2.start()
time.sleep(1) # 等待订阅者就绪
# 启动发布者
publisher()
模式订阅(Pattern)
import redis
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
pubsub = r.pubsub()
# 模式订阅
pubsub.psubscribe('user:*:events')
# 发布消息
r.publish('user:1001:events', '{"action":"login","ip":"192.168.1.1"}')
r.publish('user:1002:events', '{"action":"purchase","amount":99.9}')
for message in pubsub.listen():
if message['type'] == 'pmessage':
print(f"Pattern: {message['pattern']}")
print(f"Channel: {message['channel']}")
print(f"Data: {message['data']}")
8.4 Pub/Sub 局限性
消息丢失场景
| 场景 | 消息是否丢失 |
|---|---|
| 订阅者正常在线 | ✅ 收到 |
| 订阅者断开连接后重连 | ❌ 丢失 |
| 发布时无订阅者 | ❌ 丢失 |
| 订阅者处理速度慢 | ❌ 可能丢失(输出缓冲区满) |
Pub/Sub vs Stream 对比
| 特性 | Pub/Sub | Stream |
|---|---|---|
| 消息持久化 | ❌ | ✅ |
| 消息确认 | ❌ | ✅ XACK |
| 消费者组 | ❌ | ✅ |
| 历史消息回溯 | ❌ | ✅ XRANGE |
| 多消费者 | ✅ 每个都收到 | ✅ 可配置 |
| 适用场景 | 实时通知、配置广播 | 事件溯源、异步任务 |
8.5 使用 Stream 替代 Pub/Sub
Stream 结合了 Pub/Sub 的推送特性和消息队列的可靠性:
# 生产者
XADD events * type "user_login" user_id "1001" ip "192.168.1.1"
# 消费者组(每条消息只被组内一个消费者处理)
XGROUP CREATE events analytics $ MKSTREAM
# 消费者 A 读取
XREADGROUP GROUP analytics consumer-a COUNT 1 BLOCK 5000 STREAMS events >
# 确认消息
XACK events analytics 1715318400000-0
广播模式(Stream + 每个服务一个消费者组)
# 创建多个消费者组,每组相当于 Pub/Sub 的一个订阅者
XGROUP CREATE events notification-svc $ MKSTREAM
XGROUP CREATE events analytics-svc $ MKSTREAM
XGROUP CREATE events audit-svc $ MKSTREAM
# 三个服务各自消费,每条消息被三个服务都处理
XREADGROUP GROUP notification-svc svc-1 COUNT 10 STREAMS events >
XREADGROUP GROUP analytics-svc svc-1 COUNT 10 STREAMS events >
XREADGROUP GROUP audit-svc svc-1 COUNT 10 STREAMS events >
8.6 Pub/Sub 高级用法
通知系统
# 用户关注通知
PUBLISH user:1001:notifications '{"type":"follow","from":2001,"time":"2026-05-10T10:00:00"}'
# 系统公告
PUBLISH system:announcements '{"title":"系统维护","content":"今晚22:00-23:00维护"}'
配置广播
# 配置变更广播
PUBLISH config:update '{"key":"max_upload_size","value":"20971520"}'
# 所有应用实例订阅 config:* 频道
PSUBSCRIBE config:*
实时日志聚合
# 各服务将日志推送到 Redis
PUBLISH logs:app '{"level":"ERROR","service":"order-svc","message":"DB connection timeout"}'
PUBLISH logs:app '{"level":"INFO","service":"user-svc","message":"User 1001 logged in"}'
# 日志聚合器订阅所有日志
SUBSCRIBE logs:app
⚠️ 注意:Pub/Sub 不保证消息送达。如果订阅者不在线,消息会丢失。对于重要消息,使用 Stream 或专业的消息队列(如 RabbitMQ、Kafka)。
📌 业务场景
场景一:实时通知推送
# 用户收到消息时通知
PUBLISH push:user:1001 '{"type":"message","from":"客服","content":"您的订单已发货"}'
# WebSocket 服务订阅
SUBSCRIBE push:user:1001
场景二:分布式事件广播
# 缓存失效广播
PUBLISH cache:invalidate '{"key":"user:1001","reason":"profile_update"}'
# 各节点订阅并清除本地缓存
PSUBSCRIBE cache:*
场景三:实时数据同步
# 数据库变更事件
PUBLISH db:changes '{"table":"orders","action":"update","id":1001,"status":"shipped"}'
# 搜索索引服务、缓存服务各自订阅处理