强曰为道
与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

Redis 传输协议精讲 / 12 - 最佳实践

最佳实践

12.1 驱动开发指南

驱动架构

┌─────────────────────────────────────────────────────┐
│                    Application                       │
├─────────────────────────────────────────────────────┤
│                 High-Level API                       │
│  client.get("key")  client.set("key", "value")      │
├─────────────────────────────────────────────────────┤
│                 Command Layer                        │
│  ["GET", "key"]  ["SET", "key", "value"]             │
├─────────────────────────────────────────────────────┤
│                 Protocol Layer (RESP)                 │
│  *2\r\n$3\r\nGET\r\n$3\r\nkey\r\n                   │
├─────────────────────────────────────────────────────┤
│                 Connection Layer                      │
│  TCP Socket / TLS / Unix Domain Socket               │
├─────────────────────────────────────────────────────┤
│                 Connection Pool                       │
│  Pool of persistent connections                      │
└─────────────────────────────────────────────────────┘

最小驱动实现

"""
Redis 驱动最小实现
覆盖:连接、命令发送、响应解析、Pipeline
"""

import socket
from typing import Any, List, Optional

class RedisError(Exception):
    pass

class Connection:
    """RESP 协议连接"""

    def __init__(self, host="127.0.0.1", port=6379, socket_timeout=5.0):
        self.host = host
        self.port = port
        self.socket_timeout = socket_timeout
        self._sock: Optional[socket.socket] = None
        self._buffer = b""

    def connect(self):
        """建立连接"""
        self._sock = socket.create_connection(
            (self.host, self.port),
            timeout=self.socket_timeout
        )
        self._sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)

    @property
    def sock(self) -> socket.socket:
        if self._sock is None:
            self.connect()
        return self._sock

    def send_command(self, *args) -> None:
        """发送 RESP 命令"""
        parts = [f"*{len(args)}\r\n".encode()]
        for arg in args:
            if isinstance(arg, str):
                arg = arg.encode("utf-8")
            parts.append(f"${len(arg)}\r\n".encode())
            parts.append(arg)
            parts.append(b"\r\n")
        self.sock.sendall(b"".join(parts))

    def read_response(self) -> Any:
        """读取 RESP 响应"""
        line = self._read_line()
        prefix = chr(line[0])

        if prefix == "+":
            return line[1:].decode("utf-8")
        elif prefix == "-":
            raise RedisError(line[1:].decode("utf-8"))
        elif prefix == ":":
            return int(line[1:])
        elif prefix == "$":
            length = int(line[1:])
            if length == -1:
                return None
            data = self._read_bytes(length + 2)
            return data[:length]
        elif prefix == "*":
            count = int(line[1:])
            if count == -1:
                return None
            return [self.read_response() for _ in range(count)]
        else:
            raise ValueError(f"Unknown type: {prefix}")

    def _read_line(self) -> bytes:
        while b"\r\n" not in self._buffer:
            chunk = self.sock.recv(65536)
            if not chunk:
                raise ConnectionError("Connection closed")
            self._buffer += chunk
        pos = self._buffer.index(b"\r\n")
        line = self._buffer[:pos]
        self._buffer = self._buffer[pos + 2:]
        return line

    def _read_bytes(self, n: int) -> bytes:
        while len(self._buffer) < n:
            chunk = self.sock.recv(65536)
            if not chunk:
                raise ConnectionError("Connection closed")
            self._buffer += chunk
        result = self._buffer[:n]
        self._buffer = self._buffer[n:]
        return result

    def close(self):
        if self._sock:
            self._sock.close()
            self._sock = None


class Pipeline:
    """Pipeline 批量执行"""

    def __init__(self, connection: Connection):
        self.connection = connection
        self.commands: List[tuple] = []
        self.response_count = 0

    def execute_command(self, *args) -> "Pipeline":
        self.commands.append(args)
        self.response_count += 1
        return self  # 支持链式调用

    def execute(self) -> list:
        """发送所有命令并收集响应"""
        # 发送所有命令
        buf = b""
        for args in self.commands:
            parts = [f"*{len(args)}\r\n".encode()]
            for arg in args:
                if isinstance(arg, str):
                    arg = arg.encode("utf-8")
                parts.append(f"${len(arg)}\r\n".encode())
                parts.append(arg)
                parts.append(b"\r\n")
            buf += b"".join(parts)
        self.connection.sock.sendall(buf)

        # 收集响应
        results = []
        for _ in range(self.response_count):
            results.append(self.connection.read_response())

        self.commands.clear()
        self.response_count = 0
        return results


class Redis:
    """Redis 客户端"""

    def __init__(self, host="127.0.0.1", port=6379):
        self.connection = Connection(host, port)

    def execute_command(self, *args) -> Any:
        self.connection.send_command(*args)
        return self.connection.read_response()

    # 高级 API
    def get(self, key: str) -> Optional[bytes]:
        return self.execute_command("GET", key)

    def set(self, key: str, value: str, ex: int = None) -> str:
        args = ["SET", key, value]
        if ex:
            args.extend(["EX", str(ex)])
        return self.execute_command(*args)

    def delete(self, *keys: str) -> int:
        return self.execute_command("DEL", *keys)

    def pipeline(self) -> Pipeline:
        return Pipeline(self.connection)

    def ping(self) -> bool:
        return self.execute_command("PING") == "PONG"

    def close(self):
        self.connection.close()

测试

# 测试基本操作
r = Redis()
assert r.ping() == True
r.set("test", "hello")
assert r.get("test") == b"hello"
r.delete("test")

# 测试 Pipeline
pipe = r.pipeline()
for i in range(1000):
    pipe.execute_command("SET", f"key:{i}", f"value:{i}")
results = pipe.execute()
assert len(results) == 1000

r.close()

12.2 连接管理

连接生命周期

创建 → 认证 → 选择数据库 → 使用 → 健康检查 → 关闭

连接池最佳实践

import redis

# 推荐配置
pool = redis.ConnectionPool(
    host="127.0.0.1",
    port=6379,
    # 连接池大小
    max_connections=50,
    # 超时设置
    socket_timeout=2.0,
    socket_connect_timeout=1.0,
    # 重试策略
    retry_on_timeout=True,
    retry=redis.retry.Retry(
        retries=3,
        backoff=redis.backoff.ExponentialBackoff()
    ),
    # 健康检查
    health_check_interval=30,
    # TCP 选项
    socket_keepalive=True,
    socket_keepalive_options={
        1: 1,   # TCP_KEEPIDLE
        2: 3,   # TCP_KEEPINTVL
        3: 5,   # TCP_KEEPCNT
    },
    # 解码
    decode_responses=True,
)

r = redis.Redis(connection_pool=pool)

连接超时配置

参数默认值推荐值说明
socket_timeoutNone2.0s读写超时
socket_connect_timeoutNone1.0s连接建立超时
health_check_interval030s健康检查间隔
max_connections2³¹50-200最大连接数

连接复用模式

# 单例模式(推荐用于全局)
class RedisSingleton:
    _instance = None
    _pool = None

    @classmethod
    def get_client(cls) -> redis.Redis:
        if cls._pool is None:
            cls._pool = redis.ConnectionPool(
                host="127.0.0.1",
                port=6379,
                max_connections=50,
                decode_responses=True
            )
        if cls._instance is None:
            cls._instance = redis.Redis(connection_pool=cls._pool)
        return cls._instance

# 使用
r = RedisSingleton.get_client()
r.set("key", "value")

连接健康检查

import threading
import time

class HealthChecker:
    """连接健康检查器"""

    def __init__(self, pool: redis.ConnectionPool, interval=30):
        self.pool = pool
        self.interval = interval
        self._stop_event = threading.Event()
        self._thread = None

    def start(self):
        self._thread = threading.Thread(target=self._check_loop, daemon=True)
        self._thread.start()

    def stop(self):
        self._stop_event.set()
        if self._thread:
            self._thread.join()

    def _check_loop(self):
        while not self._stop_event.is_set():
            self._check_connections()
            self._stop_event.wait(self.interval)

    def _check_connections(self):
        """检查连接池中的连接"""
        # redis-py 内置了健康检查,这里展示自定义逻辑
        try:
            r = redis.Redis(connection_pool=self.pool)
            if not r.ping():
                self._reset_pool()
        except redis.ConnectionError:
            self._reset_pool()

    def _reset_pool(self):
        """重置连接池"""
        self.pool.disconnect()

12.3 序列化策略

常见序列化格式

格式优点缺点适用场景
JSON人类可读、跨语言体积大、不支持二进制配置、简单对象
MessagePack紧凑、快速不可读性能敏感场景
Protobuf紧凑、Schema 定义需要预定义微服务通信
PicklePython 原生、支持复杂对象不安全、不可跨语言仅限 Python 内部
BSONJSON 的二进制版本体积偏大MongoDB 集成

JSON 序列化

import json
import redis

class JSONRedis:
    """JSON 序列化的 Redis 客户端"""

    def __init__(self, client: redis.Redis):
        self.client = client

    def set_json(self, key: str, value, **kwargs):
        """存储 JSON 对象"""
        return self.client.set(key, json.dumps(value, ensure_ascii=False), **kwargs)

    def get_json(self, key: str):
        """读取 JSON 对象"""
        data = self.client.get(key)
        if data is None:
            return None
        return json.loads(data)

    def hset_json(self, name: str, mapping: dict):
        """Hash 字段使用 JSON 序列化"""
        serialized = {k: json.dumps(v, ensure_ascii=False) for k, v in mapping.items()}
        return self.client.hset(name, mapping=serialized)

    def hget_json(self, name: str, key: str):
        """读取 Hash 字段并反序列化"""
        data = self.client.hget(name, key)
        if data is None:
            return None
        return json.loads(data)


# 使用
r = redis.Redis(decode_responses=True)
jr = JSONRedis(r)

jr.set_json("user:1", {"name": "张三", "age": 30, "tags": ["python", "redis"]})
user = jr.get_json("user:1")
print(user)  # {'name': '张三', 'age': 30, 'tags': ['python', 'redis']}

MessagePack 序列化

import msgpack

class MsgPackRedis:
    def __init__(self, client: redis.Redis):
        self.client = client

    def set_msgpack(self, key: str, value, **kwargs):
        return self.client.set(key, msgpack.packb(value, use_bin_type=True), **kwargs)

    def get_msgpack(self, key: str):
        data = self.client.get(key)
        if data is None:
            return None
        return msgpack.unpackb(data, raw=False)

序列化大小对比

import json
import msgpack

data = {
    "user_id": 12345,
    "username": "alice_wonderland",
    "email": "[email protected]",
    "scores": [95, 87, 92, 88],
    "active": True
}

json_size = len(json.dumps(data).encode())
msgpack_size = len(msgpack.packb(data))

print(f"JSON:     {json_size} bytes")
print(f"MsgPack:  {msgpack_size} bytes")
print(f"压缩比:   {json_size / msgpack_size:.2f}x")

12.4 安全最佳实践

认证

# 配置密码(Redis < 6.0)
requirepass your_password

# ACL 用户(Redis 6.0+)
ACL SETUSER alice on >password ~* &* +@all
ACL SETUSER bob on >password ~cache:* &* +@read +@write
# 客户端认证
r = redis.Redis(
    host="127.0.0.1",
    port=6379,
    username="alice",
    password="your_password"
)

TLS 加密

import ssl

r = redis.Redis(
    host="127.0.0.1",
    port=6380,
    ssl=True,
    ssl_certfile="/path/to/cert.pem",
    ssl_keyfile="/path/to/key.pem",
    ssl_ca_certs="/path/to/ca.pem",
    ssl_cert_reqs=ssl.CERT_REQUIRED
)

网络安全

# 绑定内网地址
bind 127.0.0.1 10.0.0.1

# 禁用危险命令
rename-command FLUSHDB ""
rename-command FLUSHALL ""
rename-command CONFIG ""
rename-command DEBUG ""
rename-command KEYS ""

# 禁用 Lua 脚本的危险操作
# (通过 ACL 限制)
ACL SETUSER restricted on >password ~* &* +@all -@admin -SCRIPT

ACL 详细配置

# 创建只读用户
ACL SETUSER readonly on >password ~* &* +@read -@write -@admin -@dangerous

# 创建缓存专用用户
ACL SETUSER cache_user on >password ~cache:* &* +get +set +del +expire +ttl

# 创建管理用户
ACL SETUSER admin on >strongpassword ~* &* +@all

# 查看用户权限
ACL GETUSER readonly

输入验证

def sanitize_key(key: str) -> str:
    """验证和清理 key"""
    if not key:
        raise ValueError("Key cannot be empty")
    if len(key) > 512:
        raise ValueError("Key too long (max 512 bytes)")
    # 可选:限制字符集
    if any(c in key for c in ['\r', '\n', '\x00']):
        raise ValueError("Key contains invalid characters")
    return key

12.5 性能优化清单

命令层面

优化说明
✅ 使用 MSET/MGET批量操作减少 RTT
✅ 使用 Pipeline批量发送命令
✅ 使用 Lua 脚本服务端原子操作
✅ 使用 Hash Tag集群中相关 key 同槽
❌ 避免 KEYS *使用 SCAN 替代
❌ 避免大 Hash/Set考虑分片
❌ 避免长时间脚本限制执行时间

连接层面

# ✅ 正确:使用连接池
pool = redis.ConnectionPool(max_connections=50)
r = redis.Redis(connection_pool=pool)

# ❌ 错误:每次创建新连接
r = redis.Redis(host="127.0.0.1", port=6379)

数据结构选择

场景推荐数据结构不推荐
缓存String / Hash-
对象存储(多字段)HashString (JSON)
计数器String (INCR)-
队列List (LPUSH/RPOP)-
去重Set / Bloom Filter-
排行榜Sorted Set-
限流Sorted Set + LuaString (计数)
分布式锁String (SET NX EX)-
时间序列Sorted Set / RedisTimeSeriesList

12.6 监控与告警

关键指标

# 获取所有指标
redis-cli INFO all

# 关键指标分类
redis-cli INFO memory      # 内存使用
redis-cli INFO stats       # 命令统计
redis-cli INFO clients     # 连接信息
redis-cli INFO replication # 复制状态
指标说明告警阈值
used_memory已用内存> maxmemory 的 80%
used_memory_rssRSS 内存> maxmemory 的 1.5x
connected_clients连接数> max_clients 的 80%
instantaneous_ops_per_secQPS根据容量规划
keyspace_hits命中数计算命中率
keyspace_misses未命中数命中率 < 90%
latest_fork_usec最近 fork 耗时> 500ms
rejected_connections拒绝连接数> 0
expired_keys过期 key 数监控趋势

慢查询日志

# 配置慢查询阈值
CONFIG SET slowlog-log-slower-than 10000  # 10ms
CONFIG SET slowlog-max-len 128

# 查看慢查询
SLOWLOG GET 10

# 输出格式:
# 1) 1) (integer) 1           # ID
#    2) (integer) 1619787123  # 时间戳
#    3) (integer) 15000       # 耗时(微秒)
#    4) 1) "KEYS"             # 命令
#       2) "*"
#    5) "127.0.0.1:52345"    # 客户端地址
#    6) ""                    # 客户端名称

12.7 常见问题排查

问题一:连接超时

原因分析:
- 网络不通
- Redis 拒绝连接(max_clients)
- Redis 阻塞(慢命令、fork、AOF fsync)

排查步骤:
1. telnet <host> <port>
2. 检查 max_clients 配置
3. 检查 slowlog
4. 检查 info clients

问题二:内存增长

原因分析:
- Key 未设置过期时间
- 大 Key 存在
- 内存碎片

排查步骤:
1. INFO memory
2. redis-cli --bigkeys
3. MEMORY USAGE <key>
4. CONFIG SET activedefrag yes

问题三:主从延迟

原因分析:
- 网络带宽不足
- 从节点负载过高
- 大 Key 传输

排查步骤:
1. INFO replication
2. 检查 master_repl_offset vs slave_repl_offset
3. 检查 repl_backlog_size

12.8 代码示例汇总

完整的 Redis 工具类

import redis
import json
import hashlib
import time
from typing import Any, Optional, List, Dict
from functools import wraps

class RedisToolkit:
    """生产级 Redis 工具类"""

    def __init__(self, host="127.0.0.1", port=6379, db=0, password=None):
        self.pool = redis.ConnectionPool(
            host=host, port=port, db=db, password=password,
            max_connections=50, decode_responses=True,
            socket_timeout=2.0, socket_connect_timeout=1.0,
            health_check_interval=30, retry_on_timeout=True
        )
        self.client = redis.Redis(connection_pool=self.pool)
        self._scripts = {}

    # === 基本操作 ===
    def get(self, key: str) -> Optional[str]:
        return self.client.get(key)

    def set(self, key: str, value: str, ex: int = None) -> bool:
        return self.client.set(key, value, ex=ex)

    def delete(self, *keys: str) -> int:
        return self.client.delete(*keys)

    # === JSON 操作 ===
    def set_json(self, key: str, value: Any, ex: int = None) -> bool:
        return self.client.set(key, json.dumps(value, ensure_ascii=False), ex=ex)

    def get_json(self, key: str) -> Optional[Any]:
        data = self.client.get(key)
        return json.loads(data) if data else None

    # === Pipeline ===
    def pipeline(self, transaction=False):
        return self.client.pipeline(transaction=transaction)

    # === 分布式锁 ===
    def acquire_lock(self, lock_name: str, owner: str, ttl: int = 30) -> bool:
        return bool(self.client.set(f"lock:{lock_name}", owner, nx=True, ex=ttl))

    def release_lock(self, lock_name: str, owner: str) -> bool:
        lua = """
        if redis.call('GET', KEYS[1]) == ARGV[1] then
            return redis.call('DEL', KEYS[1])
        end
        return 0
        """
        return bool(self.client.eval(lua, 1, f"lock:{lock_name}", owner))

    # === 限流器 ===
    def is_rate_limited(self, key: str, limit: int, window: int) -> bool:
        lua = """
        local key = KEYS[1]
        local limit = tonumber(ARGV[1])
        local window = tonumber(ARGV[2])
        local now = tonumber(ARGV[3])

        redis.call('ZREMRANGEBYSCORE', key, 0, now - window)
        local count = redis.call('ZCARD', key)

        if count < limit then
            redis.call('ZADD', key, now, now .. math.random())
            redis.call('EXPIRE', key, window)
            return 0
        end
        return 1
        """
        return bool(self.client.eval(lua, 1, key, limit, window, int(time.time())))

    # === 缓存装饰器 ===
    def cached(self, key_prefix: str, ttl: int = 300):
        def decorator(func):
            @wraps(func)
            def wrapper(*args, **kwargs):
                cache_key = f"{key_prefix}:{hashlib.md5(str(args).encode()).hexdigest()}"
                result = self.get_json(cache_key)
                if result is not None:
                    return result
                result = func(*args, **kwargs)
                self.set_json(cache_key, result, ex=ttl)
                return result
            return wrapper
        return decorator

    # === 健康检查 ===
    def health_check(self) -> Dict[str, Any]:
        try:
            start = time.time()
            self.client.ping()
            latency = time.time() - start
            info = self.client.info("memory")
            return {
                "status": "healthy",
                "latency_ms": round(latency * 1000, 2),
                "used_memory_mb": round(info["used_memory"] / 1024 / 1024, 2),
                "connected_clients": self.client.info("clients")["connected_clients"],
            }
        except Exception as e:
            return {"status": "unhealthy", "error": str(e)}

    def close(self):
        self.pool.disconnect()


# 使用示例
toolkit = RedisToolkit()

# 基本操作
toolkit.set("greeting", "hello", ex=60)
print(toolkit.get("greeting"))

# JSON 操作
toolkit.set_json("user:1", {"name": "张三", "age": 30})
user = toolkit.get_json("user:1")

# 分布式锁
if toolkit.acquire_lock("mylock", "process-1", ttl=30):
    try:
        # 执行受保护的操作
        pass
    finally:
        toolkit.release_lock("mylock", "process-1")

# 限流
if not toolkit.is_rate_limited("api:user:123", limit=100, window=60):
    # 允许请求
    pass

# 缓存装饰器
@toolkit.cached("user_info", ttl=300)
def get_user_info(user_id):
    # 耗时操作
    return {"id": user_id, "name": "Alice"}

# 健康检查
health = toolkit.health_check()
print(health)

toolkit.close()

12.9 注意事项

⚠️ 不要在循环中创建连接 始终使用连接池复用连接。

⚠️ 注意序列化开销 JSON 序列化可能成为瓶颈。对于高频操作,考虑 MessagePack 或直接存储二进制。

⚠️ 设置合理的超时 没有超时的连接可能永远阻塞。始终设置 socket_timeout

⚠️ 监控连接池状态 定期检查连接池使用率,避免连接耗尽。

⚠️ 处理连接断开 实现重连逻辑和重试策略,应对网络波动。


12.10 扩展阅读

资源说明
Redis 最佳实践官方设计模式
redis-py 文档Python 客户端
Jedis 文档Java 客户端
go-redis 文档Go 客户端
Redis 安全指南官方安全文档
Redis 内存优化内存优化技巧

上一章:代理实现 | 返回:目录