Redis 传输协议精讲 / 12 - 最佳实践
最佳实践
12.1 驱动开发指南
驱动架构
┌─────────────────────────────────────────────────────┐
│ Application │
├─────────────────────────────────────────────────────┤
│ High-Level API │
│ client.get("key") client.set("key", "value") │
├─────────────────────────────────────────────────────┤
│ Command Layer │
│ ["GET", "key"] ["SET", "key", "value"] │
├─────────────────────────────────────────────────────┤
│ Protocol Layer (RESP) │
│ *2\r\n$3\r\nGET\r\n$3\r\nkey\r\n │
├─────────────────────────────────────────────────────┤
│ Connection Layer │
│ TCP Socket / TLS / Unix Domain Socket │
├─────────────────────────────────────────────────────┤
│ Connection Pool │
│ Pool of persistent connections │
└─────────────────────────────────────────────────────┘
最小驱动实现
"""
Redis 驱动最小实现
覆盖:连接、命令发送、响应解析、Pipeline
"""
import socket
from typing import Any, List, Optional
class RedisError(Exception):
pass
class Connection:
"""RESP 协议连接"""
def __init__(self, host="127.0.0.1", port=6379, socket_timeout=5.0):
self.host = host
self.port = port
self.socket_timeout = socket_timeout
self._sock: Optional[socket.socket] = None
self._buffer = b""
def connect(self):
"""建立连接"""
self._sock = socket.create_connection(
(self.host, self.port),
timeout=self.socket_timeout
)
self._sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
@property
def sock(self) -> socket.socket:
if self._sock is None:
self.connect()
return self._sock
def send_command(self, *args) -> None:
"""发送 RESP 命令"""
parts = [f"*{len(args)}\r\n".encode()]
for arg in args:
if isinstance(arg, str):
arg = arg.encode("utf-8")
parts.append(f"${len(arg)}\r\n".encode())
parts.append(arg)
parts.append(b"\r\n")
self.sock.sendall(b"".join(parts))
def read_response(self) -> Any:
"""读取 RESP 响应"""
line = self._read_line()
prefix = chr(line[0])
if prefix == "+":
return line[1:].decode("utf-8")
elif prefix == "-":
raise RedisError(line[1:].decode("utf-8"))
elif prefix == ":":
return int(line[1:])
elif prefix == "$":
length = int(line[1:])
if length == -1:
return None
data = self._read_bytes(length + 2)
return data[:length]
elif prefix == "*":
count = int(line[1:])
if count == -1:
return None
return [self.read_response() for _ in range(count)]
else:
raise ValueError(f"Unknown type: {prefix}")
def _read_line(self) -> bytes:
while b"\r\n" not in self._buffer:
chunk = self.sock.recv(65536)
if not chunk:
raise ConnectionError("Connection closed")
self._buffer += chunk
pos = self._buffer.index(b"\r\n")
line = self._buffer[:pos]
self._buffer = self._buffer[pos + 2:]
return line
def _read_bytes(self, n: int) -> bytes:
while len(self._buffer) < n:
chunk = self.sock.recv(65536)
if not chunk:
raise ConnectionError("Connection closed")
self._buffer += chunk
result = self._buffer[:n]
self._buffer = self._buffer[n:]
return result
def close(self):
if self._sock:
self._sock.close()
self._sock = None
class Pipeline:
"""Pipeline 批量执行"""
def __init__(self, connection: Connection):
self.connection = connection
self.commands: List[tuple] = []
self.response_count = 0
def execute_command(self, *args) -> "Pipeline":
self.commands.append(args)
self.response_count += 1
return self # 支持链式调用
def execute(self) -> list:
"""发送所有命令并收集响应"""
# 发送所有命令
buf = b""
for args in self.commands:
parts = [f"*{len(args)}\r\n".encode()]
for arg in args:
if isinstance(arg, str):
arg = arg.encode("utf-8")
parts.append(f"${len(arg)}\r\n".encode())
parts.append(arg)
parts.append(b"\r\n")
buf += b"".join(parts)
self.connection.sock.sendall(buf)
# 收集响应
results = []
for _ in range(self.response_count):
results.append(self.connection.read_response())
self.commands.clear()
self.response_count = 0
return results
class Redis:
"""Redis 客户端"""
def __init__(self, host="127.0.0.1", port=6379):
self.connection = Connection(host, port)
def execute_command(self, *args) -> Any:
self.connection.send_command(*args)
return self.connection.read_response()
# 高级 API
def get(self, key: str) -> Optional[bytes]:
return self.execute_command("GET", key)
def set(self, key: str, value: str, ex: int = None) -> str:
args = ["SET", key, value]
if ex:
args.extend(["EX", str(ex)])
return self.execute_command(*args)
def delete(self, *keys: str) -> int:
return self.execute_command("DEL", *keys)
def pipeline(self) -> Pipeline:
return Pipeline(self.connection)
def ping(self) -> bool:
return self.execute_command("PING") == "PONG"
def close(self):
self.connection.close()
测试
# 测试基本操作
r = Redis()
assert r.ping() == True
r.set("test", "hello")
assert r.get("test") == b"hello"
r.delete("test")
# 测试 Pipeline
pipe = r.pipeline()
for i in range(1000):
pipe.execute_command("SET", f"key:{i}", f"value:{i}")
results = pipe.execute()
assert len(results) == 1000
r.close()
12.2 连接管理
连接生命周期
创建 → 认证 → 选择数据库 → 使用 → 健康检查 → 关闭
连接池最佳实践
import redis
# 推荐配置
pool = redis.ConnectionPool(
host="127.0.0.1",
port=6379,
# 连接池大小
max_connections=50,
# 超时设置
socket_timeout=2.0,
socket_connect_timeout=1.0,
# 重试策略
retry_on_timeout=True,
retry=redis.retry.Retry(
retries=3,
backoff=redis.backoff.ExponentialBackoff()
),
# 健康检查
health_check_interval=30,
# TCP 选项
socket_keepalive=True,
socket_keepalive_options={
1: 1, # TCP_KEEPIDLE
2: 3, # TCP_KEEPINTVL
3: 5, # TCP_KEEPCNT
},
# 解码
decode_responses=True,
)
r = redis.Redis(connection_pool=pool)
连接超时配置
| 参数 | 默认值 | 推荐值 | 说明 |
|---|
socket_timeout | None | 2.0s | 读写超时 |
socket_connect_timeout | None | 1.0s | 连接建立超时 |
health_check_interval | 0 | 30s | 健康检查间隔 |
max_connections | 2³¹ | 50-200 | 最大连接数 |
连接复用模式
# 单例模式(推荐用于全局)
class RedisSingleton:
_instance = None
_pool = None
@classmethod
def get_client(cls) -> redis.Redis:
if cls._pool is None:
cls._pool = redis.ConnectionPool(
host="127.0.0.1",
port=6379,
max_connections=50,
decode_responses=True
)
if cls._instance is None:
cls._instance = redis.Redis(connection_pool=cls._pool)
return cls._instance
# 使用
r = RedisSingleton.get_client()
r.set("key", "value")
连接健康检查
import threading
import time
class HealthChecker:
"""连接健康检查器"""
def __init__(self, pool: redis.ConnectionPool, interval=30):
self.pool = pool
self.interval = interval
self._stop_event = threading.Event()
self._thread = None
def start(self):
self._thread = threading.Thread(target=self._check_loop, daemon=True)
self._thread.start()
def stop(self):
self._stop_event.set()
if self._thread:
self._thread.join()
def _check_loop(self):
while not self._stop_event.is_set():
self._check_connections()
self._stop_event.wait(self.interval)
def _check_connections(self):
"""检查连接池中的连接"""
# redis-py 内置了健康检查,这里展示自定义逻辑
try:
r = redis.Redis(connection_pool=self.pool)
if not r.ping():
self._reset_pool()
except redis.ConnectionError:
self._reset_pool()
def _reset_pool(self):
"""重置连接池"""
self.pool.disconnect()
12.3 序列化策略
常见序列化格式
| 格式 | 优点 | 缺点 | 适用场景 |
|---|
| JSON | 人类可读、跨语言 | 体积大、不支持二进制 | 配置、简单对象 |
| MessagePack | 紧凑、快速 | 不可读 | 性能敏感场景 |
| Protobuf | 紧凑、Schema 定义 | 需要预定义 | 微服务通信 |
| Pickle | Python 原生、支持复杂对象 | 不安全、不可跨语言 | 仅限 Python 内部 |
| BSON | JSON 的二进制版本 | 体积偏大 | MongoDB 集成 |
JSON 序列化
import json
import redis
class JSONRedis:
"""JSON 序列化的 Redis 客户端"""
def __init__(self, client: redis.Redis):
self.client = client
def set_json(self, key: str, value, **kwargs):
"""存储 JSON 对象"""
return self.client.set(key, json.dumps(value, ensure_ascii=False), **kwargs)
def get_json(self, key: str):
"""读取 JSON 对象"""
data = self.client.get(key)
if data is None:
return None
return json.loads(data)
def hset_json(self, name: str, mapping: dict):
"""Hash 字段使用 JSON 序列化"""
serialized = {k: json.dumps(v, ensure_ascii=False) for k, v in mapping.items()}
return self.client.hset(name, mapping=serialized)
def hget_json(self, name: str, key: str):
"""读取 Hash 字段并反序列化"""
data = self.client.hget(name, key)
if data is None:
return None
return json.loads(data)
# 使用
r = redis.Redis(decode_responses=True)
jr = JSONRedis(r)
jr.set_json("user:1", {"name": "张三", "age": 30, "tags": ["python", "redis"]})
user = jr.get_json("user:1")
print(user) # {'name': '张三', 'age': 30, 'tags': ['python', 'redis']}
MessagePack 序列化
import msgpack
class MsgPackRedis:
def __init__(self, client: redis.Redis):
self.client = client
def set_msgpack(self, key: str, value, **kwargs):
return self.client.set(key, msgpack.packb(value, use_bin_type=True), **kwargs)
def get_msgpack(self, key: str):
data = self.client.get(key)
if data is None:
return None
return msgpack.unpackb(data, raw=False)
序列化大小对比
import json
import msgpack
data = {
"user_id": 12345,
"username": "alice_wonderland",
"email": "[email protected]",
"scores": [95, 87, 92, 88],
"active": True
}
json_size = len(json.dumps(data).encode())
msgpack_size = len(msgpack.packb(data))
print(f"JSON: {json_size} bytes")
print(f"MsgPack: {msgpack_size} bytes")
print(f"压缩比: {json_size / msgpack_size:.2f}x")
12.4 安全最佳实践
认证
# 配置密码(Redis < 6.0)
requirepass your_password
# ACL 用户(Redis 6.0+)
ACL SETUSER alice on >password ~* &* +@all
ACL SETUSER bob on >password ~cache:* &* +@read +@write
# 客户端认证
r = redis.Redis(
host="127.0.0.1",
port=6379,
username="alice",
password="your_password"
)
TLS 加密
import ssl
r = redis.Redis(
host="127.0.0.1",
port=6380,
ssl=True,
ssl_certfile="/path/to/cert.pem",
ssl_keyfile="/path/to/key.pem",
ssl_ca_certs="/path/to/ca.pem",
ssl_cert_reqs=ssl.CERT_REQUIRED
)
网络安全
# 绑定内网地址
bind 127.0.0.1 10.0.0.1
# 禁用危险命令
rename-command FLUSHDB ""
rename-command FLUSHALL ""
rename-command CONFIG ""
rename-command DEBUG ""
rename-command KEYS ""
# 禁用 Lua 脚本的危险操作
# (通过 ACL 限制)
ACL SETUSER restricted on >password ~* &* +@all -@admin -SCRIPT
ACL 详细配置
# 创建只读用户
ACL SETUSER readonly on >password ~* &* +@read -@write -@admin -@dangerous
# 创建缓存专用用户
ACL SETUSER cache_user on >password ~cache:* &* +get +set +del +expire +ttl
# 创建管理用户
ACL SETUSER admin on >strongpassword ~* &* +@all
# 查看用户权限
ACL GETUSER readonly
输入验证
def sanitize_key(key: str) -> str:
"""验证和清理 key"""
if not key:
raise ValueError("Key cannot be empty")
if len(key) > 512:
raise ValueError("Key too long (max 512 bytes)")
# 可选:限制字符集
if any(c in key for c in ['\r', '\n', '\x00']):
raise ValueError("Key contains invalid characters")
return key
12.5 性能优化清单
命令层面
| 优化 | 说明 |
|---|
| ✅ 使用 MSET/MGET | 批量操作减少 RTT |
| ✅ 使用 Pipeline | 批量发送命令 |
| ✅ 使用 Lua 脚本 | 服务端原子操作 |
| ✅ 使用 Hash Tag | 集群中相关 key 同槽 |
| ❌ 避免 KEYS * | 使用 SCAN 替代 |
| ❌ 避免大 Hash/Set | 考虑分片 |
| ❌ 避免长时间脚本 | 限制执行时间 |
连接层面
# ✅ 正确:使用连接池
pool = redis.ConnectionPool(max_connections=50)
r = redis.Redis(connection_pool=pool)
# ❌ 错误:每次创建新连接
r = redis.Redis(host="127.0.0.1", port=6379)
数据结构选择
| 场景 | 推荐数据结构 | 不推荐 |
|---|
| 缓存 | String / Hash | - |
| 对象存储(多字段) | Hash | String (JSON) |
| 计数器 | String (INCR) | - |
| 队列 | List (LPUSH/RPOP) | - |
| 去重 | Set / Bloom Filter | - |
| 排行榜 | Sorted Set | - |
| 限流 | Sorted Set + Lua | String (计数) |
| 分布式锁 | String (SET NX EX) | - |
| 时间序列 | Sorted Set / RedisTimeSeries | List |
12.6 监控与告警
关键指标
# 获取所有指标
redis-cli INFO all
# 关键指标分类
redis-cli INFO memory # 内存使用
redis-cli INFO stats # 命令统计
redis-cli INFO clients # 连接信息
redis-cli INFO replication # 复制状态
| 指标 | 说明 | 告警阈值 |
|---|
used_memory | 已用内存 | > maxmemory 的 80% |
used_memory_rss | RSS 内存 | > maxmemory 的 1.5x |
connected_clients | 连接数 | > max_clients 的 80% |
instantaneous_ops_per_sec | QPS | 根据容量规划 |
keyspace_hits | 命中数 | 计算命中率 |
keyspace_misses | 未命中数 | 命中率 < 90% |
latest_fork_usec | 最近 fork 耗时 | > 500ms |
rejected_connections | 拒绝连接数 | > 0 |
expired_keys | 过期 key 数 | 监控趋势 |
慢查询日志
# 配置慢查询阈值
CONFIG SET slowlog-log-slower-than 10000 # 10ms
CONFIG SET slowlog-max-len 128
# 查看慢查询
SLOWLOG GET 10
# 输出格式:
# 1) 1) (integer) 1 # ID
# 2) (integer) 1619787123 # 时间戳
# 3) (integer) 15000 # 耗时(微秒)
# 4) 1) "KEYS" # 命令
# 2) "*"
# 5) "127.0.0.1:52345" # 客户端地址
# 6) "" # 客户端名称
12.7 常见问题排查
问题一:连接超时
原因分析:
- 网络不通
- Redis 拒绝连接(max_clients)
- Redis 阻塞(慢命令、fork、AOF fsync)
排查步骤:
1. telnet <host> <port>
2. 检查 max_clients 配置
3. 检查 slowlog
4. 检查 info clients
问题二:内存增长
原因分析:
- Key 未设置过期时间
- 大 Key 存在
- 内存碎片
排查步骤:
1. INFO memory
2. redis-cli --bigkeys
3. MEMORY USAGE <key>
4. CONFIG SET activedefrag yes
问题三:主从延迟
原因分析:
- 网络带宽不足
- 从节点负载过高
- 大 Key 传输
排查步骤:
1. INFO replication
2. 检查 master_repl_offset vs slave_repl_offset
3. 检查 repl_backlog_size
12.8 代码示例汇总
完整的 Redis 工具类
import redis
import json
import hashlib
import time
from typing import Any, Optional, List, Dict
from functools import wraps
class RedisToolkit:
"""生产级 Redis 工具类"""
def __init__(self, host="127.0.0.1", port=6379, db=0, password=None):
self.pool = redis.ConnectionPool(
host=host, port=port, db=db, password=password,
max_connections=50, decode_responses=True,
socket_timeout=2.0, socket_connect_timeout=1.0,
health_check_interval=30, retry_on_timeout=True
)
self.client = redis.Redis(connection_pool=self.pool)
self._scripts = {}
# === 基本操作 ===
def get(self, key: str) -> Optional[str]:
return self.client.get(key)
def set(self, key: str, value: str, ex: int = None) -> bool:
return self.client.set(key, value, ex=ex)
def delete(self, *keys: str) -> int:
return self.client.delete(*keys)
# === JSON 操作 ===
def set_json(self, key: str, value: Any, ex: int = None) -> bool:
return self.client.set(key, json.dumps(value, ensure_ascii=False), ex=ex)
def get_json(self, key: str) -> Optional[Any]:
data = self.client.get(key)
return json.loads(data) if data else None
# === Pipeline ===
def pipeline(self, transaction=False):
return self.client.pipeline(transaction=transaction)
# === 分布式锁 ===
def acquire_lock(self, lock_name: str, owner: str, ttl: int = 30) -> bool:
return bool(self.client.set(f"lock:{lock_name}", owner, nx=True, ex=ttl))
def release_lock(self, lock_name: str, owner: str) -> bool:
lua = """
if redis.call('GET', KEYS[1]) == ARGV[1] then
return redis.call('DEL', KEYS[1])
end
return 0
"""
return bool(self.client.eval(lua, 1, f"lock:{lock_name}", owner))
# === 限流器 ===
def is_rate_limited(self, key: str, limit: int, window: int) -> bool:
lua = """
local key = KEYS[1]
local limit = tonumber(ARGV[1])
local window = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
redis.call('ZREMRANGEBYSCORE', key, 0, now - window)
local count = redis.call('ZCARD', key)
if count < limit then
redis.call('ZADD', key, now, now .. math.random())
redis.call('EXPIRE', key, window)
return 0
end
return 1
"""
return bool(self.client.eval(lua, 1, key, limit, window, int(time.time())))
# === 缓存装饰器 ===
def cached(self, key_prefix: str, ttl: int = 300):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
cache_key = f"{key_prefix}:{hashlib.md5(str(args).encode()).hexdigest()}"
result = self.get_json(cache_key)
if result is not None:
return result
result = func(*args, **kwargs)
self.set_json(cache_key, result, ex=ttl)
return result
return wrapper
return decorator
# === 健康检查 ===
def health_check(self) -> Dict[str, Any]:
try:
start = time.time()
self.client.ping()
latency = time.time() - start
info = self.client.info("memory")
return {
"status": "healthy",
"latency_ms": round(latency * 1000, 2),
"used_memory_mb": round(info["used_memory"] / 1024 / 1024, 2),
"connected_clients": self.client.info("clients")["connected_clients"],
}
except Exception as e:
return {"status": "unhealthy", "error": str(e)}
def close(self):
self.pool.disconnect()
# 使用示例
toolkit = RedisToolkit()
# 基本操作
toolkit.set("greeting", "hello", ex=60)
print(toolkit.get("greeting"))
# JSON 操作
toolkit.set_json("user:1", {"name": "张三", "age": 30})
user = toolkit.get_json("user:1")
# 分布式锁
if toolkit.acquire_lock("mylock", "process-1", ttl=30):
try:
# 执行受保护的操作
pass
finally:
toolkit.release_lock("mylock", "process-1")
# 限流
if not toolkit.is_rate_limited("api:user:123", limit=100, window=60):
# 允许请求
pass
# 缓存装饰器
@toolkit.cached("user_info", ttl=300)
def get_user_info(user_id):
# 耗时操作
return {"id": user_id, "name": "Alice"}
# 健康检查
health = toolkit.health_check()
print(health)
toolkit.close()
12.9 注意事项
⚠️ 不要在循环中创建连接
始终使用连接池复用连接。
⚠️ 注意序列化开销
JSON 序列化可能成为瓶颈。对于高频操作,考虑 MessagePack 或直接存储二进制。
⚠️ 设置合理的超时
没有超时的连接可能永远阻塞。始终设置 socket_timeout。
⚠️ 监控连接池状态
定期检查连接池使用率,避免连接耗尽。
⚠️ 处理连接断开
实现重连逻辑和重试策略,应对网络波动。
12.10 扩展阅读
上一章:代理实现 | 返回:目录