Redis 传输协议精讲 / 10 - 集群协议
集群协议
10.1 集群概述
Redis Cluster 是 Redis 的分布式方案,通过数据分片(Sharding)实现水平扩展。
核心概念
| 概念 | 说明 |
|---|---|
| 哈希槽(Hash Slot) | 16384 个槽位,每个 key 属于一个槽 |
| 节点(Node) | 每个节点负责一部分槽位 |
| 槽位分配 | 如 0-5460 → Node A, 5461-10922 → Node B, 10923-16383 → Node C |
| 重定向 | 客户端访问错误节点时,收到 MOVED/ASK 重定向 |
槽位计算
HASH_SLOT = CRC16(key) mod 16384
def crc16(s: bytes) -> int:
"""Redis 使用的 CRC16 算法"""
crc = 0
for byte in s:
crc ^= byte << 8
for _ in range(8):
if crc & 0x8000:
crc = (crc << 1) ^ 0x1021
else:
crc <<= 1
crc &= 0xFFFF
return crc
def hash_slot(key: str) -> int:
"""计算 key 的哈希槽"""
# 处理 hash tag {}
start = key.find('{')
if start != -1:
end = key.find('}', start + 1)
if end != -1 and end != start + 1:
key = key[start + 1:end]
return crc16(key.encode()) % 16384
10.2 MOVED 重定向
概述
当客户端向错误的节点发送命令时,节点返回 MOVED 错误,告诉客户端正确的节点地址。
协议格式
-MOVED <slot> <host>:<port>
交互流程
Client Node A (负责 0-5460)
│ │
│──── GET key:7777 ────────────→│ key:7777 在槽 7777
│ │ 槽 7777 不属于 Node A
│←── MOVED 7777 127.0.0.1:6380 ─│
│ │
│──── GET key:7777 ────────────→│ Node B (负责 5461-10922)
│←── $5\r\nhello ──────────────│
客户端处理逻辑
import socket
import re
class ClusterClient:
def __init__(self, startup_nodes):
"""startup_nodes: 初始节点列表 [(host, port), ...]"""
self.nodes = {addr: self._connect(*addr) for addr in startup_nodes}
self.slot_map = {} # slot → (host, port)
self._refresh_slots()
def _connect(self, host, port):
return socket.create_connection((host, port))
def _refresh_slots(self):
"""刷新槽位映射"""
for addr, sock in self.nodes.items():
try:
sock.sendall(b"*2\r\n$7\r\nCLUSTER\r\n$9\r\nSLOTS\r\n\r\n")
resp = self._read_response(sock)
# 解析 CLUSTER SLOTS 响应
# [[0, 5460, ["127.0.0.1", 6379]], ...]
for slot_range in resp:
start_slot = slot_range[0]
end_slot = slot_range[1]
node_info = slot_range[2]
node_addr = (node_info[0], node_info[1])
for slot in range(start_slot, end_slot + 1):
self.slot_map[slot] = node_addr
return
except:
continue
def execute(self, *args):
"""执行命令,处理 MOVED 重定向"""
for attempt in range(3):
# 计算槽位
key = args[1] if len(args) > 1 else None
slot = hash_slot(key) if key else 0
# 获取目标节点
target = self.slot_map.get(slot)
if target is None:
self._refresh_slots()
target = self.slot_map.get(slot)
sock = self.nodes.get(target)
# 发送命令
cmd = encode_command(*args)
sock.sendall(cmd)
# 读取响应
resp = self._read_response(sock)
# 检查 MOVED
if isinstance(resp, Exception) and str(resp).startswith("MOVED"):
parts = str(resp).split()
new_host, new_port = parts[2].split(":")
new_addr = (new_host, int(new_port))
# 更新槽位映射
self.slot_map[slot] = new_addr
# 连接到新节点
if new_addr not in self.nodes:
self.nodes[new_addr] = self._connect(*new_addr)
continue # 重试
return resp
raise RuntimeError("Max retries exceeded")
CLUSTER SLOTS 命令
→ CLUSTER SLOTS
← *3
← *4
← :0 ← 起始槽
← :5460 ← 结束槽
← *2 ← 主节点
← $9
← 127.0.0.1
← :6379
← *2 ← 从节点
← $9
← 127.0.0.1
← :6380
← *4
← :5461
← :10922
← *2
← $9
← 127.0.0.1
← :6381
← *2
← $9
← 127.0.0.1
← :6382
← *4
← :10923
← :16383
← *2
← $9
← 127.0.0.1
← :6383
← *2
← $9
← 127.0.0.1
← :6384
CLUSTER NODES 命令
返回集群拓扑的文本描述:
→ CLUSTER NODES
← $<多行文本>
# 格式:<node-id> <ip:port@cport> <flags> <master-id> <ping-sent> <pong-recv> <config-epoch> <link-state> <slot> <slot> ...
示例输出:
a]1b2c3d4e5f... 127.0.0.1:6379@16379 myself,master - 0 0 1 connected 0-5460
b2c3d4e5f6... 127.0.0.1:6381@16381 master - 0 0 2 connected 5461-10922
c3d4e5f6a7... 127.0.0.1:6383@16383 master - 0 0 3 connected 10923-16383
10.3 ASK 重定向
MOVED vs ASK
| 特性 | MOVED | ASK |
|---|---|---|
| 含义 | 槽位永久归属新节点 | 槽位正在迁移中 |
| 客户端行为 | 更新本地槽位映射 | 仅本次重定向,不更新映射 |
| 后续请求 | 直接发往新节点 | 仍发往旧节点(直到迁移完成) |
槽位迁移流程
1. Node A: CLUSTER SETSLOT 5461 MIGRATING <node-b-id>
2. Node B: CLUSTER SETSLOT 5461 IMPORTING <node-a-id>
3. 逐个迁移槽位中的 key
4. 所有节点: CLUSTER SETSLOT 5461 NODE <node-b-id>
ASK 交互流程
Client Node A (正在迁出槽 5461)
│ │
│──── GET key:5461 ────────────→│ key 已迁移或不存在
│ │
│←── ASK 5461 127.0.0.1:6380 ───│
│ │
│──── ASKING ──────────────────→│ Node B (正在迁入槽 5461)
│←── +OK ───────────────────────│ ASKING 标记允许访问导入中的槽
│ │
│──── GET key:5461 ────────────→│
│←── $5\r\nhello ──────────────│
ASKING 命令
ASKING 命令告诉节点"我知道这个槽正在导入,请让我访问":
*1\r\n$6\r\nASKING\r\n
响应:+OK\r\n
客户端 ASK 处理
def handle_ask(self, slot, new_addr, original_args):
"""处理 ASK 重定向"""
# 1. 连接到新节点
sock = self._get_or_connect(new_addr)
# 2. 发送 ASKING
sock.sendall(b"*1\r\n$6\r\nASKING\r\n")
self._read_response(sock) # +OK
# 3. 重新发送原始命令
cmd = encode_command(*original_args)
sock.sendall(cmd)
resp = self._read_response(sock)
# 注意:不更新 slot_map,因为槽位还在迁移中
return resp
10.4 Gossip 协议
概述
Redis Cluster 使用 Gossip 协议在节点间交换集群状态信息。每个节点定期与其他节点交换消息。
Gossip 消息类型
| 消息 | 说明 |
|---|---|
PING | 节点 A 向节点 B 发送心跳 |
PONG | 对 PING 的响应,也用于广播自身状态 |
MEET | 邀请新节点加入集群 |
FAIL | 广播节点故障 |
消息格式
Gossip 消息通过 Redis 集群总线(Cluster Bus)传输,使用专用端口(数据端口 + 10000):
# 集群总线端口
数据端口: 6379
总线端口: 16379
CLUSTER INFO 命令
→ CLUSTER INFO
← cluster_state:ok
← cluster_slots_assigned:16384
← cluster_slots_ok:16384
← cluster_slots_pfail:0
← cluster_slots_fail:0
← cluster_known_nodes:6
← cluster_size:3
← cluster_current_epoch:6
← cluster_my_epoch:1
← cluster_stats_messages_sent:14839
← cluster_stats_messages_received:14839
| 字段 | 说明 |
|---|---|
cluster_state | ok 或 fail |
cluster_slots_assigned | 已分配的槽位数(应为 16384) |
cluster_slots_pfail | 处于 PFAIL 状态的槽位数 |
cluster_slots_fail | 处于 FAIL 状态的槽位数 |
cluster_known_nodes | 已知节点数 |
cluster_size | 主节点数 |
cluster_current_epoch | 当前纪元号 |
10.5 集群命令
CLUSTER MYID
获取当前节点 ID:
→ CLUSTER MYID
← $40
← abc123...
CLUSTER MEET
邀请节点加入集群:
→ CLUSTER MEET 127.0.0.1 6385
← +OK
CLUSTER ADDSLOTS / DELSLOTS
分配/移除槽位:
→ CLUSTER ADDSLOTS 0 1 2 3 4 5
← +OK
→ CLUSTER DELSLOTS 0 1 2
← +OK
CLUSTER KEYSLOT
查看 key 属于哪个槽:
→ CLUSTER KEYSLOT mykey
← :14687
10.6 集群中的多 key 操作
限制
所有多 key 操作(如 MGET、MSET、事务、Lua 脚本)要求所有 key 在同一个槽:
# ❌ 错误:key 在不同槽
→ MGET user:1 user:2
← -CROSSSLOT Keys in request don't hash to the same slot
Hash Tag 解决方案
使用 {} 指定 hash tag,确保相关 key 映射到同一个槽:
# 使用 hash tag
{user:1000}.name → 槽 X
{user:1000}.email → 槽 X(相同)
{user:1000}.age → 槽 X(相同)
# MGET 可以执行
→ MGET {user:1000}.name {user:1000}.email {user:1000}.age
Hash Tag 策略
def make_keys(user_id, *fields):
"""生成使用 hash tag 的 key 列表"""
return [f"{{{user_id}}}:{field}" for field in fields]
keys = make_keys("user:1000", "name", "email", "age")
# ["{user:1000}:name", "{user:1000}:email", "{user:1000}:age"]
# 所有 key 都在同一个槽
assert all(hash_slot(keys[0]) == hash_slot(k) for k in keys)
10.7 集群中的 Pipeline
Pipeline 在集群模式下需要按槽位分组:
def cluster_pipeline(self, commands):
"""集群模式的 Pipeline"""
# 按槽位分组
slot_groups = {}
for cmd_args in commands:
key = cmd_args[1]
slot = hash_slot(key)
if slot not in slot_groups:
slot_groups[slot] = []
slot_groups[slot].append(cmd_args)
# 每个槽位一组 Pipeline
results = []
for slot, cmds in slot_groups.items():
node = self.slot_map[slot]
sock = self.nodes[node]
# 发送 Pipeline
buf = b""
for cmd_args in cmds:
buf += encode_command(*cmd_args)
sock.sendall(buf)
# 读取响应
for _ in cmds:
resp = self._read_response(sock)
if isinstance(resp, Exception) and str(resp).startswith("MOVED"):
# 需要重新路由
pass
results.append(resp)
return results
10.8 客户端重试策略
指数退避重试
import time
import random
def execute_with_retry(self, *args, max_retries=5):
"""带指数退避的重试策略"""
for attempt in range(max_retries):
try:
return self._execute(*args)
except (MOVED, ASK) as e:
# 重定向,立即重试
continue
except ConnectionError:
# 连接失败,退避重试
if attempt < max_retries - 1:
delay = min(0.1 * (2 ** attempt), 2.0)
delay += random.uniform(0, delay * 0.1) # 抖动
time.sleep(delay)
raise RuntimeError("Max retries exceeded")
10.9 注意事项
⚠️ 最少 3 个主节点 集群至少需要 3 个主节点才能正常工作。奇数个主节点有利于故障时的选举。
⚠️ 槽位迁移期间的可用性 迁移中的 key 可能需要两次重定向(ASK + 操作),这会增加延迟。
⚠️ 集群不支持跨槽事务 事务和 Lua 脚本中的所有 key 必须在同一个槽中。使用 hash tag 确保这一点。
⚠️ 客户端缓存槽位映射 客户端应缓存槽位到节点的映射,避免每次都通过 MOVED 发现。使用
CLUSTER SLOTS定期刷新。
10.10 扩展阅读
| 资源 | 说明 |
|---|---|
| Redis Cluster 教程 | 官方教程 |
| Redis Cluster 规范 | 集群协议规范 |
| CLUSTER SLOTS | 槽位查询命令 |
| Hash Tag | Hash Tag 规范 |